Skip to content

Commit 498a825

Browse files
authored
Merge branch 'main' into rm-aggregates-integers
2 parents 9d2790d + d530276 commit 498a825

109 files changed

Lines changed: 3724 additions & 682 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 19 additions & 43 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/README.md

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,19 @@ cargo run --example dataframe -- dataframe
8888

8989
#### Category: Single Process
9090

91-
| Subcommand | File Path | Description |
92-
| -------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------ |
93-
| catalog | [`data_io/catalog.rs`](examples/data_io/catalog.rs) | Register tables into a custom catalog |
94-
| json_shredding | [`data_io/json_shredding.rs`](examples/data_io/json_shredding.rs) | Implement filter rewriting for JSON shredding |
95-
| parquet_adv_idx | [`data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs) | Create a secondary index across multiple parquet files |
96-
| parquet_emb_idx | [`data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs) | Store a custom index inside Parquet files |
97-
| parquet_enc | [`data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs) | Read & write encrypted Parquet files |
98-
| parquet_enc_with_kms | [`data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs) | Encrypted Parquet I/O using a KMS-backed factory |
99-
| parquet_exec_visitor | [`data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs) | Extract statistics by visiting an ExecutionPlan |
100-
| parquet_idx | [`data_io/parquet_index.rs`](examples/data_io/parquet_index.rs) | Create a secondary index |
101-
| query_http_csv | [`data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs) | Query CSV files via HTTP |
102-
| remote_catalog | [`data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs) | Interact with a remote catalog |
91+
| Subcommand | File Path | Description |
92+
| ---------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------------- |
93+
| catalog | [`data_io/catalog.rs`](examples/data_io/catalog.rs) | Register tables into a custom catalog |
94+
| in_memory_object_store | [`data_io/in_memory_object_store.rs`](examples/data_io/in_memory_object_store.rs) | Read CSV from an in-memory object store (pattern applies to JSON/Parquet) |
95+
| json_shredding | [`data_io/json_shredding.rs`](examples/data_io/json_shredding.rs) | Implement filter rewriting for JSON shredding |
96+
| parquet_adv_idx | [`data_io/parquet_advanced_index.rs`](examples/data_io/parquet_advanced_index.rs) | Create a secondary index across multiple parquet files |
97+
| parquet_emb_idx | [`data_io/parquet_embedded_index.rs`](examples/data_io/parquet_embedded_index.rs) | Store a custom index inside Parquet files |
98+
| parquet_enc | [`data_io/parquet_encrypted.rs`](examples/data_io/parquet_encrypted.rs) | Read & write encrypted Parquet files |
99+
| parquet_enc_with_kms | [`data_io/parquet_encrypted_with_kms.rs`](examples/data_io/parquet_encrypted_with_kms.rs) | Encrypted Parquet I/O using a KMS-backed factory |
100+
| parquet_exec_visitor | [`data_io/parquet_exec_visitor.rs`](examples/data_io/parquet_exec_visitor.rs) | Extract statistics by visiting an ExecutionPlan |
101+
| parquet_idx | [`data_io/parquet_index.rs`](examples/data_io/parquet_index.rs) | Create a secondary index |
102+
| query_http_csv | [`data_io/query_http_csv.rs`](examples/data_io/query_http_csv.rs) | Query CSV files via HTTP |
103+
| remote_catalog | [`data_io/remote_catalog.rs`](examples/data_io/remote_catalog.rs) | Interact with a remote catalog |
103104

104105
## DataFrame Examples
105106

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! See `main.rs` for how to run it.
19+
//!
20+
//! This follows the recommended approach: implement the `ObjectStore` trait
21+
//! (or use an existing implementation), register it with DataFusion, and then
22+
//! read a URL "path" from that store.
23+
//! See the in-memory reference implementation:
24+
//! https://docs.rs/object_store/latest/object_store/memory/struct.InMemory.html
25+
26+
use std::sync::Arc;
27+
28+
use arrow::datatypes::{DataType, Field, Schema};
29+
use datafusion::assert_batches_eq;
30+
use datafusion::common::Result;
31+
use datafusion::execution::object_store::ObjectStoreUrl;
32+
use datafusion::prelude::{CsvReadOptions, SessionContext};
33+
use object_store::memory::InMemory;
34+
use object_store::path::Path;
35+
use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
36+
37+
/// Demonstrates reading CSV data from an in-memory object store.
38+
///
39+
/// The same pattern applies to JSON/Parquet: register a store for a URL
40+
/// prefix, write bytes into the store, then read via that URL.
41+
pub async fn in_memory_object_store() -> Result<()> {
42+
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
43+
let ctx = SessionContext::new();
44+
let object_store_url = ObjectStoreUrl::parse("memory://")?;
45+
// Register a URL prefix to route reads through this object store.
46+
ctx.register_object_store(object_store_url.as_ref(), Arc::clone(&store));
47+
48+
let schema = Schema::new(vec![
49+
Field::new("id", DataType::Int64, false),
50+
Field::new("name", DataType::Utf8, false),
51+
]);
52+
53+
println!("=== CSV from memory ===");
54+
let csv_path = Path::from("/people.csv");
55+
let csv_data = b"id,name\n1,Alice\n2,Bob\n";
56+
// Write bytes into the in-memory object store.
57+
store
58+
.put(&csv_path, PutPayload::from_static(csv_data))
59+
.await?;
60+
// Read using the URL that matches the registered prefix.
61+
let csv = ctx
62+
.read_csv(
63+
"memory:///people.csv",
64+
CsvReadOptions::new().schema(&schema),
65+
)
66+
.await?
67+
.collect()
68+
.await?;
69+
#[rustfmt::skip]
70+
let expected = [
71+
"+----+-------+",
72+
"| id | name |",
73+
"+----+-------+",
74+
"| 1 | Alice |",
75+
"| 2 | Bob |",
76+
"+----+-------+",
77+
];
78+
assert_batches_eq!(expected, &csv);
79+
80+
Ok(())
81+
}

datafusion-examples/examples/data_io/main.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//!
2222
//! ## Usage
2323
//! ```bash
24-
//! cargo run --example data_io -- [all|catalog|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
24+
//! cargo run --example data_io -- [all|catalog|in_memory_object_store|json_shredding|parquet_adv_idx|parquet_emb_idx|parquet_enc_with_kms|parquet_enc|parquet_exec_visitor|parquet_idx|query_http_csv|remote_catalog]
2525
//! ```
2626
//!
2727
//! Each subcommand runs a corresponding example:
@@ -30,6 +30,9 @@
3030
//! - `catalog`
3131
//! (file: catalog.rs, desc: Register tables into a custom catalog)
3232
//!
33+
//! - `in_memory_object_store`
34+
//! (file: in_memory_object_store.rs, desc: Read CSV from an in-memory object store (pattern applies to JSON/Parquet))
35+
//!
3336
//! - `json_shredding`
3437
//! (file: json_shredding.rs, desc: Implement filter rewriting for JSON shredding)
3538
//!
@@ -58,6 +61,7 @@
5861
//! (file: remote_catalog.rs, desc: Interact with a remote catalog)
5962
6063
mod catalog;
64+
mod in_memory_object_store;
6165
mod json_shredding;
6266
mod parquet_advanced_index;
6367
mod parquet_embedded_index;
@@ -77,6 +81,7 @@ use strum_macros::{Display, EnumIter, EnumString, VariantNames};
7781
enum ExampleKind {
7882
All,
7983
Catalog,
84+
InMemoryObjectStore,
8085
JsonShredding,
8186
ParquetAdvIdx,
8287
ParquetEmbIdx,
@@ -104,6 +109,9 @@ impl ExampleKind {
104109
}
105110
}
106111
ExampleKind::Catalog => catalog::catalog().await?,
112+
ExampleKind::InMemoryObjectStore => {
113+
in_memory_object_store::in_memory_object_store().await?
114+
}
107115
ExampleKind::JsonShredding => json_shredding::json_shredding().await?,
108116
ExampleKind::ParquetAdvIdx => {
109117
parquet_advanced_index::parquet_advanced_index().await?

datafusion-examples/examples/query_planning/expr_api.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,10 @@ fn simplify_demo() -> Result<()> {
175175
// the ExecutionProps carries information needed to simplify
176176
// expressions, such as the current time (to evaluate `now()`
177177
// correctly)
178-
let context = SimplifyContext::default()
178+
let context = SimplifyContext::builder()
179179
.with_schema(schema)
180-
.with_current_time();
180+
.with_current_time()
181+
.build();
181182
let simplifier = ExprSimplifier::new(context);
182183

183184
// And then call the simplify_expr function:
@@ -192,9 +193,10 @@ fn simplify_demo() -> Result<()> {
192193

193194
// here are some other examples of what DataFusion is capable of
194195
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
195-
let context = SimplifyContext::default()
196+
let context = SimplifyContext::builder()
196197
.with_schema(Arc::clone(&schema))
197-
.with_current_time();
198+
.with_current_time()
199+
.build();
198200
let simplifier = ExprSimplifier::new(context);
199201

200202
// basic arithmetic simplification
@@ -554,9 +556,10 @@ fn type_coercion_demo() -> Result<()> {
554556
assert!(physical_expr.evaluate(&batch).is_ok());
555557

556558
// 2. Type coercion with `ExprSimplifier::coerce`.
557-
let context = SimplifyContext::default()
559+
let context = SimplifyContext::builder()
558560
.with_schema(Arc::new(df_schema.clone()))
559-
.with_current_time();
561+
.with_current_time()
562+
.build();
560563
let simplifier = ExprSimplifier::new(context);
561564
let coerced_expr = simplifier.coerce(expr.clone(), &df_schema)?;
562565
let physical_expr = datafusion::physical_expr::create_physical_expr(

0 commit comments

Comments
 (0)