Skip to content

Commit 422b545

Browse files
fix: Track metrics in hash joins with empty build sides (#20810)
## Which issue does this PR close? - Closes #20809. ## Rationale for this change Show the correct metrics when executing hash joins. ## What changes are included in this PR? - The fast path in `process_probe_batch` now stores the output batch in `output_buffer` instead of returning it, since the metrics are only updated with batches stored there. - Added unit test. ## Are these changes tested? Yes. ## Are there any user-facing changes? No. --------- Co-authored-by: Yongting You <2010youy01@gmail.com>
1 parent fcb1c93 commit 422b545

2 files changed

Lines changed: 45 additions & 2 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -655,10 +655,10 @@ impl HashJoinStream {
655655
self.join_type,
656656
)?;
657657
timer.done();
658-
658+
self.output_buffer.push_batch(result)?;
659659
self.state = HashJoinStreamState::FetchProbeBatch;
660660

661-
return Ok(StatefulStreamResult::Ready(Some(result)));
661+
return Ok(StatefulStreamResult::Continue);
662662
}
663663

664664
// get the matched by join keys indices

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5056,6 +5056,49 @@ LEFT ANTI JOIN t2 ON k1 = k2
50565056
WHERE k1 < 0
50575057
----
50585058

5059+
# Also check that the reported number of output rows/batches are correct in the "empty build side"
5060+
# optimization.
5061+
# Issue: https://github.com/apache/datafusion/issues/20809
5062+
query TT
5063+
EXPLAIN ANALYZE
5064+
WITH t1 (k) AS (
5065+
VALUES (1), (2)
5066+
), t2 (k) AS (
5067+
VALUES (1)
5068+
)
5069+
SELECT *
5070+
FROM t1
5071+
LEFT ANTI JOIN (
5072+
SELECT *
5073+
FROM t2
5074+
WHERE k <> 1
5075+
) t2 ON t1.k = t2.k;
5076+
----
5077+
Plan with Metrics
5078+
01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=1, array_map_created_count=0, build_input_batches=0, build_input_rows=0, input_batches=1, input_rows=2, build_mem_used=<slt:ignore>, build_time=<slt:ignore>, join_time=<slt:ignore>, avg_fanout=N/A (0/0), probe_hit_rate=0% (0/2)]
5079+
02)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=0, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=0, expr_0_eval_time=<slt:ignore>]
5080+
03)----FilterExec: column1@0 != 1, metrics=[output_rows=0, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=0, selectivity=0% (0/1)]
5081+
04)------DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]
5082+
05)--ProjectionExec: expr=[column1@0 as k], metrics=[output_rows=2, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>, output_batches=1, expr_0_eval_time=<slt:ignore>]
5083+
06)----DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]
5084+
5085+
query I
5086+
WITH t1 (k) AS (
5087+
VALUES (1), (2)
5088+
), t2 (k) AS (
5089+
VALUES (1)
5090+
)
5091+
SELECT *
5092+
FROM t1
5093+
LEFT ANTI JOIN (
5094+
SELECT *
5095+
FROM t2
5096+
WHERE k <> 1
5097+
) t2 ON t1.k = t2.k;
5098+
----
5099+
1
5100+
2
5101+
50595102
# Mark testing
50605103
statement ok
50615104
CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);

0 commit comments

Comments
 (0)