Skip to content

Commit e82aff6

Browse files
committed
Sketch out a Morselize API
1 parent 8609288 commit e82aff6

9 files changed

Lines changed: 3314 additions & 204 deletions

File tree

datafusion/datasource-parquet/src/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod access_plan;
2424
pub mod file_format;
2525
pub mod metadata;
2626
mod metrics;
27+
mod morsel;
2728
mod opener;
2829
mod page_filter;
2930
mod reader;
@@ -37,6 +38,7 @@ mod writer;
3738
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
3839
pub use file_format::*;
3940
pub use metrics::ParquetFileMetrics;
41+
pub use morsel::ParquetMorselizer;
4042
pub use page_filter::PagePruningAccessPlanFilter;
4143
pub use reader::*; // Expose so downstream crates can use it
4244
pub use row_filter::build_row_filter;

datafusion/datasource-parquet/src/morsel.rs

Lines changed: 2593 additions & 0 deletions
Large diffs are not rendered by default.

datafusion/datasource-parquet/src/source.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use std::sync::Arc;
2323

2424
use crate::DefaultParquetFileReaderFactory;
2525
use crate::ParquetFileReaderFactory;
26+
use crate::ParquetMorselizer;
27+
use crate::morsel::{EncryptionContext, ParquetMorselizerState};
2628
use crate::opener::ParquetOpener;
2729
use crate::opener::build_pruning_predicates;
2830
use crate::row_filter::can_expr_be_pushed_down_with_schemas;
@@ -31,6 +33,7 @@ use datafusion_common::config::ConfigOptions;
3133
use datafusion_common::config::EncryptionFactoryOptions;
3234
use datafusion_datasource::as_file_source;
3335
use datafusion_datasource::file_stream::FileOpener;
36+
use datafusion_datasource::morsel::Morselizer;
3437

3538
use arrow::datatypes::TimeUnit;
3639
use datafusion_common::DataFusionError;
@@ -573,6 +576,76 @@ impl FileSource for ParquetSource {
573576
Ok(opener)
574577
}
575578

579+
fn create_morselizer(
580+
&self,
581+
object_store: Arc<dyn ObjectStore>,
582+
base_config: &FileScanConfig,
583+
partition: usize,
584+
) -> datafusion_common::Result<Box<dyn Morselizer>> {
585+
let expr_adapter_factory = base_config
586+
.expr_adapter_factory
587+
.clone()
588+
.unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory) as _);
589+
590+
let parquet_file_reader_factory =
591+
self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
592+
Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
593+
});
594+
595+
#[cfg(not(feature = "parquet_encryption"))]
596+
let encryption_context = EncryptionContext::default();
597+
598+
#[cfg(feature = "parquet_encryption")]
599+
let encryption_context = {
600+
let file_decryption_properties = self
601+
.table_parquet_options()
602+
.crypto
603+
.file_decryption
604+
.clone()
605+
.map(FileDecryptionProperties::from)
606+
.map(Arc::new);
607+
608+
EncryptionContext::new(
609+
file_decryption_properties,
610+
self.get_encryption_factory_with_config(),
611+
)
612+
};
613+
614+
let coerce_int96 = self
615+
.table_parquet_options
616+
.global
617+
.coerce_int96
618+
.as_ref()
619+
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
620+
621+
let morselizer = ParquetMorselizer::new(ParquetMorselizerState {
622+
partition_index: partition,
623+
projection: self.projection.clone(),
624+
batch_size: self
625+
.batch_size
626+
.expect("Batch size must set before creating ParquetMorselizer"),
627+
limit: base_config.limit,
628+
preserve_order: base_config.preserve_order,
629+
predicate: self.predicate.clone(),
630+
table_schema: self.table_schema.clone(),
631+
metadata_size_hint: self.metadata_size_hint,
632+
metrics: self.metrics().clone(),
633+
parquet_file_reader_factory,
634+
pushdown_filters: self.pushdown_filters(),
635+
reorder_filters: self.reorder_filters(),
636+
force_filter_selections: self.force_filter_selections(),
637+
enable_page_index: self.enable_page_index(),
638+
enable_bloom_filter: self.bloom_filter_on_read(),
639+
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
640+
coerce_int96,
641+
expr_adapter_factory,
642+
encryption_context,
643+
max_predicate_cache_size: self.max_predicate_cache_size(),
644+
reverse_row_groups: self.reverse_row_groups,
645+
});
646+
Ok(Box::new(morselizer))
647+
}
648+
576649
fn as_any(&self) -> &dyn Any {
577650
self
578651
}

datafusion/datasource/src/file.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::sync::Arc;
2525
use crate::file_groups::FileGroupPartitioner;
2626
use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
28+
use crate::morsel::{FileOpenerMorselizer, Morselizer};
2829
#[expect(deprecated)]
2930
use crate::schema_adapter::SchemaAdapterFactory;
3031
use datafusion_common::config::ConfigOptions;
@@ -70,6 +71,21 @@ pub trait FileSource: Send + Sync {
7071
base_config: &FileScanConfig,
7172
partition: usize,
7273
) -> Result<Arc<dyn FileOpener>>;
74+
75+
/// Creates a `dyn Morselizer` based on given parameters.
76+
///
77+
/// The default implementation preserves existing behavior by adapting the
78+
/// legacy [`FileOpener`] API into a [`Morselizer`]. File formats with a
79+
/// native morsel-driven implementation should override this method.
80+
fn create_morselizer(
81+
&self,
82+
object_store: Arc<dyn ObjectStore>,
83+
base_config: &FileScanConfig,
84+
partition: usize,
85+
) -> Result<Box<dyn Morselizer>> {
86+
let opener = self.create_file_opener(object_store, base_config, partition)?;
87+
Ok(Box::new(FileOpenerMorselizer::new(opener)))
88+
}
7389
/// Any
7490
fn as_any(&self) -> &dyn Any;
7591

datafusion/datasource/src/file_scan_config.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -586,9 +586,14 @@ impl DataSource for FileScanConfig {
586586

587587
let source = self.file_source.with_batch_size(batch_size);
588588

589-
let opener = source.create_file_opener(object_store, self, partition)?;
589+
let morselizer = source.create_morselizer(object_store, self, partition)?;
590590

591-
let stream = FileStream::new(self, partition, opener, source.metrics())?;
591+
let stream = FileStream::new_with_morselizer(
592+
self,
593+
partition,
594+
morselizer,
595+
source.metrics(),
596+
)?;
592597
Ok(Box::pin(cooperative(stream)))
593598
}
594599

0 commit comments

Comments
 (0)