Skip to content

Commit 285e03e

Browse files
adriangbclaude
andcommitted
Add metric category filtering for EXPLAIN ANALYZE
Introduces `MetricCategory` (Rows, Bytes, Timing) so that EXPLAIN ANALYZE output can be narrowed to only deterministic metrics, which is especially useful in sqllogictest (.slt) files where timing values would otherwise require `<slt:ignore>` markers everywhere. Each `Metric` now optionally declares a category via `MetricBuilder::with_category()`. Well-known builder methods (`output_rows`, `elapsed_compute`, …) set the category automatically; custom counters/gauges default to "always included". A new session config `datafusion.explain.analyze_categories` accepts `all` (default), `none`, or a comma-separated list of `rows`, `bytes`, `timing` to control which categories appear. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9b726bc commit 285e03e

15 files changed

Lines changed: 536 additions & 36 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow_ipc::CompressionType;
2222
#[cfg(feature = "parquet_encryption")]
2323
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
2424
use crate::error::_config_err;
25-
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
25+
use crate::format::{ExplainAnalyzeCategories, ExplainAnalyzeLevel, ExplainFormat};
2626
use crate::parquet_config::DFParquetWriterVersion;
2727
use crate::parsers::CompressionTypeVariant;
2828
use crate::utils::get_available_parallelism;
@@ -1212,6 +1212,12 @@ config_namespace! {
12121212
/// "summary" shows common metrics for high-level insights.
12131213
/// "dev" provides deep operator-level introspection for developers.
12141214
pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1215+
1216+
/// Which metric categories to include in "EXPLAIN ANALYZE" output.
1217+
/// Comma-separated list of: "rows", "bytes", "timing".
1218+
/// Use "none" to show plan structure only, or "all" (default) to show everything.
1219+
/// Metrics without a declared category are always shown (except with "none").
1220+
pub analyze_categories: ExplainAnalyzeCategories, default = ExplainAnalyzeCategories::All
12151221
}
12161222
}
12171223

datafusion/common/src/format.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,142 @@ impl ConfigField for ExplainFormat {
206206
}
207207
}
208208

209+
/// Classifies a metric by what it measures.
210+
///
211+
/// This is orthogonal to [`MetricType`] (SUMMARY / DEV), which controls
212+
/// *verbosity*. `MetricCategory` controls *what kind of value* is shown,
213+
/// so that `EXPLAIN ANALYZE` output can be narrowed to only the categories
214+
/// that are useful in a given context.
215+
///
216+
/// For testing, the key property is **determinism**:
217+
/// - [`Rows`](Self::Rows) and [`Bytes`](Self::Bytes) depend on the plan
218+
/// and the data, so they are deterministic across runs (given the same
219+
/// input).
220+
/// - [`Timing`](Self::Timing) depends on hardware, system load, scheduling,
221+
/// etc., so it varies from run to run even on the same machine.
222+
///
223+
/// [`MetricCategory`] is especially useful in sqllogictest (`.slt`) files:
224+
/// setting `datafusion.explain.analyze_categories = 'rows'` lets a test
225+
/// assert on row-count metrics without sprinkling `<slt:ignore>` over every
226+
/// timing value.
227+
///
228+
/// Metrics that do not declare a category (the default for custom
229+
/// `Count` / `Gauge` metrics) are **always included** unless the config
230+
/// is set to `'none'`.
231+
///
232+
/// [`MetricType`]: datafusion_physical_expr_common::metrics::MetricType
233+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
234+
pub enum MetricCategory {
235+
/// Row counts and related dimensionless counters: `output_rows`,
236+
/// `spilled_rows`, `output_batches`, pruning metrics, ratios, etc.
237+
///
238+
/// Deterministic given the same plan and data.
239+
Rows,
240+
/// Byte measurements: `output_bytes`, `spilled_bytes`,
241+
/// `current_memory_usage`, `bytes_scanned`, etc.
242+
///
243+
/// Deterministic given the same plan and data.
244+
Bytes,
245+
/// Wall-clock durations and timestamps: `elapsed_compute`,
246+
/// operator-defined `Time` metrics, `start_timestamp` /
247+
/// `end_timestamp`, etc.
248+
///
249+
/// **Non-deterministic** — varies across runs even on the same hardware.
250+
Timing,
251+
}
252+
253+
/// Controls which [`MetricCategory`] values are shown in `EXPLAIN ANALYZE`.
254+
///
255+
/// Set via `SET datafusion.explain.analyze_categories = '...'`.
256+
///
257+
/// See [`MetricCategory`] for the determinism properties that motivate
258+
/// this filter.
259+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
260+
pub enum ExplainAnalyzeCategories {
261+
/// Show all metrics regardless of category (the default).
262+
All,
263+
/// Show only metrics whose category is in the list.
264+
/// Metrics that have no declared category are still included
265+
/// (they are treated as "always on").
266+
///
267+
/// An **empty** vec means "plan only" — suppress all metrics.
268+
Only(Vec<MetricCategory>),
269+
}
270+
271+
impl Default for ExplainAnalyzeCategories {
272+
fn default() -> Self {
273+
Self::All
274+
}
275+
}
276+
277+
impl FromStr for ExplainAnalyzeCategories {
278+
type Err = DataFusionError;
279+
280+
fn from_str(s: &str) -> Result<Self, Self::Err> {
281+
let s = s.trim().to_lowercase();
282+
match s.as_str() {
283+
"all" => Ok(Self::All),
284+
"none" => Ok(Self::Only(vec![])),
285+
other => {
286+
let mut cats = Vec::new();
287+
for part in other.split(',') {
288+
let part = part.trim();
289+
match part {
290+
"rows" => cats.push(MetricCategory::Rows),
291+
"bytes" => cats.push(MetricCategory::Bytes),
292+
"timing" => cats.push(MetricCategory::Timing),
293+
unknown => {
294+
return Err(DataFusionError::Configuration(format!(
295+
"Invalid metric category '{unknown}'. \
296+
Expected 'all', 'none', or comma-separated list of \
297+
'rows', 'bytes', 'timing'."
298+
)));
299+
}
300+
}
301+
}
302+
cats.dedup();
303+
Ok(Self::Only(cats))
304+
}
305+
}
306+
}
307+
}
308+
309+
impl Display for ExplainAnalyzeCategories {
310+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311+
match self {
312+
Self::All => write!(f, "all"),
313+
Self::Only(cats) if cats.is_empty() => write!(f, "none"),
314+
Self::Only(cats) => {
315+
let mut first = true;
316+
for cat in cats {
317+
if !first {
318+
write!(f, ",")?;
319+
}
320+
first = false;
321+
let s = match cat {
322+
MetricCategory::Rows => "rows",
323+
MetricCategory::Bytes => "bytes",
324+
MetricCategory::Timing => "timing",
325+
};
326+
write!(f, "{s}")?;
327+
}
328+
Ok(())
329+
}
330+
}
331+
}
332+
}
333+
334+
impl ConfigField for ExplainAnalyzeCategories {
335+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
336+
v.some(key, self, description)
337+
}
338+
339+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
340+
*self = ExplainAnalyzeCategories::from_str(value)?;
341+
Ok(())
342+
}
343+
}
344+
209345
/// Verbosity levels controlling how `EXPLAIN ANALYZE` renders metrics
210346
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
211347
pub enum ExplainAnalyzeLevel {

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ mod tests {
235235
false,
236236
false,
237237
vec![MetricType::SUMMARY, MetricType::DEV],
238+
None,
238239
// use a new ParquetSource to avoid sharing execution metrics
239240
self.build_parquet_exec(
240241
file_group.clone(),

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use arrow_schema::Field;
6464
use datafusion_catalog::ScanArgs;
6565
use datafusion_common::Column;
6666
use datafusion_common::display::ToStringifiedPlan;
67-
use datafusion_common::format::ExplainAnalyzeLevel;
67+
use datafusion_common::format::{ExplainAnalyzeCategories, ExplainAnalyzeLevel};
6868
use datafusion_common::tree_node::{
6969
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
7070
};
@@ -2720,10 +2720,17 @@ impl DefaultPhysicalPlanner {
27202720
ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
27212721
ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
27222722
};
2723+
let analyze_categories =
2724+
session_state.config_options().explain.analyze_categories.clone();
2725+
let metric_categories = match analyze_categories {
2726+
ExplainAnalyzeCategories::All => None,
2727+
ExplainAnalyzeCategories::Only(cats) => Some(cats),
2728+
};
27232729
Ok(Arc::new(AnalyzeExec::new(
27242730
a.verbose,
27252731
show_statistics,
27262732
metric_types,
2733+
metric_categories,
27272734
input,
27282735
schema,
27292736
)))

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use rstest::rstest;
2222
use datafusion::config::ConfigOptions;
2323
use datafusion::physical_plan::display::DisplayableExecutionPlan;
2424
use datafusion::physical_plan::metrics::Timestamp;
25-
use datafusion_common::format::ExplainAnalyzeLevel;
25+
use datafusion_common::format::{
26+
ExplainAnalyzeCategories, ExplainAnalyzeLevel, MetricCategory,
27+
};
2628
use object_store::path::Path;
2729

2830
#[tokio::test]
@@ -219,6 +221,23 @@ async fn collect_plan_with_context(
219221
.to_string()
220222
}
221223

224+
async fn collect_plan_with_categories(
225+
sql_str: &str,
226+
categories: ExplainAnalyzeCategories,
227+
) -> String {
228+
let ctx = SessionContext::new();
229+
{
230+
let state = ctx.state_ref();
231+
let mut state = state.write();
232+
state.config_mut().options_mut().explain.analyze_categories = categories;
233+
}
234+
let dataframe = ctx.sql(sql_str).await.unwrap();
235+
let batches = dataframe.collect().await.unwrap();
236+
arrow::util::pretty::pretty_format_batches(&batches)
237+
.unwrap()
238+
.to_string()
239+
}
240+
222241
async fn collect_plan(sql_str: &str, level: ExplainAnalyzeLevel) -> String {
223242
let ctx = SessionContext::new();
224243
collect_plan_with_context(sql_str, &ctx, level).await
@@ -1160,3 +1179,78 @@ async fn explain_analyze_hash_join() {
11601179
);
11611180
}
11621181
}
1182+
1183+
#[tokio::test]
1184+
async fn explain_analyze_categories() {
1185+
let sql = "EXPLAIN ANALYZE \
1186+
SELECT * \
1187+
FROM generate_series(10) as t1(v1) \
1188+
ORDER BY v1 DESC";
1189+
1190+
for (categories, needle, should_contain) in [
1191+
// "rows" category: output_rows yes, elapsed_compute no, output_bytes no
1192+
(
1193+
ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]),
1194+
"output_rows",
1195+
true,
1196+
),
1197+
(
1198+
ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]),
1199+
"elapsed_compute",
1200+
false,
1201+
),
1202+
(
1203+
ExplainAnalyzeCategories::Only(vec![MetricCategory::Rows]),
1204+
"output_bytes",
1205+
false,
1206+
),
1207+
// "none" — plan only, no metrics at all
1208+
(
1209+
ExplainAnalyzeCategories::Only(vec![]),
1210+
"output_rows",
1211+
false,
1212+
),
1213+
(
1214+
ExplainAnalyzeCategories::Only(vec![]),
1215+
"elapsed_compute",
1216+
false,
1217+
),
1218+
// "all" — everything shown
1219+
(ExplainAnalyzeCategories::All, "output_rows", true),
1220+
(ExplainAnalyzeCategories::All, "elapsed_compute", true),
1221+
(ExplainAnalyzeCategories::All, "output_bytes", true),
1222+
// "rows,bytes" — row + byte metrics, no timing
1223+
(
1224+
ExplainAnalyzeCategories::Only(vec![
1225+
MetricCategory::Rows,
1226+
MetricCategory::Bytes,
1227+
]),
1228+
"output_rows",
1229+
true,
1230+
),
1231+
(
1232+
ExplainAnalyzeCategories::Only(vec![
1233+
MetricCategory::Rows,
1234+
MetricCategory::Bytes,
1235+
]),
1236+
"output_bytes",
1237+
true,
1238+
),
1239+
(
1240+
ExplainAnalyzeCategories::Only(vec![
1241+
MetricCategory::Rows,
1242+
MetricCategory::Bytes,
1243+
]),
1244+
"elapsed_compute",
1245+
false,
1246+
),
1247+
] {
1248+
let plan = collect_plan_with_categories(sql, categories.clone()).await;
1249+
assert_eq!(
1250+
plan.contains(needle),
1251+
should_contain,
1252+
"plan for categories {categories:?} should{} contain '{needle}':\n{plan}",
1253+
if should_contain { "" } else { " NOT" }
1254+
);
1255+
}
1256+
}

0 commit comments

Comments
 (0)