@@ -3422,6 +3422,176 @@ fn test_pushdown_through_aggregate_grouping_sets_with_reordered_input() {
34223422 ) ;
34233423}
34243424
3425+ #[ tokio:: test]
3426+ async fn test_hashjoin_dynamic_filter_pushdown_through_aggregate_with_reordered_input ( ) {
3427+ // Reproduces the real-world scenario where a HashJoin's dynamic filter must
3428+ // push through an AggregateExec whose input columns are reordered by a
3429+ // ProjectionExec. The plan looks like:
3430+ //
3431+ // HashJoinExec: on=[(bhandle@0, bhandle@0)] ← creates DynFilter
3432+ // DataSourceExec (build side)
3433+ // AggregateExec: gby=[bhandle@1], aggr=[min(value)] ← must pass filter
3434+ // ProjectionExec: [value@1, bhandle@0] ← reorders columns
3435+ // DataSourceExec (probe side) ← should receive filter
3436+ //
3437+ // Without the fix, the aggregate blocks the dynamic filter because
3438+ // grouping expr Column("bhandle", 1) != filter Column("bhandle", 0).
3439+
3440+ use datafusion_common:: JoinType ;
3441+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
3442+
3443+ // Build side: simple lookup table with bhandle values
3444+ let build_batches = vec ! [ record_batch!(
3445+ ( "bhandle" , Utf8 , [ "h1" , "h2" ] )
3446+ )
3447+ . unwrap( ) ] ;
3448+ let build_schema = Arc :: new ( Schema :: new ( vec ! [
3449+ Field :: new( "bhandle" , DataType :: Utf8 , false ) ,
3450+ ] ) ) ;
3451+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_schema) )
3452+ . with_support ( true )
3453+ . with_batches ( build_batches)
3454+ . build ( ) ;
3455+
3456+ // Probe side: scan with (bhandle, value)
3457+ let probe_batches = vec ! [ record_batch!(
3458+ ( "bhandle" , Utf8 , [ "h1" , "h2" , "h3" , "h4" ] ) ,
3459+ ( "value" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
3460+ )
3461+ . unwrap( ) ] ;
3462+ let probe_schema = Arc :: new ( Schema :: new ( vec ! [
3463+ Field :: new( "bhandle" , DataType :: Utf8 , false ) ,
3464+ Field :: new( "value" , DataType :: Float64 , false ) ,
3465+ ] ) ) ;
3466+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_schema) )
3467+ . with_support ( true )
3468+ . with_batches ( probe_batches)
3469+ . build ( ) ;
3470+
3471+ // ProjectionExec reorders (bhandle, value) → (value, bhandle)
3472+ let reordered_schema = Arc :: new ( Schema :: new ( vec ! [
3473+ Field :: new( "value" , DataType :: Float64 , false ) ,
3474+ Field :: new( "bhandle" , DataType :: Utf8 , false ) ,
3475+ ] ) ) ;
3476+ let projection = Arc :: new (
3477+ ProjectionExec :: try_new (
3478+ vec ! [
3479+ ( col( "value" , & probe_schema) . unwrap( ) , "value" . to_string( ) ) ,
3480+ (
3481+ col( "bhandle" , & probe_schema) . unwrap( ) ,
3482+ "bhandle" . to_string( ) ,
3483+ ) ,
3484+ ] ,
3485+ probe_scan,
3486+ )
3487+ . unwrap ( ) ,
3488+ ) ;
3489+
3490+ // AggregateExec: GROUP BY bhandle@1, min(value@0)
3491+ let aggregate_expr = vec ! [
3492+ AggregateExprBuilder :: new(
3493+ min_udaf( ) ,
3494+ vec![ col( "value" , & reordered_schema) . unwrap( ) ] ,
3495+ )
3496+ . schema( reordered_schema. clone( ) )
3497+ . alias( "min_value" )
3498+ . build( )
3499+ . map( Arc :: new)
3500+ . unwrap( ) ,
3501+ ] ;
3502+ let group_by = PhysicalGroupBy :: new_single ( vec ! [ (
3503+ col( "bhandle" , & reordered_schema) . unwrap( ) , // bhandle@1 in input
3504+ "bhandle" . to_string( ) ,
3505+ ) ] ) ;
3506+
3507+ let aggregate = Arc :: new (
3508+ AggregateExec :: try_new (
3509+ AggregateMode :: Single ,
3510+ group_by,
3511+ aggregate_expr,
3512+ vec ! [ None ] ,
3513+ projection,
3514+ reordered_schema,
3515+ )
3516+ . unwrap ( ) ,
3517+ ) ;
3518+ // Aggregate output schema: (bhandle@0, min_value@1)
3519+
3520+ // HashJoinExec: CollectLeft, join on bhandle
3521+ let agg_output_schema = aggregate. schema ( ) ;
3522+ let plan = Arc :: new (
3523+ HashJoinExec :: try_new (
3524+ build_scan,
3525+ aggregate,
3526+ vec ! [ (
3527+ col( "bhandle" , & build_schema) . unwrap( ) ,
3528+ col( "bhandle" , & agg_output_schema) . unwrap( ) ,
3529+ ) ] ,
3530+ None ,
3531+ & JoinType :: Inner ,
3532+ None ,
3533+ PartitionMode :: CollectLeft ,
3534+ datafusion_common:: NullEquality :: NullEqualsNothing ,
3535+ false ,
3536+ )
3537+ . unwrap ( ) ,
3538+ ) as Arc < dyn ExecutionPlan > ;
3539+
3540+ // Post-optimization: the HashJoin's dynamic filter on bhandle should push
3541+ // through the aggregate and reach the probe-side DataSource.
3542+ insta:: assert_snapshot!(
3543+ OptimizationTest :: new( Arc :: clone( & plan) , FilterPushdown :: new_post_optimization( ) , true ) ,
3544+ @r"
3545+ OptimizationTest:
3546+ input:
3547+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(bhandle@0, bhandle@0)]
3548+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[bhandle], file_type=test, pushdown_supported=true
3549+ - AggregateExec: mode=Single, gby=[bhandle@1 as bhandle], aggr=[min_value]
3550+ - ProjectionExec: expr=[value@1 as value, bhandle@0 as bhandle]
3551+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[bhandle, value], file_type=test, pushdown_supported=true
3552+ output:
3553+ Ok:
3554+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(bhandle@0, bhandle@0)]
3555+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[bhandle], file_type=test, pushdown_supported=true
3556+ - AggregateExec: mode=Single, gby=[bhandle@1 as bhandle], aggr=[min_value]
3557+ - ProjectionExec: expr=[value@1 as value, bhandle@0 as bhandle]
3558+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[bhandle, value], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
3559+ "
3560+ ) ;
3561+
3562+ // Actually execute the plan to verify the dynamic filter is populated
3563+ let mut config = ConfigOptions :: default ( ) ;
3564+ config. execution . parquet . pushdown_filters = true ;
3565+ config. optimizer . enable_dynamic_filter_pushdown = true ;
3566+ config. optimizer . enable_aggregate_dynamic_filter_pushdown = true ;
3567+ let plan = FilterPushdown :: new_post_optimization ( )
3568+ . optimize ( plan, & config)
3569+ . unwrap ( ) ;
3570+
3571+ let session_config = SessionConfig :: new ( ) . with_batch_size ( 10 ) ;
3572+ let session_ctx = SessionContext :: new_with_config ( session_config) ;
3573+ session_ctx. register_object_store (
3574+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
3575+ Arc :: new ( InMemory :: new ( ) ) ,
3576+ ) ;
3577+ let state = session_ctx. state ( ) ;
3578+ let task_ctx = state. task_ctx ( ) ;
3579+ let mut stream = plan. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
3580+ stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
3581+
3582+ // After execution, the dynamic filter should be populated with bhandle values
3583+ insta:: assert_snapshot!(
3584+ format!( "{}" , format_plan_for_test( & plan) ) ,
3585+ @r"
3586+ - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(bhandle@0, bhandle@0)]
3587+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[bhandle], file_type=test, pushdown_supported=true
3588+ - AggregateExec: mode=Single, gby=[bhandle@1 as bhandle], aggr=[min_value]
3589+ - ProjectionExec: expr=[value@1 as value, bhandle@0 as bhandle]
3590+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[bhandle, value], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ bhandle@0 >= h1 AND bhandle@0 <= h2 AND bhandle@0 IN (SET) ([h1, h2]) ]
3591+ "
3592+ ) ;
3593+ }
3594+
34253595#[ test]
34263596fn test_pushdown_with_computed_grouping_key ( ) {
34273597 // Test filter pushdown with computed grouping expression
0 commit comments