diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3d..21220089d0f41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1968,6 +1968,7 @@ dependencies = [ "liblzma", "log", "object_store", + "parking_lot", "rand 0.9.2", "tempfile", "tokio", diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 72549d5b8705f..16588330551a9 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,9 +24,11 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{RecordBatch, RecordBatchOptions, RecordBatchReader}; use arrow::datatypes::DataType; -use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_datasource::morsel::{ + Morsel, MorselPlan, MorselPlanner, MorselStream, Morselizer, +}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -34,16 +36,12 @@ use std::collections::HashMap; use std::fmt; use std::future::Future; use std::mem; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::sync::{Arc, Mutex}; use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::stats::Precision; -use datafusion_common::{ - ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, exec_err, -}; +use datafusion_common::{ColumnStatistics, Result, ScalarValue, Statistics, exec_err}; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -60,15 +58,13 @@ use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate}; use datafusion_common::config::EncryptionFactoryOptions; #[cfg(feature = "parquet_encryption")] use datafusion_execution::parquet_encryption::EncryptionFactory; -use futures::{ - FutureExt, Stream, StreamExt, future::BoxFuture, ready, stream::BoxStream, -}; +use futures::{FutureExt, future::BoxFuture}; use log::debug; use parquet::DecodeResult; use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; use parquet::arrow::arrow_reader::{ - ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy, + ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::parquet_column; @@ -190,10 +186,9 @@ impl Morselizer for ParquetMorselizer { /// PruneWithBloomFilters /// | /// v -/// BuildStream +/// BuildDecoder /// | -/// v -/// Done +/// Done /// ``` /// /// Note: `LoadEncryption` is only present when the `parquet_encryption` feature is @@ -218,18 +213,15 @@ enum ParquetOpenState { PrepareFilters(Box), /// Loading [Parquet Page Index](https://parquet.apache.org/docs/file-format/pageindex/) LoadPageIndex(BoxFuture<'static, Result>), - /// Pruning Row Groups + /// Pruning Row Groups with RowGroup and PageIndex statistics PruneWithStatistics(Box), /// Loading bloom filters required for row-group pruning LoadBloomFilters(BoxFuture<'static, Result>), /// Pruning with preloaded Bloom Filters PruneWithBloomFilters(Box), - /// Builds the final reader stream - /// - /// TODO: split state as this currently does both I/O and CPU work. - BuildStream(Box), - /// Terminal state: the final opened stream is ready to return. - Ready(BoxStream<'static, Result>), + /// Building and advancing the parquet push decoder until the next + /// reader-backed morsel or I/O boundary is discovered. + BuildDecoder(Box), /// Terminal state: reading complete Done, } @@ -247,8 +239,7 @@ impl fmt::Debug for ParquetOpenState { ParquetOpenState::PruneWithStatistics(_) => "PruneWithStatistics", ParquetOpenState::LoadBloomFilters(_) => "LoadBloomFilters", ParquetOpenState::PruneWithBloomFilters(_) => "PruneWithBloomFilters", - ParquetOpenState::BuildStream(_) => "BuildStream", - ParquetOpenState::Ready(_) => "Ready", + ParquetOpenState::BuildDecoder(_) => "BuildDecoder", ParquetOpenState::Done => "Done", }; f.write_str(state) @@ -393,13 +384,16 @@ impl ParquetOpenState { ParquetOpenState::LoadBloomFilters(future) => { Ok(ParquetOpenState::LoadBloomFilters(future)) } - ParquetOpenState::PruneWithBloomFilters(loaded) => Ok( - ParquetOpenState::BuildStream(Box::new(loaded.prune_bloom_filters())), - ), - ParquetOpenState::BuildStream(prepared) => { - Ok(ParquetOpenState::Ready(prepared.build_stream()?)) + ParquetOpenState::PruneWithBloomFilters(loaded) => { + Ok(ParquetOpenState::BuildDecoder(Box::new( + ParquetBuildDecoderState::NotStarted(Box::new( + loaded.prune_bloom_filters(), + )), + ))) + } + ParquetOpenState::BuildDecoder(state) => { + Ok(ParquetOpenState::BuildDecoder(state)) } - ParquetOpenState::Ready(stream) => Ok(ParquetOpenState::Ready(stream)), ParquetOpenState::Done => { panic!("ParquetOpenFuture polled after completion"); } @@ -409,12 +403,12 @@ impl ParquetOpenState { /// Implements the Morsel API struct ParquetStreamMorsel { - stream: BoxStream<'static, Result>, + reader: Box, } impl ParquetStreamMorsel { - fn new(stream: BoxStream<'static, Result>) -> Self { - Self { stream } + fn new(reader: Box) -> Self { + Self { reader } } } @@ -426,8 +420,8 @@ impl fmt::Debug for ParquetStreamMorsel { } impl Morsel for ParquetStreamMorsel { - fn into_stream(self: Box) -> BoxStream<'static, Result> { - self.stream + fn into_stream(self: Box) -> Result { + Ok(MorselStream::Sync(self.reader)) } } @@ -515,11 +509,7 @@ impl MorselPlanner for ParquetMorselPlanner { ))) }))) } - ParquetOpenState::Ready(stream) => { - let morsels: Vec> = - vec![Box::new(ParquetStreamMorsel::new(stream))]; - Ok(Some(MorselPlan::new().with_morsels(morsels))) - } + ParquetOpenState::BuildDecoder(state) => state.plan(), ParquetOpenState::Done => Ok(None), cpu_state => Ok(Some( MorselPlan::new() @@ -1055,8 +1045,8 @@ impl BloomFiltersLoadedParquetOpen { } impl RowGroupsPrunedParquetOpen { - /// Build the final parquet stream once all pruning work is complete. - fn build_stream(self) -> Result>> { + /// Build the decoder-backed planner state once all pruning work is complete. + fn build_decoder(self) -> Result { let RowGroupsPrunedParquetOpen { prepared, mut row_groups, @@ -1184,112 +1174,92 @@ impl RowGroupsPrunedParquetOpen { .projection .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; let projector = projection.make_projector(&stream_schema)?; - let output_schema = Arc::clone(&prepared.output_schema); - let files_ranges_pruned_statistics = - prepared.file_metrics.files_ranges_pruned_statistics.clone(); - let stream = futures::stream::unfold( - PushDecoderStreamState { - decoder, - reader: prepared.async_file_reader, - projector, - output_schema, + let file_pruner = prepared.file_pruner.map(|file_pruner| { + Arc::new(SharedFilePruner::new( + file_pruner, + prepared.file_metrics.files_ranges_pruned_statistics.clone(), + )) + }); + + Ok(ParquetDecoderPlannerState { + decoder, + reader: prepared.async_file_reader, + context: Arc::new(ParquetDecoderContext { + projector: Arc::new(projector), + output_schema: Arc::clone(&prepared.output_schema), replace_schema, arrow_reader_metrics, predicate_cache_inner_records, predicate_cache_records, baseline_metrics: prepared.baseline_metrics, - }, - |mut state| async move { - let result = state.transition().await; - result.map(|r| (r, state)) - }, - ) - .fuse(); - - // Wrap the stream so a dynamic filter can stop the file scan early. - if let Some(file_pruner) = prepared.file_pruner { - let stream = stream.boxed(); - Ok(EarlyStoppingStream::new( - stream, file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } else { - Ok(stream.boxed()) + }), + }) + } +} + +/// State carried by [`ParquetOpenState::BuildDecoder`]. +/// +/// The first time this state is planned it finishes CPU-only decoder +/// construction. Subsequent calls keep the planner in `BuildDecoder` while it +/// repeatedly asks `ParquetPushDecoder::try_next_reader` for either a ready +/// reader-backed morsel or the next I/O boundary. +enum ParquetBuildDecoderState { + NotStarted(Box), + Ready(Box), +} + +impl ParquetBuildDecoderState { + fn plan(mut self: Box) -> Result> { + loop { + match *self { + Self::NotStarted(prepared) => { + self = Box::new(Self::Ready(Box::new(prepared.build_decoder()?))); + } + Self::Ready(state) => return state.plan(), + } } } } -/// State for a stream that decodes a single Parquet file using a push-based decoder. +/// Shared decode/projection context reused by all morsels for one parquet file. /// -/// The [`transition`](Self::transition) method drives the decoder in a loop: it requests -/// byte ranges from the [`AsyncFileReader`], pushes the fetched data into the -/// [`ParquetPushDecoder`], and yields projected [`RecordBatch`]es until the file is -/// fully consumed. -struct PushDecoderStreamState { - decoder: ParquetPushDecoder, - reader: Box, - projector: Projector, +/// The planner can split a file into multiple reader-backed morsels, but the +/// projection logic, metrics plumbing, and optional late file-pruning behavior +/// must remain consistent across all of them. This context holds that shared +/// immutable state. +struct ParquetDecoderContext { + projector: Arc, output_schema: Arc, replace_schema: bool, arrow_reader_metrics: ArrowReaderMetrics, predicate_cache_inner_records: Gauge, predicate_cache_records: Gauge, baseline_metrics: BaselineMetrics, + file_pruner: Option>, } -impl PushDecoderStreamState { - /// Advances the decoder state machine until the next [`RecordBatch`] is - /// produced, the file is fully consumed, or an error occurs. - /// - /// On each iteration the decoder is polled via [`ParquetPushDecoder::try_decode`]: - /// - [`NeedsData`](DecodeResult::NeedsData) – the requested byte ranges are - /// fetched from the [`AsyncFileReader`] and fed back into the decoder. - /// - [`Data`](DecodeResult::Data) – a decoded batch is projected and returned. - /// - [`Finished`](DecodeResult::Finished) – signals end-of-stream (`None`). - async fn transition(&mut self) -> Option> { - loop { - match self.decoder.try_decode() { - Ok(DecodeResult::NeedsData(ranges)) => { - // IO (get_byte_ranges) and CPU (push_ranges) are still - // decoupled — they just can't live in a nested async block - // because that captures `&mut self` as one opaque borrow, - // which violates Stacked Borrows across the yield point. - // Inlining lets the compiler split the disjoint field borrows. - let data = self - .reader - .get_byte_ranges(ranges.clone()) - .await - .map_err(DataFusionError::from); - match data { - Ok(data) => { - if let Err(e) = self.decoder.push_ranges(ranges, data) { - return Some(Err(DataFusionError::from(e))); - } - } - Err(e) => return Some(Err(e)), - } - } - Ok(DecodeResult::Data(batch)) => { - let mut timer = self.baseline_metrics.elapsed_compute().timer(); - self.copy_arrow_reader_metrics(); - let result = self.project_batch(&batch); - timer.stop(); - return Some(result); - } - Ok(DecodeResult::Finished) => { - return None; - } - Err(e) => { - return Some(Err(DataFusionError::from(e))); - } - } +impl ParquetDecoderContext { + fn decode_batch( + &self, + batch: std::result::Result, + ) -> Result> { + let mut timer = self.baseline_metrics.elapsed_compute().timer(); + self.copy_arrow_reader_metrics(); + let batch = self.project_batch(&batch?)?; + timer.stop(); + + // Re-check dynamic file pruning after each decoded batch so a filter + // that became selective mid-scan can terminate the rest of the file. + if let Some(file_pruner) = &self.file_pruner + && file_pruner.should_prune()? + { + return Ok(None); } + + Ok(Some(batch)) } - /// Copies metrics from ArrowReaderMetrics (the metrics collected by the - /// arrow-rs parquet reader) to the parquet file metrics for DataFusion fn copy_arrow_reader_metrics(&self) { if let Some(v) = self.arrow_reader_metrics.records_read_from_inner() { self.predicate_cache_inner_records.set(v); @@ -1322,6 +1292,182 @@ impl PushDecoderStreamState { } } +/// Synchronous reader returned from parquet morsels. +/// +/// The underlying parquet reader is already fully I/O-ready. This wrapper keeps +/// the CPU-only post-processing that DataFusion still needs for each batch: +/// expression projection, schema replacement, metrics propagation, and optional +/// late file pruning. +struct ProjectedParquetRecordBatchReader { + reader: ParquetRecordBatchReader, + context: Arc, + done: bool, +} + +impl Iterator for ProjectedParquetRecordBatchReader { + type Item = arrow::error::Result; + + fn next(&mut self) -> Option { + if self.done { + return None; + } + + match self.reader.next() { + Some(batch) => match self.context.decode_batch(batch) { + Ok(Some(batch)) => Some(Ok(batch)), + Ok(None) => { + self.done = true; + None + } + Err(err) => { + self.done = true; + Some(Err(err.into())) + } + }, + None => { + self.done = true; + None + } + } + } +} + +impl RecordBatchReader for ProjectedParquetRecordBatchReader { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.context.output_schema) + } +} + +/// Planner state after parquet setup and pruning have completed. +/// +/// This is the parquet-specific bridge between the existing morsel API and the +/// push decoder's `try_next_reader` API. Each call either produces one +/// reader-backed morsel, discovers the next I/O boundary, or finishes the +/// file. The planner itself stays in `BuildDecoder` the whole time. +struct ParquetDecoderPlannerState { + decoder: ParquetPushDecoder, + reader: Box, + context: Arc, +} + +impl ParquetDecoderPlannerState { + fn plan(mut self: Box) -> Result> { + match self.decoder.try_next_reader()? { + DecodeResult::NeedsData(ranges) => { + if self.should_prune()? { + Ok(None) + } else { + Ok(Some(self.schedule_io(ranges))) + } + } + DecodeResult::Data(reader) => { + if self.should_prune()? { + return Ok(None); + } + let morsels: Vec> = + vec![Box::new(ParquetStreamMorsel::new(Box::new( + ProjectedParquetRecordBatchReader { + reader, + context: Arc::clone(&self.context), + done: false, + }, + )))]; + Ok(Some(MorselPlan::new().with_morsels(morsels).with_planners( + vec![Box::new(ParquetMorselPlanner { + state: ParquetOpenState::BuildDecoder(Box::new( + ParquetBuildDecoderState::Ready(self), + )), + })], + ))) + } + DecodeResult::Finished => Ok(None), + } + } + + fn should_prune(&self) -> Result { + if let Some(file_pruner) = &self.context.file_pruner { + file_pruner.should_prune() + } else { + Ok(false) + } + } + + fn schedule_io(self: Box, ranges: Vec>) -> MorselPlan { + MorselPlan::new().with_pending_planner(self.into_pending_planner(ranges)) + } + + async fn into_pending_planner( + self: Box, + ranges: Vec>, + ) -> Result> { + let state = self.fetch_ranges(ranges).await?; + Ok(Box::new(ParquetMorselPlanner { + state: ParquetOpenState::BuildDecoder(Box::new( + ParquetBuildDecoderState::Ready(state), + )), + }) as Box) + } + + async fn fetch_ranges( + mut self: Box, + ranges: Vec>, + ) -> Result> { + let data = self.reader.get_byte_ranges(ranges.clone()).await?; + self.decoder.push_ranges(ranges, data)?; + Ok(self) + } +} + +impl fmt::Debug for ParquetDecoderPlannerState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetDecoderPlannerState") + .finish_non_exhaustive() + } +} + +struct SharedFilePruner { + state: Mutex, + files_ranges_pruned_statistics: PruningMetrics, +} + +struct SharedFilePrunerState { + file_pruner: FilePruner, + pruned: bool, +} + +impl SharedFilePruner { + fn new( + file_pruner: FilePruner, + files_ranges_pruned_statistics: PruningMetrics, + ) -> Self { + Self { + state: Mutex::new(SharedFilePrunerState { + file_pruner, + pruned: false, + }), + files_ranges_pruned_statistics, + } + } + + fn should_prune(&self) -> Result { + let mut state = self.state.lock().expect("shared file pruner lock poisoned"); + if state.pruned { + return Ok(true); + } + + // Multiple morsels for the same file can consult this pruner. Once a + // dynamic filter proves the file can be skipped, remember that result + // and adjust the file-level pruning metrics exactly once. + if state.file_pruner.should_prune()? { + self.files_ranges_pruned_statistics.add_pruned(1); + self.files_ranges_pruned_statistics.subtract_matched(1); + state.pruned = true; + } + + Ok(state.pruned) + } +} + type ConstantColumns = HashMap; /// Extract constant column values from statistics, keyed by column name in the logical file schema. @@ -1384,85 +1530,6 @@ fn constant_value_from_stats( None } -/// Wraps an inner RecordBatchStream and a [`FilePruner`] -/// -/// This can terminate the scan early when some dynamic filters is updated after -/// the scan starts, so we discover after the scan starts that the file can be -/// pruned (can't have matching rows). -struct EarlyStoppingStream { - /// Has the stream finished processing? All subsequent polls will return - /// None - done: bool, - file_pruner: FilePruner, - files_ranges_pruned_statistics: PruningMetrics, - /// The inner stream - inner: S, -} - -impl EarlyStoppingStream { - pub fn new( - stream: S, - file_pruner: FilePruner, - files_ranges_pruned_statistics: PruningMetrics, - ) -> Self { - Self { - done: false, - inner: stream, - file_pruner, - files_ranges_pruned_statistics, - } - } -} - -impl EarlyStoppingStream -where - S: Stream> + Unpin, -{ - fn check_prune(&mut self, input: Result) -> Result> { - let batch = input?; - - // Since dynamic filters may have been updated, see if we can stop - // reading this stream entirely. - if self.file_pruner.should_prune()? { - self.files_ranges_pruned_statistics.add_pruned(1); - // Previously this file range has been counted as matched - self.files_ranges_pruned_statistics.subtract_matched(1); - self.done = true; - Ok(None) - } else { - // Return the adapted batch - Ok(Some(batch)) - } - } -} - -impl Stream for EarlyStoppingStream -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - if self.done { - return Poll::Ready(None); - } - match ready!(self.inner.poll_next_unpin(cx)) { - None => { - // input done - self.done = true; - Poll::Ready(None) - } - Some(input_batch) => { - let output = self.check_prune(input_batch); - Poll::Ready(output.transpose()) - } - } - } -} - #[derive(Default)] struct EncryptionContext { #[cfg(feature = "parquet_encryption")] @@ -1633,7 +1700,7 @@ mod test { ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, stats::Precision, }; - use datafusion_datasource::morsel::{Morsel, Morselizer}; + use datafusion_datasource::morsel::{Morsel, MorselPlan, Morselizer}; use datafusion_datasource::{PartitionedFile, TableSchema}; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ @@ -1822,7 +1889,7 @@ mod test { } /// Test helper that drives a [`ParquetMorselizer`] to completion and returns - /// the first stream morsel it produces. + /// a stream containing the output from all morsels in logical order. /// /// This mirrors how `FileStream` consumes the morsel APIs: it repeatedly /// plans CPU work, awaits any discovered I/O futures, and feeds the planner @@ -1833,14 +1900,27 @@ mod test { ) -> Result>> { let mut planners = VecDeque::from([morselizer.plan_file(file)?]); let mut morsels: VecDeque> = VecDeque::new(); + let mut batches = Vec::new(); loop { if let Some(morsel) = morsels.pop_front() { - return Ok(Box::pin(morsel.into_stream())); + match morsel.into_stream()? { + MorselStream::Async(mut stream) => { + while let Some(batch) = stream.next().await { + batches.push(batch); + } + } + MorselStream::Sync(reader) => { + for batch in reader { + batches.push(Ok(batch?)); + } + } + }; + continue; } let Some(planner) = planners.pop_front() else { - return Ok(Box::pin(futures::stream::empty())); + return Ok(Box::pin(futures::stream::iter(batches))); }; if let Some(mut plan) = planner.plan()? { @@ -1859,6 +1939,39 @@ mod test { } } + /// Drives planning until the first plan that yields at least one morsel. + async fn first_morsel_plan( + morselizer: &ParquetMorselizer, + file: PartitionedFile, + ) -> Result { + let mut planners = VecDeque::from([morselizer.plan_file(file)?]); + + loop { + let Some(planner) = planners.pop_front() else { + return internal_err!("planner finished without producing a morsel"); + }; + + if let Some(mut plan) = planner.plan()? { + let morsels = plan.take_morsels(); + if !morsels.is_empty() { + plan = plan.with_morsels(morsels); + return Ok(plan); + } + + planners.extend(plan.take_ready_planners()); + + if let Some(pending_planner) = plan.take_pending_planner() { + planners.push_front(pending_planner.await?); + continue; + } + + if planners.is_empty() { + return internal_err!("planner returned an empty morsel plan"); + } + } + } + } + fn constant_int_stats() -> (Statistics, SchemaRef) { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), @@ -2070,6 +2183,46 @@ mod test { assert_eq!(num_rows, 0); } + #[tokio::test] + async fn test_first_parquet_morsel_returns_sync_reader_and_continues_build_decoder() { + let store = Arc::new(InMemory::new()) as Arc; + + let batch = record_batch!(("a", Int32, vec![1, 2, 3, 4, 5, 6])).unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(2)) + .build(); + let data_size = write_parquet_batches( + Arc::clone(&store), + "prefetch.parquet", + vec![batch], + Some(props), + ) + .await; + + let file = PartitionedFile::new("prefetch.parquet", data_size as u64); + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let opener = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(schema) + .with_projection_indices(&[0]) + .with_reverse_row_groups(false) + .build(); + + let mut plan = first_morsel_plan(&opener, file).await.unwrap(); + let mut morsels = plan.take_morsels(); + assert_eq!(morsels.len(), 1, "expected one ready parquet morsel"); + match morsels.pop().unwrap().into_stream().unwrap() { + MorselStream::Sync(_) => {} + MorselStream::Async(_) => { + panic!("parquet morsel should now expose a sync reader") + } + } + assert!( + !plan.take_ready_planners().is_empty(), + "first parquet morsel plan should keep BuildDecoder ready to continue" + ); + } + #[tokio::test] async fn test_prune_on_partition_statistics_with_dynamic_expression() { let store = Arc::new(InMemory::new()) as Arc; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 4027521658977..40e2271f45205 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -64,6 +64,7 @@ itertools = { workspace = true } liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } +parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index c1afd35da31ab..21b977a963ffc 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -24,7 +24,8 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, - source::DataSource, statistics::MinMaxStatistics, + file_stream::work_source::SharedWorkSource, source::DataSource, + statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; @@ -38,6 +39,7 @@ use datafusion_execution::{ }; use datafusion_expr::Operator; +use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -580,6 +582,15 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { + self.open_with_args(OpenArgs::new(partition, context)) + } + + fn open_with_args(&self, args: OpenArgs) -> Result { + let OpenArgs { + partition, + context, + sibling_state, + } = args; let object_store = context.runtime_env().object_store(&self.object_store_url)?; let batch_size = self .batch_size @@ -589,8 +600,17 @@ impl DataSource for FileScanConfig { let morselizer = source.create_morselizer(object_store, self, partition)?; + // Extract the shared work source from the sibling state if it exists. + // This allows multiple sibling streams to steal work from a single + // shared queue of unopened files. + let shared_work_source = sibling_state + .as_ref() + .and_then(|state| state.downcast_ref::()) + .cloned(); + let stream = FileStreamBuilder::new(self) .with_partition(partition) + .with_shared_work_source(shared_work_source) .with_morselizer(morselizer) .with_metrics(source.metrics()) .build()?; @@ -991,6 +1011,20 @@ impl DataSource for FileScanConfig { // Delegate to the file source self.file_source.apply_expressions(f) } + + /// Create any shared state that should be passed between sibling streams + /// during one execution. + /// + /// This returns `None` when sibling streams must not share work, such as + /// when file order must be preserved or the file groups define the output + /// partitioning needed for the rest of the plan + fn create_sibling_state(&self) -> Option> { + if self.preserve_order || self.partitioned_by_file_group { + return None; + } + + Some(Arc::new(SharedWorkSource::from_config(self)) as Arc) + } } impl FileScanConfig { @@ -1368,19 +1402,33 @@ mod tests { use super::*; use crate::TableSchema; + use crate::source::DataSourceExec; use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; + use arrow::array::{Int32Array, RecordBatch}; use arrow::datatypes::Field; use datafusion_common::ColumnStatistics; use datafusion_common::stats::Precision; + use datafusion_common::tree_node::TreeNodeRecursion; + use datafusion_common::{Result, assert_batches_eq, internal_err}; + use datafusion_execution::TaskContext; use datafusion_expr::SortExpr; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExpr; + use datafusion_physical_expr::projection::ProjectionExprs; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::execution_plan::collect; + use futures::FutureExt as _; + use futures::StreamExt as _; + use futures::stream; + use object_store::ObjectStore; + use std::fmt::Debug; #[derive(Clone)] struct InexactSortPushdownSource { @@ -1400,7 +1448,7 @@ mod tests { impl FileSource for InexactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { @@ -2288,6 +2336,88 @@ mod tests { assert_eq!(partition_stats.total_byte_size, Precision::Exact(800)); } + /// Regression test for reusing a `DataSourceExec` after its execution-local + /// shared work queue has been drained. + /// + /// This test uses a single file group with two files so the scan creates a + /// shared unopened-file queue. Executing after `reset_state` must recreate + /// the shared queue and return the same rows again. + #[tokio::test] + async fn reset_state_recreates_shared_work_source() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let file_source = Arc::new( + MockSource::new(Arc::clone(&schema)) + .with_file_opener(Arc::new(ResetStateTestFileOpener { schema })), + ); + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_group(FileGroup::new(vec![ + PartitionedFile::new("file1.parquet", 100), + PartitionedFile::new("file2.parquet", 100), + ])) + .build(); + + let exec: Arc = DataSourceExec::from_data_source(config); + let task_ctx = Arc::new(TaskContext::default()); + + // Running the same scan after resetting the state, should + // produce the same answer. + let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?; + let reset_exec = exec.reset_state()?; + let second_run = collect(reset_exec, task_ctx).await?; + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| 1 |", + "| 2 |", + "+-------+", + ]; + assert_batches_eq!(expected, &first_run); + assert_batches_eq!(expected, &second_run); + + Ok(()) + } + + /// Test-only `FileOpener` that turns file names like `file1.parquet` into a + /// single-batch stream containing that numeric value + #[derive(Debug)] + struct ResetStateTestFileOpener { + schema: SchemaRef, + } + + impl crate::file_stream::FileOpener for ResetStateTestFileOpener { + fn open( + &self, + file: PartitionedFile, + ) -> Result { + let value = file + .object_meta + .location + .as_ref() + .trim_start_matches("file") + .trim_end_matches(".parquet") + .parse::() + .expect("invalid test file name"); + let schema = Arc::clone(&self.schema); + Ok(async move { + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![value]))], + ) + .expect("test batch should be valid"); + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } + .boxed()) + } + } + #[test] fn test_output_partitioning_not_partitioned_by_file_group() { let file_schema = aggr_test_schema(); @@ -2473,7 +2603,7 @@ mod tests { impl FileSource for ExactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index efe9c39ce3b38..7034e902550a9 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; use crate::file_stream::scan_state::ScanState; +use crate::file_stream::work_source::{SharedWorkSource, WorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -33,10 +34,11 @@ pub struct FileStreamBuilder<'a> { morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, + shared_work_source: Option, } impl<'a> FileStreamBuilder<'a> { - /// Create a new builder. + /// Create a new builder for [`FileStream`]. pub fn new(config: &'a FileScanConfig) -> Self { Self { config, @@ -44,6 +46,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer: None, metrics: None, on_error: OnError::Fail, + shared_work_source: None, } } @@ -81,6 +84,15 @@ impl<'a> FileStreamBuilder<'a> { self } + /// Configure the [`SharedWorkSource`] for sibling work stealing. + pub(crate) fn with_shared_work_source( + mut self, + shared_work_source: Option, + ) -> Self { + self.shared_work_source = shared_work_source; + self + } + /// Build the configured [`FileStream`]. pub fn build(self) -> Result { let Self { @@ -89,6 +101,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer, metrics, on_error, + shared_work_source, } = self; let Some(partition) = partition else { @@ -106,10 +119,14 @@ impl<'a> FileStreamBuilder<'a> { "FileStreamBuilder invalid partition index: {partition}" ); }; + let work_source = match shared_work_source { + Some(shared) => WorkSource::Shared(shared), + None => WorkSource::Local(file_group.into_inner().into()), + }; let file_stream_metrics = FileStreamMetrics::new(metrics, partition); let scan_state = Box::new(ScanState::new( - file_group.into_inner(), + work_source, config.limit, morselizer, on_error, diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index ff71f16023080..e277690cff810 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -24,6 +24,7 @@ mod builder; mod metrics; mod scan_state; +pub(crate) mod work_source; use std::pin::Pin; use std::sync::Arc; @@ -175,6 +176,7 @@ mod tests { IoFutureId, MockMorselizer, MockPlanBuilder, MockPlanner, MorselId, PendingPlannerBuilder, PollsToResolve, }; + use crate::source::DataSource; use crate::tests::make_partition; use crate::{PartitionedFile, TableSchema}; use arrow::array::{AsArray, RecordBatch}; @@ -184,14 +186,22 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{FutureExt as _, StreamExt as _}; + use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; + use crate::file_stream::{ + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, + work_source::SharedWorkSource, + }; use crate::test_util::MockSource; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; + /// Test identifier for one `FileStream` partition. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct PartitionId(usize); + /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] struct TestOpener { @@ -758,8 +768,8 @@ mod tests { async fn morsel_two_ios_one_batch() -> Result<()> { let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") - .add_plan(PendingPlannerBuilder::new(IoFutureId(1)).build()) - .add_plan(PendingPlannerBuilder::new(IoFutureId(2)).build()) + .add_plan(PendingPlannerBuilder::new(IoFutureId(1))) + .add_plan(PendingPlannerBuilder::new(IoFutureId(2))) .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) .return_none(), ); @@ -871,8 +881,7 @@ mod tests { async fn morsel_ready_child_planner() -> Result<()> { let child_planner = MockPlanner::builder("child planner") .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) - .return_none() - .build(); + .return_none(); let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") @@ -1001,11 +1010,331 @@ mod tests { Ok(()) } - /// Tests how FileStream opens and processes files. + /// Return a morsel test with two partitions: + /// Partition 0: file1, file2, file3 + /// Partition 1: file4 + /// + /// Partition 1 has only 1 file but it polled first 4 times + fn two_partition_morsel_test() -> FileStreamMorselTest { + FileStreamMorselTest::new() + // Partition 0 has three files + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 1 has only one file, but is polled first + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file4.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13), 201)) + .return_none(), + ) + .with_reads(vec![ + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + ]) + } + + /// Verifies that an idle sibling stream can steal shared files from + /// another stream once it exhausts its own local work. + #[tokio::test] + async fn morsel_shared_files_can_be_stolen() -> Result<()> { + let test = two_partition_morsel_test().with_file_stream_events(false); + + // Partition 0 starts with 3 files, but Partition 1 is polled first. + // Since Partition 1 is polled first, it will run all the files even those + // that were assigned to Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that a stream that must preserve order keeps its files local + /// and therefore cannot steal from a sibling shared queue. + #[tokio::test] + async fn morsel_preserve_order_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-order + let test = two_partition_morsel_test() + .with_preserve_order(true) + .with_file_stream_events(false); + + // Even though that Partition 1 is polled first, it can not steal files + // from partition 0. The three files originally assigned to Partition 0 + // must be evaluated by Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that `partitioned_by_file_group` disables shared work stealing. + #[tokio::test] + async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-partitioned + let test = two_partition_morsel_test() + .with_partitioned_by_file_group(true) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that an empty sibling can immediately steal shared files when + /// it is polled before the stream that originally owned them. + #[tokio::test] + async fn morsel_empty_sibling_can_steal() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Poll the empty sibling first so it steals both files. + .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Ensures that if a sibling is built and polled + /// before another sibling has been built and contributed its files to the + /// shared queue, the first sibling does not finish prematurely. + #[tokio::test] + async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Build streams lazily so partition 1 can poll the shared queue + // before partition 0 has contributed its files. Once partition 0 + // is built, a later poll of partition 1 can still steal one of + // them from the shared queue. + .with_build_streams_on_first_read(true) + .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)]) + .with_file_stream_events(false); + + // Partition 1 polls too early once, then later steals one file after + // partition 0 has populated the shared queue. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 102 + Done + ----- Partition 1 ----- + Batch: 101 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that a sibling hitting its limit does not count shared files + /// left in the queue as already processed by that stream. + #[tokio::test] + async fn morsel_shared_limit_does_not_double_count_files_processed() -> Result<()> { + let test = two_partition_morsel_test(); + let unlimited_config = test.test_config(); + let limited_config = test.clone().with_limit(1).test_config(); + let shared_work_source = limited_config + .create_sibling_state() + .and_then(|state| state.as_ref().downcast_ref::().cloned()) + .expect("shared work source"); + let limited_metrics = ExecutionPlanMetricsSet::new(); + let unlimited_metrics = ExecutionPlanMetricsSet::new(); + + let limited_stream = FileStreamBuilder::new(&limited_config) + .with_partition(1) + .with_shared_work_source(Some(shared_work_source.clone())) + .with_morselizer(Box::new(test.morselizer.clone())) + .with_metrics(&limited_metrics) + .build()?; + + let unlimited_stream = FileStreamBuilder::new(&unlimited_config) + .with_partition(0) + .with_shared_work_source(Some(shared_work_source)) + .with_morselizer(Box::new(test.morselizer)) + .with_metrics(&unlimited_metrics) + .build()?; + + let limited_output = drain_stream_output(limited_stream).await?; + let unlimited_output = drain_stream_output(unlimited_stream).await?; + + insta::assert_snapshot!(format!( + "----- Limited Stream -----\n{limited_output}\n----- Unlimited Stream -----\n{unlimited_output}" + ), @r" + ----- Limited Stream ----- + Batch: 101 + ----- Unlimited Stream ----- + Batch: 102 + Batch: 103 + Batch: 201 + "); + + assert_eq!( + metric_count(&limited_metrics, "files_opened"), + 1, + "the limited stream should only open the file that produced its output" + ); + assert_eq!( + metric_count(&limited_metrics, "files_processed"), + 1, + "the limited stream should only mark its own file as processed" + ); + assert_eq!( + metric_count(&unlimited_metrics, "files_opened"), + 3, + "the draining stream should open the remaining shared files" + ); + assert_eq!( + metric_count(&unlimited_metrics, "files_processed"), + 3, + "the draining stream should process exactly the files it opened" + ); + + Ok(()) + } + + /// Verifies that one fast sibling can drain shared files that originated + /// in more than one other partition. + #[tokio::test] + async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + // Partition 1 has two files + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 2 starts empty but is polled first, so it should drain + // the shared queue across both sibling partitions. + .with_reads(vec![ + PartitionId(2), + PartitionId(2), + PartitionId(1), + PartitionId(2), + ]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 103 + Done + ----- Partition 2 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Tests how one or more `FileStream`s consume morselized file work. #[derive(Clone)] struct FileStreamMorselTest { morselizer: MockMorselizer, - file_names: Vec, + partition_files: BTreeMap>, + preserve_order: bool, + partitioned_by_file_group: bool, + file_stream_events: bool, + build_streams_on_first_read: bool, + reads: Vec, limit: Option, } @@ -1014,75 +1343,238 @@ mod tests { fn new() -> Self { Self { morselizer: MockMorselizer::new(), - file_names: vec![], + partition_files: BTreeMap::new(), + preserve_order: false, + partitioned_by_file_group: false, + file_stream_events: true, + build_streams_on_first_read: false, + reads: vec![], limit: None, } } - /// Adds one file and its root planner to the test input. - fn with_file(mut self, planner: impl Into) -> Self { + /// Adds one file and its root planner to partition 0. + fn with_file(self, planner: impl Into) -> Self { + self.with_file_in_partition(PartitionId(0), planner) + } + + /// Adds one file and its root planner to the specified input partition. + fn with_file_in_partition( + mut self, + partition: PartitionId, + planner: impl Into, + ) -> Self { let planner = planner.into(); - self.file_names.push(planner.file_path().to_string()); - self.morselizer = self.morselizer.with_file(planner); + let file_path = planner.file_path().to_string(); + self.morselizer = self.morselizer.with_planner(planner); + self.partition_files + .entry(partition) + .or_default() + .push(file_path); self } - /// Sets a global output limit for the stream. + /// Marks the stream (and all partitions) to preserve the specified file + /// order. + fn with_preserve_order(mut self, preserve_order: bool) -> Self { + self.preserve_order = preserve_order; + self + } + + /// Marks the test scan as pre-partitioned by file group, which should + /// force each stream to keep its own files local. + fn with_partitioned_by_file_group( + mut self, + partitioned_by_file_group: bool, + ) -> Self { + self.partitioned_by_file_group = partitioned_by_file_group; + self + } + + /// Controls whether scheduler events are included in the snapshot. + /// + /// When disabled, `run()` still includes the event section header but + /// replaces the trace with a fixed placeholder so tests can focus only + /// on the output batches. + fn with_file_stream_events(mut self, file_stream_events: bool) -> Self { + self.file_stream_events = file_stream_events; + self + } + + /// Controls whether streams are all built up front or lazily on their + /// first read. + /// + /// The default builds all streams before polling begins, which matches + /// normal execution. Tests may enable lazy creation to model races + /// where one sibling polls before another has contributed its files to + /// the shared queue. + fn with_build_streams_on_first_read( + mut self, + build_streams_on_first_read: bool, + ) -> Self { + self.build_streams_on_first_read = build_streams_on_first_read; + self + } + + /// Sets the partition polling order. + /// + /// `run()` polls these partitions in the listed order first. After + /// those explicit reads are exhausted, it completes to round + /// robin across all configured partitions, skipping any streams that + /// have already finished. + /// + /// This allows testing early scheduling decisions explicit in a test + /// while avoiding a fully scripted poll trace for the remainder. + fn with_reads(mut self, reads: Vec) -> Self { + self.reads = reads; + self + } + + /// Sets a global output limit for all streams created by this test. fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } - /// Runs the test returns combined output and scheduler trace text as a String. + /// Runs the test and returns combined stream output and scheduler + /// trace text. async fn run(self) -> Result { let observer = self.morselizer.observer().clone(); observer.clear(); - let config = self.test_config(); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut stream = FileStreamBuilder::new(&config) - .with_partition(0) - .with_morselizer(Box::new(self.morselizer)) - .with_metrics(&metrics_set) - .build()?; + let partition_count = self.num_partitions(); - let mut stream_contents = Vec::new(); - while let Some(result) = stream.next().await { - match result { - Ok(batch) => { - let col = batch.column(0).as_primitive::(); - let batch_id = col.value(0); - stream_contents.push(format!("Batch: {batch_id}")); - } - Err(e) => { - // Pull the actual message for external errors rather than - // relying on DataFusionError formatting, which changes - // if backtraces are enabled, etc - let message = if let DataFusionError::External(generic) = e { - generic.to_string() - } else { - e.to_string() - }; - stream_contents.push(format!("Error: {message}")); - } + let mut partitions = (0..partition_count) + .map(|_| PartitionState::new()) + .collect::>(); + + let mut build_order = Vec::new(); + for partition in self.reads.iter().map(|partition| partition.0) { + if !build_order.contains(&partition) { + build_order.push(partition); + } + } + for partition in 0..partition_count { + if !build_order.contains(&partition) { + build_order.push(partition); } } - stream_contents.push("Done".to_string()); - Ok(format!( - "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", - stream_contents.join("\n"), + let config = self.test_config(); + // `DataSourceExec::execute` creates one execution-local shared + // state object via `create_sibling_state()` and then passes it + // to `open_with_sibling_state(...)`. These tests build + // `FileStream`s directly, bypassing `DataSourceExec`, so they must + // perform the same setup explicitly when exercising sibling-stream + // work stealing. + let shared_work_source = config.create_sibling_state().and_then(|state| { + state.as_ref().downcast_ref::().cloned() + }); + if !self.build_streams_on_first_read { + for partition in build_order { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partitions[partition].set_stream(stream); + } + } + + let mut initial_reads: VecDeque<_> = self.reads.into(); + let mut next_round_robin = 0; + + while !initial_reads.is_empty() + || partitions.iter().any(PartitionState::is_active) + { + let partition = if let Some(partition) = initial_reads.pop_front() { + partition.0 + } else { + let partition = next_round_robin; + next_round_robin = (next_round_robin + 1) % partition_count.max(1); + partition + }; + + let partition_state = &mut partitions[partition]; + + if self.build_streams_on_first_read && !partition_state.built { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partition_state.set_stream(stream); + } + + let Some(stream) = partition_state.stream.as_mut() else { + continue; + }; + + match stream.next().await { + Some(result) => partition_state.push_output(format_result(result)), + None => partition_state.finish(), + } + } + + let output_text = if partition_count == 1 { + format!( + "----- Output Stream -----\n{}", + partitions[0].output.join("\n") + ) + } else { + partitions + .into_iter() + .enumerate() + .map(|(partition, state)| { + format!( + "----- Partition {} -----\n{}", + partition, + state.output.join("\n") + ) + }) + .collect::>() + .join("\n") + }; + + let file_stream_events = if self.file_stream_events { observer.format_events() + } else { + "(omitted due to with_file_stream_events(false))".to_string() + }; + + Ok(format!( + "{output_text}\n----- File Stream Events -----\n{file_stream_events}", )) } - /// Builds the `FileScanConfig` for the configured mock file set. + /// Returns the number of configured partitions, including empty ones + /// that appear only in the explicit read schedule. + fn num_partitions(&self) -> usize { + self.partition_files + .keys() + .map(|partition| partition.0 + 1) + .chain(self.reads.iter().map(|partition| partition.0 + 1)) + .max() + .unwrap_or(1) + } + + /// Builds a `FileScanConfig` covering every configured partition. fn test_config(&self) -> FileScanConfig { - let file_group = self - .file_names - .iter() - .map(|name| PartitionedFile::new(name, 10)) - .collect(); + let file_groups = (0..self.num_partitions()) + .map(|partition| { + self.partition_files + .get(&PartitionId(partition)) + .into_iter() + .flat_map(|files| files.iter()) + .map(|name| PartitionedFile::new(name, 10)) + .collect::>() + .into() + }) + .collect::>(); + let table_schema = TableSchema::new( Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), vec![], @@ -1091,9 +1583,94 @@ mod tests { ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), ) - .with_file_group(file_group) + .with_file_groups(file_groups) .with_limit(self.limit) + .with_preserve_order(self.preserve_order) + .with_partitioned_by_file_group(self.partitioned_by_file_group) .build() } } + + /// Formats one stream poll result into a stable snapshot line. + fn format_result(result: Result) -> String { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + format!("Batch: {batch_id}") + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes if + // backtraces are enabled, etc. + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + format!("Error: {message}") + } + } + } + + async fn drain_stream_output(stream: FileStream) -> Result { + let output = stream + .collect::>() + .await + .into_iter() + .map(|result| result.map(|batch| format_result(Ok(batch)))) + .collect::>>()?; + Ok(output.join("\n")) + } + + fn metric_count(metrics: &ExecutionPlanMetricsSet, name: &str) -> usize { + metrics + .clone_inner() + .sum_by_name(name) + .unwrap_or_else(|| panic!("missing metric: {name}")) + .as_usize() + } + + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. + struct PartitionState { + /// Whether the `FileStream` for this partition has been built yet. + built: bool, + /// The live stream, if this partition has not finished yet. + stream: Option, + /// Snapshot lines produced by this partition. + output: Vec, + } + + impl PartitionState { + /// Create an unbuilt partition with no output yet. + fn new() -> Self { + Self { + built: false, + stream: None, + output: vec![], + } + } + + /// Returns true if this partition might still produce output. + fn is_active(&self) -> bool { + !self.built || self.stream.is_some() + } + + /// Records that this partition's stream has been built. + fn set_stream(&mut self, stream: FileStream) { + self.stream = Some(stream); + self.built = true; + } + + /// Records one formatted output line for this partition. + fn push_output(&mut self, line: String) { + self.output.push(line); + } + + /// Marks this partition as finished. + fn finish(&mut self) { + self.push_output("Done".to_string()); + self.stream = None; + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 025164c29c8f2..3d5d2388918ca 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -19,14 +19,15 @@ use datafusion_common::internal_datafusion_err; use std::collections::VecDeque; use std::task::{Context, Poll}; -use crate::PartitionedFile; -use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; +use crate::morsel::{ + Morsel, MorselPlanner, MorselStream, Morselizer, PendingMorselPlanner, +}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_physical_plan::metrics::ScopedTimerGuard; -use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; +use super::work_source::WorkSource; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -45,7 +46,7 @@ use super::{FileStreamMetrics, OnError}; /// # State Transitions /// /// ```text -/// file_iter +/// work_source /// | /// v /// morselizer.plan_file(file) @@ -62,8 +63,8 @@ use super::{FileStreamMetrics, OnError}; /// /// [`FileStreamState::Scan`]: super::FileStreamState::Scan pub(super) struct ScanState { - /// Files that still need to be planned. - file_iter: VecDeque, + /// Unopened files that still need to be planned for this stream. + work_source: WorkSource, /// Remaining row limit, if any. remain: Option, /// The morselizer used to plan files. @@ -75,8 +76,11 @@ pub(super) struct ScanState { /// Ready morsels for the current file. ready_morsels: VecDeque>, /// The active reader, if any. - reader: Option>>, - /// The currently outstanding I/O, if any. + morsel_stream: Option, + /// The single planner currently blocked on I/O, if any. + /// + /// Once the I/O completes, yields the next planner and is pushed back + /// onto `ready_planners`. pending_planner: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, @@ -84,21 +88,20 @@ pub(super) struct ScanState { impl ScanState { pub(super) fn new( - file_iter: impl Into>, + work_source: WorkSource, remain: Option, morselizer: Box, on_error: OnError, metrics: FileStreamMetrics, ) -> Self { - let file_iter = file_iter.into(); Self { - file_iter, + work_source, remain, morselizer, on_error, ready_planners: Default::default(), ready_morsels: Default::default(), - reader: None, + morsel_stream: None, pending_planner: None, metrics, } @@ -121,8 +124,8 @@ impl ScanState { /// The return [`ScanAndReturn`] tells `poll_inner` how to update the /// outer `FileStreamState`. pub(super) fn poll_scan(&mut self, cx: &mut Context<'_>) -> ScanAndReturn { - let _processing_timer: ScopedTimerGuard<'_> = - self.metrics.time_processing.timer(); + let time_processing = self.metrics.time_processing.clone(); + let _timer: ScopedTimerGuard<'_> = time_processing.timer(); // Try and resolve outstanding IO first. If it is still pending, check // the current reader or ready morsels before yielding. New planning @@ -153,61 +156,8 @@ impl ScanState { } // Next try and get the next batch from the active reader, if any. - if let Some(reader) = self.reader.as_mut() { - match reader.poll_next_unpin(cx) { - // Morsels should ideally only expose ready-to-decode streams, - // but tolerate pending readers here. - Poll::Pending => return ScanAndReturn::Return(Poll::Pending), - Poll::Ready(Some(Ok(batch))) => { - self.metrics.time_scanning_until_data.stop(); - self.metrics.time_scanning_total.stop(); - // Apply any remaining row limit. - let (batch, finished) = match &mut self.remain { - Some(remain) => { - if *remain > batch.num_rows() { - *remain -= batch.num_rows(); - self.metrics.time_scanning_total.start(); - (batch, false) - } else { - let batch = batch.slice(0, *remain); - let done = 1 + self.file_iter.len(); - self.metrics.files_processed.add(done); - *remain = 0; - (batch, true) - } - } - None => { - self.metrics.time_scanning_total.start(); - (batch, false) - } - }; - return if finished { - ScanAndReturn::Done(Some(Ok(batch))) - } else { - ScanAndReturn::Return(Poll::Ready(Some(Ok(batch)))) - }; - } - Poll::Ready(Some(Err(err))) => { - self.reader = None; - self.metrics.file_scan_errors.add(1); - self.metrics.time_scanning_until_data.stop(); - self.metrics.time_scanning_total.stop(); - return match self.on_error { - OnError::Skip => { - self.metrics.files_processed.add(1); - ScanAndReturn::Continue - } - OnError::Fail => ScanAndReturn::Error(err), - }; - } - Poll::Ready(None) => { - self.reader = None; - self.metrics.files_processed.add(1); - self.metrics.time_scanning_until_data.stop(); - self.metrics.time_scanning_total.stop(); - return ScanAndReturn::Continue; - } - } + if let Some(scan_and_return) = self.poll_reader(cx) { + return scan_and_return; } // No active reader, but a morsel is ready to become the reader. @@ -215,7 +165,20 @@ impl ScanState { self.metrics.time_opening.stop(); self.metrics.time_scanning_until_data.start(); self.metrics.time_scanning_total.start(); - self.reader = Some(morsel.into_stream()); + let morsel_stream = match morsel.into_stream() { + Ok(stream) => stream, + Err(e) => { + self.metrics.file_open_errors.add(1); + return match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(e), + }; + } + }; + self.morsel_stream = Some(morsel_stream); return ScanAndReturn::Continue; } @@ -263,8 +226,8 @@ impl ScanState { }; } - // No outstanding work remains, so morselize the next unopened file. - let part_file = match self.file_iter.pop_front() { + // No outstanding work remains, so begin planning the next unopened file. + let part_file = match self.work_source.pop_front() { Some(part_file) => part_file, None => return ScanAndReturn::Done(None), }; @@ -287,6 +250,86 @@ impl ScanState { }, } } + + /// if there is an active reader, poll it for the next batch and return it + /// if ready. Otherwise, return None. + fn poll_reader(&mut self, cx: &mut Context<'_>) -> Option { + let morsel_stream = self.morsel_stream.as_mut()?; + + let ret = match morsel_stream { + MorselStream::Sync(reader) => match reader.next() { + Some(Ok(batch)) => self.handle_batch(batch), + Some(Err(err)) => self.handle_scan_error(DataFusionError::from(err)), + None => self.handle_reader_finished(), + }, + MorselStream::Async(reader) => match reader.poll_next_unpin(cx) { + // Morsels should ideally only expose ready-to-decode streams, + // but tolerate pending readers here. + Poll::Pending => ScanAndReturn::Return(Poll::Pending), + Poll::Ready(Some(Ok(batch))) => self.handle_batch(batch), + Poll::Ready(Some(Err(err))) => self.handle_scan_error(err), + Poll::Ready(None) => self.handle_reader_finished(), + }, + }; + Some(ret) + } + + fn handle_batch(&mut self, batch: RecordBatch) -> ScanAndReturn { + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + let (batch, finished) = match &mut self.remain { + Some(remain) => { + if *remain > batch.num_rows() { + *remain -= batch.num_rows(); + self.metrics.time_scanning_total.start(); + (batch, false) + } else { + let batch = batch.slice(0, *remain); + let done = 1 + self.work_source.skipped_on_limit(); + self.metrics.files_processed.add(done); + *remain = 0; + (batch, true) + } + } + None => { + self.metrics.time_scanning_total.start(); + (batch, false) + } + }; + + if finished { + ScanAndReturn::Done(Some(Ok(batch))) + } else { + ScanAndReturn::Return(Poll::Ready(Some(Ok(batch)))) + } + } + + fn handle_scan_error(&mut self, err: DataFusionError) -> ScanAndReturn { + self.morsel_stream = None; + self.metrics.file_scan_errors.add(1); + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + match self.on_error { + OnError::Skip => { + self.metrics.files_processed.add(1); + ScanAndReturn::Continue + } + OnError::Fail => ScanAndReturn::Error(err), + } + } + + fn handle_reader_finished(&mut self) -> ScanAndReturn { + self.morsel_stream = None; + self.metrics.time_scanning_until_data.stop(); + self.metrics.time_scanning_total.stop(); + if self.ready_morsels.is_empty() + && self.ready_planners.is_empty() + && self.pending_planner.is_none() + { + self.metrics.files_processed.add(1); + } + ScanAndReturn::Continue + } } /// What should be done on the next iteration of [`ScanState::poll_scan`]? diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs new file mode 100644 index 0000000000000..7f31dacca9592 --- /dev/null +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::PartitionedFile; +use crate::file_groups::FileGroup; +use crate::file_scan_config::FileScanConfig; +use parking_lot::Mutex; + +/// Source of work for `ScanState`. +/// +/// Streams that may share work across siblings use [`WorkSource::Shared`], +/// while streams that can not share work (e.g. because they must preserve file +/// order) use [`WorkSource::Local`]. +#[derive(Debug, Clone)] +pub(super) enum WorkSource { + /// Files this stream will plan locally without sharing them. + Local(VecDeque), + /// Files shared with sibling streams. + Shared(SharedWorkSource), +} + +impl WorkSource { + /// Pop the next file to plan from this work source. + pub(super) fn pop_front(&mut self) -> Option { + match self { + Self::Local(files) => files.pop_front(), + Self::Shared(shared) => shared.pop_front(), + } + } + + /// Return how many queued files should be counted as already processed + /// when this stream stops early after hitting a global limit. + pub(super) fn skipped_on_limit(&self) -> usize { + match self { + Self::Local(files) => files.len(), + Self::Shared(_) => 0, + } + } +} + +/// Shared source of work for sibling `FileStream`s +/// +/// The queue is created once per execution and shared by all reorderable +/// sibling streams for that execution. Whichever stream becomes idle first may +/// take the next unopened file from the front of the queue. +/// +/// It uses a [`Mutex`] internally to provide thread-safe access +/// to the shared file queue. +#[derive(Debug, Clone)] +pub(crate) struct SharedWorkSource { + inner: Arc, +} + +#[derive(Debug, Default)] +pub(super) struct SharedWorkSourceInner { + files: Mutex>, +} + +impl SharedWorkSource { + /// Create a shared work source containing the provided unopened files. + pub(crate) fn new(files: impl IntoIterator) -> Self { + let files = files.into_iter().collect(); + Self { + inner: Arc::new(SharedWorkSourceInner { + files: Mutex::new(files), + }), + } + } + + /// Create a shared work source for the unopened files in `config`. + pub(crate) fn from_config(config: &FileScanConfig) -> Self { + Self::new(config.file_groups.iter().flat_map(FileGroup::iter).cloned()) + } + + /// Pop the next file from the shared work queue. + /// + /// Returns `None` if the queue is empty + fn pop_front(&self) -> Option { + self.inner.files.lock().pop_front() + } +} diff --git a/datafusion/datasource/src/morsel/adapters.rs b/datafusion/datasource/src/morsel/adapters.rs index 6fa6d4916771d..ed80bb8d33cca 100644 --- a/datafusion/datasource/src/morsel/adapters.rs +++ b/datafusion/datasource/src/morsel/adapters.rs @@ -17,7 +17,7 @@ use crate::PartitionedFile; use crate::file_stream::FileOpener; -use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, MorselStream, Morselizer}; use arrow::array::RecordBatch; use datafusion_common::Result; use futures::FutureExt; @@ -116,7 +116,7 @@ impl Debug for FileStreamMorsel { } impl Morsel for FileStreamMorsel { - fn into_stream(self: Box) -> BoxStream<'static, Result> { - self.stream + fn into_stream(self: Box) -> Result { + Ok(MorselStream::Async(self.stream)) } } diff --git a/datafusion/datasource/src/morsel/mocks.rs b/datafusion/datasource/src/morsel/mocks.rs index cd1fa3732ffea..d0f64a70ff59f 100644 --- a/datafusion/datasource/src/morsel/mocks.rs +++ b/datafusion/datasource/src/morsel/mocks.rs @@ -24,11 +24,10 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use crate::PartitionedFile; -use crate::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use crate::morsel::{Morsel, MorselPlan, MorselPlanner, MorselStream, Morselizer}; use arrow::array::{Int32Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{DataFusionError, Result, internal_datafusion_err}; -use futures::stream::BoxStream; use futures::{Future, FutureExt}; // Use thin wrappers around usize so the test setups are more explicit @@ -295,8 +294,11 @@ impl MockPlanBuilder { } /// Add a ready child planner - pub(crate) fn with_ready_planner(self, ready_planners: MockPlanner) -> Self { - self.with_ready_planners(vec![ready_planners]) + pub(crate) fn with_ready_planner( + self, + ready_planner: impl Into, + ) -> Self { + self.with_ready_planners(vec![ready_planner.into()]) } /// Add ready child planners produced by this planning step. @@ -430,8 +432,9 @@ impl MockMorselizer { &self.observer } - /// Associates a file path with the planner spec used to open it. - pub(crate) fn with_file(mut self, planner: MockPlanner) -> Self { + /// Specify the return planner for the specified file_path + pub(crate) fn with_planner(mut self, planner: impl Into) -> Self { + let planner = planner.into(); self.files.insert(planner.file_path.clone(), planner); self } @@ -625,16 +628,16 @@ impl MockMorsel { } impl Morsel for MockMorsel { - fn into_stream(self: Box) -> BoxStream<'static, Result> { + fn into_stream(self: Box) -> Result { self.observer.push(MorselEvent::MorselStreamStarted { morsel_id: self.morsel_id, }); - Box::pin(MockMorselStream { + Ok(MorselStream::Async(Box::pin(MockMorselStream { observer: self.observer.clone(), morsel_id: self.morsel_id, batch_ids: self.batch_ids.into(), finished: false, - }) + }))) } } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 7b5066ca07a26..2947abe990917 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -32,7 +32,7 @@ pub(crate) mod mocks; use crate::PartitionedFile; pub(crate) use adapters::FileOpenerMorselizer; -use arrow::array::RecordBatch; +use arrow::array::{RecordBatch, RecordBatchReader}; use datafusion_common::Result; use futures::FutureExt; use futures::future::BoxFuture; @@ -41,6 +41,25 @@ use std::fmt::Debug; use std::pin::Pin; use std::task::{Context, Poll}; +/// Return value of [`Morsel::into_stream`] +pub enum MorselStream { + /// An `async` stream of [`RecordBatch`]es + /// + /// For maximum performance, this stream should *only* do I/O operations and + /// not CPU bound work, but the trait does not enforce this. + /// + /// Implementers should prefer returning [`Self::Sync`] when possible to + /// ensure IO and CPU are separated more completely. + Async(BoxStream<'static, Result>), + /// A [`RecordBatchReader`] that does not require any more I/O to produce + /// batches. + /// + /// Note: this API is explicitly not `async` so that it is clear the morsel + /// has all the necessary data to proceed and does not require any more I/O + /// work (such as reading from the file) + Sync(Box), +} + /// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es. /// /// This represents a single morsel of work that is ready to be processed. It @@ -48,10 +67,7 @@ use std::task::{Context, Poll}; /// into a stream of [`RecordBatch`]es for processing by the execution engine. pub trait Morsel: Send + Debug { /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing. - /// - /// Note: This may do CPU work to decode already-loaded data, but should not - /// do any I/O work such as reading from the file. - fn into_stream(self: Box) -> BoxStream<'static, Result>; + fn into_stream(self: Box) -> Result; } /// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 81e15d0a2a092..4bdf7bf795f41 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_plan::execution_plan::{ @@ -123,12 +123,23 @@ use datafusion_physical_plan::filter_pushdown::{ /// └─────────────────────┘ /// ``` pub trait DataSource: Send + Sync + Debug { + /// Open the specified output partition and return its stream of + /// [`RecordBatch`]es. + /// + /// This should be used by data sources that do not need any sibling + /// coordination. Data sources that want to use per-execution shared state + /// (for example, to reorder work across partitions at runtime) should + /// implement [`Self::open_with_args`] instead. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch fn open( &self, partition: usize, context: Arc, ) -> Result; + fn as_any(&self) -> &dyn Any; + /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; @@ -246,6 +257,55 @@ pub trait DataSource: Send + Sync + Debug { ) -> Option> { None } + + /// Create per execution state to share across sibling instances of this + /// data source during one execution. + /// + /// Returns `None` (the default) if this data source has + /// no sibling-shared execution state. + fn create_sibling_state(&self) -> Option> { + None + } + + /// Open a partition using optional sibling-shared execution state. + /// + /// The default implementation ignores the additional state and delegates to + /// [`Self::open`]. + fn open_with_args(&self, args: OpenArgs) -> Result { + self.open(args.partition, args.context) + } +} + +/// Arguments for [`DataSource::open_with_args`] +#[derive(Debug, Clone)] +pub struct OpenArgs { + /// Which partition to open + pub partition: usize, + /// The task context for execution + pub context: Arc, + /// Optional sibling-shared execution state, see + /// [`DataSource::create_sibling_state`] for details. + pub sibling_state: Option>, +} + +impl OpenArgs { + /// Create a new OpenArgs with required arguments + pub fn new(partition: usize, context: Arc) -> Self { + Self { + partition, + context, + sibling_state: None, + } + } + + /// Set sibling shared state + pub fn with_shared_state( + mut self, + sibling_state: Option>, + ) -> Self { + self.sibling_state = sibling_state; + self + } } /// [`ExecutionPlan`] that reads one or more files @@ -266,6 +326,12 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: Arc, + /// Per execution state shared across partitions of this plan. + /// + /// Created by [`DataSource::create_sibling_state`] + /// and then passed to + /// [`DataSource::open_with_args`]. + execution_state: Arc>>>, } impl DisplayAs for DataSourceExec { @@ -339,8 +405,15 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> Result { - let stream = self.data_source.open(partition, Arc::clone(&context))?; + let shared_state = self + .execution_state + .get_or_init(|| self.data_source.create_sibling_state()) + .clone(); + let args = OpenArgs::new(partition, Arc::clone(&context)) + .with_shared_state(shared_state); + let stream = self.data_source.open_with_args(args)?; let batch_size = context.session_config().batch_size(); + log::debug!( "Batch splitting enabled for partition {partition}: batch_size={batch_size}" ); @@ -377,8 +450,13 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = Arc::clone(&self.cache); + let execution_state = Arc::new(OnceLock::new()); - Some(Arc::new(Self { data_source, cache })) + Some(Arc::new(Self { + data_source, + cache, + execution_state, + })) } fn fetch(&self) -> Option { @@ -471,6 +549,12 @@ impl ExecutionPlan for DataSourceExec { as Arc }) } + + fn reset_state(self: Arc) -> Result> { + let mut new_exec = Arc::unwrap_or_clone(self); + new_exec.execution_state = Arc::new(OnceLock::new()); + Ok(Arc::new(new_exec)) + } } impl DataSourceExec { @@ -484,6 +568,7 @@ impl DataSourceExec { Self { data_source, cache: Arc::new(cache), + execution_state: Arc::new(OnceLock::new()), } } @@ -495,6 +580,7 @@ impl DataSourceExec { pub fn with_data_source(mut self, data_source: Arc) -> Self { self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; + self.execution_state = Arc::new(OnceLock::new()); self } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index 3a9e78943b07b..5ce0f1419d11d 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -34,6 +34,7 @@ pub(crate) struct MockSource { filter: Option>, table_schema: crate::table_schema::TableSchema, projection: crate::projection::SplitProjection, + file_opener: Option>, } impl Default for MockSource { @@ -45,6 +46,7 @@ impl Default for MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } } @@ -57,6 +59,7 @@ impl MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } @@ -64,6 +67,11 @@ impl MockSource { self.filter = Some(filter); self } + + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } } impl FileSource for MockSource { @@ -73,7 +81,9 @@ impl FileSource for MockSource { _base_config: &FileScanConfig, _partition: usize, ) -> Result> { - unimplemented!() + self.file_opener.clone().ok_or_else(|| { + datafusion_common::internal_datafusion_err!("MockSource missing FileOpener") + }) } fn as_any(&self) -> &dyn std::any::Any {