Skip to content

Commit c6c5afd

Browse files
committed
feat: BufferExec waits for hash join dynamic filters before buffering
Adds ProducerKind to DynamicFilterPhysicalExpr to distinguish between hash join (safe to wait), TopK, and Aggregate (would deadlock) producers. BufferExec now walks its child subtree at execution time, collects any HashJoin dynamic filters, and waits for them to complete before the background task begins polling the input stream. This allows scans below BufferExec to benefit from dynamic filters rather than being eagerly read before the filter is populated. A new session config option `hash_join_buffering_dynamic_filter_wait_ms` controls the wait behavior (0 = disabled, usize::MAX = wait indefinitely, any other value = timeout in ms). Defaults to usize::MAX. Closes #20778
1 parent dd8760d commit c6c5afd

12 files changed

Lines changed: 335 additions & 27 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,6 +684,17 @@ config_namespace! {
684684
///
685685
/// Disabled by default, set to a number greater than 0 for enabling it.
686686
pub hash_join_buffering_capacity: usize, default = 0
687+
688+
/// How long (in milliseconds) the probe-side buffer should wait for hash join dynamic
689+
/// filters to be fully populated before starting to buffer data.
690+
///
691+
/// When `hash_join_buffering_capacity` is enabled and dynamic filters are present below
692+
/// the buffer, waiting for the filter to complete allows the input scan to benefit from
693+
/// the filter and skip reading unnecessary data.
694+
///
695+
/// Set to 0 to disable waiting (buffer immediately, same as previous behavior).
696+
/// Set to `usize::MAX` to wait indefinitely until the filter is complete.
697+
pub hash_join_buffering_dynamic_filter_wait_ms: usize, default = usize::MAX
687698
}
688699
}
689700

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5146,3 +5146,182 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
51465146
"
51475147
);
51485148
}
5149+
5150+
/// Helper: build and optimize a HashJoin plan with dynamic filter pushdown + buffering enabled.
5151+
/// Returns the optimized plan ready for execution.
5152+
fn build_buffered_hash_join_plan(
5153+
build_scan: Arc<dyn ExecutionPlan>,
5154+
probe_scan: Arc<dyn ExecutionPlan>,
5155+
build_schema: &SchemaRef,
5156+
probe_schema: &SchemaRef,
5157+
join_type: JoinType,
5158+
config: &ConfigOptions,
5159+
) -> Arc<dyn ExecutionPlan> {
5160+
use datafusion_common::NullEquality;
5161+
use datafusion_physical_optimizer::hash_join_buffering::HashJoinBuffering;
5162+
5163+
let join = Arc::new(
5164+
HashJoinExec::try_new(
5165+
build_scan,
5166+
probe_scan,
5167+
vec![(
5168+
col("a", build_schema).unwrap(),
5169+
col("a", probe_schema).unwrap(),
5170+
)],
5171+
None,
5172+
&join_type,
5173+
None,
5174+
PartitionMode::CollectLeft,
5175+
NullEquality::NullEqualsNothing,
5176+
false,
5177+
)
5178+
.unwrap(),
5179+
) as Arc<dyn ExecutionPlan>;
5180+
5181+
// Apply dynamic filter pushdown so the probe scan receives the dynamic filter
5182+
let plan = FilterPushdown::new_post_optimization()
5183+
.optimize(join, config)
5184+
.unwrap();
5185+
5186+
// Apply buffering so BufferExec wraps the probe side
5187+
HashJoinBuffering::new().optimize(plan, config).unwrap()
5188+
}
5189+
5190+
/// Test that BufferExec correctly waits for a HashJoin dynamic filter before buffering,
5191+
/// and that the query produces correct results.
5192+
#[tokio::test]
5193+
async fn test_buffer_exec_waits_for_hash_join_dynamic_filter() {
5194+
let build_batches =
5195+
vec![record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Int32, [1, 2])).unwrap()];
5196+
let build_schema = Arc::new(Schema::new(vec![
5197+
Field::new("a", DataType::Utf8, false),
5198+
Field::new("b", DataType::Int32, false),
5199+
]));
5200+
let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
5201+
.with_support(true)
5202+
.with_batches(build_batches)
5203+
.build();
5204+
5205+
let probe_batches = vec![
5206+
record_batch!(
5207+
("a", Utf8, ["aa", "ab", "ac"]),
5208+
("c", Float64, [1.0, 2.0, 3.0])
5209+
)
5210+
.unwrap(),
5211+
];
5212+
let probe_schema = Arc::new(Schema::new(vec![
5213+
Field::new("a", DataType::Utf8, false),
5214+
Field::new("c", DataType::Float64, false),
5215+
]));
5216+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
5217+
.with_support(true)
5218+
.with_batches(probe_batches)
5219+
.build();
5220+
5221+
let mut config = ConfigOptions::default();
5222+
config.optimizer.enable_dynamic_filter_pushdown = true;
5223+
config.execution.hash_join_buffering_capacity = 1024 * 1024;
5224+
config.execution.hash_join_buffering_dynamic_filter_wait_ms = usize::MAX;
5225+
5226+
let plan = build_buffered_hash_join_plan(
5227+
build_scan,
5228+
probe_scan,
5229+
&build_schema,
5230+
&probe_schema,
5231+
JoinType::Inner,
5232+
&config,
5233+
);
5234+
5235+
let session_config = SessionConfig::from(config);
5236+
let session_ctx = SessionContext::new_with_config(session_config);
5237+
session_ctx.register_object_store(
5238+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
5239+
Arc::new(InMemory::new()),
5240+
);
5241+
let task_ctx = session_ctx.state().task_ctx();
5242+
let batches = collect(plan, task_ctx).await.unwrap();
5243+
5244+
// "ac" has no match on build side so only "aa" and "ab" rows appear
5245+
let result = format!("{}", pretty_format_batches(&batches).unwrap());
5246+
insta::assert_snapshot!(result, @r"
5247+
+----+---+----+-----+
5248+
| a | b | a | c |
5249+
+----+---+----+-----+
5250+
| aa | 1 | aa | 1.0 |
5251+
| ab | 2 | ab | 2.0 |
5252+
+----+---+----+-----+
5253+
");
5254+
}
5255+
5256+
/// Test that BufferExec + HashJoin with a TopK above does not deadlock.
5257+
/// The TopK filter is above BufferExec, so it is NOT in the child subtree
5258+
/// and will not be waited on.
5259+
#[tokio::test]
5260+
async fn test_buffer_exec_hash_join_with_topk_above_no_deadlock() {
5261+
let build_batches = vec![
5262+
record_batch!(("a", Utf8, ["aa", "ab", "ac"]), ("b", Int32, [1, 2, 3])).unwrap(),
5263+
];
5264+
let build_schema = Arc::new(Schema::new(vec![
5265+
Field::new("a", DataType::Utf8, false),
5266+
Field::new("b", DataType::Int32, false),
5267+
]));
5268+
let build_scan = TestScanBuilder::new(Arc::clone(&build_schema))
5269+
.with_support(true)
5270+
.with_batches(build_batches)
5271+
.build();
5272+
5273+
let probe_batches = vec![
5274+
record_batch!(
5275+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
5276+
("c", Float64, [1.0, 2.0, 3.0, 4.0])
5277+
)
5278+
.unwrap(),
5279+
];
5280+
let probe_schema = Arc::new(Schema::new(vec![
5281+
Field::new("a", DataType::Utf8, false),
5282+
Field::new("c", DataType::Float64, false),
5283+
]));
5284+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_schema))
5285+
.with_support(true)
5286+
.with_batches(probe_batches)
5287+
.build();
5288+
5289+
let mut config = ConfigOptions::default();
5290+
config.optimizer.enable_dynamic_filter_pushdown = true;
5291+
config.execution.hash_join_buffering_capacity = 1024 * 1024;
5292+
config.execution.hash_join_buffering_dynamic_filter_wait_ms = usize::MAX;
5293+
5294+
let plan = build_buffered_hash_join_plan(
5295+
build_scan,
5296+
probe_scan,
5297+
&build_schema,
5298+
&probe_schema,
5299+
JoinType::Inner,
5300+
&config,
5301+
);
5302+
5303+
// Wrap with TopK (SortExec with fetch) above the join
5304+
let sort_expr =
5305+
PhysicalSortExpr::new(col("a", &plan.schema()).unwrap(), SortOptions::default());
5306+
let topk = Arc::new(
5307+
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), plan)
5308+
.with_fetch(Some(2)),
5309+
) as Arc<dyn ExecutionPlan>;
5310+
5311+
let session_config = SessionConfig::from(config);
5312+
let session_ctx = SessionContext::new_with_config(session_config);
5313+
session_ctx.register_object_store(
5314+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
5315+
Arc::new(InMemory::new()),
5316+
);
5317+
let task_ctx = session_ctx.state().task_ctx();
5318+
5319+
// Should complete without deadlock
5320+
let batches =
5321+
tokio::time::timeout(std::time::Duration::from_secs(10), collect(topk, task_ctx))
5322+
.await
5323+
.expect("should not deadlock")
5324+
.unwrap();
5325+
5326+
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
5327+
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1588,7 +1588,7 @@ mod test {
15881588
use datafusion_expr::{col, lit};
15891589
use datafusion_physical_expr::{
15901590
PhysicalExpr,
1591-
expressions::{Column, DynamicFilterPhysicalExpr, Literal},
1591+
expressions::{Column, DynamicFilterPhysicalExpr, Literal, ProducerKind},
15921592
planner::logical2physical,
15931593
projection::ProjectionExprs,
15941594
};
@@ -1922,6 +1922,7 @@ mod test {
19221922
Arc::new(DynamicFilterPhysicalExpr::new(
19231923
expr.children().into_iter().map(Arc::clone).collect(),
19241924
expr,
1925+
ProducerKind::HashJoin,
19251926
))
19261927
}
19271928

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,26 @@ use datafusion_common::{
2828
use datafusion_expr::ColumnarValue;
2929
use datafusion_physical_expr_common::physical_expr::DynHash;
3030

31+
/// Identifies which kind of operator produced a [`DynamicFilterPhysicalExpr`].
32+
///
33+
/// This is used by operators like [`BufferExec`] to decide whether it is safe
34+
/// to wait for a dynamic filter to complete before starting execution.
35+
/// Only [`ProducerKind::HashJoin`] filters are guaranteed to be fully populated
36+
/// before the probe side starts, making them safe to wait on.
37+
/// [`ProducerKind::TopK`] and [`ProducerKind::Aggregate`] filters are populated
38+
/// incrementally during execution and waiting on them would cause a deadlock.
39+
///
40+
/// [`BufferExec`]: datafusion_physical_plan::buffer::BufferExec
41+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42+
pub enum ProducerKind {
43+
/// Produced by a hash join build side. Safe to wait on — fully populated before probe starts.
44+
HashJoin,
45+
/// Produced by a TopK operator. Not safe to wait on — populated incrementally.
46+
TopK,
47+
/// Produced by an aggregate operator. Not safe to wait on — may never complete.
48+
Aggregate,
49+
}
50+
3151
/// State of a dynamic filter, tracking both updates and completion.
3252
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3353
enum FilterState {
@@ -55,7 +75,7 @@ impl FilterState {
5575
/// For more background, please also see the [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]
5676
///
5777
/// [Dynamic Filters: Passing Information Between Operators During Execution for 25x Faster Queries blog]: https://datafusion.apache.org/blog/2025/09/10/dynamic-filters
58-
#[derive(Debug)]
78+
#[derive(Debug, Clone)]
5979
pub struct DynamicFilterPhysicalExpr {
6080
/// The original children of this PhysicalExpr, if any.
6181
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
@@ -74,6 +94,8 @@ pub struct DynamicFilterPhysicalExpr {
7494
/// But this can have overhead in production, so it's only included in our tests.
7595
data_type: Arc<RwLock<Option<DataType>>>,
7696
nullable: Arc<RwLock<Option<bool>>>,
97+
/// The kind of operator that produced this dynamic filter.
98+
producer_kind: ProducerKind,
7799
}
78100

79101
#[derive(Debug)]
@@ -168,6 +190,7 @@ impl DynamicFilterPhysicalExpr {
168190
pub fn new(
169191
children: Vec<Arc<dyn PhysicalExpr>>,
170192
inner: Arc<dyn PhysicalExpr>,
193+
producer_kind: ProducerKind,
171194
) -> Self {
172195
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
173196
Self {
@@ -177,6 +200,7 @@ impl DynamicFilterPhysicalExpr {
177200
state_watch,
178201
data_type: Arc::new(RwLock::new(None)),
179202
nullable: Arc::new(RwLock::new(None)),
203+
producer_kind,
180204
}
181205
}
182206

@@ -289,6 +313,11 @@ impl DynamicFilterPhysicalExpr {
289313
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
290314
}
291315

316+
/// Returns the [`ProducerKind`] of this dynamic filter.
317+
pub fn producer_kind(&self) -> ProducerKind {
318+
self.producer_kind
319+
}
320+
292321
/// Wait asynchronously until this dynamic filter is marked as complete.
293322
///
294323
/// This method returns immediately if the filter is already complete.
@@ -372,6 +401,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
372401
state_watch: self.state_watch.clone(),
373402
data_type: Arc::clone(&self.data_type),
374403
nullable: Arc::clone(&self.nullable),
404+
producer_kind: self.producer_kind,
375405
}))
376406
}
377407

@@ -478,6 +508,7 @@ mod test {
478508
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
479509
vec![col("a", &table_schema).unwrap()],
480510
expr as Arc<dyn PhysicalExpr>,
511+
ProducerKind::HashJoin,
481512
));
482513
// Simulate two `ParquetSource` files with different filter schemas
483514
// Both of these should hit the same inner `PhysicalExpr` even after `update()` is called
@@ -632,6 +663,7 @@ mod test {
632663
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
633664
vec![],
634665
lit(42) as Arc<dyn PhysicalExpr>,
666+
ProducerKind::HashJoin,
635667
));
636668

637669
// Mark as complete immediately
@@ -667,6 +699,7 @@ mod test {
667699
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
668700
vec![Arc::clone(&col_a), Arc::clone(&col_b)],
669701
expr as Arc<dyn PhysicalExpr>,
702+
ProducerKind::HashJoin,
670703
));
671704

672705
// Clone the Arc (two references to the same DynamicFilterPhysicalExpr)
@@ -730,6 +763,7 @@ mod test {
730763
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
731764
vec![],
732765
lit(true) as Arc<dyn PhysicalExpr>,
766+
ProducerKind::HashJoin,
733767
));
734768

735769
// Initially, only one reference to the inner Arc exists

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast};
4545
pub use cast_column::CastColumnExpr;
4646
pub use column::{Column, col, with_new_schema};
4747
pub use datafusion_expr::utils::format_state_name;
48-
pub use dynamic_filters::DynamicFilterPhysicalExpr;
48+
pub use dynamic_filters::{DynamicFilterPhysicalExpr, ProducerKind};
4949
pub use in_list::{InListExpr, in_list};
5050
pub use is_not_null::{IsNotNullExpr, is_not_null};
5151
pub use is_null::{IsNullExpr, is_null};

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ use datafusion_execution::TaskContext;
5252
use datafusion_expr::{Accumulator, Aggregate};
5353
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
5454
use datafusion_physical_expr::equivalence::ProjectionMapping;
55-
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
55+
use datafusion_physical_expr::expressions::{
56+
Column, DynamicFilterPhysicalExpr, ProducerKind, lit,
57+
};
5658
use datafusion_physical_expr::{
5759
ConstExpr, EquivalenceProperties, physical_exprs_contains,
5860
};
@@ -1167,7 +1169,11 @@ impl AggregateExec {
11671169

11681170
if !aggr_dyn_filters.is_empty() {
11691171
self.dynamic_filter = Some(Arc::new(AggrDynFilter {
1170-
filter: Arc::new(DynamicFilterPhysicalExpr::new(all_cols, lit(true))),
1172+
filter: Arc::new(DynamicFilterPhysicalExpr::new(
1173+
all_cols,
1174+
lit(true),
1175+
ProducerKind::Aggregate,
1176+
)),
11711177
supported_accumulators_info: aggr_dyn_filters,
11721178
}))
11731179
}

0 commit comments

Comments
 (0)