Skip to content

Commit f0b2a4a

Browse files
authored
Merge branch 'main' into implement_groups_accumulator_count_distinct_primitive_types
2 parents 01831af + 16e578d commit f0b2a4a

File tree

49 files changed

+3179
-1829
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3179
-1829
lines changed

datafusion-examples/examples/relation_planner/pivot_unpivot.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,25 @@ async fn run_examples(ctx: &SessionContext) -> Result<()> {
217217
+--------+
218218
");
219219

220+
// Example 7: PIVOT on a quoted mixed-case column
221+
// Reuses the parsed column expression so quoted identifiers keep their case.
222+
let results = run_example(
223+
ctx,
224+
"Example 7: PIVOT with quoted mixed-case column",
225+
r#"SELECT * FROM point_stats
226+
PIVOT (MAX(max_value) FOR "pointNumber" IN ('16951' AS p16951, '16952' AS p16952)) AS p
227+
ORDER BY ts"#,
228+
)
229+
.await?;
230+
assert_snapshot!(results, @r"
231+
+----------------------+------+--------+--------+
232+
| ts | port | p16951 | p16952 |
233+
+----------------------+------+--------+--------+
234+
| 2024-09-01T10:00:00Z | 2411 | 10 | 20 |
235+
| 2024-09-01T10:01:00Z | 2411 | 30 | 40 |
236+
+----------------------+------+--------+--------+
237+
");
238+
220239
Ok(())
221240
}
222241

@@ -288,6 +307,34 @@ fn register_sample_data(ctx: &SessionContext) -> Result<()> {
288307
])?,
289308
)?;
290309

310+
// point_stats: grouped data with a quoted mixed-case pivot column.
311+
ctx.register_batch(
312+
"point_stats",
313+
RecordBatch::try_from_iter(vec![
314+
(
315+
"ts",
316+
Arc::new(StringArray::from(vec![
317+
"2024-09-01T10:00:00Z",
318+
"2024-09-01T10:00:00Z",
319+
"2024-09-01T10:01:00Z",
320+
"2024-09-01T10:01:00Z",
321+
])) as ArrayRef,
322+
),
323+
(
324+
"pointNumber",
325+
Arc::new(StringArray::from(vec!["16951", "16952", "16951", "16952"])),
326+
),
327+
(
328+
"port",
329+
Arc::new(StringArray::from(vec!["2411", "2411", "2411", "2411"])),
330+
),
331+
(
332+
"max_value",
333+
Arc::new(Int64Array::from(vec![10, 20, 30, 40])),
334+
),
335+
])?,
336+
)?;
337+
291338
Ok(())
292339
}
293340

@@ -415,11 +462,12 @@ fn plan_pivot(
415462
.collect();
416463

417464
let group_by_cols: Vec<Expr> = schema
418-
.fields()
419465
.iter()
420-
.map(|f| f.name().as_str())
421-
.filter(|name| *name != pivot_col_name.as_str() && !agg_input_cols.contains(name))
422-
.map(col)
466+
.filter(|(_, field)| {
467+
let name = field.name();
468+
name != pivot_col_name.as_str() && !agg_input_cols.contains(&name.as_str())
469+
})
470+
.map(Expr::from)
423471
.collect();
424472

425473
// Build CASE expressions for each (aggregate, pivot_value) pair
@@ -434,7 +482,7 @@ fn plan_pivot(
434482

435483
for (value_alias, pivot_value) in &pivot_values {
436484
// CASE pivot_col WHEN pivot_value THEN agg_input END
437-
let case_expr = case(col(&pivot_col_name))
485+
let case_expr = case(pivot_col.clone())
438486
.when(pivot_value.clone(), agg_input.clone())
439487
.end()?;
440488

datafusion/core/benches/sql_planner.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
130130
format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")
131131
};
132132

133-
let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");
133+
let sql =
134+
format!("CREATE EXTERNAL TABLE hits_raw STORED AS PARQUET LOCATION '{path}'");
134135

135136
// ClickBench partitioned dataset was written by an ancient version of pyarrow that
136137
// that wrote strings with the wrong logical type. To read it correctly, we must
@@ -139,6 +140,17 @@ fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
139140
.unwrap();
140141
rt.block_on(ctx.sql(&sql)).unwrap();
141142

143+
// ClickBench stores EventDate as UInt16 (days since 1970-01-01). Create a view
144+
// that exposes it as SQL DATE so that queries comparing it with date literals
145+
// (e.g. "EventDate >= '2013-07-01'") work correctly during planning.
146+
rt.block_on(ctx.sql(
147+
"CREATE VIEW hits AS \
148+
SELECT * EXCEPT (\"EventDate\"), \
149+
CAST(CAST(\"EventDate\" AS INTEGER) AS DATE) AS \"EventDate\" \
150+
FROM hits_raw",
151+
))
152+
.unwrap();
153+
142154
let count =
143155
rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() });
144156
assert!(count > 0);

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ async fn sort_merge_join_spill() {
213213
.with_config(config)
214214
.with_disk_manager_builder(DiskManagerBuilder::default())
215215
.with_scenario(Scenario::AccessLogStreaming)
216+
.with_expected_success()
216217
.run()
217218
.await
218219
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 89 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -920,12 +920,22 @@ impl MetadataLoadedParquetOpen {
920920
prepared.physical_file_schema = Arc::clone(&physical_file_schema);
921921

922922
// Build predicates for this specific file
923-
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
923+
let pruning_predicate = build_pruning_predicates(
924924
prepared.predicate.as_ref(),
925925
&physical_file_schema,
926926
&prepared.predicate_creation_errors,
927927
);
928928

929+
// Only build page pruning predicate if page index is enabled
930+
let page_pruning_predicate = if prepared.enable_page_index {
931+
prepared.predicate.as_ref().and_then(|predicate| {
932+
let p = build_page_pruning_predicate(predicate, &physical_file_schema);
933+
(p.filter_number() > 0).then_some(p)
934+
})
935+
} else {
936+
None
937+
};
938+
929939
Ok(FiltersPreparedParquetOpen {
930940
loaded: MetadataLoadedParquetOpen {
931941
prepared,
@@ -945,10 +955,7 @@ impl FiltersPreparedParquetOpen {
945955
// metadata load above may not have read the page index structures yet.
946956
// If we need them for reading and they aren't yet loaded, we need to
947957
// load them now.
948-
if should_enable_page_index(
949-
self.loaded.prepared.enable_page_index,
950-
&self.page_pruning_predicate,
951-
) {
958+
if self.page_pruning_predicate.is_some() {
952959
self.loaded.reader_metadata = load_page_index(
953960
self.loaded.reader_metadata,
954961
&mut self.loaded.prepared.async_file_reader,
@@ -1661,20 +1668,13 @@ pub(crate) fn build_pruning_predicates(
16611668
predicate: Option<&Arc<dyn PhysicalExpr>>,
16621669
file_schema: &SchemaRef,
16631670
predicate_creation_errors: &Count,
1664-
) -> (
1665-
Option<Arc<PruningPredicate>>,
1666-
Option<Arc<PagePruningAccessPlanFilter>>,
1667-
) {
1668-
let Some(predicate) = predicate.as_ref() else {
1669-
return (None, None);
1670-
};
1671-
let pruning_predicate = build_pruning_predicate(
1671+
) -> Option<Arc<PruningPredicate>> {
1672+
let predicate = predicate.as_ref()?;
1673+
build_pruning_predicate(
16721674
Arc::clone(predicate),
16731675
file_schema,
16741676
predicate_creation_errors,
1675-
);
1676-
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
1677-
(pruning_predicate, Some(page_pruning_predicate))
1677+
)
16781678
}
16791679

16801680
/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
@@ -1708,18 +1708,6 @@ async fn load_page_index<T: AsyncFileReader>(
17081708
}
17091709
}
17101710

1711-
fn should_enable_page_index(
1712-
enable_page_index: bool,
1713-
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
1714-
) -> bool {
1715-
enable_page_index
1716-
&& page_pruning_predicate.is_some()
1717-
&& page_pruning_predicate
1718-
.as_ref()
1719-
.map(|p| p.filter_number() > 0)
1720-
.unwrap_or(false)
1721-
}
1722-
17231711
#[cfg(test)]
17241712
mod test {
17251713
use std::sync::Arc;
@@ -1849,6 +1837,12 @@ mod test {
18491837
self
18501838
}
18511839

1840+
/// Enable page index.
1841+
fn with_enable_page_index(mut self, enable: bool) -> Self {
1842+
self.enable_page_index = enable;
1843+
self
1844+
}
1845+
18521846
/// Set reverse row groups flag.
18531847
fn with_reverse_row_groups(mut self, enable: bool) -> Self {
18541848
self.reverse_row_groups = enable;
@@ -2713,4 +2707,71 @@ mod test {
27132707
"Reverse scan with non-contiguous row groups should correctly map RowSelection"
27142708
);
27152709
}
2710+
2711+
/// Test that page pruning predicates are only built and applied when `enable_page_index` is true.
2712+
///
2713+
/// The file has a single row group with 10 pages (10 rows each, values 1..100).
2714+
/// With page index enabled, pages whose max value <= 90 are pruned, returning only
2715+
/// the last page (rows 91..100). With page index disabled, all 100 rows are returned
2716+
/// since neither pushdown nor row-group pruning is active.
2717+
#[tokio::test]
2718+
async fn test_page_pruning_predicate_respects_enable_page_index() {
2719+
use parquet::file::properties::WriterProperties;
2720+
2721+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
2722+
2723+
// 100 rows with values 1..=100, written as a single row group with 10 rows per page
2724+
let values: Vec<i32> = (1..=100).collect();
2725+
let batch = record_batch!((
2726+
"a",
2727+
Int32,
2728+
values.iter().map(|v| Some(*v)).collect::<Vec<_>>()
2729+
))
2730+
.unwrap();
2731+
let props = WriterProperties::builder()
2732+
.set_data_page_row_count_limit(10)
2733+
.set_write_batch_size(10)
2734+
.build();
2735+
let schema = batch.schema();
2736+
let data_size = write_parquet_batches(
2737+
Arc::clone(&store),
2738+
"test.parquet",
2739+
vec![batch],
2740+
Some(props),
2741+
)
2742+
.await;
2743+
2744+
let file = PartitionedFile::new("test.parquet".to_string(), data_size as u64);
2745+
2746+
// predicate: a > 90 — should allow page index to prune first 9 pages
2747+
let predicate = logical2physical(&col("a").gt(lit(90i32)), &schema);
2748+
2749+
let make_opener = |enable_page_index| {
2750+
ParquetOpenerBuilder::new()
2751+
.with_store(Arc::clone(&store))
2752+
.with_schema(Arc::clone(&schema))
2753+
.with_predicate(Arc::clone(&predicate))
2754+
.with_enable_page_index(enable_page_index)
2755+
// disable pushdown and row-group pruning so the only pruning path is page index
2756+
.with_pushdown_filters(false)
2757+
.with_row_group_stats_pruning(false)
2758+
.build()
2759+
};
2760+
let (_, rows_with_page_index) = count_batches_and_rows(
2761+
make_opener(true).open(file.clone()).unwrap().await.unwrap(),
2762+
)
2763+
.await;
2764+
let (_, rows_without_page_index) =
2765+
count_batches_and_rows(make_opener(false).open(file).unwrap().await.unwrap())
2766+
.await;
2767+
2768+
assert_eq!(
2769+
rows_with_page_index, 10,
2770+
"page index should prune 9 of 10 pages"
2771+
);
2772+
assert_eq!(
2773+
rows_without_page_index, 100,
2774+
"without page index all rows are returned"
2775+
);
2776+
}
27162777
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -629,17 +629,17 @@ impl FileSource for ParquetSource {
629629
write!(f, ", reverse_row_groups=true")?;
630630
}
631631

632-
// Try to build a the pruning predicates.
632+
// Try to build the pruning predicates.
633633
// These are only generated here because it's useful to have *some*
634634
// idea of what pushdown is happening when viewing plans.
635-
// However it is important to note that these predicates are *not*
635+
// However, it is important to note that these predicates are *not*
636636
// necessarily the predicates that are actually evaluated:
637637
// the actual predicates are built in reference to the physical schema of
638638
// each file, which we do not have at this point and hence cannot use.
639-
// Instead we use the logical schema of the file (the table schema without partition columns).
639+
// Instead, we use the logical schema of the file (the table schema without partition columns).
640640
if let Some(predicate) = &self.predicate {
641641
let predicate_creation_errors = Count::new();
642-
if let (Some(pruning_predicate), _) = build_pruning_predicates(
642+
if let Some(pruning_predicate) = build_pruning_predicates(
643643
Some(predicate),
644644
self.table_schema.table_schema(),
645645
&predicate_creation_errors,

0 commit comments

Comments
 (0)