Skip to content

Commit 8552245

Browse files
committed
Use SpawnedTask for eager hash join build reporting
1 parent 03f4947 commit 8552245

2 files changed

Lines changed: 43 additions & 16 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ use datafusion_common::{
7979
DataFusionError, JoinSide, JoinType, NullEquality, Result, assert_or_internal_err,
8080
internal_err, plan_err, project_schema,
8181
};
82+
use datafusion_common_runtime::SpawnedTask;
8283
use datafusion_execution::TaskContext;
8384
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
8485
use datafusion_expr::Accumulator;
@@ -95,6 +96,7 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
9596
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
9697
use futures::TryStreamExt;
9798
use parking_lot::Mutex;
99+
use tokio::sync::oneshot;
98100

99101
use super::partitioned_hash_eval::SeededRandomState;
100102

@@ -1374,25 +1376,42 @@ impl ExecutionPlan for HashJoinExec {
13741376
let build_accumulator = build_accumulator
13751377
.as_ref()
13761378
.map(|acc| (Arc::clone(acc), partition));
1379+
let random_state = self.random_state.random_state().clone();
1380+
let with_visited_indices_bitmap =
1381+
need_produce_result_in_final(self.join_type);
1382+
let config = Arc::clone(context.session_config().options());
1383+
let null_equality = self.null_equality;
1384+
let background_join_metrics = join_metrics.clone();
13771385

13781386
if build_accumulator.is_some() {
1379-
let task = tokio::task::spawn(collect_left_input_and_maybe_report(
1380-
self.random_state.random_state().clone(),
1381-
left_stream,
1382-
on_left.clone(),
1383-
join_metrics.clone(),
1384-
reservation,
1385-
need_produce_result_in_final(self.join_type),
1386-
1,
1387-
enable_dynamic_filter_pushdown,
1388-
Arc::clone(context.session_config().options()),
1389-
self.null_equality,
1390-
array_map_created_count,
1391-
build_accumulator,
1392-
));
1387+
let (tx, rx) = oneshot::channel();
1388+
let (build_accumulator, partition) =
1389+
build_accumulator.expect("checked is_some above");
1390+
let background_build_accumulator = Arc::clone(&build_accumulator);
1391+
let task = SpawnedTask::spawn(async move {
1392+
let result = collect_left_input_and_maybe_report(
1393+
random_state,
1394+
left_stream,
1395+
on_left.clone(),
1396+
background_join_metrics,
1397+
reservation,
1398+
with_visited_indices_bitmap,
1399+
1,
1400+
enable_dynamic_filter_pushdown,
1401+
config,
1402+
null_equality,
1403+
array_map_created_count,
1404+
Some((background_build_accumulator, partition)),
1405+
)
1406+
.await;
1407+
let _ = tx.send(result);
1408+
});
1409+
build_accumulator.register_background_task(task);
13931410
OnceFut::new(async move {
1394-
task.await.map_err(|err| {
1395-
DataFusionError::ExecutionJoin(Box::new(err))
1411+
rx.await.map_err(|err| {
1412+
DataFusionError::Execution(format!(
1413+
"hash join build task ended before sending its result: {err}"
1414+
))
13961415
})?
13971416
})
13981417
} else {

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use arrow::array::ArrayRef;
3434
use arrow::datatypes::{DataType, Field, Schema};
3535
use datafusion_common::config::ConfigOptions;
3636
use datafusion_common::{Result, ScalarValue};
37+
use datafusion_common_runtime::SpawnedTask;
3738
use datafusion_expr::Operator;
3839
use datafusion_functions::core::r#struct as struct_func;
3940
use datafusion_physical_expr::expressions::{
@@ -216,6 +217,8 @@ fn create_bounds_predicate(
216217
pub(crate) struct SharedBuildAccumulator {
217218
/// Build-side data protected by a single mutex to avoid ordering concerns
218219
inner: Mutex<AccumulatedBuildData>,
220+
/// Keeps background build/report tasks alive until all partitions have reported.
221+
background_tasks: Mutex<Vec<SpawnedTask<()>>>,
219222
barrier: Barrier,
220223
/// Dynamic filter for pushdown to probe side
221224
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
@@ -337,6 +340,7 @@ impl SharedBuildAccumulator {
337340

338341
Self {
339342
inner: Mutex::new(mode_data),
343+
background_tasks: Mutex::new(vec![]),
340344
barrier: Barrier::new(expected_calls),
341345
dynamic_filter,
342346
on_right,
@@ -345,6 +349,10 @@ impl SharedBuildAccumulator {
345349
}
346350
}
347351

352+
pub(crate) fn register_background_task(&self, task: SpawnedTask<()>) {
353+
self.background_tasks.lock().push(task);
354+
}
355+
348356
/// Report build-side data from a partition
349357
///
350358
/// This unified method handles both CollectLeft and Partitioned modes. When all partitions

0 commit comments

Comments
 (0)