Skip to content

Commit b0e0c07

Browse files
Dandandanclaude
andcommitted
Preserve column order in projection embedding to eliminate redundant ProjectionExec nodes
When `try_embed_projection` embeds a projection into a HashJoinExec (or other operators), `collect_column_indices` previously collected column indices into a HashSet and sorted them, losing the original column ordering from the projection. This meant that when a projection simply reordered columns, the embedded projection would output columns in sorted index order instead of the desired order, requiring a residual ProjectionExec just to reorder them. This change preserves the insertion order of column indices (using a HashSet only for dedup, not for iteration). For simple Column expressions, the index is taken directly in projection order. For complex expressions referencing multiple columns, indices are sorted for determinism (since `collect_columns` returns a HashSet). This eliminates unnecessary ProjectionExec nodes across many query plans, including TPC-H Q2 where two ProjectionExec nodes are removed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1e93a67 commit b0e0c07

43 files changed

Lines changed: 469 additions & 534 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion/physical-plan/src/projection.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,15 +1080,38 @@ fn try_unifying_projections(
10801080

10811081
/// Collect all column indices from the given projection expressions.
10821082
fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
1083-
// Collect indices and remove duplicates.
1084-
let mut indices = exprs
1085-
.iter()
1086-
.flat_map(|proj_expr| collect_columns(&proj_expr.expr))
1087-
.map(|x| x.index())
1088-
.collect::<std::collections::HashSet<_>>()
1089-
.into_iter()
1090-
.collect::<Vec<_>>();
1091-
indices.sort();
1083+
// Collect column indices in a deterministic order that preserves the
1084+
// projection's column ordering when possible. For simple Column
1085+
// expressions, we use the column index directly (preserving the
1086+
// projection's desired output order). For complex expressions with
1087+
// multiple column references, we sort indices for determinism since
1088+
// collect_columns returns a HashSet with non-deterministic iteration.
1089+
// This allows the embedded projection to match the desired output
1090+
// column order for simple column reorderings, avoiding a residual
1091+
// ProjectionExec.
1092+
let mut seen = std::collections::HashSet::new();
1093+
let mut indices = Vec::new();
1094+
for proj_expr in exprs {
1095+
if let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() {
1096+
// Simple column reference: preserve projection order.
1097+
if seen.insert(col.index()) {
1098+
indices.push(col.index());
1099+
}
1100+
} else {
1101+
// Complex expression: collect all referenced columns in sorted
1102+
// order for determinism.
1103+
let mut expr_indices: Vec<usize> = collect_columns(&proj_expr.expr)
1104+
.into_iter()
1105+
.map(|c| c.index())
1106+
.collect();
1107+
expr_indices.sort();
1108+
for idx in expr_indices {
1109+
if seen.insert(idx) {
1110+
indices.push(idx);
1111+
}
1112+
}
1113+
}
1114+
}
10921115
indices
10931116
}
10941117

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8780,7 +8780,7 @@ ORDER BY g;
87808780

87818781
# Config reset
87828782

8783-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
8783+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
87848784
# reset it explicitly.
87858785
statement ok
87868786
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregate_repartition.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ physical_plan
131131

132132
# Config reset
133133

134-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
134+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
135135
# reset it explicitly.
136136
statement ok
137137
SET datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregate_skip_partial.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ e true false NULL
220220
statement ok
221221
reset datafusion.execution.batch_size;
222222

223-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
223+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
224224
# reset it explicitly.
225225
statement ok
226226
set datafusion.execution.target_partitions = 4;
@@ -712,7 +712,7 @@ ORDER BY i;
712712
statement ok
713713
reset datafusion.execution.batch_size;
714714

715-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
715+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
716716
# reset it explicitly.
717717
statement ok
718718
set datafusion.execution.target_partitions = 4;
@@ -772,7 +772,7 @@ true false false false false true false NULL
772772
statement ok
773773
reset datafusion.execution.batch_size;
774774

775-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
775+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
776776
# reset it explicitly.
777777
statement ok
778778
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/clickbench.slt

Lines changed: 10 additions & 12 deletions
Large diffs are not rendered by default.

datafusion/sqllogictest/test_files/date_bin_errors.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ select date_bin(interval '1637426858 months', to_timestamp_millis(1040292460), t
2323
----
2424
NULL
2525

26-
# Negative timestamp with month interval - should return NULL instead of panicking
26+
# Negative timestamp with month interval - should return NULL instead of panicking
2727
query P
2828
select date_bin(interval '1 month', to_timestamp_millis(-1040292460), timestamp '1984-01-07 00:00:00');
2929
----
@@ -57,4 +57,4 @@ select date_bin(
5757
timestamp '1984-01-07 00:00:00'
5858
) as b;
5959
----
60-
NULL
60+
NULL

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,9 @@ logical_plan
155155
05)----SubqueryAlias: r
156156
06)------TableScan: right_parquet projection=[id, info]
157157
physical_plan
158-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
159-
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
160-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
161-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
158+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
159+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
160+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
162161

163162
# Disable Join dynamic filter pushdown
164163
statement ok
@@ -178,10 +177,9 @@ logical_plan
178177
05)----SubqueryAlias: r
179178
06)------TableScan: right_parquet projection=[id, info]
180179
physical_plan
181-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
182-
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
183-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
184-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
180+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
181+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
182+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
185183

186184
# Re-enable for next tests
187185
statement ok
@@ -203,10 +201,9 @@ logical_plan
203201
05)----SubqueryAlias: r
204202
06)------TableScan: right_parquet projection=[id, info]
205203
physical_plan
206-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
207-
02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
208-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
209-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
204+
01)HashJoinExec: mode=CollectLeft, join_type=Right, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
205+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
206+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
210207

211208
# LEFT JOIN correctness: all left rows appear, unmatched right rows produce NULLs
212209
query ITT
@@ -236,10 +233,9 @@ logical_plan
236233
05)----SubqueryAlias: r
237234
06)------TableScan: right_parquet projection=[id, info]
238235
physical_plan
239-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
240-
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
241-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
242-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
236+
01)HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
237+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
238+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
243239

244240
# RIGHT JOIN correctness: all right rows appear, unmatched left rows produce NULLs
245241
query ITT
@@ -266,8 +262,8 @@ logical_plan
266262
05)----SubqueryAlias: r
267263
06)------TableScan: right_parquet projection=[id, info]
268264
physical_plan
269-
01)ProjectionExec: expr=[id@2 as id, id@0 as rid, data@3 as data, info@1 as info]
270-
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)]
265+
01)ProjectionExec: expr=[id@0 as id, id@1 as rid, data@2 as data, info@3 as info]
266+
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)], projection=[id@2, id@0, data@3, info@1]
271267
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
272268
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
273269

@@ -468,10 +464,9 @@ logical_plan
468464
05)----SubqueryAlias: r
469465
06)------TableScan: right_parquet projection=[id, info]
470466
physical_plan
471-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
472-
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
473-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
474-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
467+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
468+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
469+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
475470

476471
# Enable TopK, disable Join
477472
statement ok
@@ -494,10 +489,9 @@ logical_plan
494489
05)----SubqueryAlias: r
495490
06)------TableScan: right_parquet projection=[id, info]
496491
physical_plan
497-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
498-
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
499-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
500-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
492+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
493+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
494+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
501495

502496
# Test 4: Aggregate dynamic filter pushdown
503497

@@ -643,10 +637,9 @@ logical_plan
643637
05)----SubqueryAlias: r
644638
06)------TableScan: right_parquet projection=[id, info]
645639
physical_plan
646-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
647-
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
648-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
649-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
640+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
641+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
642+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
650643

651644
# Re-enable
652645
statement ok
@@ -691,10 +684,9 @@ logical_plan
691684
05)----SubqueryAlias: r
692685
06)------TableScan: right_parquet projection=[id, info]
693686
physical_plan
694-
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
695-
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
696-
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
697-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
687+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[id@2, data@3, info@1]
688+
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
689+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
698690

699691
# Test 6: Regression test for issue #20213 - dynamic filter applied to wrong table
700692
# when subquery join has same column names on both sides.

datafusion/sqllogictest/test_files/encrypted_parquet.slt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,10 @@ float_field float
8585
)
8686
STORED AS PARQUET LOCATION 'test_files/scratch/encrypted_parquet/'
8787

88-
query error DataFusion error: Parquet error: Parquet error: Parquet file has an encrypted footer but decryption properties were not provided
88+
query RR
8989
SELECT * FROM parquet_table
90+
----
91+
5 6
92+
-1 -1
93+
1 2
94+
3 4

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -647,11 +647,11 @@ logical_plan
647647
02)--{
648648
03)----"Plan": {
649649
04)------"Node Type": "Values",
650-
05)------"Values": "(Int64(1))",
651-
06)------"Plans": [],
652-
07)------"Output": [
653-
08)--------"column1"
654-
09)------]
650+
05)------"Output": [
651+
06)--------"column1"
652+
07)------],
653+
08)------"Plans": [],
654+
09)------"Values": "(Int64(1))"
655655
10)----}
656656
11)--}
657657
12)]

0 commit comments

Comments
 (0)