Skip to content

Commit d79f538

Browse files
committed
wip
1 parent 34e12b7 commit d79f538

8 files changed

Lines changed: 559 additions & 72 deletions

File tree

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl ParquetOptions {
210210
skip_arrow_metadata: _,
211211
max_predicate_cache_size: _,
212212
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
213-
filter_correlation_threshold: _, // not used for writer props
213+
filter_correlation_threshold: _, // not used for writer props
214214
filter_statistics_collection_min_rows: _, // not used for writer props
215215
filter_statistics_collection_fraction: _, // not used for writer props
216216
} = self;
@@ -465,6 +465,11 @@ mod tests {
465465
coerce_int96: None,
466466
max_predicate_cache_size: defaults.max_predicate_cache_size,
467467
filter_pushdown_min_bytes_per_sec: defaults.filter_pushdown_min_bytes_per_sec,
468+
filter_correlation_threshold: defaults.filter_correlation_threshold,
469+
filter_statistics_collection_fraction: defaults
470+
.filter_statistics_collection_fraction,
471+
filter_statistics_collection_min_rows: defaults
472+
.filter_statistics_collection_min_rows,
468473
}
469474
}
470475

@@ -581,6 +586,12 @@ mod tests {
581586
coerce_int96: None,
582587
filter_pushdown_min_bytes_per_sec: global_options_defaults
583588
.filter_pushdown_min_bytes_per_sec,
589+
filter_correlation_threshold: global_options_defaults
590+
.filter_correlation_threshold,
591+
filter_statistics_collection_fraction: global_options_defaults
592+
.filter_statistics_collection_fraction,
593+
filter_statistics_collection_min_rows: global_options_defaults
594+
.filter_statistics_collection_min_rows,
584595
},
585596
column_specific_options,
586597
key_value_metadata,

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,8 @@ impl FileFormat for ParquetFormat {
524524
.cloned()
525525
.ok_or_else(|| internal_datafusion_err!("Expected ParquetSource"))?;
526526
source = source.with_table_parquet_options(self.options.clone());
527-
source = source.with_filter_pushdown_min_bytes_per_sec(filter_pushdown_min_bytes_per_sec);
527+
source = source
528+
.with_filter_pushdown_min_bytes_per_sec(filter_pushdown_min_bytes_per_sec);
528529

529530
// Use the CachedParquetFileReaderFactory
530531
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();

datafusion/datasource-parquet/src/opener.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -588,11 +588,24 @@ impl FileOpener for ParquetOpener {
588588
// metrics from the arrow reader itself
589589
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
590590

591-
// Compute bytes_per_row from file metadata for the bytes/sec metric.
591+
// Compute bytes_per_row from file metadata and feed it into the
592+
// selectivity tracker's running average (used by partition_filters_grouped).
592593
let bytes_per_row = compute_bytes_per_row(
593594
file_metadata.as_ref(),
594595
&projection.column_indices(),
595596
);
597+
if let Some(bpr) = bytes_per_row {
598+
let file_rows: u64 = file_metadata
599+
.row_groups()
600+
.iter()
601+
.map(|rg| rg.num_rows() as u64)
602+
.sum();
603+
if file_rows > 0 {
604+
selectivity_tracker
605+
.write()
606+
.update_bytes_per_row(bpr, file_rows);
607+
}
608+
}
596609

597610
// Acquire tracker lock once for both partitioning and row filter building.
598611
let (post_scan_filters, projection, mask) = {
@@ -611,7 +624,7 @@ impl FileOpener for ParquetOpener {
611624
.into_iter()
612625
.map(Arc::clone)
613626
.collect();
614-
tracker.partition_filters_grouped(conjuncts, bytes_per_row)
627+
tracker.partition_filters_grouped(conjuncts)
615628
} else {
616629
PartitionedFiltersGrouped {
617630
row_filter_groups: vec![],
@@ -623,18 +636,14 @@ impl FileOpener for ParquetOpener {
623636
// but don't add filter expressions as projection columns.
624637
let mut all_indices: Vec<usize> = projection.column_indices();
625638
for filter in &post_scan {
626-
for col in datafusion_physical_expr::utils::collect_columns(filter)
627-
{
639+
for col in datafusion_physical_expr::utils::collect_columns(filter) {
628640
let idx = col.index();
629641
if !all_indices.contains(&idx) {
630642
all_indices.push(idx);
631643
}
632644
}
633645
}
634-
let mask = ProjectionMask::roots(
635-
builder.parquet_schema(),
636-
all_indices,
637-
);
646+
let mask = ProjectionMask::roots(builder.parquet_schema(), all_indices);
638647

639648
// Build row filter from groups of correlated filters
640649
if !row_filter_groups.is_empty() {

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -895,9 +895,11 @@ pub fn build_row_filter_from_groups(
895895
// Compound can't push down -- these filters will need to be
896896
// handled as post-scan. For now, try each individually.
897897
for expr in group {
898-
if let Ok(Some(candidate)) =
899-
FilterCandidateBuilder::new(Arc::clone(&expr), Arc::clone(file_schema))
900-
.build(metadata)
898+
if let Ok(Some(candidate)) = FilterCandidateBuilder::new(
899+
Arc::clone(&expr),
900+
Arc::clone(file_schema),
901+
)
902+
.build(metadata)
901903
{
902904
candidates_with_exprs.push((vec![expr], candidate));
903905
}

0 commit comments

Comments
 (0)