Skip to content

Commit 6e943ae

Browse files
Dandandanclaude
andcommitted
Preserve column order in projection embedding to eliminate redundant ProjectionExec nodes
When `try_embed_projection` embeds a projection into a HashJoinExec (or other operators), `collect_column_indices` previously collected column indices into a HashSet and sorted them, losing the original column ordering from the projection. This meant that when a projection simply reordered columns, the embedded projection would output columns in sorted index order instead of the desired order, requiring a residual ProjectionExec just to reorder them. This change preserves the insertion order of column indices: - For simple Column expressions: the index is taken directly in projection order - For complex expressions referencing multiple columns: indices are sorted for determinism (since `collect_columns` returns a HashSet) Additionally, the final embedding decision now checks whether the embedding actually reduced the number of columns. If the residual ProjectionExec is not removable AND no columns were eliminated, the embedding is skipped (it would only add an unnecessary column reorder). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1e93a67 commit 6e943ae

42 files changed

Lines changed: 456 additions & 520 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

datafusion/physical-plan/src/projection.rs

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -607,13 +607,7 @@ pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
607607
return Ok(None);
608608
};
609609

610-
// If the projection indices is the same as the input columns, we don't need to embed the projection to hash join.
611-
// Check the projection_index is 0..n-1 and the length of projection_index is the same as the length of execution_plan schema fields.
612-
if projection_index.len() == projection_index.last().unwrap() + 1
613-
&& projection_index.len() == execution_plan.schema().fields().len()
614-
{
615-
return Ok(None);
616-
}
610+
let columns_reduced = projection_index.len() < execution_plan.schema().fields().len();
617611

618612
let new_execution_plan =
619613
Arc::new(execution_plan.with_projection(Some(projection_index.to_vec()))?);
@@ -648,9 +642,16 @@ pub fn try_embed_projection<Exec: EmbeddedProjection + 'static>(
648642
Arc::clone(&new_execution_plan) as _,
649643
)?);
650644
if is_projection_removable(&new_projection) {
645+
// Residual is identity — embedding fully absorbed the projection.
651646
Ok(Some(new_execution_plan))
652-
} else {
647+
} else if columns_reduced {
648+
// Embedding reduced columns even though a residual is still needed
649+
// for renames or expressions — worth keeping.
653650
Ok(Some(new_projection))
651+
} else {
652+
// No columns eliminated and residual still needed — embedding just
653+
// adds an unnecessary column reorder inside the operator.
654+
Ok(None)
654655
}
655656
}
656657

@@ -1080,15 +1081,38 @@ fn try_unifying_projections(
10801081

10811082
/// Collect all column indices from the given projection expressions.
10821083
fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> {
1083-
// Collect indices and remove duplicates.
1084-
let mut indices = exprs
1085-
.iter()
1086-
.flat_map(|proj_expr| collect_columns(&proj_expr.expr))
1087-
.map(|x| x.index())
1088-
.collect::<std::collections::HashSet<_>>()
1089-
.into_iter()
1090-
.collect::<Vec<_>>();
1091-
indices.sort();
1084+
// Collect column indices in a deterministic order that preserves the
1085+
// projection's column ordering when possible. For simple Column
1086+
// expressions, we use the column index directly (preserving the
1087+
// projection's desired output order). For complex expressions with
1088+
// multiple column references, we sort indices for determinism since
1089+
// collect_columns returns a HashSet with non-deterministic iteration.
1090+
// This allows the embedded projection to match the desired output
1091+
// column order for simple column reorderings, avoiding a residual
1092+
// ProjectionExec.
1093+
let mut seen = std::collections::HashSet::new();
1094+
let mut indices = Vec::new();
1095+
for proj_expr in exprs {
1096+
if let Some(col) = proj_expr.expr.as_any().downcast_ref::<Column>() {
1097+
// Simple column reference: preserve projection order.
1098+
if seen.insert(col.index()) {
1099+
indices.push(col.index());
1100+
}
1101+
} else {
1102+
// Complex expression: collect all referenced columns in sorted
1103+
// order for determinism.
1104+
let mut expr_indices: Vec<usize> = collect_columns(&proj_expr.expr)
1105+
.into_iter()
1106+
.map(|c| c.index())
1107+
.collect();
1108+
expr_indices.sort();
1109+
for idx in expr_indices {
1110+
if seen.insert(idx) {
1111+
indices.push(idx);
1112+
}
1113+
}
1114+
}
1115+
}
10921116
indices
10931117
}
10941118

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8780,7 +8780,7 @@ ORDER BY g;
87808780

87818781
# Config reset
87828782

8783-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
8783+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
87848784
# reset it explicitly.
87858785
statement ok
87868786
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregate_repartition.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ physical_plan
131131

132132
# Config reset
133133

134-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
134+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
135135
# reset it explicitly.
136136
statement ok
137137
SET datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/aggregate_skip_partial.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ e true false NULL
220220
statement ok
221221
reset datafusion.execution.batch_size;
222222

223-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
223+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
224224
# reset it explicitly.
225225
statement ok
226226
set datafusion.execution.target_partitions = 4;
@@ -712,7 +712,7 @@ ORDER BY i;
712712
statement ok
713713
reset datafusion.execution.batch_size;
714714

715-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
715+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
716716
# reset it explicitly.
717717
statement ok
718718
set datafusion.execution.target_partitions = 4;
@@ -772,7 +772,7 @@ true false false false false true false NULL
772772
statement ok
773773
reset datafusion.execution.batch_size;
774774

775-
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
775+
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
776776
# reset it explicitly.
777777
statement ok
778778
set datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/clickbench.slt

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,9 @@ physical_plan
673673
01)ProjectionExec: expr=[SearchPhrase@0 as SearchPhrase]
674674
02)--SortPreservingMergeExec: [EventTime@1 ASC NULLS LAST], fetch=10
675675
03)----SortExec: TopK(fetch=10), expr=[EventTime@1 ASC NULLS LAST], preserve_partitioning=[true]
676-
04)------ProjectionExec: expr=[SearchPhrase@1 as SearchPhrase, EventTime@0 as EventTime]
677-
05)--------FilterExec: SearchPhrase@1 !=
678-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
679-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[EventTime, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != AND DynamicFilter [ empty ], pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()]
676+
04)------FilterExec: SearchPhrase@1 != , projection=[SearchPhrase@1, EventTime@0]
677+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
678+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[EventTime, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != AND DynamicFilter [ empty ], pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()]
680679

681680
query T
682681
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime" LIMIT 10;
@@ -717,10 +716,9 @@ physical_plan
717716
01)ProjectionExec: expr=[SearchPhrase@0 as SearchPhrase]
718717
02)--SortPreservingMergeExec: [EventTime@1 ASC NULLS LAST, SearchPhrase@0 ASC NULLS LAST], fetch=10
719718
03)----SortExec: TopK(fetch=10), expr=[EventTime@1 ASC NULLS LAST, SearchPhrase@0 ASC NULLS LAST], preserve_partitioning=[true]
720-
04)------ProjectionExec: expr=[SearchPhrase@1 as SearchPhrase, EventTime@0 as EventTime]
721-
05)--------FilterExec: SearchPhrase@1 !=
722-
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
723-
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[EventTime, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != AND DynamicFilter [ empty ], pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()]
719+
04)------FilterExec: SearchPhrase@1 != , projection=[SearchPhrase@1, EventTime@0]
720+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
721+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[EventTime, SearchPhrase], file_type=parquet, predicate=SearchPhrase@39 != AND DynamicFilter [ empty ], pruning_predicate=SearchPhrase_null_count@2 != row_count@3 AND (SearchPhrase_min@0 != OR != SearchPhrase_max@1), required_guarantees=[SearchPhrase not in ()]
724722

725723
query T
726724
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime", "SearchPhrase" LIMIT 10;
@@ -1110,8 +1108,8 @@ physical_plan
11101108
05)--------AggregateExec: mode=FinalPartitioned, gby=[URLHash@0 as URLHash, EventDate@1 as EventDate], aggr=[count(Int64(1))]
11111109
06)----------RepartitionExec: partitioning=Hash([URLHash@0, EventDate@1], 4), input_partitions=4
11121110
07)------------AggregateExec: mode=Partial, gby=[URLHash@0 as URLHash, EventDate@1 as EventDate], aggr=[count(Int64(1))]
1113-
08)--------------ProjectionExec: expr=[URLHash@1 as URLHash, CAST(CAST(EventDate@0 AS Int32) AS Date32) as EventDate]
1114-
09)----------------FilterExec: CounterID@1 = 62 AND CAST(CAST(EventDate@0 AS Int32) AS Date32) >= 2013-07-01 AND CAST(CAST(EventDate@0 AS Int32) AS Date32) <= 2013-07-31 AND IsRefresh@2 = 0 AND (TraficSourceID@3 = -1 OR TraficSourceID@3 = 6) AND RefererHash@4 = 3594120000172545465, projection=[EventDate@0, URLHash@5]
1111+
08)--------------ProjectionExec: expr=[URLHash@0 as URLHash, CAST(CAST(EventDate@1 AS Int32) AS Date32) as EventDate]
1112+
09)----------------FilterExec: CounterID@1 = 62 AND CAST(CAST(EventDate@0 AS Int32) AS Date32) >= 2013-07-01 AND CAST(CAST(EventDate@0 AS Int32) AS Date32) <= 2013-07-31 AND IsRefresh@2 = 0 AND (TraficSourceID@3 = -1 OR TraficSourceID@3 = 6) AND RefererHash@4 = 3594120000172545465, projection=[URLHash@5, EventDate@0]
11151113
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11161114
11)--------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[EventDate, CounterID, IsRefresh, TraficSourceID, RefererHash, URLHash], file_type=parquet, predicate=CounterID@6 = 62 AND CAST(CAST(EventDate@5 AS Int32) AS Date32) >= 2013-07-01 AND CAST(CAST(EventDate@5 AS Int32) AS Date32) <= 2013-07-31 AND IsRefresh@15 = 0 AND (TraficSourceID@37 = -1 OR TraficSourceID@37 = 6) AND RefererHash@102 = 3594120000172545465, pruning_predicate=CounterID_null_count@2 != row_count@3 AND CounterID_min@0 <= 62 AND 62 <= CounterID_max@1 AND EventDate_null_count@5 != row_count@3 AND CAST(CAST(EventDate_max@4 AS Int32) AS Date32) >= 2013-07-01 AND EventDate_null_count@5 != row_count@3 AND CAST(CAST(EventDate_min@6 AS Int32) AS Date32) <= 2013-07-31 AND IsRefresh_null_count@9 != row_count@3 AND IsRefresh_min@7 <= 0 AND 0 <= IsRefresh_max@8 AND (TraficSourceID_null_count@12 != row_count@3 AND TraficSourceID_min@10 <= -1 AND -1 <= TraficSourceID_max@11 OR TraficSourceID_null_count@12 != row_count@3 AND TraficSourceID_min@10 <= 6 AND 6 <= TraficSourceID_max@11) AND RefererHash_null_count@15 != row_count@3 AND RefererHash_min@13 <= 3594120000172545465 AND 3594120000172545465 <= RefererHash_max@14, required_guarantees=[CounterID in (62), IsRefresh in (0), RefererHash in (3594120000172545465), TraficSourceID in (-1, 6)]
11171115

datafusion/sqllogictest/test_files/date_bin_errors.slt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ select date_bin(interval '1637426858 months', to_timestamp_millis(1040292460), t
2323
----
2424
NULL
2525

26-
# Negative timestamp with month interval - should return NULL instead of panicking
26+
# Negative timestamp with month interval - should return NULL instead of panicking
2727
query P
2828
select date_bin(interval '1 month', to_timestamp_millis(-1040292460), timestamp '1984-01-07 00:00:00');
2929
----
@@ -57,4 +57,4 @@ select date_bin(
5757
timestamp '1984-01-07 00:00:00'
5858
) as b;
5959
----
60-
NULL
60+
NULL

0 commit comments

Comments
 (0)