Skip to content

Commit 2faa4dd

Browse files
adriangbclaude
andcommitted
Categorize all built-in custom metrics
Opt in every custom counter/gauge created by DataFusion's own operators (parquet, file_stream, joins, aggregates, topk, unnest, buffer) so that category filtering works cleanly out of the box. For example `bytes_scanned` → Bytes, `pushdown_rows_pruned` → Rows, `peak_mem_used` → Bytes, `row_replacements` → Rows, etc. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b307bf2 commit 2faa4dd

14 files changed

Lines changed: 102 additions & 54 deletions

File tree

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5656
use datafusion_expr::dml::InsertOp;
5757
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
5858
use datafusion_physical_plan::metrics::{
59-
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
59+
ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, MetricsSet,
6060
};
6161
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
6262
use datafusion_session::Session;
@@ -1339,13 +1339,15 @@ impl FileSink for ParquetSink {
13391339
mut file_stream_rx: DemuxedStreamReceiver,
13401340
object_store: Arc<dyn ObjectStore>,
13411341
) -> Result<u64> {
1342-
let rows_written_counter =
1343-
MetricBuilder::new(&self.metrics).global_counter("rows_written");
1342+
let rows_written_counter = MetricBuilder::new(&self.metrics)
1343+
.with_category(MetricCategory::Rows)
1344+
.global_counter("rows_written");
13441345
// Note: bytes_written is the sum of compressed row group sizes, which
13451346
// may differ slightly from the actual on-disk file size (excludes footer,
13461347
// page indexes, and other Parquet metadata overhead).
1347-
let bytes_written_counter =
1348-
MetricBuilder::new(&self.metrics).global_counter("bytes_written");
1348+
let bytes_written_counter = MetricBuilder::new(&self.metrics)
1349+
.with_category(MetricCategory::Bytes)
1350+
.global_counter("bytes_written");
13491351
let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);
13501352

13511353
let write_start = datafusion_common::instant::Instant::now();

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
// under the License.
1717

1818
use datafusion_physical_plan::metrics::{
19-
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricType, PruningMetrics,
20-
RatioMergeStrategy, RatioMetrics, Time,
19+
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory, MetricType,
20+
PruningMetrics, RatioMergeStrategy, RatioMetrics, Time,
2121
};
2222

2323
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -126,6 +126,7 @@ impl ParquetFileMetrics {
126126
let bytes_scanned = MetricBuilder::new(metrics)
127127
.with_new_label("filename", filename.to_string())
128128
.with_type(MetricType::SUMMARY)
129+
.with_category(MetricCategory::Bytes)
129130
.counter("bytes_scanned", partition);
130131

131132
let metadata_load_time = MetricBuilder::new(metrics)
@@ -151,13 +152,16 @@ impl ParquetFileMetrics {
151152
// -----------------------
152153
let predicate_evaluation_errors = MetricBuilder::new(metrics)
153154
.with_new_label("filename", filename.to_string())
155+
.with_category(MetricCategory::Rows)
154156
.counter("predicate_evaluation_errors", partition);
155157

156158
let pushdown_rows_pruned = MetricBuilder::new(metrics)
157159
.with_new_label("filename", filename.to_string())
160+
.with_category(MetricCategory::Rows)
158161
.counter("pushdown_rows_pruned", partition);
159162
let pushdown_rows_matched = MetricBuilder::new(metrics)
160163
.with_new_label("filename", filename.to_string())
164+
.with_category(MetricCategory::Rows)
161165
.counter("pushdown_rows_matched", partition);
162166

163167
let row_pushdown_eval_time = MetricBuilder::new(metrics)
@@ -180,10 +184,12 @@ impl ParquetFileMetrics {
180184

181185
let predicate_cache_inner_records = MetricBuilder::new(metrics)
182186
.with_new_label("filename", filename.to_string())
187+
.with_category(MetricCategory::Rows)
183188
.gauge("predicate_cache_inner_records", partition);
184189

185190
let predicate_cache_records = MetricBuilder::new(metrics)
186191
.with_new_label("filename", filename.to_string())
192+
.with_category(MetricCategory::Rows)
187193
.gauge("predicate_cache_records", partition);
188194

189195
Self {

datafusion/datasource-parquet/src/opener.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ use datafusion_physical_expr_common::physical_expr::{
4848
PhysicalExpr, is_dynamic_physical_expr,
4949
};
5050
use datafusion_physical_plan::metrics::{
51-
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
51+
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory,
52+
PruningMetrics,
5253
};
5354
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
5455

@@ -217,6 +218,7 @@ impl FileOpener for ParquetOpener {
217218
let metrics = self.metrics.clone();
218219

219220
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
221+
.with_category(MetricCategory::Rows)
220222
.global_counter("num_predicate_creation_errors");
221223

222224
let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);

datafusion/datasource/src/file_stream.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use arrow::datatypes::SchemaRef;
3232
use datafusion_common::error::Result;
3333
use datafusion_execution::RecordBatchStream;
3434
use datafusion_physical_plan::metrics::{
35-
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
35+
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, Time,
3636
};
3737

3838
use arrow::record_batch::RecordBatch;
@@ -369,16 +369,21 @@ impl FileStreamMetrics {
369369
start: None,
370370
};
371371

372-
let file_open_errors =
373-
MetricBuilder::new(metrics).counter("file_open_errors", partition);
372+
let file_open_errors = MetricBuilder::new(metrics)
373+
.with_category(MetricCategory::Rows)
374+
.counter("file_open_errors", partition);
374375

375-
let file_scan_errors =
376-
MetricBuilder::new(metrics).counter("file_scan_errors", partition);
376+
let file_scan_errors = MetricBuilder::new(metrics)
377+
.with_category(MetricCategory::Rows)
378+
.counter("file_scan_errors", partition);
377379

378-
let files_opened = MetricBuilder::new(metrics).counter("files_opened", partition);
380+
let files_opened = MetricBuilder::new(metrics)
381+
.with_category(MetricCategory::Rows)
382+
.counter("files_opened", partition);
379383

380-
let files_processed =
381-
MetricBuilder::new(metrics).counter("files_processed", partition);
384+
let files_processed = MetricBuilder::new(metrics)
385+
.with_category(MetricCategory::Rows)
386+
.counter("files_processed", partition);
382387

383388
Self {
384389
time_opening,

datafusion/physical-expr-common/src/metrics/baseline.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ impl SplitMetrics {
215215
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
216216
Self {
217217
batches_split: MetricBuilder::new(metrics)
218+
.with_category(super::MetricCategory::Rows)
218219
.counter("batches_split", partition),
219220
}
220221
}

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::aggregates::{
2929
AggregateInputMode, AggregateMode, AggregateOutputMode, PhysicalGroupBy,
3030
create_schema, evaluate_group_by, evaluate_many, evaluate_optional,
3131
};
32-
use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput};
32+
use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput};
3333
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
3434
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
3535
use crate::{PhysicalExpr, aggregates, metrics};
@@ -613,6 +613,7 @@ impl GroupedHashAggregateStream {
613613
merging_aggregate_arguments,
614614
merging_group_by: PhysicalGroupBy::new_single(merging_group_by_expr),
615615
peak_mem_used: MetricBuilder::new(&agg.metrics)
616+
.with_category(MetricCategory::Bytes)
616617
.gauge("peak_mem_used", partition),
617618
spill_manager,
618619
};
@@ -637,6 +638,7 @@ impl GroupedHashAggregateStream {
637638
let probe_ratio_threshold =
638639
options.skip_partial_aggregation_probe_ratio_threshold;
639640
let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics)
641+
.with_category(MetricCategory::Rows)
640642
.counter("skipped_aggregation_rows", partition);
641643
Some(SkipAggregationProbe::new(
642644
probe_rows_threshold,

datafusion/physical-plan/src/buffer.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion_common_runtime::SpawnedTask;
3737
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
3838
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3939
use datafusion_physical_expr_common::metrics::{
40-
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
40+
ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, MetricsSet,
4141
};
4242
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4343
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
@@ -204,12 +204,16 @@ impl ExecutionPlan for BufferExec {
204204
let curr_mem_in = Arc::new(AtomicUsize::new(0));
205205
let curr_mem_out = Arc::clone(&curr_mem_in);
206206
let mut max_mem_in = 0;
207-
let max_mem = MetricBuilder::new(&self.metrics).gauge("max_mem_used", partition);
207+
let max_mem = MetricBuilder::new(&self.metrics)
208+
.with_category(MetricCategory::Bytes)
209+
.gauge("max_mem_used", partition);
208210

209211
let curr_queued_in = Arc::new(AtomicUsize::new(0));
210212
let curr_queued_out = Arc::clone(&curr_queued_in);
211213
let mut max_queued_in = 0;
212-
let max_queued = MetricBuilder::new(&self.metrics).gauge("max_queued", partition);
214+
let max_queued = MetricBuilder::new(&self.metrics)
215+
.with_category(MetricCategory::Rows)
216+
.gauge("max_queued", partition);
213217

214218
// Capture metrics when an element is queued on the stream.
215219
let in_stream = in_stream.inspect_ok(move |v| {

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::joins::utils::{
4646
swap_join_projection, update_hash,
4747
};
4848
use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder};
49-
use crate::metrics::{Count, MetricBuilder};
49+
use crate::metrics::{Count, MetricBuilder, MetricCategory};
5050
use crate::projection::{
5151
EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
5252
try_pushdown_through_join,
@@ -1317,6 +1317,7 @@ impl ExecutionPlan for HashJoinExec {
13171317
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
13181318

13191319
let array_map_created_count = MetricBuilder::new(&self.metrics)
1320+
.with_category(MetricCategory::Rows)
13201321
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
13211322

13221323
let left_fut = match self.mode {

datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//! Module for tracking Sort Merge Join metrics
1919
2020
use crate::metrics::{
21-
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, SpillMetrics,
22-
Time,
21+
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricCategory,
22+
SpillMetrics, Time,
2323
};
2424

2525
/// Metrics for SortMergeJoinExec
@@ -42,10 +42,15 @@ pub(super) struct SortMergeJoinMetrics {
4242
impl SortMergeJoinMetrics {
4343
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
4444
let join_time = MetricBuilder::new(metrics).subset_time("join_time", partition);
45-
let input_batches =
46-
MetricBuilder::new(metrics).counter("input_batches", partition);
47-
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
48-
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
45+
let input_batches = MetricBuilder::new(metrics)
46+
.with_category(MetricCategory::Rows)
47+
.counter("input_batches", partition);
48+
let input_rows = MetricBuilder::new(metrics)
49+
.with_category(MetricCategory::Rows)
50+
.counter("input_rows", partition);
51+
let peak_mem_used = MetricBuilder::new(metrics)
52+
.with_category(MetricCategory::Bytes)
53+
.gauge("peak_mem_used", partition);
4954
let spill_metrics = SpillMetrics::new(metrics, partition);
5055

5156
let baseline_metrics = BaselineMetrics::new(metrics, partition);

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ use crate::joins::join_hash_map::{
2828
update_from_iter,
2929
};
3030
use crate::joins::utils::{JoinFilter, JoinHashMapType};
31-
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder};
31+
use crate::metrics::{
32+
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
33+
};
3234
use crate::{ExecutionPlan, metrics};
3335

3436
use arrow::array::{
@@ -700,26 +702,31 @@ pub struct StreamJoinMetrics {
700702

701703
impl StreamJoinMetrics {
702704
pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
703-
let input_batches =
704-
MetricBuilder::new(metrics).counter("left_input_batches", partition);
705-
let input_rows =
706-
MetricBuilder::new(metrics).counter("left_input_rows", partition);
705+
let input_batches = MetricBuilder::new(metrics)
706+
.with_category(MetricCategory::Rows)
707+
.counter("left_input_batches", partition);
708+
let input_rows = MetricBuilder::new(metrics)
709+
.with_category(MetricCategory::Rows)
710+
.counter("left_input_rows", partition);
707711
let left = StreamJoinSideMetrics {
708712
input_batches,
709713
input_rows,
710714
};
711715

712-
let input_batches =
713-
MetricBuilder::new(metrics).counter("right_input_batches", partition);
714-
let input_rows =
715-
MetricBuilder::new(metrics).counter("right_input_rows", partition);
716+
let input_batches = MetricBuilder::new(metrics)
717+
.with_category(MetricCategory::Rows)
718+
.counter("right_input_batches", partition);
719+
let input_rows = MetricBuilder::new(metrics)
720+
.with_category(MetricCategory::Rows)
721+
.counter("right_input_rows", partition);
716722
let right = StreamJoinSideMetrics {
717723
input_batches,
718724
input_rows,
719725
};
720726

721-
let stream_memory_usage =
722-
MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
727+
let stream_memory_usage = MetricBuilder::new(metrics)
728+
.with_category(MetricCategory::Bytes)
729+
.gauge("stream_memory_usage", partition);
723730

724731
Self {
725732
left,

0 commit comments

Comments
 (0)