Skip to content

Commit 00e36e8

Browse files
authored
fix: Return probe_side.len() for RightMark/Anti count(*) queries (#20710)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20669 . ## Rationale for this change Return probe_side.len() for count(*) queries <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? slt tests <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 953bdf4 commit 00e36e8

4 files changed

Lines changed: 63 additions & 1 deletion

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,7 @@ impl HashJoinStream {
713713
filter,
714714
JoinSide::Left,
715715
None,
716+
self.join_type,
716717
)?
717718
} else {
718719
(left_indices, right_indices)
@@ -781,6 +782,7 @@ impl HashJoinStream {
781782
&right_indices,
782783
&self.column_indices,
783784
join_side,
785+
self.join_type,
784786
)?;
785787

786788
let push_status = self.output_buffer.push_batch(batch)?;
@@ -899,6 +901,7 @@ impl HashJoinStream {
899901
&right_side,
900902
&self.column_indices,
901903
JoinSide::Left,
904+
self.join_type,
902905
)?;
903906
let push_status = self.output_buffer.push_batch(batch)?;
904907

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -959,6 +959,7 @@ pub(crate) fn build_side_determined_results(
959959
&probe_indices,
960960
column_indices,
961961
build_hash_joiner.build_side,
962+
join_type,
962963
)
963964
.map(|batch| (batch.num_rows() > 0).then_some(batch))
964965
} else {
@@ -1022,6 +1023,7 @@ pub(crate) fn join_with_probe_batch(
10221023
filter,
10231024
build_hash_joiner.build_side,
10241025
None,
1026+
join_type,
10251027
)?
10261028
} else {
10271029
(build_indices, probe_indices)
@@ -1060,6 +1062,7 @@ pub(crate) fn join_with_probe_batch(
10601062
&probe_indices,
10611063
column_indices,
10621064
build_hash_joiner.build_side,
1065+
join_type,
10631066
)
10641067
.map(|batch| (batch.num_rows() > 0).then_some(batch))
10651068
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map(
910910
(left_indices, right_indices)
911911
}
912912

913+
#[expect(clippy::too_many_arguments)]
913914
pub(crate) fn apply_join_filter_to_indices(
914915
build_input_buffer: &RecordBatch,
915916
probe_batch: &RecordBatch,
@@ -918,6 +919,7 @@ pub(crate) fn apply_join_filter_to_indices(
918919
filter: &JoinFilter,
919920
build_side: JoinSide,
920921
max_intermediate_size: Option<usize>,
922+
join_type: JoinType,
921923
) -> Result<(UInt64Array, UInt32Array)> {
922924
if build_indices.is_empty() && probe_indices.is_empty() {
923925
return Ok((build_indices, probe_indices));
@@ -938,6 +940,7 @@ pub(crate) fn apply_join_filter_to_indices(
938940
&probe_indices.slice(i, len),
939941
filter.column_indices(),
940942
build_side,
943+
join_type,
941944
)?;
942945
let filter_result = filter
943946
.expression()
@@ -959,6 +962,7 @@ pub(crate) fn apply_join_filter_to_indices(
959962
&probe_indices,
960963
filter.column_indices(),
961964
build_side,
965+
join_type,
962966
)?;
963967

964968
filter
@@ -990,6 +994,7 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result<RecordBat
990994

991995
/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
992996
/// The resulting batch has [Schema] `schema`.
997+
#[expect(clippy::too_many_arguments)]
993998
pub(crate) fn build_batch_from_indices(
994999
schema: &Schema,
9951000
build_input_buffer: &RecordBatch,
@@ -998,9 +1003,17 @@ pub(crate) fn build_batch_from_indices(
9981003
probe_indices: &UInt32Array,
9991004
column_indices: &[ColumnIndex],
10001005
build_side: JoinSide,
1006+
join_type: JoinType,
10011007
) -> Result<RecordBatch> {
10021008
if schema.fields().is_empty() {
1003-
return new_empty_schema_batch(schema, build_indices.len());
1009+
// For RightAnti and RightSemi joins, after `adjust_indices_by_join_type`
1010+
// the build_indices were untouched so only probe_indices hold the actual
1011+
// row count.
1012+
let row_count = match join_type {
1013+
JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(),
1014+
_ => build_indices.len(),
1015+
};
1016+
return new_empty_schema_batch(schema, row_count);
10041017
}
10051018

10061019
// build the columns of the new [RecordBatch]:

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5318,3 +5318,46 @@ DROP TABLE issue_20437_small;
53185318

53195319
statement count 0
53205320
DROP TABLE issue_20437_large;
5321+
5322+
# Test count(*) with right semi/anti joins returns correct row counts
5323+
# issue: https://github.com/apache/datafusion/issues/20669
5324+
5325+
statement ok
5326+
CREATE TABLE t1 (k INT, v INT);
5327+
5328+
statement ok
5329+
CREATE TABLE t2 (k INT, v INT);
5330+
5331+
statement ok
5332+
INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i);
5333+
5334+
statement ok
5335+
INSERT INTO t2 VALUES (1, 1);
5336+
5337+
query I
5338+
WITH t AS (
5339+
SELECT *
5340+
FROM t1
5341+
LEFT ANTI JOIN t2 ON t1.k = t2.k
5342+
)
5343+
SELECT count(*)
5344+
FROM t;
5345+
----
5346+
99
5347+
5348+
query I
5349+
WITH t AS (
5350+
SELECT *
5351+
FROM t1
5352+
LEFT SEMI JOIN t2 ON t1.k = t2.k
5353+
)
5354+
SELECT count(*)
5355+
FROM t;
5356+
----
5357+
1
5358+
5359+
statement count 0
5360+
DROP TABLE t1;
5361+
5362+
statement count 0
5363+
DROP TABLE t2;

0 commit comments

Comments
 (0)