Skip to content

Commit 776b723

Browse files
alambadriangb
andauthored
Rewrite FileStream in terms of Morsel API (#21342)
Stacked on - #21327 - #21340 ## Which issue does this PR close? - part of #20529 - Broken out of #20820 ## Rationale for this change The Morsel API allows for finer grain parallelism (and IO). It is important to have the FileStream work in terms of the Morsel API to allow future features (like workstealing, etc) ## What changes are included in this PR? I apologize for the large diff; Note about 1/2 of this PR is tests and a test framework to test the calling sequence of FileStream. 1. Rewrite FileStream in terms of the MorselAPI 2. Add snapshot driven test to document the I/O and CPU patterns in FileStream 3. Add snapshot based tests that show the ordering of files ## Are these changes tested? Yes by existing functional and benchmark tests, as well as new functional snapshot based tests ## Are there any user-facing changes? No (not yet) --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
1 parent 6db3899 commit 776b723

13 files changed

Lines changed: 1829 additions & 348 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-parquet/src/opener.rs

Lines changed: 105 additions & 167 deletions
Large diffs are not rendered by default.

datafusion/datasource-parquet/src/source.rs

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@ use std::sync::Arc;
2323

2424
use crate::DefaultParquetFileReaderFactory;
2525
use crate::ParquetFileReaderFactory;
26+
use crate::opener::ParquetMorselizer;
2627
use crate::opener::build_pruning_predicates;
27-
use crate::opener::{ParquetMorselizer, ParquetOpener};
2828
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
2929
use datafusion_common::config::ConfigOptions;
3030
#[cfg(feature = "parquet_encryption")]
3131
use datafusion_common::config::EncryptionFactoryOptions;
3232
use datafusion_datasource::as_file_source;
3333
use datafusion_datasource::file_stream::FileOpener;
34+
use datafusion_datasource::morsel::Morselizer;
3435

3536
use arrow::datatypes::TimeUnit;
3637
use datafusion_common::DataFusionError;
@@ -246,12 +247,12 @@ use parquet::encryption::decrypt::FileDecryptionProperties;
246247
/// # Execution Overview
247248
///
248249
/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
249-
/// configured to open parquet files with a `ParquetOpener`.
250+
/// configured to morselize parquet files with a `ParquetMorselizer`.
250251
///
251-
/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
252-
/// the file.
252+
/// * Step 2: When the stream is polled, the `ParquetMorselizer` is called to
253+
/// plan the file.
253254
///
254-
/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
255+
/// * Step 3: The `ParquetMorselizer` gets the [`ParquetMetaData`] (file metadata)
255256
/// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
256257
/// applying predicates to metadata. The plan and projections are used to
257258
/// determine what pages must be read.
@@ -511,11 +512,22 @@ impl From<ParquetSource> for Arc<dyn FileSource> {
511512

512513
impl FileSource for ParquetSource {
513514
fn create_file_opener(
515+
&self,
516+
_object_store: Arc<dyn ObjectStore>,
517+
_base_config: &FileScanConfig,
518+
_partition: usize,
519+
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
520+
datafusion_common::internal_err!(
521+
"ParquetSource::create_file_opener called but it supports the Morsel API, please use that instead"
522+
)
523+
}
524+
525+
fn create_morselizer(
514526
&self,
515527
object_store: Arc<dyn ObjectStore>,
516528
base_config: &FileScanConfig,
517529
partition: usize,
518-
) -> datafusion_common::Result<Arc<dyn FileOpener>> {
530+
) -> datafusion_common::Result<Box<dyn Morselizer>> {
519531
let expr_adapter_factory = base_config
520532
.expr_adapter_factory
521533
.clone()
@@ -542,37 +554,34 @@ impl FileSource for ParquetSource {
542554
.as_ref()
543555
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
544556

545-
let opener = Arc::new(ParquetOpener {
546-
morselizer: ParquetMorselizer {
547-
partition_index: partition,
548-
projection: self.projection.clone(),
549-
batch_size: self
550-
.batch_size
551-
.expect("Batch size must set before creating ParquetOpener"),
552-
limit: base_config.limit,
553-
preserve_order: base_config.preserve_order,
554-
predicate: self.predicate.clone(),
555-
table_schema: self.table_schema.clone(),
556-
metadata_size_hint: self.metadata_size_hint,
557-
metrics: self.metrics().clone(),
558-
parquet_file_reader_factory,
559-
pushdown_filters: self.pushdown_filters(),
560-
reorder_filters: self.reorder_filters(),
561-
force_filter_selections: self.force_filter_selections(),
562-
enable_page_index: self.enable_page_index(),
563-
enable_bloom_filter: self.bloom_filter_on_read(),
564-
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
565-
coerce_int96,
566-
#[cfg(feature = "parquet_encryption")]
567-
file_decryption_properties,
568-
expr_adapter_factory,
569-
#[cfg(feature = "parquet_encryption")]
570-
encryption_factory: self.get_encryption_factory_with_config(),
571-
max_predicate_cache_size: self.max_predicate_cache_size(),
572-
reverse_row_groups: self.reverse_row_groups,
573-
},
574-
});
575-
Ok(opener)
557+
Ok(Box::new(ParquetMorselizer {
558+
partition_index: partition,
559+
projection: self.projection.clone(),
560+
batch_size: self
561+
.batch_size
562+
.expect("Batch size must set before creating ParquetMorselizer"),
563+
limit: base_config.limit,
564+
preserve_order: base_config.preserve_order,
565+
predicate: self.predicate.clone(),
566+
table_schema: self.table_schema.clone(),
567+
metadata_size_hint: self.metadata_size_hint,
568+
metrics: self.metrics().clone(),
569+
parquet_file_reader_factory,
570+
pushdown_filters: self.pushdown_filters(),
571+
reorder_filters: self.reorder_filters(),
572+
force_filter_selections: self.force_filter_selections(),
573+
enable_page_index: self.enable_page_index(),
574+
enable_bloom_filter: self.bloom_filter_on_read(),
575+
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
576+
coerce_int96,
577+
#[cfg(feature = "parquet_encryption")]
578+
file_decryption_properties,
579+
expr_adapter_factory,
580+
#[cfg(feature = "parquet_encryption")]
581+
encryption_factory: self.get_encryption_factory_with_config(),
582+
max_predicate_cache_size: self.max_predicate_cache_size(),
583+
reverse_row_groups: self.reverse_row_groups,
584+
}))
576585
}
577586

578587
fn as_any(&self) -> &dyn Any {

datafusion/datasource/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ version.workspace = true
3131
all-features = true
3232

3333
[features]
34+
backtrace = ["datafusion-common/backtrace"]
3435
compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"]
3536
default = ["compression"]
3637

@@ -72,6 +73,7 @@ zstd = { workspace = true, optional = true }
7273

7374
[dev-dependencies]
7475
criterion = { workspace = true }
76+
insta = { workspace = true }
7577
tempfile = { workspace = true }
7678

7779
# Note: add additional linter rules in lib.rs.

datafusion/datasource/src/file.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::sync::Arc;
2525
use crate::file_groups::FileGroupPartitioner;
2626
use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
28+
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2829
#[expect(deprecated)]
2930
use crate::schema_adapter::SchemaAdapterFactory;
3031
use datafusion_common::config::ConfigOptions;
@@ -63,13 +64,33 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>
6364
///
6465
/// [`DataSource`]: crate::source::DataSource
6566
pub trait FileSource: Send + Sync {
66-
/// Creates a `dyn FileOpener` based on given parameters
67+
/// Creates a `dyn FileOpener` based on given parameters.
68+
///
69+
/// Note: File sources with a native morsel implementation should return an
70+
/// error from this method and implementing [`Self::create_morselizer`] instead.
6771
fn create_file_opener(
6872
&self,
6973
object_store: Arc<dyn ObjectStore>,
7074
base_config: &FileScanConfig,
7175
partition: usize,
7276
) -> Result<Arc<dyn FileOpener>>;
77+
78+
/// Creates a `dyn Morselizer` based on given parameters.
79+
///
80+
/// The default implementation preserves existing behavior by adapting the
81+
/// legacy [`FileOpener`] API into a [`Morselizer`].
82+
///
83+
/// It is preferred to implement the [`Morselizer`] API directly by
84+
/// implementing this method.
85+
fn create_morselizer(
86+
&self,
87+
object_store: Arc<dyn ObjectStore>,
88+
base_config: &FileScanConfig,
89+
partition: usize,
90+
) -> Result<Box<dyn Morselizer>> {
91+
let opener = self.create_file_opener(object_store, base_config, partition)?;
92+
Ok(Box::new(FileOpenerMorselizer::new(opener)))
93+
}
7394
/// Any
7495
fn as_any(&self) -> &dyn Any;
7596

datafusion/datasource/src/file_scan_config/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -587,11 +587,11 @@ impl DataSource for FileScanConfig {
587587

588588
let source = self.file_source.with_batch_size(batch_size);
589589

590-
let opener = source.create_file_opener(object_store, self, partition)?;
590+
let morselizer = source.create_morselizer(object_store, self, partition)?;
591591

592592
let stream = FileStreamBuilder::new(self)
593593
.with_partition(partition)
594-
.with_file_opener(opener)
594+
.with_morselizer(morselizer)
595595
.with_metrics(source.metrics())
596596
.build()?;
597597
Ok(Box::pin(cooperative(stream)))

datafusion/datasource/src/file_stream/builder.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use std::sync::Arc;
1919

2020
use crate::file_scan_config::FileScanConfig;
21+
use crate::file_stream::scan_state::ScanState;
22+
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2123
use datafusion_common::{Result, internal_err};
2224
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
2325

@@ -28,7 +30,7 @@ use super::{FileOpener, FileStream, FileStreamState, OnError};
2830
pub struct FileStreamBuilder<'a> {
2931
config: &'a FileScanConfig,
3032
partition: Option<usize>,
31-
file_opener: Option<Arc<dyn FileOpener>>,
33+
morselizer: Option<Box<dyn Morselizer>>,
3234
metrics: Option<&'a ExecutionPlanMetricsSet>,
3335
on_error: OnError,
3436
}
@@ -39,7 +41,7 @@ impl<'a> FileStreamBuilder<'a> {
3941
Self {
4042
config,
4143
partition: None,
42-
file_opener: None,
44+
morselizer: None,
4345
metrics: None,
4446
on_error: OnError::Fail,
4547
}
@@ -52,8 +54,18 @@ impl<'a> FileStreamBuilder<'a> {
5254
}
5355

5456
/// Configure the [`FileOpener`] used to open files.
57+
///
58+
/// This will overwrite any setting from [`Self::with_morselizer`]
5559
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
56-
self.file_opener = Some(file_opener);
60+
self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
61+
self
62+
}
63+
64+
/// Configure the [`Morselizer`] used to open files.
65+
///
66+
/// This will overwrite any setting from [`Self::with_file_opener`]
67+
pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
68+
self.morselizer = Some(morselizer);
5769
self
5870
}
5971

@@ -74,16 +86,16 @@ impl<'a> FileStreamBuilder<'a> {
7486
let Self {
7587
config,
7688
partition,
77-
file_opener,
89+
morselizer,
7890
metrics,
7991
on_error,
8092
} = self;
8193

8294
let Some(partition) = partition else {
8395
return internal_err!("FileStreamBuilder missing required partition");
8496
};
85-
let Some(file_opener) = file_opener else {
86-
return internal_err!("FileStreamBuilder missing required file_opener");
97+
let Some(morselizer) = morselizer else {
98+
return internal_err!("FileStreamBuilder missing required morselizer");
8799
};
88100
let Some(metrics) = metrics else {
89101
return internal_err!("FileStreamBuilder missing required metrics");
@@ -95,15 +107,19 @@ impl<'a> FileStreamBuilder<'a> {
95107
);
96108
};
97109

110+
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
111+
let scan_state = Box::new(ScanState::new(
112+
file_group.into_inner(),
113+
config.limit,
114+
morselizer,
115+
on_error,
116+
file_stream_metrics,
117+
));
118+
98119
Ok(FileStream {
99-
file_iter: file_group.into_inner().into_iter().collect(),
100120
projected_schema,
101-
remain: config.limit,
102-
file_opener,
103-
state: FileStreamState::Idle,
104-
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
121+
state: FileStreamState::Scan { scan_state },
105122
baseline_metrics: BaselineMetrics::new(metrics, partition),
106-
on_error,
107123
})
108124
}
109125
}

datafusion/datasource/src/file_stream/metrics.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub struct FileStreamMetrics {
7777
/// Wall clock time elapsed for data decompression + decoding
7878
///
7979
/// Time spent waiting for the FileStream's input.
80-
pub time_processing: StartableTime,
80+
pub time_processing: Time,
8181
/// Count of errors opening file.
8282
///
8383
/// If using `OnError::Skip` this will provide a count of the number of files
@@ -126,11 +126,8 @@ impl FileStreamMetrics {
126126
start: None,
127127
};
128128

129-
let time_processing = StartableTime {
130-
metrics: MetricBuilder::new(metrics)
131-
.subset_time("time_elapsed_processing", partition),
132-
start: None,
133-
};
129+
let time_processing =
130+
MetricBuilder::new(metrics).subset_time("time_elapsed_processing", partition);
134131

135132
let file_open_errors = MetricBuilder::new(metrics)
136133
.with_category(MetricCategory::Rows)

0 commit comments

Comments
 (0)