Skip to content

Commit 0ac434d

Browse files
authored
Add case-heavy LEFT JOIN benchmark and debug timing/logging for PushDownFilter hot paths (#20664)
## Which issue does this PR close? * Part of #20002. ## Rationale for this change The `PushDownFilter` optimizer rule shows a severe planner-time performance pathology in the `sql_planner_extended` benchmark, where profiling indicates it dominates total planning CPU time and repeatedly recomputes expression types. This PR adds a deterministic, CASE-heavy LEFT JOIN benchmark to reliably reproduce the worst-case behavior and introduces lightweight debug-only timing + counters inside `push_down_filter` to make it easier to pinpoint expensive sub-sections (e.g. predicate simplification and join predicate inference) during profiling. ## What changes are included in this PR? * **Benchmark: add a deterministic CASE-heavy LEFT JOIN workload** * Adds `build_case_heavy_left_join_query` and helpers to construct a CASE-nested predicate chain over a `LEFT JOIN`. * Adds a new benchmark `logical_plan_optimize_case_heavy_left_join` to stress planning/optimization time. * Adds an A/B benchmark group `push_down_filter_case_heavy_left_join_ab` that sweeps predicate counts and CASE depth, comparing: * default optimizer with `push_down_filter` enabled * optimizer with `push_down_filter` removed * **Optimizer instrumentation (debug-only)** * Adds a small `with_debug_timing` helper gated by `log_enabled!(Debug)` to record microsecond timings for specific sections. * Instruments and logs: * time spent in `infer_join_predicates` * time spent in `simplify_predicates` * counts of parent predicates, `on_filters`, inferred join predicates * before/after predicate counts for simplification ## Are these changes tested? * No new unit/integration tests were added because this PR is focused on **benchmarking and debug-only instrumentation** rather than changing optimizer semantics. * Coverage is provided by: * compiling/running the `sql_planner_extended` benchmark * validating both benchmark variants (with/without `push_down_filter`) produce optimized plans without errors * enabling `RUST_LOG=debug` to confirm timing sections and counters emit as expected ## Are there any user-facing changes? * No user-facing behavior changes. * The optimizer logic is unchanged; only **debug logging** is added (emits only when `RUST_LOG` enables Debug for the relevant modules). * Benchmark suite additions only affect developers running benches. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent 02ce571 commit 0ac434d

2 files changed

Lines changed: 276 additions & 6 deletions

File tree

datafusion/core/benches/sql_planner_extended.rs

Lines changed: 236 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use arrow::array::{ArrayRef, RecordBatch};
1919
use arrow_schema::DataType;
2020
use arrow_schema::TimeUnit::Nanosecond;
21-
use criterion::{Criterion, criterion_group, criterion_main};
21+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
2222
use datafusion::prelude::{DataFrame, SessionContext};
2323
use datafusion_catalog::MemTable;
2424
use datafusion_common::ScalarValue;
@@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
2727
use datafusion_functions::expr_fn::{
2828
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
2929
};
30+
use std::fmt::Write;
3031
use std::hint::black_box;
3132
use std::ops::Rem;
3233
use std::sync::Arc;
@@ -212,21 +213,253 @@ fn build_test_data_frame(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
212213
})
213214
}
214215

215-
fn criterion_benchmark(c: &mut Criterion) {
216+
/// Build a CASE-heavy dataframe over a non-inner join to stress
217+
/// planner-time filter pushdown and nullability/type inference.
218+
fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> DataFrame {
219+
register_string_table(ctx, 100, 1000);
220+
let query = build_case_heavy_left_join_query(30, 1);
221+
rt.block_on(async { ctx.sql(&query).await.unwrap() })
222+
}
223+
224+
fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) -> String {
225+
let mut query = String::from(
226+
"SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ",
227+
);
228+
229+
if predicate_count == 0 {
230+
query.push_str("TRUE");
231+
return query;
232+
}
233+
234+
// Keep this deterministic so comparisons between profiles are stable.
235+
for i in 0..predicate_count {
236+
if i > 0 {
237+
query.push_str(" AND ");
238+
}
239+
240+
let mut expr = format!("length(l.c{})", i % 20);
241+
for depth in 0..case_depth {
242+
let left_col = (i + depth + 1) % 20;
243+
let right_col = (i + depth + 2) % 20;
244+
expr = format!(
245+
"CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE length(r.c{right_col}) END"
246+
);
247+
}
248+
249+
let _ = write!(&mut query, "{expr} > 2");
250+
}
251+
252+
query
253+
}
254+
255+
fn build_case_heavy_left_join_df_with_push_down_filter(
256+
rt: &Runtime,
257+
predicate_count: usize,
258+
case_depth: usize,
259+
push_down_filter_enabled: bool,
260+
) -> DataFrame {
261+
let ctx = SessionContext::new();
262+
register_string_table(&ctx, 100, 1000);
263+
if !push_down_filter_enabled {
264+
let removed = ctx.remove_optimizer_rule("push_down_filter");
265+
assert!(
266+
removed,
267+
"push_down_filter rule should be present in the default optimizer"
268+
);
269+
}
270+
271+
let query = build_case_heavy_left_join_query(predicate_count, case_depth);
272+
rt.block_on(async { ctx.sql(&query).await.unwrap() })
273+
}
274+
275+
fn build_non_case_left_join_query(
276+
predicate_count: usize,
277+
nesting_depth: usize,
278+
) -> String {
279+
let mut query = String::from(
280+
"SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE ",
281+
);
282+
283+
if predicate_count == 0 {
284+
query.push_str("TRUE");
285+
return query;
286+
}
287+
288+
// Keep this deterministic so comparisons between profiles are stable.
289+
for i in 0..predicate_count {
290+
if i > 0 {
291+
query.push_str(" AND ");
292+
}
293+
294+
let left_col = i % 20;
295+
let mut expr = format!("l.c{left_col}");
296+
for depth in 0..nesting_depth {
297+
let right_col = (i + depth + 1) % 20;
298+
expr = format!("coalesce({expr}, r.c{right_col})");
299+
}
300+
301+
let _ = write!(&mut query, "length({expr}) > 2");
302+
}
303+
304+
query
305+
}
306+
307+
fn build_non_case_left_join_df_with_push_down_filter(
308+
rt: &Runtime,
309+
predicate_count: usize,
310+
nesting_depth: usize,
311+
push_down_filter_enabled: bool,
312+
) -> DataFrame {
216313
let ctx = SessionContext::new();
314+
register_string_table(&ctx, 100, 1000);
315+
if !push_down_filter_enabled {
316+
let removed = ctx.remove_optimizer_rule("push_down_filter");
317+
assert!(
318+
removed,
319+
"push_down_filter rule should be present in the default optimizer"
320+
);
321+
}
322+
323+
let query = build_non_case_left_join_query(predicate_count, nesting_depth);
324+
rt.block_on(async { ctx.sql(&query).await.unwrap() })
325+
}
326+
327+
fn criterion_benchmark(c: &mut Criterion) {
328+
let baseline_ctx = SessionContext::new();
329+
let case_heavy_ctx = SessionContext::new();
217330
let rt = Runtime::new().unwrap();
218331

219332
// validate logical plan optimize performance
220333
// https://github.com/apache/datafusion/issues/17261
221334

222-
let df = build_test_data_frame(&ctx, &rt);
335+
let df = build_test_data_frame(&baseline_ctx, &rt);
336+
let case_heavy_left_join_df = build_case_heavy_left_join_df(&case_heavy_ctx, &rt);
223337

224338
c.bench_function("logical_plan_optimize", |b| {
225339
b.iter(|| {
226340
let df_clone = df.clone();
227341
black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() }));
228342
})
229343
});
344+
345+
c.bench_function("logical_plan_optimize_hotspot_case_heavy_left_join", |b| {
346+
b.iter(|| {
347+
let df_clone = case_heavy_left_join_df.clone();
348+
black_box(rt.block_on(async { df_clone.into_optimized_plan().unwrap() }));
349+
})
350+
});
351+
352+
let predicate_sweep = [10, 20, 30, 40, 60];
353+
let case_depth_sweep = [1, 2, 3];
354+
355+
let mut hotspot_group =
356+
c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
357+
for case_depth in case_depth_sweep {
358+
for predicate_count in predicate_sweep {
359+
let with_push_down_filter =
360+
build_case_heavy_left_join_df_with_push_down_filter(
361+
&rt,
362+
predicate_count,
363+
case_depth,
364+
true,
365+
);
366+
let without_push_down_filter =
367+
build_case_heavy_left_join_df_with_push_down_filter(
368+
&rt,
369+
predicate_count,
370+
case_depth,
371+
false,
372+
);
373+
374+
let input_label =
375+
format!("predicates={predicate_count},case_depth={case_depth}");
376+
// A/B interpretation:
377+
// - with_push_down_filter: default optimizer path (rule enabled)
378+
// - without_push_down_filter: control path with the rule removed
379+
// Compare both IDs at the same sweep point to isolate rule impact.
380+
hotspot_group.bench_with_input(
381+
BenchmarkId::new("with_push_down_filter", &input_label),
382+
&with_push_down_filter,
383+
|b, df| {
384+
b.iter(|| {
385+
let df_clone = df.clone();
386+
black_box(
387+
rt.block_on(async {
388+
df_clone.into_optimized_plan().unwrap()
389+
}),
390+
);
391+
})
392+
},
393+
);
394+
hotspot_group.bench_with_input(
395+
BenchmarkId::new("without_push_down_filter", &input_label),
396+
&without_push_down_filter,
397+
|b, df| {
398+
b.iter(|| {
399+
let df_clone = df.clone();
400+
black_box(
401+
rt.block_on(async {
402+
df_clone.into_optimized_plan().unwrap()
403+
}),
404+
);
405+
})
406+
},
407+
);
408+
}
409+
}
410+
hotspot_group.finish();
411+
412+
let mut control_group =
413+
c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
414+
for nesting_depth in case_depth_sweep {
415+
for predicate_count in predicate_sweep {
416+
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
417+
&rt,
418+
predicate_count,
419+
nesting_depth,
420+
true,
421+
);
422+
let without_push_down_filter =
423+
build_non_case_left_join_df_with_push_down_filter(
424+
&rt,
425+
predicate_count,
426+
nesting_depth,
427+
false,
428+
);
429+
430+
let input_label =
431+
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
432+
control_group.bench_with_input(
433+
BenchmarkId::new("with_push_down_filter", &input_label),
434+
&with_push_down_filter,
435+
|b, df| {
436+
b.iter(|| {
437+
let df_clone = df.clone();
438+
black_box(
439+
rt.block_on(async {
440+
df_clone.into_optimized_plan().unwrap()
441+
}),
442+
);
443+
})
444+
},
445+
);
446+
control_group.bench_with_input(
447+
BenchmarkId::new("without_push_down_filter", &input_label),
448+
&without_push_down_filter,
449+
|b, df| {
450+
b.iter(|| {
451+
let df_clone = df.clone();
452+
black_box(
453+
rt.block_on(async {
454+
df_clone.into_optimized_plan().unwrap()
455+
}),
456+
);
457+
})
458+
},
459+
);
460+
}
461+
}
462+
control_group.finish();
230463
}
231464

232465
criterion_group!(benches, criterion_benchmark);

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ use std::sync::Arc;
2323
use arrow::datatypes::DataType;
2424
use indexmap::IndexSet;
2525
use itertools::Itertools;
26+
use log::{Level, debug, log_enabled};
2627

28+
use datafusion_common::instant::Instant;
2729
use datafusion_common::tree_node::{
2830
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
2931
};
@@ -525,8 +527,19 @@ fn push_down_join(
525527
.map_or_else(Vec::new, |filter| split_conjunction_owned(filter.clone()));
526528

527529
// Are there any new join predicates that can be inferred from the filter expressions?
528-
let inferred_join_predicates =
529-
infer_join_predicates(&join, &predicates, &on_filters)?;
530+
let inferred_join_predicates = with_debug_timing("infer_join_predicates", || {
531+
infer_join_predicates(&join, &predicates, &on_filters)
532+
})?;
533+
534+
if log_enabled!(Level::Debug) {
535+
debug!(
536+
"push_down_filter: join_type={:?}, parent_predicates={}, on_filters={}, inferred_join_predicates={}",
537+
join.join_type,
538+
predicates.len(),
539+
on_filters.len(),
540+
inferred_join_predicates.len()
541+
);
542+
}
530543

531544
if on_filters.is_empty()
532545
&& predicates.is_empty()
@@ -765,7 +778,15 @@ impl OptimizerRule for PushDownFilter {
765778

766779
let predicate = split_conjunction_owned(filter.predicate.clone());
767780
let old_predicate_len = predicate.len();
768-
let new_predicates = simplify_predicates(predicate)?;
781+
let new_predicates =
782+
with_debug_timing("simplify_predicates", || simplify_predicates(predicate))?;
783+
if log_enabled!(Level::Debug) {
784+
debug!(
785+
"push_down_filter: simplify_predicates old_count={}, new_count={}",
786+
old_predicate_len,
787+
new_predicates.len()
788+
);
789+
}
769790
if old_predicate_len != new_predicates.len() {
770791
let Some(new_predicate) = conjunction(new_predicates) else {
771792
// new_predicates is empty - remove the filter entirely
@@ -1377,6 +1398,22 @@ impl PushDownFilter {
13771398
}
13781399
}
13791400

1401+
fn with_debug_timing<T, F>(label: &'static str, f: F) -> Result<T>
1402+
where
1403+
F: FnOnce() -> Result<T>,
1404+
{
1405+
if !log_enabled!(Level::Debug) {
1406+
return f();
1407+
}
1408+
let start = Instant::now();
1409+
let result = f();
1410+
debug!(
1411+
"push_down_filter_timing: section={label}, elapsed_us={}",
1412+
start.elapsed().as_micros()
1413+
);
1414+
result
1415+
}
1416+
13801417
/// replaces columns by its name on the projection.
13811418
pub fn replace_cols_by_name(
13821419
e: Expr,

0 commit comments

Comments
 (0)