Skip to content

Commit f1c643a

Browse files
authored
fix: LazyMemoryExec should produce independent streams per execute() (#21565)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change LazyMemoryExec::execute() shares the same generator instance across multiple calls via Arc::clone, so a second call to execute(0) continues from where the first left off instead of starting from the beginning. This is inconsistent with how other ExecutionPlan implementations behave, where each execute() call produces an independent stream. This was discovered while writing e2e tests for NestedLoopJoinExec memory-limited execution (#21448), where the OOM fallback path re-executes the left child plan and got incomplete results. ## What changes are included in this PR? LazyMemoryExec::execute() was sharing the same generator instance (via Arc::clone) across multiple calls, causing streams to share mutable state. This meant a second call to execute(0) would continue from where the first call left off, instead of starting from the beginning. Fix by calling reset_state() on the generator to create a fresh instance for each execute() call, matching the expected ExecutionPlan semantics that each execute() produces an independent stream. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Unit test ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent cda7b5c commit f1c643a

1 file changed

Lines changed: 40 additions & 1 deletion

File tree

datafusion/physical-plan/src/memory.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,14 @@ impl ExecutionPlan for LazyMemoryExec {
344344

345345
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
346346

347+
// Create a fresh generator via reset_state() so that each execute()
348+
// call produces an independent stream starting from the beginning.
349+
let generator = self.batch_generators[partition].read().reset_state();
350+
347351
let stream = LazyMemoryStream {
348352
schema: Arc::clone(&self.schema),
349353
projection: self.projection.clone(),
350-
generator: Arc::clone(&self.batch_generators[partition]),
354+
generator,
351355
baseline_metrics,
352356
};
353357
Ok(Box::pin(cooperative(stream)))
@@ -531,6 +535,41 @@ mod lazy_memory_tests {
531535
Ok(())
532536
}
533537

538+
/// Verify that calling execute(0) twice on the same LazyMemoryExec
539+
/// produces independent streams with the same data.
540+
#[tokio::test]
541+
async fn test_lazy_memory_exec_multiple_executions_are_independent() -> Result<()> {
542+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
543+
let generator = TestGenerator {
544+
counter: 0,
545+
max_batches: 3,
546+
batch_size: 2,
547+
schema: Arc::clone(&schema),
548+
};
549+
550+
let exec =
551+
LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?;
552+
let task_ctx = Arc::new(TaskContext::default());
553+
554+
// First execution — consume all batches
555+
let batches_1 = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
556+
let total_rows_1: usize = batches_1.iter().map(|b| b.num_rows()).sum();
557+
assert_eq!(total_rows_1, 6);
558+
559+
// Second execution — should produce the same data, not continue
560+
// from where the first execution left off
561+
let batches_2 = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?;
562+
let total_rows_2: usize = batches_2.iter().map(|b| b.num_rows()).sum();
563+
assert_eq!(total_rows_2, 6);
564+
565+
// Verify contents are identical
566+
for (b1, b2) in batches_1.iter().zip(batches_2.iter()) {
567+
assert_eq!(b1, b2);
568+
}
569+
570+
Ok(())
571+
}
572+
534573
#[tokio::test]
535574
async fn test_lazy_memory_exec_invalid_partition() -> Result<()> {
536575
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));

0 commit comments

Comments
 (0)