@@ -35,8 +35,7 @@ use crate::joins::Map;
3535use crate :: joins:: array_map:: ArrayMap ;
3636use crate :: joins:: hash_join:: inlist_builder:: build_struct_inlist_values;
3737use crate :: joins:: hash_join:: shared_bounds:: {
38- ColumnBounds , PartitionBounds , PartitionBuildData , PushdownStrategy ,
39- SharedBuildAccumulator ,
38+ ColumnBounds , PartitionBounds , PushdownStrategy , SharedBuildAccumulator ,
4039} ;
4140use crate :: joins:: hash_join:: stream:: {
4241 BuildSide , BuildSideInitialState , HashJoinStream , HashJoinStreamState ,
@@ -76,10 +75,9 @@ use datafusion_common::config::ConfigOptions;
7675use datafusion_common:: tree_node:: TreeNodeRecursion ;
7776use datafusion_common:: utils:: memory:: estimate_memory_size;
7877use datafusion_common:: {
79- DataFusionError , JoinSide , JoinType , NullEquality , Result , assert_or_internal_err,
80- internal_err , plan_err, project_schema,
78+ JoinSide , JoinType , NullEquality , Result , assert_or_internal_err, internal_err ,
79+ plan_err, project_schema,
8180} ;
82- use datafusion_common_runtime:: SpawnedTask ;
8381use datafusion_execution:: TaskContext ;
8482use datafusion_execution:: memory_pool:: { MemoryConsumer , MemoryReservation } ;
8583use datafusion_expr:: Accumulator ;
@@ -96,7 +94,6 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
9694use datafusion_physical_expr_common:: utils:: evaluate_expressions_to_arrays;
9795use futures:: TryStreamExt ;
9896use parking_lot:: Mutex ;
99- use tokio:: sync:: oneshot;
10097
10198use super :: partitioned_hash_eval:: SeededRandomState ;
10299
@@ -1373,60 +1370,19 @@ impl ExecutionPlan for HashJoinExec {
13731370 let reservation =
13741371 MemoryConsumer :: new ( format ! ( "HashJoinInput[{partition}]" ) )
13751372 . register ( context. memory_pool ( ) ) ;
1376- let build_accumulator = build_accumulator
1377- . as_ref ( )
1378- . map ( |acc| ( Arc :: clone ( acc) , partition) ) ;
1379- let random_state = self . random_state . random_state ( ) . clone ( ) ;
1380- let with_visited_indices_bitmap =
1381- need_produce_result_in_final ( self . join_type ) ;
1382- let config = Arc :: clone ( context. session_config ( ) . options ( ) ) ;
1383- let null_equality = self . null_equality ;
1384- let background_join_metrics = join_metrics. clone ( ) ;
1385-
1386- if let Some ( ( build_accumulator, partition) ) = build_accumulator {
1387- let ( tx, rx) = oneshot:: channel ( ) ;
1388- let background_build_accumulator = Arc :: clone ( & build_accumulator) ;
1389- let task = SpawnedTask :: spawn ( async move {
1390- let result = collect_left_input_and_maybe_report (
1391- random_state,
1392- left_stream,
1393- on_left. clone ( ) ,
1394- background_join_metrics,
1395- reservation,
1396- with_visited_indices_bitmap,
1397- 1 ,
1398- enable_dynamic_filter_pushdown,
1399- config,
1400- null_equality,
1401- array_map_created_count,
1402- Some ( ( background_build_accumulator, partition) ) ,
1403- )
1404- . await ;
1405- let _ = tx. send ( result) ;
1406- } ) ;
1407- build_accumulator. register_background_task ( task) ;
1408- OnceFut :: new ( async move {
1409- rx. await . map_err ( |err| {
1410- DataFusionError :: Execution ( format ! (
1411- "hash join build task ended before sending its result: {err}"
1412- ) )
1413- } ) ?
1414- } )
1415- } else {
1416- OnceFut :: new ( collect_left_input (
1417- self . random_state . random_state ( ) . clone ( ) ,
1418- left_stream,
1419- on_left. clone ( ) ,
1420- join_metrics. clone ( ) ,
1421- reservation,
1422- need_produce_result_in_final ( self . join_type ) ,
1423- 1 ,
1424- enable_dynamic_filter_pushdown,
1425- Arc :: clone ( context. session_config ( ) . options ( ) ) ,
1426- self . null_equality ,
1427- array_map_created_count,
1428- ) )
1429- }
1373+ OnceFut :: new ( collect_left_input (
1374+ self . random_state . random_state ( ) . clone ( ) ,
1375+ left_stream,
1376+ on_left. clone ( ) ,
1377+ join_metrics. clone ( ) ,
1378+ reservation,
1379+ need_produce_result_in_final ( self . join_type ) ,
1380+ 1 ,
1381+ enable_dynamic_filter_pushdown,
1382+ Arc :: clone ( context. session_config ( ) . options ( ) ) ,
1383+ self . null_equality ,
1384+ array_map_created_count,
1385+ ) )
14301386 }
14311387 PartitionMode :: Auto => {
14321388 return plan_err ! (
@@ -1457,8 +1413,7 @@ impl ExecutionPlan for HashJoinExec {
14571413 . map ( |( _, right_expr) | Arc :: clone ( right_expr) )
14581414 . collect :: < Vec < _ > > ( ) ;
14591415 let stream_build_accumulator = match self . mode {
1460- PartitionMode :: Partitioned => None ,
1461- PartitionMode :: CollectLeft => build_accumulator,
1416+ PartitionMode :: Partitioned | PartitionMode :: CollectLeft => build_accumulator,
14621417 PartitionMode :: Auto => unreachable ! (
14631418 "PartitionMode::Auto should not be present at execution time"
14641419 ) ,
@@ -2135,51 +2090,6 @@ async fn collect_left_input(
21352090 Ok ( data)
21362091}
21372092
2138- #[ expect( clippy:: too_many_arguments) ]
2139- async fn collect_left_input_and_maybe_report (
2140- random_state : RandomState ,
2141- left_stream : SendableRecordBatchStream ,
2142- on_left : Vec < PhysicalExprRef > ,
2143- metrics : BuildProbeJoinMetrics ,
2144- reservation : MemoryReservation ,
2145- with_visited_indices_bitmap : bool ,
2146- probe_threads_count : usize ,
2147- should_compute_dynamic_filters : bool ,
2148- config : Arc < ConfigOptions > ,
2149- null_equality : NullEquality ,
2150- array_map_created_count : Count ,
2151- build_accumulator : Option < ( Arc < SharedBuildAccumulator > , usize ) > ,
2152- ) -> Result < JoinLeftData > {
2153- let left_data = collect_left_input (
2154- random_state,
2155- left_stream,
2156- on_left,
2157- metrics,
2158- reservation,
2159- with_visited_indices_bitmap,
2160- probe_threads_count,
2161- should_compute_dynamic_filters,
2162- config,
2163- null_equality,
2164- array_map_created_count,
2165- )
2166- . await ?;
2167-
2168- if let Some ( ( build_accumulator, partition_id) ) = build_accumulator {
2169- let build_data = PartitionBuildData :: Partitioned {
2170- partition_id,
2171- pushdown : left_data. membership ( ) . clone ( ) ,
2172- bounds : left_data
2173- . bounds
2174- . clone ( )
2175- . unwrap_or_else ( || PartitionBounds :: new ( vec ! [ ] ) ) ,
2176- } ;
2177- build_accumulator. report_build_data ( build_data) . await ?;
2178- }
2179-
2180- Ok ( left_data)
2181- }
2182-
21832093#[ cfg( test) ]
21842094mod tests {
21852095 use super :: * ;
0 commit comments