Skip to content

Commit 27c9cda

Browse files
correct parquet leaf index mapping when schema contains struct cols (#20698)
## Which issue does this PR close? - Closes #20695 ## Rationale for this change Row filter pushdown assumed Arrow field indices equal Parquet leaf indices, which breaks when Struct columns are present because their children expand into separate leaves and shift all subsequent indices
1 parent 340ef60 commit 27c9cda

2 files changed

Lines changed: 140 additions & 35 deletions

File tree

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 92 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ impl FilterCandidateBuilder {
242242

243243
let root_indices: Vec<_> =
244244
required_columns.required_columns.into_iter().collect();
245+
245246
let leaf_indices = leaf_indices_for_roots(
246247
&root_indices,
247248
metadata.file_metadata().schema_descr(),
248-
required_columns.nested,
249249
);
250250

251251
let projected_schema = Arc::new(self.file_schema.project(&root_indices)?);
@@ -277,8 +277,6 @@ struct PushdownChecker<'schema> {
277277
projected_columns: bool,
278278
/// Indices into the file schema of columns required to evaluate the expression.
279279
required_columns: Vec<usize>,
280-
/// Tracks the nested column behavior found during traversal.
281-
nested_behavior: NestedColumnSupport,
282280
/// Whether nested list columns are supported by the predicate semantics.
283281
allow_list_columns: bool,
284282
/// The Arrow schema of the parquet file.
@@ -291,7 +289,6 @@ impl<'schema> PushdownChecker<'schema> {
291289
non_primitive_columns: false,
292290
projected_columns: false,
293291
required_columns: Vec::new(),
294-
nested_behavior: NestedColumnSupport::PrimitiveOnly,
295292
allow_list_columns,
296293
file_schema,
297294
}
@@ -324,16 +321,11 @@ impl<'schema> PushdownChecker<'schema> {
324321
/// `None` if the type is supported and pushdown can continue.
325322
fn handle_nested_type(&mut self, data_type: &DataType) -> Option<TreeNodeRecursion> {
326323
if self.is_nested_type_supported(data_type) {
327-
// Update to ListsSupported if we haven't encountered unsupported types yet
328-
if self.nested_behavior == NestedColumnSupport::PrimitiveOnly {
329-
self.nested_behavior = NestedColumnSupport::ListsSupported;
330-
}
331324
None
332325
} else {
333326
// Block pushdown for unsupported nested types:
334327
// - Structs (regardless of predicate support)
335328
// - Lists without supported predicates
336-
self.nested_behavior = NestedColumnSupport::Unsupported;
337329
self.non_primitive_columns = true;
338330
Some(TreeNodeRecursion::Jump)
339331
}
@@ -368,7 +360,6 @@ impl<'schema> PushdownChecker<'schema> {
368360
self.required_columns.dedup();
369361
PushdownColumns {
370362
required_columns: self.required_columns,
371-
nested: self.nested_behavior,
372363
}
373364
}
374365
}
@@ -391,29 +382,13 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
391382
///
392383
/// This enum makes explicit the different states a predicate can be in
393384
/// with respect to nested column handling during Parquet decoding.
394-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
395-
enum NestedColumnSupport {
396-
/// Expression references only primitive (non-nested) columns.
397-
/// These can always be pushed down to the Parquet decoder.
398-
PrimitiveOnly,
399-
/// Expression references list columns with supported predicates
400-
/// (e.g., array_has, array_has_all, IS NULL).
401-
/// These can be pushed down to the Parquet decoder.
402-
ListsSupported,
403-
/// Expression references unsupported nested types (e.g., structs)
404-
/// or list columns without supported predicates.
405-
/// These cannot be pushed down and must be evaluated after decoding.
406-
Unsupported,
407-
}
408-
409385
/// Result of checking which columns are required for filter pushdown.
410386
#[derive(Debug)]
411387
struct PushdownColumns {
412388
/// Sorted, unique column indices into the file schema required to evaluate
413389
/// the filter expression. Must be in ascending order for correct schema
414390
/// projection matching.
415391
required_columns: Vec<usize>,
416-
nested: NestedColumnSupport,
417392
}
418393

419394
/// Checks if a given expression can be pushed down to the parquet decoder.
@@ -437,15 +412,13 @@ fn pushdown_columns(
437412
fn leaf_indices_for_roots(
438413
root_indices: &[usize],
439414
schema_descr: &SchemaDescriptor,
440-
nested: NestedColumnSupport,
441415
) -> Vec<usize> {
442-
// For primitive-only columns, root indices ARE the leaf indices
443-
if nested == NestedColumnSupport::PrimitiveOnly {
444-
return root_indices.to_vec();
445-
}
446-
447-
// For List columns, expand to the single leaf column (item field)
448-
// For Struct columns (unsupported), this would expand to multiple leaves
416+
// Always map root (Arrow) indices to Parquet leaf indices via the schema
417+
// descriptor. Arrow root indices only equal Parquet leaf indices when the
418+
// schema has no group columns (Struct, Map, etc.); when group columns
419+
// exist, their children become separate leaves and shift all subsequent
420+
// leaf indices.
421+
// Struct columns are unsupported.
449422
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
450423

451424
(0..schema_descr.num_columns())
@@ -1088,6 +1061,91 @@ mod test {
10881061
.expect("parsing schema")
10891062
}
10901063

1064+
/// Regression test: when a schema has Struct columns, Arrow field indices diverge
1065+
/// from Parquet leaf indices (Struct children become separate leaves). The
1066+
/// `PrimitiveOnly` fast-path in `leaf_indices_for_roots` assumes they are equal,
1067+
/// so a filter on a primitive column *after* a Struct gets the wrong leaf index.
1068+
///
1069+
/// Schema:
1070+
/// Arrow indices: col_a=0 struct_col=1 col_b=2
1071+
/// Parquet leaves: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
1072+
///
1073+
/// A filter on col_b should project Parquet leaf 3, but the bug causes it to
1074+
/// project leaf 2 (struct_col.y).
1075+
#[test]
1076+
fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
1077+
use arrow::array::{Int32Array, StringArray, StructArray};
1078+
1079+
let schema = Arc::new(Schema::new(vec![
1080+
Field::new("col_a", DataType::Int32, false),
1081+
Field::new(
1082+
"struct_col",
1083+
DataType::Struct(
1084+
vec![
1085+
Arc::new(Field::new("x", DataType::Int32, true)),
1086+
Arc::new(Field::new("y", DataType::Int32, true)),
1087+
]
1088+
.into(),
1089+
),
1090+
true,
1091+
),
1092+
Field::new("col_b", DataType::Utf8, false),
1093+
]));
1094+
1095+
let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
1096+
let struct_col = Arc::new(StructArray::from(vec![
1097+
(
1098+
Arc::new(Field::new("x", DataType::Int32, true)),
1099+
Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
1100+
),
1101+
(
1102+
Arc::new(Field::new("y", DataType::Int32, true)),
1103+
Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
1104+
),
1105+
]));
1106+
let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
1107+
1108+
let batch =
1109+
RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, col_b])
1110+
.unwrap();
1111+
1112+
let file = NamedTempFile::new().expect("temp file");
1113+
let mut writer =
1114+
ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None)
1115+
.expect("writer");
1116+
writer.write(&batch).expect("write batch");
1117+
writer.close().expect("close writer");
1118+
1119+
let reader_file = file.reopen().expect("reopen file");
1120+
let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
1121+
.expect("reader builder");
1122+
let metadata = builder.metadata().clone();
1123+
let file_schema = builder.schema().clone();
1124+
1125+
// sanity check: 4 Parquet leaves, 3 Arrow fields
1126+
assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
1127+
assert_eq!(file_schema.fields().len(), 3);
1128+
1129+
// build a filter candidate for `col_b = 'target'` through the public API
1130+
let expr = col("col_b").eq(Expr::Literal(
1131+
ScalarValue::Utf8(Some("target".to_string())),
1132+
None,
1133+
));
1134+
let expr = logical2physical(&expr, &file_schema);
1135+
1136+
let candidate = FilterCandidateBuilder::new(expr, file_schema)
1137+
.build(&metadata)
1138+
.expect("building candidate")
1139+
.expect("filter on primitive col_b should be pushable");
1140+
1141+
// col_b is Parquet leaf 3 (shifted by struct_col's two children).
1142+
assert_eq!(
1143+
candidate.projection.leaf_indices,
1144+
vec![3],
1145+
"leaf_indices should be [3] for col_b"
1146+
);
1147+
}
1148+
10911149
/// Sanity check that the given expression could be evaluated against the given schema without any errors.
10921150
/// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
10931151
fn check_expression_can_evaluate_against_schema(

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,6 @@ WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
812812
72.4 53.4 51
813813
70.4 50.4 50
814814

815-
816815
statement ok
817816
set datafusion.execution.parquet.pushdown_filters = true;
818817

@@ -842,3 +841,51 @@ DROP TABLE o2_parquet_20696;
842841
# Cleanup settings
843842
statement ok
844843
set datafusion.execution.parquet.pushdown_filters = false;
844+
845+
##########
846+
# Regression test: filter pushdown with Struct columns in schema
847+
#
848+
# When a schema has Struct columns, Arrow field indices diverge from Parquet
849+
# leaf indices (Struct children become separate leaves). A filter on a
850+
# primitive column *after* a Struct must use the correct Parquet leaf index.
851+
#
852+
# Schema:
853+
# Arrow: col_a=0 struct_col=1 col_b=2
854+
# Parquet: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
855+
##########
856+
857+
statement ok
858+
set datafusion.execution.parquet.pushdown_filters = true;
859+
860+
statement ok
861+
COPY (
862+
SELECT
863+
column1 as col_a,
864+
column2 as struct_col,
865+
column3 as col_b
866+
FROM VALUES
867+
(1, {x: 10, y: 100}, 'aaa'),
868+
(2, {x: 20, y: 200}, 'target'),
869+
(3, {x: 30, y: 300}, 'zzz')
870+
) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'
871+
STORED AS PARQUET;
872+
873+
statement ok
874+
CREATE EXTERNAL TABLE t_struct_filter
875+
STORED AS PARQUET
876+
LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet';
877+
878+
# Filter on col_b (the primitive column after the struct).
879+
# Before the fix, this returned 0 rows because the filter read struct_col.y
880+
# (Parquet leaf 2) instead of col_b (Parquet leaf 3).
881+
query IT
882+
SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target';
883+
----
884+
2 target
885+
886+
# Clean up
887+
statement ok
888+
set datafusion.execution.parquet.pushdown_filters = false;
889+
890+
statement ok
891+
DROP TABLE t_struct_filter;

0 commit comments

Comments
 (0)