Skip to content

Commit e922af9

Browse files
[branch-53] correct parquet leaf index mapping when schema contains struct cols (#20698) (#20884)
- Part of #19692 - Closes #20695 on branch-53 This PR: - Backports #20698 from @friendlymatthew to the branch-53 line Co-authored-by: Matthew Kim <38759997+friendlymatthew@users.noreply.github.com>
1 parent 76751f1 commit e922af9

2 files changed

Lines changed: 140 additions & 35 deletions

File tree

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 92 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ impl FilterCandidateBuilder {
242242

243243
let root_indices: Vec<_> =
244244
required_columns.required_columns.into_iter().collect();
245+
245246
let leaf_indices = leaf_indices_for_roots(
246247
&root_indices,
247248
metadata.file_metadata().schema_descr(),
248-
required_columns.nested,
249249
);
250250

251251
let projected_schema = Arc::new(self.file_schema.project(&root_indices)?);
@@ -277,8 +277,6 @@ struct PushdownChecker<'schema> {
277277
projected_columns: bool,
278278
/// Indices into the file schema of columns required to evaluate the expression.
279279
required_columns: Vec<usize>,
280-
/// Tracks the nested column behavior found during traversal.
281-
nested_behavior: NestedColumnSupport,
282280
/// Whether nested list columns are supported by the predicate semantics.
283281
allow_list_columns: bool,
284282
/// The Arrow schema of the parquet file.
@@ -291,7 +289,6 @@ impl<'schema> PushdownChecker<'schema> {
291289
non_primitive_columns: false,
292290
projected_columns: false,
293291
required_columns: Vec::new(),
294-
nested_behavior: NestedColumnSupport::PrimitiveOnly,
295292
allow_list_columns,
296293
file_schema,
297294
}
@@ -324,16 +321,11 @@ impl<'schema> PushdownChecker<'schema> {
324321
/// `None` if the type is supported and pushdown can continue.
325322
fn handle_nested_type(&mut self, data_type: &DataType) -> Option<TreeNodeRecursion> {
326323
if self.is_nested_type_supported(data_type) {
327-
// Update to ListsSupported if we haven't encountered unsupported types yet
328-
if self.nested_behavior == NestedColumnSupport::PrimitiveOnly {
329-
self.nested_behavior = NestedColumnSupport::ListsSupported;
330-
}
331324
None
332325
} else {
333326
// Block pushdown for unsupported nested types:
334327
// - Structs (regardless of predicate support)
335328
// - Lists without supported predicates
336-
self.nested_behavior = NestedColumnSupport::Unsupported;
337329
self.non_primitive_columns = true;
338330
Some(TreeNodeRecursion::Jump)
339331
}
@@ -368,7 +360,6 @@ impl<'schema> PushdownChecker<'schema> {
368360
self.required_columns.dedup();
369361
PushdownColumns {
370362
required_columns: self.required_columns,
371-
nested: self.nested_behavior,
372363
}
373364
}
374365
}
@@ -391,29 +382,13 @@ impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
391382
///
392383
/// This enum makes explicit the different states a predicate can be in
393384
/// with respect to nested column handling during Parquet decoding.
394-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
395-
enum NestedColumnSupport {
396-
/// Expression references only primitive (non-nested) columns.
397-
/// These can always be pushed down to the Parquet decoder.
398-
PrimitiveOnly,
399-
/// Expression references list columns with supported predicates
400-
/// (e.g., array_has, array_has_all, IS NULL).
401-
/// These can be pushed down to the Parquet decoder.
402-
ListsSupported,
403-
/// Expression references unsupported nested types (e.g., structs)
404-
/// or list columns without supported predicates.
405-
/// These cannot be pushed down and must be evaluated after decoding.
406-
Unsupported,
407-
}
408-
409385
/// Result of checking which columns are required for filter pushdown.
410386
#[derive(Debug)]
411387
struct PushdownColumns {
412388
/// Sorted, unique column indices into the file schema required to evaluate
413389
/// the filter expression. Must be in ascending order for correct schema
414390
/// projection matching.
415391
required_columns: Vec<usize>,
416-
nested: NestedColumnSupport,
417392
}
418393

419394
/// Checks if a given expression can be pushed down to the parquet decoder.
@@ -437,15 +412,13 @@ fn pushdown_columns(
437412
fn leaf_indices_for_roots(
438413
root_indices: &[usize],
439414
schema_descr: &SchemaDescriptor,
440-
nested: NestedColumnSupport,
441415
) -> Vec<usize> {
442-
// For primitive-only columns, root indices ARE the leaf indices
443-
if nested == NestedColumnSupport::PrimitiveOnly {
444-
return root_indices.to_vec();
445-
}
446-
447-
// For List columns, expand to the single leaf column (item field)
448-
// For Struct columns (unsupported), this would expand to multiple leaves
416+
// Always map root (Arrow) indices to Parquet leaf indices via the schema
417+
// descriptor. Arrow root indices only equal Parquet leaf indices when the
418+
// schema has no group columns (Struct, Map, etc.); when group columns
419+
// exist, their children become separate leaves and shift all subsequent
420+
// leaf indices.
421+
// Struct columns are unsupported.
449422
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
450423

451424
(0..schema_descr.num_columns())
@@ -1088,6 +1061,91 @@ mod test {
10881061
.expect("parsing schema")
10891062
}
10901063

1064+
/// Regression test: when a schema has Struct columns, Arrow field indices diverge
1065+
/// from Parquet leaf indices (Struct children become separate leaves). The
1066+
/// `PrimitiveOnly` fast-path in `leaf_indices_for_roots` assumes they are equal,
1067+
/// so a filter on a primitive column *after* a Struct gets the wrong leaf index.
1068+
///
1069+
/// Schema:
1070+
/// Arrow indices: col_a=0 struct_col=1 col_b=2
1071+
/// Parquet leaves: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
1072+
///
1073+
/// A filter on col_b should project Parquet leaf 3, but the bug causes it to
1074+
/// project leaf 2 (struct_col.y).
1075+
#[test]
1076+
fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
1077+
use arrow::array::{Int32Array, StringArray, StructArray};
1078+
1079+
let schema = Arc::new(Schema::new(vec![
1080+
Field::new("col_a", DataType::Int32, false),
1081+
Field::new(
1082+
"struct_col",
1083+
DataType::Struct(
1084+
vec![
1085+
Arc::new(Field::new("x", DataType::Int32, true)),
1086+
Arc::new(Field::new("y", DataType::Int32, true)),
1087+
]
1088+
.into(),
1089+
),
1090+
true,
1091+
),
1092+
Field::new("col_b", DataType::Utf8, false),
1093+
]));
1094+
1095+
let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
1096+
let struct_col = Arc::new(StructArray::from(vec![
1097+
(
1098+
Arc::new(Field::new("x", DataType::Int32, true)),
1099+
Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
1100+
),
1101+
(
1102+
Arc::new(Field::new("y", DataType::Int32, true)),
1103+
Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
1104+
),
1105+
]));
1106+
let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
1107+
1108+
let batch =
1109+
RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, col_b])
1110+
.unwrap();
1111+
1112+
let file = NamedTempFile::new().expect("temp file");
1113+
let mut writer =
1114+
ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), None)
1115+
.expect("writer");
1116+
writer.write(&batch).expect("write batch");
1117+
writer.close().expect("close writer");
1118+
1119+
let reader_file = file.reopen().expect("reopen file");
1120+
let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
1121+
.expect("reader builder");
1122+
let metadata = builder.metadata().clone();
1123+
let file_schema = builder.schema().clone();
1124+
1125+
// sanity check: 4 Parquet leaves, 3 Arrow fields
1126+
assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
1127+
assert_eq!(file_schema.fields().len(), 3);
1128+
1129+
// build a filter candidate for `col_b = 'target'` through the public API
1130+
let expr = col("col_b").eq(Expr::Literal(
1131+
ScalarValue::Utf8(Some("target".to_string())),
1132+
None,
1133+
));
1134+
let expr = logical2physical(&expr, &file_schema);
1135+
1136+
let candidate = FilterCandidateBuilder::new(expr, file_schema)
1137+
.build(&metadata)
1138+
.expect("building candidate")
1139+
.expect("filter on primitive col_b should be pushable");
1140+
1141+
// col_b is Parquet leaf 3 (shifted by struct_col's two children).
1142+
assert_eq!(
1143+
candidate.projection.leaf_indices,
1144+
vec![3],
1145+
"leaf_indices should be [3] for col_b"
1146+
);
1147+
}
1148+
10911149
/// Sanity check that the given expression could be evaluated against the given schema without any errors.
10921150
/// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
10931151
fn check_expression_can_evaluate_against_schema(

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,6 @@ WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
812812
72.4 53.4 51
813813
70.4 50.4 50
814814

815-
816815
statement ok
817816
set datafusion.execution.parquet.pushdown_filters = true;
818817

@@ -842,3 +841,51 @@ DROP TABLE o2_parquet_20696;
842841
# Cleanup settings
843842
statement ok
844843
set datafusion.execution.parquet.pushdown_filters = false;
844+
845+
##########
846+
# Regression test: filter pushdown with Struct columns in schema
847+
#
848+
# When a schema has Struct columns, Arrow field indices diverge from Parquet
849+
# leaf indices (Struct children become separate leaves). A filter on a
850+
# primitive column *after* a Struct must use the correct Parquet leaf index.
851+
#
852+
# Schema:
853+
# Arrow: col_a=0 struct_col=1 col_b=2
854+
# Parquet: col_a=0 struct_col.x=1 struct_col.y=2 col_b=3
855+
##########
856+
857+
statement ok
858+
set datafusion.execution.parquet.pushdown_filters = true;
859+
860+
statement ok
861+
COPY (
862+
SELECT
863+
column1 as col_a,
864+
column2 as struct_col,
865+
column3 as col_b
866+
FROM VALUES
867+
(1, {x: 10, y: 100}, 'aaa'),
868+
(2, {x: 20, y: 200}, 'target'),
869+
(3, {x: 30, y: 300}, 'zzz')
870+
) TO 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet'
871+
STORED AS PARQUET;
872+
873+
statement ok
874+
CREATE EXTERNAL TABLE t_struct_filter
875+
STORED AS PARQUET
876+
LOCATION 'test_files/scratch/parquet_filter_pushdown/struct_filter.parquet';
877+
878+
# Filter on col_b (the primitive column after the struct).
879+
# Before the fix, this returned 0 rows because the filter read struct_col.y
880+
# (Parquet leaf 2) instead of col_b (Parquet leaf 3).
881+
query IT
882+
SELECT col_a, col_b FROM t_struct_filter WHERE col_b = 'target';
883+
----
884+
2 target
885+
886+
# Clean up
887+
statement ok
888+
set datafusion.execution.parquet.pushdown_filters = false;
889+
890+
statement ok
891+
DROP TABLE t_struct_filter;

0 commit comments

Comments
 (0)