Skip to content

Commit c875ef9

Browse files
repro and disable dyn filter for preserve file partitions (#20175)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20176 - Discussed this with @adriangb ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Dynamic filter pushdown can produce incorrect results when `preserve_file_partitions` is enabled and a partitioned hash join is used. The file groups are Hive‑partitioned (value‑based) but reported as hash‑partitioned, so hash‑routed dynamic filters can drop valid rows. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Disable join dynamic filter pushdown for `PartitionMode::Partitioned` when `preserve_file_partitions > 0`. - Add two sqllogictests that reproduce the issue and validate the fix. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> - `cargo test --test sqllogictests -- preserve_file_partitioning` -> will pass - `git checkout main ‎datafusion/physical-plan/src/joins/hash_join/exec.rs` - `cargo test --test sqllogictests -- preserve_file_partitioning` -> will fail ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Join dynamic filter pushdown is disabled when `preserve_file_partitions` is enabled and the join is partitioned. <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> cc: @NGA-TRAN @gabotechs
1 parent 48d20ad commit c875ef9

2 files changed

Lines changed: 151 additions & 6 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,26 @@ impl HashJoinExec {
719719
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
720720
}
721721

722+
fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
723+
if !config.optimizer.enable_join_dynamic_filter_pushdown {
724+
return false;
725+
}
726+
727+
// `preserve_file_partitions` can report Hash partitioning for Hive-style
728+
// file groups, but those partitions are not actually hash-distributed.
729+
// Partitioned dynamic filters rely on hash routing, so disable them in
730+
// this mode to avoid incorrect results. Follow-up work: enable dynamic
731+
// filtering for preserve_file_partitioned scans (issue #20195).
732+
// https://github.com/apache/datafusion/issues/20195
733+
if config.optimizer.preserve_file_partitions > 0
734+
&& self.mode == PartitionMode::Partitioned
735+
{
736+
return false;
737+
}
738+
739+
true
740+
}
741+
722742
/// left (build) side which gets hashed
723743
pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
724744
&self.left
@@ -1179,11 +1199,8 @@ impl ExecutionPlan for HashJoinExec {
11791199
// - A dynamic filter exists
11801200
// - At least one consumer is holding a reference to it, this avoids expensive filter
11811201
// computation when disabled or when no consumer will use it.
1182-
let enable_dynamic_filter_pushdown = context
1183-
.session_config()
1184-
.options()
1185-
.optimizer
1186-
.enable_join_dynamic_filter_pushdown
1202+
let enable_dynamic_filter_pushdown = self
1203+
.allow_join_dynamic_filter_pushdown(context.session_config().options())
11871204
&& self
11881205
.dynamic_filter
11891206
.as_ref()
@@ -1413,7 +1430,7 @@ impl ExecutionPlan for HashJoinExec {
14131430

14141431
// Add dynamic filters in Post phase if enabled
14151432
if matches!(phase, FilterPushdownPhase::Post)
1416-
&& config.optimizer.enable_join_dynamic_filter_pushdown
1433+
&& self.allow_join_dynamic_filter_pushdown(config)
14171434
{
14181435
// Add actual dynamic filter to right side (probe side)
14191436
let dynamic_filter = Self::create_dynamic_filter(&self.on);

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,29 @@ STORED AS PARQUET;
101101
----
102102
4
103103

104+
# Create hive-partitioned dimension table (3 partitions matching fact_table)
105+
# For testing Partitioned joins with matching partition counts
106+
query I
107+
COPY (SELECT 'dev' as env, 'log' as service)
108+
TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet'
109+
STORED AS PARQUET;
110+
----
111+
1
112+
113+
query I
114+
COPY (SELECT 'prod' as env, 'log' as service)
115+
TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet'
116+
STORED AS PARQUET;
117+
----
118+
1
119+
120+
query I
121+
COPY (SELECT 'prod' as env, 'log' as service)
122+
TO 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet'
123+
STORED AS PARQUET;
124+
----
125+
1
126+
104127
# Create high-cardinality fact table (5 partitions > 3 target_partitions)
105128
# For testing partition merging with consistent hashing
106129
query I
@@ -173,6 +196,13 @@ CREATE EXTERNAL TABLE dimension_table (d_dkey STRING, env STRING, service STRING
173196
STORED AS PARQUET
174197
LOCATION 'test_files/scratch/preserve_file_partitioning/dimension/';
175198

199+
# Hive-partitioned dimension table (3 partitions matching fact_table for Partitioned join tests)
200+
statement ok
201+
CREATE EXTERNAL TABLE dimension_table_partitioned (env STRING, service STRING)
202+
STORED AS PARQUET
203+
PARTITIONED BY (d_dkey STRING)
204+
LOCATION 'test_files/scratch/preserve_file_partitioning/dimension_partitioned/';
205+
176206
# 'High'-cardinality fact table (5 partitions > 3 target_partitions)
177207
statement ok
178208
CREATE EXTERNAL TABLE high_cardinality_table (timestamp TIMESTAMP, value DOUBLE)
@@ -579,6 +609,101 @@ C 1 300
579609
D 1 400
580610
E 1 500
581611

612+
##########
613+
# TEST 11: Partitioned Join with Matching Partition Counts - Without Optimization
614+
# fact_table (3 partitions) joins dimension_table_partitioned (3 partitions)
615+
# Shows RepartitionExec added when preserve_file_partitions is disabled
616+
##########
617+
618+
statement ok
619+
set datafusion.optimizer.preserve_file_partitions = 0;
620+
621+
# Force Partitioned join mode (not CollectLeft)
622+
statement ok
623+
set datafusion.optimizer.hash_join_single_partition_threshold = 0;
624+
625+
statement ok
626+
set datafusion.optimizer.hash_join_single_partition_threshold_rows = 0;
627+
628+
query TT
629+
EXPLAIN SELECT f.f_dkey, d.env, sum(f.value)
630+
FROM fact_table f
631+
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
632+
GROUP BY f.f_dkey, d.env;
633+
----
634+
logical_plan
635+
01)Aggregate: groupBy=[[f.f_dkey, d.env]], aggr=[[sum(f.value)]]
636+
02)--Projection: f.value, f.f_dkey, d.env
637+
03)----Inner Join: f.f_dkey = d.d_dkey
638+
04)------SubqueryAlias: f
639+
05)--------TableScan: fact_table projection=[value, f_dkey]
640+
06)------SubqueryAlias: d
641+
07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey]
642+
physical_plan
643+
01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)]
644+
02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3
645+
03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)]
646+
04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env]
647+
05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3]
648+
06)----------RepartitionExec: partitioning=Hash([d_dkey@1], 3), input_partitions=3
649+
07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
650+
08)----------RepartitionExec: partitioning=Hash([f_dkey@1], 3), input_partitions=3
651+
09)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]
652+
653+
query TTR rowsort
654+
SELECT f.f_dkey, d.env, sum(f.value)
655+
FROM fact_table f
656+
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
657+
GROUP BY f.f_dkey, d.env;
658+
----
659+
A dev 772.4
660+
B prod 614.4
661+
C prod 2017.6
662+
663+
##########
664+
# TEST 12: Partitioned Join with Matching Partition Counts - With Optimization
665+
# Both tables have 3 partitions matching target_partitions=3
666+
# No RepartitionExec needed for join - partitions already satisfy the requirement
667+
# Dynamic filter pushdown is disabled in this mode because preserve_file_partitions
668+
# reports Hash partitioning for Hive-style file groups, which are not hash-routed.
669+
##########
670+
671+
statement ok
672+
set datafusion.optimizer.preserve_file_partitions = 1;
673+
674+
query TT
675+
EXPLAIN SELECT f.f_dkey, d.env, sum(f.value)
676+
FROM fact_table f
677+
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
678+
GROUP BY f.f_dkey, d.env;
679+
----
680+
logical_plan
681+
01)Aggregate: groupBy=[[f.f_dkey, d.env]], aggr=[[sum(f.value)]]
682+
02)--Projection: f.value, f.f_dkey, d.env
683+
03)----Inner Join: f.f_dkey = d.d_dkey
684+
04)------SubqueryAlias: f
685+
05)--------TableScan: fact_table projection=[value, f_dkey]
686+
06)------SubqueryAlias: d
687+
07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey]
688+
physical_plan
689+
01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)]
690+
02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3
691+
03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)]
692+
04)------ProjectionExec: expr=[value@1 as value, f_dkey@2 as f_dkey, env@0 as env]
693+
05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[env@0, value@2, f_dkey@3]
694+
06)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
695+
07)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet
696+
697+
query TTR rowsort
698+
SELECT f.f_dkey, d.env, sum(f.value)
699+
FROM fact_table f
700+
INNER JOIN dimension_table_partitioned d ON f.f_dkey = d.d_dkey
701+
GROUP BY f.f_dkey, d.env;
702+
----
703+
A dev 772.4
704+
B prod 614.4
705+
C prod 2017.6
706+
582707
##########
583708
# CLEANUP
584709
##########
@@ -592,5 +717,8 @@ DROP TABLE fact_table_ordered;
592717
statement ok
593718
DROP TABLE dimension_table;
594719

720+
statement ok
721+
DROP TABLE dimension_table_partitioned;
722+
595723
statement ok
596724
DROP TABLE high_cardinality_table;

0 commit comments

Comments
 (0)