Skip to content

Commit d530276

Browse files
ernestprovo23alamb
andauthored
Fix elapsed_compute metric for Parquet DataSourceExec (#20767)
## Which issue does this PR close? Closes part of #18195 — specifically the `elapsed_compute` baseline metric sub-item for Parquet scans. ## Rationale `EXPLAIN ANALYZE` on Parquet scans reports `elapsed_compute` values like `14ns` for full table scans, which is misleading. The metric was never being populated because no timer wrapped the per-batch compute work in the Parquet scan path. ## What changes are included in this PR? Follows the same pattern established in PR #18901 (CSV fix): 1. Added `BaselineMetrics` instantiation in `ParquetOpener::open()` using the existing `metrics` and `partition_index` fields 2. Wrapped the per-batch stream `.map()` closure with an `elapsed_compute` timer that measures projection, schema replacement, and metrics copy work Single file changed: `datafusion/datasource-parquet/src/opener.rs` (+7, -3 lines) ## Are these changes tested? - All 81 existing tests in `datafusion-datasource-parquet` pass - The metric correctness is verified by observing realistic `elapsed_compute` values in `EXPLAIN ANALYZE` output (no longer showing nanosecond-level values for real scans) - Per maintainer guidance from @2010YOUY01: "Testing if we have the time measured correct is tricky, I don't think there is a good way to do it. But for a large parquet file scan, several nanoseconds is definitely not reasonable." ## Are there any user-facing changes? `EXPLAIN ANALYZE` output for Parquet scans will now show accurate `elapsed_compute` values reflecting actual CPU time spent on per-batch processing. Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent f30b85c commit d530276

2 files changed

Lines changed: 9 additions & 3 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion_physical_expr_common::physical_expr::{
4848
PhysicalExpr, is_dynamic_physical_expr,
4949
};
5050
use datafusion_physical_plan::metrics::{
51-
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
51+
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
5252
};
5353
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
5454

@@ -135,6 +135,7 @@ impl FileOpener for ParquetOpener {
135135
let file_name = file_location.to_string();
136136
let file_metrics =
137137
ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
138+
let baseline_metrics = BaselineMetrics::new(&self.metrics, self.partition_index);
138139

139140
let metadata_size_hint = partitioned_file
140141
.metadata_size_hint
@@ -605,6 +606,7 @@ impl FileOpener for ParquetOpener {
605606
arrow_reader_metrics,
606607
predicate_cache_inner_records,
607608
predicate_cache_records,
609+
baseline_metrics,
608610
},
609611
|mut state| async move {
610612
let result = state.transition().await;
@@ -646,6 +648,7 @@ struct PushDecoderStreamState {
646648
arrow_reader_metrics: ArrowReaderMetrics,
647649
predicate_cache_inner_records: Gauge,
648650
predicate_cache_records: Gauge,
651+
baseline_metrics: BaselineMetrics,
649652
}
650653

651654
impl PushDecoderStreamState {
@@ -671,8 +674,11 @@ impl PushDecoderStreamState {
671674
}
672675
}
673676
Ok(DecodeResult::Data(batch)) => {
677+
let mut timer = self.baseline_metrics.elapsed_compute().timer();
674678
self.copy_arrow_reader_metrics();
675-
return Some(self.project_batch(&batch));
679+
let result = self.project_batch(&batch);
680+
timer.stop();
681+
return Some(result);
676682
}
677683
Ok(DecodeResult::Finished) => {
678684
return None;

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ Plan with Metrics
104104
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as name], metrics=[output_rows=10, <slt:ignore>]
105105
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, selectivity=100% (10/10)]
106106
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
107-
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=1ns, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18% (210/1.15 K)]
107+
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]}, projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND (value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 800), required_guarantees=[], metrics=[output_rows=10, elapsed_compute=<slt:ignore>, output_bytes=80.0 B, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=1 total → 1 matched, limit_pruned_row_groups=0 total → 0 matched, bytes_scanned=210, metadata_load_time=<slt:ignore>, scan_efficiency_ratio=18% (210/1.15 K)]
108108

109109
statement ok
110110
set datafusion.explain.analyze_level = dev;

0 commit comments

Comments
 (0)