Skip to content

Commit 70bb8b4

Browse files
committed
Handle canceled partitioned hash join filters lazily
1 parent d17d5e4 commit 70bb8b4

3 files changed

Lines changed: 301 additions & 324 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 17 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ use crate::joins::Map;
3535
use crate::joins::array_map::ArrayMap;
3636
use crate::joins::hash_join::inlist_builder::build_struct_inlist_values;
3737
use crate::joins::hash_join::shared_bounds::{
38-
ColumnBounds, PartitionBounds, PartitionBuildData, PushdownStrategy,
39-
SharedBuildAccumulator,
38+
ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator,
4039
};
4140
use crate::joins::hash_join::stream::{
4241
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
@@ -76,10 +75,9 @@ use datafusion_common::config::ConfigOptions;
7675
use datafusion_common::tree_node::TreeNodeRecursion;
7776
use datafusion_common::utils::memory::estimate_memory_size;
7877
use datafusion_common::{
79-
DataFusionError, JoinSide, JoinType, NullEquality, Result, assert_or_internal_err,
80-
internal_err, plan_err, project_schema,
78+
JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err,
79+
plan_err, project_schema,
8180
};
82-
use datafusion_common_runtime::SpawnedTask;
8381
use datafusion_execution::TaskContext;
8482
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
8583
use datafusion_expr::Accumulator;
@@ -96,7 +94,6 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
9694
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
9795
use futures::TryStreamExt;
9896
use parking_lot::Mutex;
99-
use tokio::sync::oneshot;
10097

10198
use super::partitioned_hash_eval::SeededRandomState;
10299

@@ -1373,60 +1370,19 @@ impl ExecutionPlan for HashJoinExec {
13731370
let reservation =
13741371
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
13751372
.register(context.memory_pool());
1376-
let build_accumulator = build_accumulator
1377-
.as_ref()
1378-
.map(|acc| (Arc::clone(acc), partition));
1379-
let random_state = self.random_state.random_state().clone();
1380-
let with_visited_indices_bitmap =
1381-
need_produce_result_in_final(self.join_type);
1382-
let config = Arc::clone(context.session_config().options());
1383-
let null_equality = self.null_equality;
1384-
let background_join_metrics = join_metrics.clone();
1385-
1386-
if let Some((build_accumulator, partition)) = build_accumulator {
1387-
let (tx, rx) = oneshot::channel();
1388-
let background_build_accumulator = Arc::clone(&build_accumulator);
1389-
let task = SpawnedTask::spawn(async move {
1390-
let result = collect_left_input_and_maybe_report(
1391-
random_state,
1392-
left_stream,
1393-
on_left.clone(),
1394-
background_join_metrics,
1395-
reservation,
1396-
with_visited_indices_bitmap,
1397-
1,
1398-
enable_dynamic_filter_pushdown,
1399-
config,
1400-
null_equality,
1401-
array_map_created_count,
1402-
Some((background_build_accumulator, partition)),
1403-
)
1404-
.await;
1405-
let _ = tx.send(result);
1406-
});
1407-
build_accumulator.register_background_task(task);
1408-
OnceFut::new(async move {
1409-
rx.await.map_err(|err| {
1410-
DataFusionError::Execution(format!(
1411-
"hash join build task ended before sending its result: {err}"
1412-
))
1413-
})?
1414-
})
1415-
} else {
1416-
OnceFut::new(collect_left_input(
1417-
self.random_state.random_state().clone(),
1418-
left_stream,
1419-
on_left.clone(),
1420-
join_metrics.clone(),
1421-
reservation,
1422-
need_produce_result_in_final(self.join_type),
1423-
1,
1424-
enable_dynamic_filter_pushdown,
1425-
Arc::clone(context.session_config().options()),
1426-
self.null_equality,
1427-
array_map_created_count,
1428-
))
1429-
}
1373+
OnceFut::new(collect_left_input(
1374+
self.random_state.random_state().clone(),
1375+
left_stream,
1376+
on_left.clone(),
1377+
join_metrics.clone(),
1378+
reservation,
1379+
need_produce_result_in_final(self.join_type),
1380+
1,
1381+
enable_dynamic_filter_pushdown,
1382+
Arc::clone(context.session_config().options()),
1383+
self.null_equality,
1384+
array_map_created_count,
1385+
))
14301386
}
14311387
PartitionMode::Auto => {
14321388
return plan_err!(
@@ -1457,8 +1413,7 @@ impl ExecutionPlan for HashJoinExec {
14571413
.map(|(_, right_expr)| Arc::clone(right_expr))
14581414
.collect::<Vec<_>>();
14591415
let stream_build_accumulator = match self.mode {
1460-
PartitionMode::Partitioned => None,
1461-
PartitionMode::CollectLeft => build_accumulator,
1416+
PartitionMode::Partitioned | PartitionMode::CollectLeft => build_accumulator,
14621417
PartitionMode::Auto => unreachable!(
14631418
"PartitionMode::Auto should not be present at execution time"
14641419
),
@@ -2141,51 +2096,6 @@ async fn collect_left_input(
21412096
Ok(data)
21422097
}
21432098

2144-
#[expect(clippy::too_many_arguments)]
2145-
async fn collect_left_input_and_maybe_report(
2146-
random_state: RandomState,
2147-
left_stream: SendableRecordBatchStream,
2148-
on_left: Vec<PhysicalExprRef>,
2149-
metrics: BuildProbeJoinMetrics,
2150-
reservation: MemoryReservation,
2151-
with_visited_indices_bitmap: bool,
2152-
probe_threads_count: usize,
2153-
should_compute_dynamic_filters: bool,
2154-
config: Arc<ConfigOptions>,
2155-
null_equality: NullEquality,
2156-
array_map_created_count: Count,
2157-
build_accumulator: Option<(Arc<SharedBuildAccumulator>, usize)>,
2158-
) -> Result<JoinLeftData> {
2159-
let left_data = collect_left_input(
2160-
random_state,
2161-
left_stream,
2162-
on_left,
2163-
metrics,
2164-
reservation,
2165-
with_visited_indices_bitmap,
2166-
probe_threads_count,
2167-
should_compute_dynamic_filters,
2168-
config,
2169-
null_equality,
2170-
array_map_created_count,
2171-
)
2172-
.await?;
2173-
2174-
if let Some((build_accumulator, partition_id)) = build_accumulator {
2175-
let build_data = PartitionBuildData::Partitioned {
2176-
partition_id,
2177-
pushdown: left_data.membership().clone(),
2178-
bounds: left_data
2179-
.bounds
2180-
.clone()
2181-
.unwrap_or_else(|| PartitionBounds::new(vec![])),
2182-
};
2183-
build_accumulator.report_build_data(build_data).await?;
2184-
}
2185-
2186-
Ok(left_data)
2187-
}
2188-
21892099
#[cfg(test)]
21902100
mod tests {
21912101
use super::*;

0 commit comments

Comments
 (0)