Skip to content

Commit 41acbf8

Browse files
alambjonathanc-n
andauthored
[branch-52] fix: Return probe_side.len() for RightMark/Anti count(*) queries (#20710) (#20881)
- Part of #20855 - Closes #20669 on branch-52 This PR: - Backports #20710 from @jonathanc-n to the branch-52 line Co-authored-by: Jonathan Chen <chenleejonathan@gmail.com>
1 parent a5f6fbb commit 41acbf8

4 files changed

Lines changed: 63 additions & 1 deletion

File tree

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ impl HashJoinStream {
639639
filter,
640640
JoinSide::Left,
641641
None,
642+
self.join_type,
642643
)?
643644
} else {
644645
(left_indices, right_indices)
@@ -707,6 +708,7 @@ impl HashJoinStream {
707708
&right_indices,
708709
&self.column_indices,
709710
join_side,
711+
self.join_type,
710712
)?;
711713

712714
self.output_buffer.push_batch(batch)?;
@@ -770,6 +772,7 @@ impl HashJoinStream {
770772
&right_side,
771773
&self.column_indices,
772774
JoinSide::Left,
775+
self.join_type,
773776
)?;
774777
self.output_buffer.push_batch(batch)?;
775778
}

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ pub(crate) fn build_side_determined_results(
930930
&probe_indices,
931931
column_indices,
932932
build_hash_joiner.build_side,
933+
join_type,
933934
)
934935
.map(|batch| (batch.num_rows() > 0).then_some(batch))
935936
} else {
@@ -993,6 +994,7 @@ pub(crate) fn join_with_probe_batch(
993994
filter,
994995
build_hash_joiner.build_side,
995996
None,
997+
join_type,
996998
)?
997999
} else {
9981000
(build_indices, probe_indices)
@@ -1031,6 +1033,7 @@ pub(crate) fn join_with_probe_batch(
10311033
&probe_indices,
10321034
column_indices,
10331035
build_hash_joiner.build_side,
1036+
join_type,
10341037
)
10351038
.map(|batch| (batch.num_rows() > 0).then_some(batch))
10361039
}

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map(
910910
(left_indices, right_indices)
911911
}
912912

913+
#[expect(clippy::too_many_arguments)]
913914
pub(crate) fn apply_join_filter_to_indices(
914915
build_input_buffer: &RecordBatch,
915916
probe_batch: &RecordBatch,
@@ -918,6 +919,7 @@ pub(crate) fn apply_join_filter_to_indices(
918919
filter: &JoinFilter,
919920
build_side: JoinSide,
920921
max_intermediate_size: Option<usize>,
922+
join_type: JoinType,
921923
) -> Result<(UInt64Array, UInt32Array)> {
922924
if build_indices.is_empty() && probe_indices.is_empty() {
923925
return Ok((build_indices, probe_indices));
@@ -938,6 +940,7 @@ pub(crate) fn apply_join_filter_to_indices(
938940
&probe_indices.slice(i, len),
939941
filter.column_indices(),
940942
build_side,
943+
join_type,
941944
)?;
942945
let filter_result = filter
943946
.expression()
@@ -959,6 +962,7 @@ pub(crate) fn apply_join_filter_to_indices(
959962
&probe_indices,
960963
filter.column_indices(),
961964
build_side,
965+
join_type,
962966
)?;
963967

964968
filter
@@ -979,6 +983,7 @@ pub(crate) fn apply_join_filter_to_indices(
979983

980984
/// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`.
981985
/// The resulting batch has [Schema] `schema`.
986+
#[expect(clippy::too_many_arguments)]
982987
pub(crate) fn build_batch_from_indices(
983988
schema: &Schema,
984989
build_input_buffer: &RecordBatch,
@@ -987,11 +992,19 @@ pub(crate) fn build_batch_from_indices(
987992
probe_indices: &UInt32Array,
988993
column_indices: &[ColumnIndex],
989994
build_side: JoinSide,
995+
join_type: JoinType,
990996
) -> Result<RecordBatch> {
991997
if schema.fields().is_empty() {
998+
// For RightAnti and RightSemi joins, after `adjust_indices_by_join_type`
999+
// the build_indices were untouched so only probe_indices hold the actual
1000+
// row count.
1001+
let row_count = match join_type {
1002+
JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(),
1003+
_ => build_indices.len(),
1004+
};
9921005
let options = RecordBatchOptions::new()
9931006
.with_match_field_names(true)
994-
.with_row_count(Some(build_indices.len()));
1007+
.with_row_count(Some(row_count));
9951008

9961009
return Ok(RecordBatch::try_new_with_options(
9971010
Arc::new(schema.clone()),

datafusion/sqllogictest/test_files/joins.slt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5226,3 +5226,46 @@ DROP TABLE issue_20437_small;
52265226

52275227
statement count 0
52285228
DROP TABLE issue_20437_large;
5229+
5230+
# Test count(*) with right semi/anti joins returns correct row counts
5231+
# issue: https://github.com/apache/datafusion/issues/20669
5232+
5233+
statement ok
5234+
CREATE TABLE t1 (k INT, v INT);
5235+
5236+
statement ok
5237+
CREATE TABLE t2 (k INT, v INT);
5238+
5239+
statement ok
5240+
INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i);
5241+
5242+
statement ok
5243+
INSERT INTO t2 VALUES (1, 1);
5244+
5245+
query I
5246+
WITH t AS (
5247+
SELECT *
5248+
FROM t1
5249+
LEFT ANTI JOIN t2 ON t1.k = t2.k
5250+
)
5251+
SELECT count(*)
5252+
FROM t;
5253+
----
5254+
99
5255+
5256+
query I
5257+
WITH t AS (
5258+
SELECT *
5259+
FROM t1
5260+
LEFT SEMI JOIN t2 ON t1.k = t2.k
5261+
)
5262+
SELECT count(*)
5263+
FROM t;
5264+
----
5265+
1
5266+
5267+
statement count 0
5268+
DROP TABLE t1;
5269+
5270+
statement count 0
5271+
DROP TABLE t2;

0 commit comments

Comments
 (0)