Skip to content

Commit 0143dfe

Browse files
physical_optimizer: preserve_file_partitions when num file groups < target_partitions (#21533)
## Rationale for this change `datafusion.optimizer.preserve_file_partitions` would not actually preserve the file partitions when the number of file groups is less than the target_partitions. This is unexpected behavior. If a user wants to preserve file partitions, it is because they want to avoid repartitions. Before ``` ProjectionExec AggregateExec: mode=FinalPartitioned, gby=[f_dkey, timestamp] RepartitionExec: partitioning=Hash([f_dkey, timestamp], 4), input_partitions=3 AggregateExec: mode=Partial, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` After ``` ProjectionExec AggregateExec: mode=SinglePartitioned, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` ## What changes are included in this PR? This change fixes that by updating 1 line in the `enforce_distribution` optimizer rule. ## Are these changes tested? Yes. In the `preserve_file_partitions` SLT. ## Are there any user-facing changes? No.
1 parent d3cedb2 commit 0143dfe

2 files changed

Lines changed: 61 additions & 1 deletion

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1328,8 +1328,14 @@ pub fn ensure_distribution(
13281328
})
13291329
);
13301330

1331-
let allow_subset_satisfy_partitioning = current_partitions
1331+
let allow_subset_satisfy_partitioning = (current_partitions
13321332
>= subset_satisfaction_threshold
1333+
// `preserve_file_partitions` exposes existing file-group
1334+
// partitioning to the optimizer. Respect it when the only
1335+
// reason to repartition would be to increase partition count
1336+
// beyond the preserved file-group count.
1337+
|| (config.optimizer.preserve_file_partitions > 0
1338+
&& current_partitions < target_partitions))
13331339
&& !is_partitioned_join
13341340
&& !requires_grouping_id;
13351341

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,60 @@ A dev 772.4
698698
B prod 614.4
699699
C prod 2017.6
700700

701+
##########
702+
# TEST 13: Partitioned Join where Number of File Groups is less than target_partitions
703+
# With preserve_file_partitions enabled, we should still avoid repartitioning
704+
##########
705+
706+
statement ok
707+
set datafusion.execution.target_partitions = 4;
708+
709+
statement ok
710+
set datafusion.optimizer.preserve_file_partitions = 1;
711+
712+
query TT
713+
EXPLAIN SELECT f_dkey, timestamp,
714+
COUNT(*), AVG(value)
715+
FROM fact_table
716+
GROUP BY f_dkey, timestamp;
717+
----
718+
logical_plan
719+
01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS count(*), avg(fact_table.value)
720+
02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)), avg(fact_table.value)]]
721+
03)----TableScan: fact_table projection=[timestamp, value, f_dkey]
722+
physical_plan
723+
01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)]
724+
02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)]
725+
03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet
726+
727+
query TPIR rowsort
728+
SELECT f_dkey, timestamp,
729+
COUNT(*), AVG(value)
730+
FROM fact_table
731+
GROUP BY f_dkey, timestamp;
732+
----
733+
A 2023-01-01T09:00:00 1 95.5
734+
A 2023-01-01T09:00:10 1 102.3
735+
A 2023-01-01T09:00:20 1 98.7
736+
A 2023-01-01T09:12:20 1 105.1
737+
A 2023-01-01T09:12:30 1 100
738+
A 2023-01-01T09:12:40 1 150
739+
A 2023-01-01T09:12:50 1 120.8
740+
B 2023-01-01T09:00:00 1 75.2
741+
B 2023-01-01T09:00:10 1 82.4
742+
B 2023-01-01T09:00:20 1 78.9
743+
B 2023-01-01T09:00:30 1 85.6
744+
B 2023-01-01T09:12:30 1 80
745+
B 2023-01-01T09:12:40 1 120
746+
B 2023-01-01T09:12:50 1 92.3
747+
C 2023-01-01T09:00:00 1 300.5
748+
C 2023-01-01T09:00:10 1 285.7
749+
C 2023-01-01T09:00:20 1 310.2
750+
C 2023-01-01T09:00:30 1 295.8
751+
C 2023-01-01T09:00:40 1 300
752+
C 2023-01-01T09:12:40 1 250
753+
C 2023-01-01T09:12:50 1 275.4
754+
701755
##########
702756
# CLEANUP
703757
##########

0 commit comments

Comments
 (0)