Skip to content

Commit bfa0ea8

Browse files
authored
Hash join buffering on probe side (#19761)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> It does not close any issue, but it's related to: - #17494 - #15885 - #18942 ## Rationale for this change This is a PR from a batch of PRs that attempt to improve performance in hash joins: - #19759 - #19760 - This PR It adds the new `BufferExec` node at the top of the probe side of hash joins so that some work is eagerly performed before the build side of the hash join is completely finished. ### Why should this speed up joins? In order to better understand the impact of this PR, it's useful to understand how streams work in Rust: creating a stream does not perform any work, progress is just made if the stream gets polled. This means that whenever we call `.execute()` on an `ExecutionPlan` (like the probe side of a join), nothing happens, not even the most basic TCP connections or system calls are performed. Instead, all this work is delayed as much as possible until the first poll is made to the stream, losing the opportunity to make some early progress. This gets worst when multiple hash joins are chained together: they will get executed in cascade as if they were domino pieces, which has the benefit of leaving a small memory footprint, but underutilizes the resources of the machine for executing the query faster. > [!NOTE] > Even if this shows overall performance improvement in the benchmarks, it can show performance degradation on queries with dynamic filters, so hash join buffering is disabled by default, and users can opt in. > Follow up work will be needed in order to make this interact well with dynamic filters. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Adds a new `HashJoinBuffering` physical optimizer rule that will idempotently place `BufferExec` nodes on the probe side of has joins: ``` ┌───────────────────┐ │ HashJoinExec │ └─────▲────────▲────┘ ┌───────┘ └─────────┐ │ │ ┌────────────────┐ ┌─────────────────┐ │ Build side │ + │ BufferExec │ └────────────────┘ └────────▲────────┘ │ ┌────────┴────────┐ │ Probe side │ └─────────────────┘ ``` ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> yes, by existing tests ## Are there any user-facing changes? Not by default, users can now opt in to this feature with the hash_join_buffering_capacity config parameter. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --- # Results > [!NOTE] > Note that a small number of TPC-DS queries have regressed, this is because with eager buffering they do not benefit from dynamic filters as much. This is the main reason for leaving this config parameter disabled by default until we have a proper way for interacting with the dynamic filters inside the BufferExec node. ``` ./bench.sh compare main hash-join-buffering-on-probe-side Comparing main and hash-join-buffering-on-probe-side -------------------- Benchmark tpcds_sf1.json -------------------- ┏━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ hash-join-buffering-on-probe-side ┃ Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 39.70 ms │ 18.30 ms │ +2.17x faster │ │ QQuery 2 │ 134.95 ms │ 57.64 ms │ +2.34x faster │ │ QQuery 3 │ 103.08 ms │ 89.88 ms │ +1.15x faster │ │ QQuery 4 │ 1034.04 ms │ 340.84 ms │ +3.03x faster │ │ QQuery 5 │ 155.97 ms │ 139.44 ms │ +1.12x faster │ │ QQuery 6 │ 591.97 ms │ 523.17 ms │ +1.13x faster │ │ QQuery 7 │ 304.47 ms │ 235.52 ms │ +1.29x faster │ │ QQuery 8 │ 100.02 ms │ 90.65 ms │ +1.10x faster │ │ QQuery 9 │ 91.86 ms │ 92.25 ms │ no change │ │ QQuery 10 │ 90.97 ms │ 47.59 ms │ +1.91x faster │ │ QQuery 11 │ 649.77 ms │ 217.42 ms │ +2.99x faster │ │ QQuery 12 │ 38.12 ms │ 31.71 ms │ +1.20x faster │ │ QQuery 13 │ 337.93 ms │ 302.24 ms │ +1.12x faster │ │ QQuery 14 │ 797.93 ms │ 439.40 ms │ +1.82x faster │ │ QQuery 15 │ 28.14 ms │ 47.75 ms │ 1.70x slower │ │ QQuery 16 │ 34.08 ms │ 103.14 ms │ 3.03x slower │ │ QQuery 17 │ 225.43 ms │ 152.29 ms │ +1.48x faster │ │ QQuery 18 │ 118.09 ms │ 164.11 ms │ 1.39x slower │ │ QQuery 19 │ 143.39 ms │ 120.07 ms │ +1.19x faster │ │ QQuery 20 │ 12.70 ms │ 52.15 ms │ 4.11x slower │ │ QQuery 21 │ 16.74 ms │ 184.55 ms │ 11.02x slower │ │ QQuery 22 │ 311.97 ms │ 358.70 ms │ 1.15x slower │ │ QQuery 23 │ 807.41 ms │ 531.22 ms │ +1.52x faster │ │ QQuery 24 │ 347.90 ms │ 279.01 ms │ +1.25x faster │ │ QQuery 25 │ 313.20 ms │ 183.26 ms │ +1.71x faster │ │ QQuery 26 │ 83.57 ms │ 124.28 ms │ 1.49x slower │ │ QQuery 27 │ 300.93 ms │ 237.28 ms │ +1.27x faster │ │ QQuery 28 │ 130.79 ms │ 129.64 ms │ no change │ │ QQuery 29 │ 267.08 ms │ 157.55 ms │ +1.70x faster │ │ QQuery 30 │ 37.23 ms │ 25.98 ms │ +1.43x faster │ │ QQuery 31 │ 128.57 ms │ 102.96 ms │ +1.25x faster │ │ QQuery 32 │ 50.16 ms │ 42.77 ms │ +1.17x faster │ │ QQuery 33 │ 114.06 ms │ 110.83 ms │ no change │ │ QQuery 34 │ 89.27 ms │ 77.19 ms │ +1.16x faster │ │ QQuery 35 │ 86.66 ms │ 50.86 ms │ +1.70x faster │ │ QQuery 36 │ 173.00 ms │ 160.46 ms │ +1.08x faster │ │ QQuery 37 │ 157.69 ms │ 153.57 ms │ no change │ │ QQuery 38 │ 62.53 ms │ 52.28 ms │ +1.20x faster │ │ QQuery 39 │ 83.38 ms │ 394.28 ms │ 4.73x slower │ │ QQuery 40 │ 87.64 ms │ 77.15 ms │ +1.14x faster │ │ QQuery 41 │ 16.23 ms │ 15.05 ms │ +1.08x faster │ │ QQuery 42 │ 93.24 ms │ 88.03 ms │ +1.06x faster │ │ QQuery 43 │ 72.64 ms │ 63.49 ms │ +1.14x faster │ │ QQuery 44 │ 9.06 ms │ 7.80 ms │ +1.16x faster │ │ QQuery 45 │ 55.46 ms │ 34.12 ms │ +1.63x faster │ │ QQuery 46 │ 185.75 ms │ 163.09 ms │ +1.14x faster │ │ QQuery 47 │ 529.01 ms │ 143.05 ms │ +3.70x faster │ │ QQuery 48 │ 236.59 ms │ 198.08 ms │ +1.19x faster │ │ QQuery 49 │ 208.83 ms │ 191.07 ms │ +1.09x faster │ │ QQuery 50 │ 176.04 ms │ 143.57 ms │ +1.23x faster │ │ QQuery 51 │ 140.97 ms │ 96.36 ms │ +1.46x faster │ │ QQuery 52 │ 92.83 ms │ 86.68 ms │ +1.07x faster │ │ QQuery 53 │ 90.46 ms │ 83.34 ms │ +1.09x faster │ │ QQuery 54 │ 135.74 ms │ 116.89 ms │ +1.16x faster │ │ QQuery 55 │ 91.55 ms │ 87.18 ms │ no change │ │ QQuery 56 │ 113.12 ms │ 111.00 ms │ no change │ │ QQuery 57 │ 129.43 ms │ 78.69 ms │ +1.64x faster │ │ QQuery 58 │ 229.68 ms │ 165.27 ms │ +1.39x faster │ │ QQuery 59 │ 161.24 ms │ 125.57 ms │ +1.28x faster │ │ QQuery 60 │ 116.86 ms │ 111.38 ms │ no change │ │ QQuery 61 │ 150.19 ms │ 143.00 ms │ no change │ │ QQuery 62 │ 426.70 ms │ 413.02 ms │ no change │ │ QQuery 63 │ 93.41 ms │ 81.94 ms │ +1.14x faster │ │ QQuery 64 │ 578.51 ms │ 442.41 ms │ +1.31x faster │ │ QQuery 65 │ 201.75 ms │ 87.46 ms │ +2.31x faster │ │ QQuery 66 │ 181.57 ms │ 184.28 ms │ no change │ │ QQuery 67 │ 246.39 ms │ 226.38 ms │ +1.09x faster │ │ QQuery 68 │ 230.40 ms │ 212.41 ms │ +1.08x faster │ │ QQuery 69 │ 91.30 ms │ 46.05 ms │ +1.98x faster │ │ QQuery 70 │ 270.46 ms │ 232.65 ms │ +1.16x faster │ │ QQuery 71 │ 111.93 ms │ 107.35 ms │ no change │ │ QQuery 72 │ 562.16 ms │ 435.56 ms │ +1.29x faster │ │ QQuery 73 │ 85.66 ms │ 81.05 ms │ +1.06x faster │ │ QQuery 74 │ 371.14 ms │ 148.67 ms │ +2.50x faster │ │ QQuery 75 │ 221.61 ms │ 170.13 ms │ +1.30x faster │ │ QQuery 76 │ 122.88 ms │ 107.23 ms │ +1.15x faster │ │ QQuery 77 │ 163.52 ms │ 140.98 ms │ +1.16x faster │ │ QQuery 78 │ 313.92 ms │ 205.72 ms │ +1.53x faster │ │ QQuery 79 │ 187.31 ms │ 163.73 ms │ +1.14x faster │ │ QQuery 80 │ 262.86 ms │ 240.16 ms │ +1.09x faster │ │ QQuery 81 │ 21.89 ms │ 18.13 ms │ +1.21x faster │ │ QQuery 82 │ 172.40 ms │ 159.50 ms │ +1.08x faster │ │ QQuery 83 │ 45.38 ms │ 22.20 ms │ +2.04x faster │ │ QQuery 84 │ 39.41 ms │ 30.58 ms │ +1.29x faster │ │ QQuery 85 │ 141.02 ms │ 73.78 ms │ +1.91x faster │ │ QQuery 86 │ 31.68 ms │ 28.44 ms │ +1.11x faster │ │ QQuery 87 │ 63.57 ms │ 53.78 ms │ +1.18x faster │ │ QQuery 88 │ 88.01 ms │ 74.09 ms │ +1.19x faster │ │ QQuery 89 │ 105.44 ms │ 85.16 ms │ +1.24x faster │ │ QQuery 90 │ 19.76 ms │ 16.27 ms │ +1.21x faster │ │ QQuery 91 │ 52.31 ms │ 33.46 ms │ +1.56x faster │ │ QQuery 92 │ 50.04 ms │ 25.38 ms │ +1.97x faster │ │ QQuery 93 │ 143.48 ms │ 130.62 ms │ +1.10x faster │ │ QQuery 94 │ 50.84 ms │ 45.57 ms │ +1.12x faster │ │ QQuery 95 │ 131.03 ms │ 57.60 ms │ +2.27x faster │ │ QQuery 96 │ 60.62 ms │ 53.24 ms │ +1.14x faster │ │ QQuery 97 │ 95.60 ms │ 67.33 ms │ +1.42x faster │ │ QQuery 98 │ 125.88 ms │ 103.03 ms │ +1.22x faster │ │ QQuery 99 │ 4475.55 ms │ 4459.77 ms │ no change │ └───────────┴────────────┴───────────────────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (main) │ 22354.66ms │ │ Total Time (hash-join-buffering-on-probe-side) │ 18217.21ms │ │ Average Time (main) │ 225.80ms │ │ Average Time (hash-join-buffering-on-probe-side) │ 184.01ms │ │ Queries Faster │ 79 │ │ Queries Slower │ 8 │ │ Queries with No Change │ 12 │ │ Queries with Failure │ 0 │ └──────────────────────────────────────────────────┴────────────┘ -------------------- Benchmark tpch_sf1.json -------------------- ┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ hash-join-buffering-on-probe-side ┃ Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 42.94 ms │ 45.96 ms │ 1.07x slower │ │ QQuery 2 │ 20.64 ms │ 13.00 ms │ +1.59x faster │ │ QQuery 3 │ 30.07 ms │ 24.52 ms │ +1.23x faster │ │ QQuery 4 │ 17.22 ms │ 16.39 ms │ no change │ │ QQuery 5 │ 98.91 ms │ 41.25 ms │ +2.40x faster │ │ QQuery 6 │ 18.67 ms │ 18.23 ms │ no change │ │ QQuery 7 │ 104.82 ms │ 46.45 ms │ +2.26x faster │ │ QQuery 8 │ 97.98 ms │ 34.09 ms │ +2.87x faster │ │ QQuery 9 │ 86.25 ms │ 43.32 ms │ +1.99x faster │ │ QQuery 10 │ 106.09 ms │ 41.49 ms │ +2.56x faster │ │ QQuery 11 │ 13.77 ms │ 11.15 ms │ +1.24x faster │ │ QQuery 12 │ 54.57 ms │ 30.04 ms │ +1.82x faster │ │ QQuery 13 │ 21.71 ms │ 21.74 ms │ no change │ │ QQuery 14 │ 51.38 ms │ 21.87 ms │ +2.35x faster │ │ QQuery 15 │ 35.13 ms │ 27.95 ms │ +1.26x faster │ │ QQuery 16 │ 13.04 ms │ 12.05 ms │ +1.08x faster │ │ QQuery 17 │ 82.94 ms │ 53.05 ms │ +1.56x faster │ │ QQuery 18 │ 109.92 ms │ 61.16 ms │ +1.80x faster │ │ QQuery 19 │ 37.57 ms │ 37.79 ms │ no change │ │ QQuery 20 │ 60.77 ms │ 26.21 ms │ +2.32x faster │ │ QQuery 21 │ 78.28 ms │ 54.11 ms │ +1.45x faster │ │ QQuery 22 │ 8.48 ms │ 8.73 ms │ no change │ └───────────┴───────────┴───────────────────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩ │ Total Time (main) │ 1191.17ms │ │ Total Time (hash-join-buffering-on-probe-side) │ 690.57ms │ │ Average Time (main) │ 54.14ms │ │ Average Time (hash-join-buffering-on-probe-side) │ 31.39ms │ │ Queries Faster │ 16 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 5 │ │ Queries with Failure │ 0 │ └──────────────────────────────────────────────────┴───────────┘ -------------------- Benchmark tpch_sf10.json -------------------- ┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ hash-join-buffering-on-probe-side ┃ Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 373.55 ms │ 331.39 ms │ +1.13x faster │ │ QQuery 2 │ 150.76 ms │ 89.50 ms │ +1.68x faster │ │ QQuery 3 │ 320.66 ms │ 271.20 ms │ +1.18x faster │ │ QQuery 4 │ 132.21 ms │ 115.69 ms │ +1.14x faster │ │ QQuery 5 │ 462.29 ms │ 401.73 ms │ +1.15x faster │ │ QQuery 6 │ 150.85 ms │ 124.88 ms │ +1.21x faster │ │ QQuery 7 │ 631.13 ms │ 547.52 ms │ +1.15x faster │ │ QQuery 8 │ 593.22 ms │ 445.85 ms │ +1.33x faster │ │ QQuery 9 │ 780.22 ms │ 657.02 ms │ +1.19x faster │ │ QQuery 10 │ 495.89 ms │ 324.21 ms │ +1.53x faster │ │ QQuery 11 │ 144.90 ms │ 88.97 ms │ +1.63x faster │ │ QQuery 12 │ 263.50 ms │ 188.59 ms │ +1.40x faster │ │ QQuery 13 │ 287.01 ms │ 217.33 ms │ +1.32x faster │ │ QQuery 14 │ 248.88 ms │ 166.86 ms │ +1.49x faster │ │ QQuery 15 │ 399.52 ms │ 280.62 ms │ +1.42x faster │ │ QQuery 16 │ 97.62 ms │ 65.14 ms │ +1.50x faster │ │ QQuery 17 │ 780.01 ms │ 641.17 ms │ +1.22x faster │ │ QQuery 18 │ 824.42 ms │ 696.09 ms │ +1.18x faster │ │ QQuery 19 │ 367.17 ms │ 268.54 ms │ +1.37x faster │ │ QQuery 20 │ 332.86 ms │ 241.19 ms │ +1.38x faster │ │ QQuery 21 │ 856.49 ms │ 697.65 ms │ +1.23x faster │ │ QQuery 22 │ 89.72 ms │ 72.73 ms │ +1.23x faster │ └───────────┴───────────┴───────────────────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩ │ Total Time (main) │ 8782.89ms │ │ Total Time (hash-join-buffering-on-probe-side) │ 6933.87ms │ │ Average Time (main) │ 399.22ms │ │ Average Time (hash-join-buffering-on-probe-side) │ 315.18ms │ │ Queries Faster │ 22 │ │ Queries Slower │ 0 │ │ Queries with No Change │ 0 │ │ Queries with Failure │ 0 │ └──────────────────────────────────────────────────┴───────────┘ ```
1 parent 5211a8b commit bfa0ea8

11 files changed

Lines changed: 154 additions & 1 deletion

File tree

benchmarks/src/imdb/run.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ pub struct RunOpt {
9292
/// True by default.
9393
#[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
9494
prefer_hash_join: BoolDefaultTrue,
95+
96+
/// How many bytes to buffer on the probe side of hash joins.
97+
#[arg(long, default_value = "0")]
98+
hash_join_buffering_capacity: usize,
9599
}
96100

97101
fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
306310
.config()?
307311
.with_collect_statistics(!self.disable_statistics);
308312
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
313+
config.options_mut().execution.hash_join_buffering_capacity =
314+
self.hash_join_buffering_capacity;
309315
let rt_builder = self.common.runtime_env_builder()?;
310316
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
311317

@@ -527,6 +533,7 @@ mod tests {
527533
output_path: None,
528534
disable_statistics: false,
529535
prefer_hash_join: true,
536+
hash_join_buffering_capacity: 0,
530537
};
531538
opt.register_tables(&ctx).await?;
532539
let queries = get_query_sql(map_query_id_to_str(query))?;
@@ -563,6 +570,7 @@ mod tests {
563570
output_path: None,
564571
disable_statistics: false,
565572
prefer_hash_join: true,
573+
hash_join_buffering_capacity: 0,
566574
};
567575
opt.register_tables(&ctx).await?;
568576
let queries = get_query_sql(map_query_id_to_str(query))?;

benchmarks/src/tpcds/run.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ pub struct RunOpt {
144144
/// The tables should have been created with the `--sort` option for this to have any effect.
145145
#[arg(short = 't', long = "sorted")]
146146
sorted: bool,
147+
148+
/// How many bytes to buffer on the probe side of hash joins.
149+
#[arg(long, default_value = "0")]
150+
hash_join_buffering_capacity: usize,
147151
}
148152

149153
impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
162166
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
163167
config.options_mut().optimizer.enable_piecewise_merge_join =
164168
self.enable_piecewise_merge_join;
169+
config.options_mut().execution.hash_join_buffering_capacity =
170+
self.hash_join_buffering_capacity;
165171
let rt_builder = self.common.runtime_env_builder()?;
166172
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
167173
// register tables

benchmarks/src/tpch/run.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub struct RunOpt {
105105
/// The tables should have been created with the `--sort` option for this to have any effect.
106106
#[arg(short = 't', long = "sorted")]
107107
sorted: bool,
108+
109+
/// How many bytes to buffer on the probe side of hash joins.
110+
#[arg(long, default_value = "0")]
111+
hash_join_buffering_capacity: usize,
108112
}
109113

110114
impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
123127
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
124128
config.options_mut().optimizer.enable_piecewise_merge_join =
125129
self.enable_piecewise_merge_join;
130+
config.options_mut().execution.hash_join_buffering_capacity =
131+
self.hash_join_buffering_capacity;
126132
let rt_builder = self.common.runtime_env_builder()?;
127133
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
128134
// register tables
@@ -392,6 +398,7 @@ mod tests {
392398
prefer_hash_join: true,
393399
enable_piecewise_merge_join: false,
394400
sorted: false,
401+
hash_join_buffering_capacity: 0,
395402
};
396403
opt.register_tables(&ctx).await?;
397404
let queries = get_query_sql(query)?;
@@ -430,6 +437,7 @@ mod tests {
430437
prefer_hash_join: true,
431438
enable_piecewise_merge_join: false,
432439
sorted: false,
440+
hash_join_buffering_capacity: 0,
433441
};
434442
opt.register_tables(&ctx).await?;
435443
let queries = get_query_sql(query)?;

datafusion/common/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,21 @@ config_namespace! {
669669
/// # Default
670670
/// `false` — ANSI SQL mode is disabled by default.
671671
pub enable_ansi_mode: bool, default = false
672+
673+
/// How many bytes to buffer in the probe side of hash joins while the build side is
674+
/// concurrently being built.
675+
///
676+
/// Without this, hash joins will wait until the full materialization of the build side
677+
/// before polling the probe side. This is useful in scenarios where the query is not
678+
/// completely CPU bounded, allowing to do some early work concurrently and reducing the
679+
/// latency of the query.
680+
///
681+
/// Note that when hash join buffering is enabled, the probe side will start eagerly
682+
/// polling data, not giving time for the producer side of dynamic filters to produce any
683+
/// meaningful predicate. Queries with dynamic filters might see performance degradation.
684+
///
685+
/// Disabled by default, set to a number greater than 0 for enabling it.
686+
pub hash_join_buffering_capacity: usize, default = 0
672687
}
673688
}
674689

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::PhysicalOptimizerRule;
19+
use datafusion_common::JoinSide;
20+
use datafusion_common::config::ConfigOptions;
21+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
22+
use datafusion_physical_plan::ExecutionPlan;
23+
use datafusion_physical_plan::buffer::BufferExec;
24+
use datafusion_physical_plan::joins::HashJoinExec;
25+
use std::sync::Arc;
26+
27+
/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] node with the
28+
/// configured capacity in the probe side:
29+
///
30+
/// ```text
31+
/// ┌───────────────────┐
32+
/// │ HashJoinExec │
33+
/// └─────▲────────▲────┘
34+
/// ┌───────┘ └─────────┐
35+
/// │ │
36+
/// ┌────────────────┐ ┌─────────────────┐
37+
/// │ Build side │ + │ BufferExec │
38+
/// └────────────────┘ └────────▲────────┘
39+
/// │
40+
/// ┌────────┴────────┐
41+
/// │ Probe side │
42+
/// └─────────────────┘
43+
/// ```
44+
///
45+
/// Which allows eagerly pulling it even before the build side has completely finished.
46+
#[derive(Debug, Default)]
47+
pub struct HashJoinBuffering {}
48+
49+
impl HashJoinBuffering {
50+
pub fn new() -> Self {
51+
Self::default()
52+
}
53+
}
54+
55+
impl PhysicalOptimizerRule for HashJoinBuffering {
56+
fn optimize(
57+
&self,
58+
plan: Arc<dyn ExecutionPlan>,
59+
config: &ConfigOptions,
60+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
61+
let capacity = config.execution.hash_join_buffering_capacity;
62+
if capacity == 0 {
63+
return Ok(plan);
64+
}
65+
66+
plan.transform_down(|plan| {
67+
let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() else {
68+
return Ok(Transformed::no(plan));
69+
};
70+
let plan = Arc::clone(&plan);
71+
Ok(Transformed::yes(
72+
if HashJoinExec::probe_side() == JoinSide::Left {
73+
// Do not stack BufferExec nodes together.
74+
if node.left.as_any().downcast_ref::<BufferExec>().is_some() {
75+
return Ok(Transformed::no(plan));
76+
}
77+
plan.with_new_children(vec![
78+
Arc::new(BufferExec::new(Arc::clone(&node.left), capacity)),
79+
Arc::clone(&node.right),
80+
])?
81+
} else {
82+
// Do not stack BufferExec nodes together.
83+
if node.right.as_any().downcast_ref::<BufferExec>().is_some() {
84+
return Ok(Transformed::no(plan));
85+
}
86+
plan.with_new_children(vec![
87+
Arc::clone(&node.left),
88+
Arc::new(BufferExec::new(Arc::clone(&node.right), capacity)),
89+
])?
90+
},
91+
))
92+
})
93+
.data()
94+
}
95+
96+
fn name(&self) -> &str {
97+
"HashJoinBuffering"
98+
}
99+
100+
fn schema_check(&self) -> bool {
101+
true
102+
}
103+
}

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub mod optimizer;
3939
pub mod output_requirements;
4040
pub mod projection_pushdown;
4141
pub use datafusion_pruning as pruning;
42+
pub mod hash_join_buffering;
4243
pub mod pushdown_sort;
4344
pub mod sanity_checker;
4445
pub mod topk_aggregation;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
3535
use crate::topk_aggregation::TopKAggregation;
3636
use crate::update_aggr_exprs::OptimizeAggregateOrder;
3737

38+
use crate::hash_join_buffering::HashJoinBuffering;
3839
use crate::limit_pushdown_past_window::LimitPushPastWindows;
3940
use crate::pushdown_sort::PushdownSort;
4041
use datafusion_common::Result;
@@ -137,6 +138,10 @@ impl PhysicalOptimizer {
137138
// This can possibly be combined with [LimitPushdown]
138139
// It needs to come after [EnforceSorting]
139140
Arc::new(LimitPushPastWindows::new()),
141+
// The HashJoinBuffering rule adds a BufferExec node with the configured capacity
142+
// in the prob side of hash joins. That way, the probe side gets eagerly polled before
143+
// the build side is completely finished.
144+
Arc::new(HashJoinBuffering::new()),
140145
// The LimitPushdown rule tries to push limits down as far as possible,
141146
// replacing operators with fetching variants, or adding limits
142147
// past operators that support limit pushdown.

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
243243
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
244244
physical_plan after LimitAggregation SAME TEXT AS ABOVE
245245
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
246+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
246247
physical_plan after LimitPushdown SAME TEXT AS ABOVE
247248
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
248249
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -323,6 +324,7 @@ physical_plan after OutputRequirements
323324
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]]
324325
physical_plan after LimitAggregation SAME TEXT AS ABOVE
325326
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
327+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
326328
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]]
327329
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
328330
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -367,6 +369,7 @@ physical_plan after OutputRequirements
367369
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
368370
physical_plan after LimitAggregation SAME TEXT AS ABOVE
369371
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
372+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
370373
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
371374
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
372375
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -608,6 +611,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
608611
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
609612
physical_plan after LimitAggregation SAME TEXT AS ABOVE
610613
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
614+
physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
611615
physical_plan after LimitPushdown SAME TEXT AS ABOVE
612616
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
613617
physical_plan after PushdownSort SAME TEXT AS ABOVE

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true
220220
datafusion.execution.enable_ansi_mode false
221221
datafusion.execution.enable_recursive_ctes true
222222
datafusion.execution.enforce_batch_size_in_joins false
223+
datafusion.execution.hash_join_buffering_capacity 0
223224
datafusion.execution.keep_partition_by_columns false
224225
datafusion.execution.listing_table_factory_infer_partitions true
225226
datafusion.execution.listing_table_ignore_subdirectory true
@@ -358,6 +359,7 @@ datafusion.execution.collect_statistics true Should DataFusion collect statistic
358359
datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default.
359360
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
360361
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower.
362+
datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it.
361363
datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches
362364
datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema).
363365
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).

datafusion/sqllogictest/test_files/struct.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1666,4 +1666,4 @@ order by id;
16661666
3 2 150
16671667

16681668
statement ok
1669-
drop table t_agg_window;
1669+
drop table t_agg_window;

0 commit comments

Comments
 (0)