Skip to content

Commit a03bdca

Browse files
committed
fix
1 parent a53942d commit a03bdca

2 files changed

Lines changed: 56 additions & 0 deletions

File tree

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,19 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
208208
cx: &mut Context<'_>,
209209
) -> Poll<Option<Result<RecordBatch>>> {
210210
if self.done {
211+
// When `build_record_batch()` hits an i32 offset overflow (e.g.
212+
// combined string offsets exceed 2 GB), it emits a partial batch
213+
// and keeps the remaining rows in `self.in_progress.indices`.
214+
// Drain those leftover rows before terminating the stream,
215+
// otherwise they would be silently dropped.
216+
// Repeated overflows are fine — each poll emits another partial
217+
// batch until `in_progress` is fully drained.
218+
if !self.in_progress.is_empty() {
219+
let before = self.in_progress.len();
220+
let result = self.in_progress.build_record_batch();
221+
self.produced += before - self.in_progress.len();
222+
return Poll::Ready(result.transpose());
223+
}
211224
return Poll::Ready(None);
212225
}
213226
// Once all partitions have set their corresponding cursors for the loser tree,

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,4 +1539,47 @@ mod tests {
15391539
Err(_) => exec_err!("SortPreservingMerge caused a deadlock"),
15401540
}
15411541
}
1542+
1543+
/// Test that SortPreservingMerge with FETCH does not silently drop rows
1544+
/// when an interleave offset overflow forces a partial batch.
1545+
#[tokio::test]
1546+
async fn test_sort_merge_fetch_interleave_overflow() -> Result<()> {
1547+
// Each string is ~768 MB. Three rows total → ~2.3 GB > i32::MAX,
1548+
// which forces `build_record_batch` to emit a partial batch.
1549+
let big_str: String = "x".repeat(768 * 1024 * 1024);
1550+
1551+
let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)]));
1552+
1553+
// Create 3 single-row partitions, each with one large string.
1554+
let mut partitions = Vec::new();
1555+
for _ in 0..3 {
1556+
let array = StringArray::from(vec![big_str.as_str()]);
1557+
let batch = RecordBatch::try_new(
1558+
Arc::clone(&schema),
1559+
vec![Arc::new(array) as ArrayRef],
1560+
)?;
1561+
partitions.push(vec![batch]);
1562+
}
1563+
1564+
let input = TestMemoryExec::try_new_exec(&partitions, Arc::clone(&schema), None)?;
1565+
1566+
let sort_exprs: LexOrdering = [PhysicalSortExpr::new_default(Arc::new(
1567+
Column::new("s", 0),
1568+
)
1569+
as Arc<dyn PhysicalExpr>)]
1570+
.into();
1571+
1572+
let spm = SortPreservingMergeExec::new(sort_exprs, input).with_fetch(Some(3));
1573+
1574+
let task_ctx = Arc::new(TaskContext::default());
1575+
let batches = collect(Arc::new(spm), task_ctx).await?;
1576+
1577+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1578+
assert_eq!(
1579+
total_rows, 3,
1580+
"Expected all 3 rows to be emitted despite interleave overflow, got {total_rows}"
1581+
);
1582+
1583+
Ok(())
1584+
}
15421585
}

0 commit comments

Comments
 (0)