Skip to content

Commit 3f475b1

Browse files
hcrossealamb
andauthored
feat: make BatchPartitioner::partition_iter public (#21341)
## Which issue does this PR close? - Closes #21311. ## Rationale for this change `BatchPartitioner::partition_iter` is already used internally as the core implementation behind the public `partition` method, and was intentionally factored out to support both sync and async consumption patterns. However, since it's private, downstream crates like Ballista can't use the iterator directly and are forced to run both CPU-bound partitioning and I/O together in a sync closure. ## What changes are included in this PR? Changed `partition_iter` visibility from private to public. ## Are these changes tested? The existing tests for `BatchPartitioner` cover `partition_iter` indirectly through the `partition` method, which delegates to it. No behavioral change was made. ## Are there any user-facing changes? `BatchPartitioner::partition_iter` is now part of the public API. This is a purely additive change with no breaking impact. Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 95630ed commit 3f475b1

1 file changed

Lines changed: 14 additions & 5 deletions

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -545,12 +545,21 @@ impl BatchPartitioner {
545545
})
546546
}
547547

548-
/// Actual implementation of [`partition`](Self::partition).
548+
/// Returns an iterator of `(partition_index, RecordBatch)` pairs for the given batch.
549549
///
550-
/// The reason this was pulled out is that we need to have a variant of `partition` that works w/ sync functions,
551-
/// and one that works w/ async. Using an iterator as an intermediate representation was the best way to achieve
552-
/// this (so we don't need to clone the entire implementation).
553-
fn partition_iter(
550+
/// This is useful for async consumers that want to separate CPU-bound partitioning
551+
/// from I/O. For example, you can iterate results on the async side and send them
552+
/// through a channel, while performing file I/O on a blocking task:
553+
///
554+
/// ```ignore
555+
/// for result in partitioner.partition_iter(batch)? {
556+
/// let (partition, batch) = result?;
557+
/// tx.send((partition, batch)).await?;
558+
/// }
559+
/// ```
560+
///
561+
/// The sync [`partition`](Self::partition) method is implemented on top of this.
562+
pub fn partition_iter(
554563
&mut self,
555564
batch: RecordBatch,
556565
) -> Result<impl Iterator<Item = Result<(usize, RecordBatch)>> + Send + '_> {

0 commit comments

Comments
 (0)