Skip to content

Commit 8ad7685

Browse files
committed
extract more
1 parent aa930d6 commit 8ad7685

1 file changed

Lines changed: 129 additions & 52 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 129 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,19 @@ struct MetadataLoadedParquetOpen {
211211
options: ArrowReaderOptions,
212212
}
213213

214+
/// Result of CPU-only preparation after metadata has been loaded.
215+
///
216+
/// This captures the file schema coercions and file-specific pruning predicates
217+
/// so the next async step only has to fetch any missing page index data.
218+
struct FiltersPreparedParquetOpen {
219+
loaded: MetadataLoadedParquetOpen,
220+
physical_file_schema: SchemaRef,
221+
projection: ProjectionExprs,
222+
predicate: Option<Arc<dyn PhysicalExpr>>,
223+
pruning_predicate: Option<Arc<PruningPredicate>>,
224+
page_pruning_predicate: Option<Arc<PagePruningAccessPlanFilter>>,
225+
}
226+
214227
impl ParquetMorselizerState {
215228
/// Perform the CPU-only setup for opening a parquet file.
216229
fn prepare_open_file(
@@ -399,51 +412,20 @@ impl ParquetMorselizerState {
399412
})
400413
}
401414

402-
/// Continue opening once parquet metadata has been loaded.
403-
async fn execute_metadata_loaded_open(
415+
/// Prepare file-specific filters and schema coercions after metadata is loaded.
416+
fn prepare_loaded_metadata(
404417
loaded: MetadataLoadedParquetOpen,
405-
) -> Result<BoxStream<'static, Result<RecordBatch>>> {
418+
) -> Result<FiltersPreparedParquetOpen> {
406419
let MetadataLoadedParquetOpen {
407-
prepared,
420+
mut prepared,
408421
mut reader_metadata,
409422
mut options,
410423
} = loaded;
411-
let PreparedParquetOpen {
412-
state,
413-
partitioned_file,
414-
file_range,
415-
extensions,
416-
file_metrics,
417-
file_pruner,
418-
metadata_size_hint,
419-
mut async_file_reader,
420-
logical_file_schema,
421-
output_schema,
422-
mut projection,
423-
mut predicate,
424-
#[cfg(feature = "parquet_encryption")]
425-
file_decryption_properties: _,
426-
} = prepared;
427-
428-
let file_name = partitioned_file.object_meta.location.to_string();
429-
let batch_size = state.batch_size;
430-
let reorder_predicates = state.reorder_filters;
431-
let pushdown_filters = state.pushdown_filters;
432-
let force_filter_selections = state.force_filter_selections;
424+
let state = Arc::clone(&prepared.state);
433425
let coerce_int96 = state.coerce_int96;
434-
let enable_bloom_filter = state.enable_bloom_filter;
435-
let enable_row_group_stats_pruning = state.enable_row_group_stats_pruning;
436-
let limit = state.limit;
437-
let parquet_file_reader_factory = Arc::clone(&state.parquet_file_reader_factory);
438-
let partition_index = state.partition_index;
439-
let metrics = state.metrics.clone();
440426
let predicate_creation_errors = MetricBuilder::new(&state.metrics)
441427
.global_counter("num_predicate_creation_errors");
442428
let expr_adapter_factory = Arc::clone(&state.expr_adapter_factory);
443-
let enable_page_index = state.enable_page_index;
444-
let max_predicate_cache_size = state.max_predicate_cache_size;
445-
let reverse_row_groups = state.reverse_row_groups;
446-
let preserve_order = state.preserve_order;
447429

448430
// Note about schemas: we are actually dealing with **3 different schemas** here:
449431
// - The table schema as defined by the TableProvider.
@@ -452,6 +434,7 @@ impl ParquetMorselizerState {
452434
// This is what the physical file schema is coerced to.
453435
// - The physical file schema: this is the schema that the arrow-rs
454436
// parquet reader will actually produce.
437+
let logical_file_schema = Arc::clone(&prepared.logical_file_schema);
455438
let mut physical_file_schema = Arc::clone(reader_metadata.schema());
456439

457440
// The schema loaded from the file may not be the same as the
@@ -495,32 +478,109 @@ impl ParquetMorselizerState {
495478
Arc::clone(&physical_file_schema),
496479
)?;
497480
let simplifier = PhysicalExprSimplifier::new(&physical_file_schema);
498-
predicate = predicate
481+
prepared.predicate = prepared
482+
.predicate
499483
.map(|p| simplifier.simplify(rewriter.rewrite(p)?))
500484
.transpose()?;
501485
// Adapt projections to the physical file schema as well
502-
projection =
503-
projection.try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?;
486+
prepared.projection = prepared
487+
.projection
488+
.try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?;
504489

505490
// Build predicates for this specific file
506491
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
507-
predicate.as_ref(),
492+
prepared.predicate.as_ref(),
508493
&physical_file_schema,
509494
&predicate_creation_errors,
510495
);
511496

512-
// The page index is not stored inline in the parquet footer so the
513-
// code above may not have read the page index structures yet. If we
514-
// need them for reading and they aren't yet loaded, we need to load them now.
515-
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
516-
reader_metadata = load_page_index(
497+
let projection = prepared.projection.clone();
498+
let predicate = prepared.predicate.clone();
499+
500+
Ok(FiltersPreparedParquetOpen {
501+
loaded: MetadataLoadedParquetOpen {
502+
prepared,
517503
reader_metadata,
518-
&mut async_file_reader,
504+
options,
505+
},
506+
physical_file_schema,
507+
projection,
508+
predicate,
509+
pruning_predicate,
510+
page_pruning_predicate,
511+
})
512+
}
513+
514+
/// Fetch the page index if it is needed and missing from the loaded metadata.
515+
async fn load_prepared_page_index(
516+
mut prepared: FiltersPreparedParquetOpen,
517+
) -> Result<FiltersPreparedParquetOpen> {
518+
let enable_page_index = prepared.loaded.prepared.state.enable_page_index;
519+
if should_enable_page_index(enable_page_index, &prepared.page_pruning_predicate)
520+
{
521+
prepared.loaded.reader_metadata = load_page_index(
522+
prepared.loaded.reader_metadata,
523+
&mut prepared.loaded.prepared.async_file_reader,
519524
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
520-
options.with_page_index_policy(PageIndexPolicy::Optional),
525+
prepared
526+
.loaded
527+
.options
528+
.clone()
529+
.with_page_index_policy(PageIndexPolicy::Optional),
521530
)
522531
.await?;
523532
}
533+
Ok(prepared)
534+
}
535+
536+
/// Continue opening once metadata-derived filters and page indexes are ready.
537+
async fn execute_metadata_loaded_open(
538+
prepared: FiltersPreparedParquetOpen,
539+
) -> Result<BoxStream<'static, Result<RecordBatch>>> {
540+
let FiltersPreparedParquetOpen {
541+
loaded,
542+
physical_file_schema,
543+
projection,
544+
predicate,
545+
pruning_predicate,
546+
page_pruning_predicate,
547+
} = prepared;
548+
let MetadataLoadedParquetOpen {
549+
prepared,
550+
reader_metadata,
551+
options: _,
552+
} = loaded;
553+
let PreparedParquetOpen {
554+
state,
555+
partitioned_file,
556+
file_range,
557+
extensions,
558+
file_metrics,
559+
file_pruner,
560+
metadata_size_hint,
561+
mut async_file_reader,
562+
logical_file_schema: _,
563+
output_schema,
564+
projection: _,
565+
predicate: _,
566+
#[cfg(feature = "parquet_encryption")]
567+
file_decryption_properties: _,
568+
} = prepared;
569+
570+
let file_name = partitioned_file.object_meta.location.to_string();
571+
let batch_size = state.batch_size;
572+
let reorder_predicates = state.reorder_filters;
573+
let pushdown_filters = state.pushdown_filters;
574+
let force_filter_selections = state.force_filter_selections;
575+
let enable_bloom_filter = state.enable_bloom_filter;
576+
let enable_row_group_stats_pruning = state.enable_row_group_stats_pruning;
577+
let limit = state.limit;
578+
let parquet_file_reader_factory = Arc::clone(&state.parquet_file_reader_factory);
579+
let partition_index = state.partition_index;
580+
let metrics = state.metrics.clone();
581+
let max_predicate_cache_size = state.max_predicate_cache_size;
582+
let reverse_row_groups = state.reverse_row_groups;
583+
let preserve_order = state.preserve_order;
524584

525585
// ------------------------------------------------------------
526586
// Step: prune row groups by range, predicate and bloom filter
@@ -612,8 +672,7 @@ impl ParquetMorselizerState {
612672
// be ruled using page metadata, rows from other columns
613673
// with that range can be skipped as well
614674
// --------------------------------------------------------
615-
if enable_page_index
616-
&& !access_plan.is_empty()
675+
if !access_plan.is_empty()
617676
&& let Some(p) = page_pruning_predicate
618677
{
619678
access_plan = p.prune_plan_with_page_index(
@@ -775,8 +834,10 @@ enum ReadyState {
775834
PrepareEncryption(Box<PreparedParquetOpen>),
776835
/// Planner can do file-level pruning before requesting parquet metadata.
777836
PruneFiles(Box<PreparedParquetOpen>),
778-
/// Planner has loaded parquet metadata and is ready for further planning.
779-
Prepared(Box<MetadataLoadedParquetOpen>),
837+
/// Planner has loaded parquet metadata and can do CPU-only filter preparation.
838+
PrepareFilters(Box<MetadataLoadedParquetOpen>),
839+
/// Planner has prepared filters and can request any missing page index data.
840+
Prepared(Box<FiltersPreparedParquetOpen>),
780841
/// Planner has a fully prepared stream ready to wrap as a morsel.
781842
EmitMorsel(BoxStream<'static, Result<RecordBatch>>),
782843
}
@@ -796,6 +857,10 @@ impl ReadyState {
796857
}
797858

798859
fn prepared(prepared: MetadataLoadedParquetOpen) -> Self {
860+
Self::PrepareFilters(prepared.into())
861+
}
862+
863+
fn filters_prepared(prepared: FiltersPreparedParquetOpen) -> Self {
799864
Self::Prepared(prepared.into())
800865
}
801866

@@ -845,6 +910,7 @@ impl ParquetMorselPlannerState {
845910
#[cfg(feature = "parquet_encryption")]
846911
ReadyState::PrepareEncryption(_) => "Ready(PrepareEncryption)",
847912
ReadyState::PruneFiles(_) => "Ready(PruneFiles)",
913+
ReadyState::PrepareFilters(_) => "Ready(PrepareFilters)",
848914
ReadyState::Prepared(_) => "Ready(Prepared)",
849915
ReadyState::EmitMorsel(_) => "Ready(EmitMorsel)",
850916
},
@@ -961,9 +1027,20 @@ impl MorselPlanner for ParquetMorselPlanner {
9611027
Ok(ReadyState::prepared(loaded))
9621028
}))
9631029
}
1030+
ReadyState::PrepareFilters(prepared) => {
1031+
let prepared =
1032+
ParquetMorselizerState::prepare_loaded_metadata(*prepared)?;
1033+
self.state = ParquetMorselPlannerState::ready(
1034+
ReadyState::filters_prepared(prepared),
1035+
);
1036+
Ok(Some(MorselPlan::new()))
1037+
}
9641038
ReadyState::Prepared(prepared) => Ok(self.schedule_io(async move {
1039+
let prepared =
1040+
ParquetMorselizerState::load_prepared_page_index(*prepared)
1041+
.await?;
9651042
let stream =
966-
ParquetMorselizerState::execute_metadata_loaded_open(*prepared)
1043+
ParquetMorselizerState::execute_metadata_loaded_open(prepared)
9671044
.await?;
9681045
Ok(ReadyState::emit_morsel(stream))
9691046
})),

0 commit comments

Comments
 (0)