Skip to content

Commit aefebec

Browse files
Dandandanclaude
andcommitted
Use take instead of interleave for single-batch build side in hash join
When the build side has only one batch, use `take` directly instead of `interleave`, avoiding the overhead of multi-array dispatch and extra (batch_idx, row_idx) allocations. This is the common case for CollectLeft mode and fixes a regression in TPC-DS Q74. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 114e182 commit aefebec

1 file changed

Lines changed: 79 additions & 43 deletions

File tree

  • datafusion/physical-plan/src/joins

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 79 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -992,6 +992,26 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result<RecordBat
992992
)?)
993993
}
994994

995+
/// Apply null mask from build_indices onto a result array.
996+
/// Used for outer joins where unmatched rows are represented as null indices.
997+
fn apply_null_mask(result: ArrayRef, build_indices: &UInt64Array) -> Result<ArrayRef> {
998+
if let Some(idx_nulls) = build_indices.nulls() {
999+
let data = result.to_data();
1000+
let combined_nulls = if let Some(existing) = data.nulls() {
1001+
NullBuffer::new(existing.inner() & idx_nulls.inner())
1002+
} else {
1003+
idx_nulls.clone()
1004+
};
1005+
Ok(arrow::array::make_array(
1006+
data.into_builder()
1007+
.null_bit_buffer(Some(combined_nulls.into_inner().into_inner()))
1008+
.build()?,
1009+
))
1010+
} else {
1011+
Ok(result)
1012+
}
1013+
}
1014+
9951015
/// Decode packed composite indices into (batch_index, row_index) pairs.
9961016
///
9971017
/// Each u64 value encodes `(batch_idx << 32) | row_idx`.
@@ -1033,13 +1053,31 @@ pub(crate) fn build_batch_from_indices(
10331053
return new_empty_schema_batch(schema, row_count);
10341054
}
10351055

1036-
// Pre-compute interleave indices for build-side columns (shared across all build columns)
1037-
let interleave_indices = if build_batches.is_empty()
1056+
// Determine build-side gathering strategy:
1057+
// - Single batch: use `take` directly (packed value == row index since batch_idx is 0)
1058+
// - Multiple batches: use `interleave` with decoded (batch_idx, row_idx) pairs
1059+
// - No batches or all-null indices: emit null arrays
1060+
enum BuildGather {
1061+
/// Single build batch — use `take` with row indices cast to u32
1062+
SingleBatch(UInt32Array),
1063+
/// Multiple build batches — use `interleave` with (batch_idx, row_idx) pairs
1064+
MultiBatch(Vec<(usize, usize)>),
1065+
/// All build indices are null (outer join with no matches)
1066+
AllNull,
1067+
}
1068+
1069+
let build_gather = if build_batches.is_empty()
10381070
|| build_indices.null_count() == build_indices.len()
10391071
{
1040-
None
1072+
BuildGather::AllNull
1073+
} else if build_batches.len() == 1 {
1074+
// Single batch: packed value == row index (batch_idx is 0), use take directly
1075+
// Preserve the null buffer from build_indices for outer joins
1076+
let values: Vec<u32> = build_indices.values().iter().map(|&v| v as u32).collect();
1077+
let row_indices = UInt32Array::new(values.into(), build_indices.nulls().cloned());
1078+
BuildGather::SingleBatch(row_indices)
10411079
} else {
1042-
Some(packed_indices_to_interleave(build_indices))
1080+
BuildGather::MultiBatch(packed_indices_to_interleave(build_indices))
10431081
};
10441082

10451083
let mut columns: Vec<Arc<dyn Array>> = Vec::with_capacity(schema.fields().len());
@@ -1048,48 +1086,44 @@ pub(crate) fn build_batch_from_indices(
10481086
let array = if column_index.side == JoinSide::None {
10491087
Arc::new(compute::is_not_null(probe_indices)?)
10501088
} else if column_index.side == build_side {
1051-
if let Some(ref il_indices) = interleave_indices {
1052-
// Gather column arrays from all build batches
1053-
let arrays: Vec<&dyn Array> = build_batches
1054-
.iter()
1055-
.map(|b| b.column(column_index.index).as_ref())
1056-
.collect();
1057-
let result = compute::interleave(&arrays, il_indices)?;
1058-
// Apply null mask from build_indices (for outer joins where
1059-
// unmatched rows are represented as null indices)
1060-
if build_indices.null_count() > 0 {
1061-
if let Some(idx_nulls) = build_indices.nulls() {
1062-
let data = result.to_data();
1063-
let combined_nulls = if let Some(existing) = data.nulls() {
1064-
NullBuffer::new(existing.inner() & idx_nulls.inner())
1065-
} else {
1066-
idx_nulls.clone()
1067-
};
1068-
arrow::array::make_array(
1069-
data.into_builder()
1070-
.null_bit_buffer(Some(
1071-
combined_nulls.into_inner().into_inner(),
1072-
))
1073-
.build()?,
1074-
)
1089+
match &build_gather {
1090+
BuildGather::SingleBatch(row_indices) => {
1091+
// take() already handles null indices (produces null output),
1092+
// so no extra null mask needed
1093+
take(
1094+
build_batches[0].column(column_index.index).as_ref(),
1095+
row_indices,
1096+
None,
1097+
)?
1098+
}
1099+
BuildGather::MultiBatch(il_indices) => {
1100+
// Gather column arrays from all build batches
1101+
let arrays: Vec<&dyn Array> = build_batches
1102+
.iter()
1103+
.map(|b| b.column(column_index.index).as_ref())
1104+
.collect();
1105+
let result = compute::interleave(&arrays, il_indices)?;
1106+
// Apply null mask from build_indices (for outer joins where
1107+
// unmatched rows are represented as null indices)
1108+
if build_indices.null_count() > 0 {
1109+
apply_null_mask(result, build_indices)?
10751110
} else {
10761111
result
10771112
}
1078-
} else {
1079-
result
10801113
}
1081-
} else {
1082-
// All build indices are null (outer join with no matches)
1083-
// Use the build batch schema to get the correct data type
1084-
// (column_index.index refers to the build batch columns, not the output schema)
1085-
let data_type = if let Some(b) = build_batches.first() {
1086-
b.column(column_index.index).data_type().clone()
1087-
} else {
1088-
// No build batches available (empty build side in CollectLeft mode).
1089-
// Use the output schema field at the current column position.
1090-
schema.field(columns.len()).data_type().clone()
1091-
};
1092-
new_null_array(&data_type, build_indices.len())
1114+
BuildGather::AllNull => {
1115+
// All build indices are null (outer join with no matches)
1116+
// Use the build batch schema to get the correct data type
1117+
// (column_index.index refers to the build batch columns, not the output schema)
1118+
let data_type = if let Some(b) = build_batches.first() {
1119+
b.column(column_index.index).data_type().clone()
1120+
} else {
1121+
// No build batches available (empty build side in CollectLeft mode).
1122+
// Use the output schema field at the current column position.
1123+
schema.field(columns.len()).data_type().clone()
1124+
};
1125+
new_null_array(&data_type, build_indices.len())
1126+
}
10931127
}
10941128
} else {
10951129
let array = probe_batch.column(column_index.index);
@@ -1879,7 +1913,9 @@ pub(super) fn equal_rows_arr(
18791913

18801914
let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?;
18811915

1882-
for (key_idx, right_key_array) in right_arrays.iter().enumerate().take(num_keys).skip(1) {
1916+
for (key_idx, right_key_array) in
1917+
right_arrays.iter().enumerate().take(num_keys).skip(1)
1918+
{
18831919
let arr_left = gather_left_key(key_idx)?;
18841920
let arr_right = take(right_key_array.as_ref(), indices_right, None)?;
18851921
let eq_result =

0 commit comments

Comments
 (0)