Skip to content

Commit 93ae1b8

Browse files
authored
fix: try again to fix Miri in ParquetOpener (#21680)
## Which issue does this PR close? - Closes #21662. ## Rationale for this change #21663 removed the nested async block in `PushDecoderStreamState::transition` to fix a Stacked Borrows violation under miri. However, miri still flags the same violation because `&mut self` is still a single opaque borrow — inlining the async block alone doesn't let miri split the disjoint field borrows across the `.await` yield point. Comet CI reproduces this reliably: https://github.com/apache/datafusion-comet/actions/runs/24518967597/job/71671004017?pr=3916 ## What changes are included in this PR? Change `PushDecoderStreamState::transition` to take `self` by value instead of `&mut self`. With `&mut self`, the generated future stores a mutable reference, and when `unfold` pins and polls it, miri sees the `&mut self` as a single opaque borrow that conflicts across the `.await` yield point. With owned `self`, the future owns the state directly — no reference means no Stacked Borrows conflict. The struct fields are all heap-allocated or reference-counted, so the move is just pointer-sized copies, not a deep copy. ## Are these changes tested? Existing tests cover this code path. The fix is validated by miri passing in CI (the same test that currently fails: `test_nested_types_extract_missing_struct_names_missing_field`). We'll run Comet CI against this branch first to confirm the miri violation is resolved before merging. ## Are there any user-facing changes? No.
1 parent 7731130 commit 93ae1b8

1 file changed

Lines changed: 13 additions & 14 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,10 +1199,7 @@ impl RowGroupsPrunedParquetOpen {
11991199
predicate_cache_records,
12001200
baseline_metrics: prepared.baseline_metrics,
12011201
},
1202-
|mut state| async move {
1203-
let result = state.transition().await;
1204-
result.map(|r| (r, state))
1205-
},
1202+
|state| async move { state.transition().await },
12061203
)
12071204
.fuse();
12081205

@@ -1248,15 +1245,15 @@ impl PushDecoderStreamState {
12481245
/// fetched from the [`AsyncFileReader`] and fed back into the decoder.
12491246
/// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned.
12501247
/// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`).
1251-
async fn transition(&mut self) -> Option<Result<RecordBatch>> {
1248+
///
1249+
/// Takes `self` by value (rather than `&mut self`) so the generated future
1250+
/// owns the state directly. This avoids a Stacked Borrows violation under
1251+
/// miri where `&mut self` creates a single opaque borrow that conflicts
1252+
/// with `unfold`'s ownership across yield points.
1253+
async fn transition(mut self) -> Option<(Result<RecordBatch>, Self)> {
12521254
loop {
12531255
match self.decoder.try_decode() {
12541256
Ok(DecodeResult::NeedsData(ranges)) => {
1255-
// IO (get_byte_ranges) and CPU (push_ranges) are still
1256-
// decoupled — they just can't live in a nested async block
1257-
// because that captures `&mut self` as one opaque borrow,
1258-
// which violates Stacked Borrows across the yield point.
1259-
// Inlining lets the compiler split the disjoint field borrows.
12601257
let data = self
12611258
.reader
12621259
.get_byte_ranges(ranges.clone())
@@ -1265,24 +1262,26 @@ impl PushDecoderStreamState {
12651262
match data {
12661263
Ok(data) => {
12671264
if let Err(e) = self.decoder.push_ranges(ranges, data) {
1268-
return Some(Err(DataFusionError::from(e)));
1265+
return Some((Err(DataFusionError::from(e)), self));
12691266
}
12701267
}
1271-
Err(e) => return Some(Err(e)),
1268+
Err(e) => return Some((Err(e), self)),
12721269
}
12731270
}
12741271
Ok(DecodeResult::Data(batch)) => {
12751272
let mut timer = self.baseline_metrics.elapsed_compute().timer();
12761273
self.copy_arrow_reader_metrics();
12771274
let result = self.project_batch(&batch);
12781275
timer.stop();
1279-
return Some(result);
1276+
// Release the borrow on baseline_metrics before moving self
1277+
drop(timer);
1278+
return Some((result, self));
12801279
}
12811280
Ok(DecodeResult::Finished) => {
12821281
return None;
12831282
}
12841283
Err(e) => {
1285-
return Some(Err(DataFusionError::from(e)));
1284+
return Some((Err(DataFusionError::from(e)), self));
12861285
}
12871286
}
12881287
}

0 commit comments

Comments
 (0)