Skip to content

Commit 5cf7348

Browse files
committed
refactor
1 parent b13a270 commit 5cf7348

21 files changed

Lines changed: 228 additions & 188 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow_ipc::CompressionType;
2222
#[cfg(feature = "parquet_encryption")]
2323
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
2424
use crate::error::_config_err;
25-
use crate::format::{ExplainAnalyzeCategories, ExplainAnalyzeLevel, ExplainFormat};
25+
use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
2626
use crate::parquet_config::DFParquetWriterVersion;
2727
use crate::parsers::CompressionTypeVariant;
2828
use crate::utils::get_available_parallelism;
@@ -1211,12 +1211,12 @@ config_namespace! {
12111211
/// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
12121212
/// "summary" shows common metrics for high-level insights.
12131213
/// "dev" provides deep operator-level introspection for developers.
1214-
pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1214+
pub analyze_level: MetricType, default = MetricType::Dev
12151215

12161216
/// Which metric categories to include in "EXPLAIN ANALYZE" output.
1217-
/// Comma-separated list of: "rows", "bytes", "timing".
1217+
/// Comma-separated list of: "rows", "bytes", "timing", "uncategorized".
12181218
/// Use "none" to show plan structure only, or "all" (default) to show everything.
1219-
/// Metrics without a declared category are always shown (except with "none").
1219+
/// Metrics without a declared category are treated as "uncategorized".
12201220
pub analyze_categories: ExplainAnalyzeCategories, default = ExplainAnalyzeCategories::All
12211221
}
12221222
}

datafusion/common/src/format.rs

Lines changed: 126 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,90 @@ impl ConfigField for ExplainFormat {
206206
}
207207
}
208208

209+
/// Categorizes metrics so the display layer can choose the desired verbosity.
210+
///
211+
/// The `datafusion.explain.analyze_level` configuration controls which
212+
/// type is shown:
213+
/// - `"dev"` (the default): all metrics are shown.
214+
/// - `"summary"`: only metrics tagged as `Summary` are shown.
215+
///
216+
/// This is orthogonal to [`MetricCategory`], which filters by *what kind*
217+
/// of value a metric represents (rows / bytes / timing).
218+
///
219+
/// # Difference from `EXPLAIN ANALYZE VERBOSE`
220+
///
221+
/// The `VERBOSE` keyword controls whether per-partition metrics are shown
222+
/// (when specified) or aggregated metrics are displayed (when omitted).
223+
/// In contrast, `MetricType` determines which *levels* of metrics are
224+
/// displayed.
225+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
226+
pub enum MetricType {
227+
/// Common metrics for high-level insights (answering which operator is slow)
228+
Summary,
229+
/// For deep operator-level introspection for developers
230+
Dev,
231+
}
232+
233+
impl MetricType {
234+
/// Returns the set of metric types that should be shown for this level.
235+
///
236+
/// `Dev` is a superset of `Summary`: when the user selects
237+
/// `analyze_level = 'dev'`, both `Summary` and `Dev` metrics are shown.
238+
pub fn included_types(self) -> Vec<MetricType> {
239+
match self {
240+
MetricType::Summary => vec![MetricType::Summary],
241+
MetricType::Dev => vec![MetricType::Summary, MetricType::Dev],
242+
}
243+
}
244+
}
245+
246+
impl FromStr for MetricType {
247+
type Err = DataFusionError;
248+
249+
fn from_str(s: &str) -> Result<Self, Self::Err> {
250+
match s.trim().to_lowercase().as_str() {
251+
"summary" => Ok(Self::Summary),
252+
"dev" => Ok(Self::Dev),
253+
other => Err(DataFusionError::Configuration(format!(
254+
"Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'"
255+
))),
256+
}
257+
}
258+
}
259+
260+
impl Display for MetricType {
261+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262+
match self {
263+
Self::Summary => write!(f, "summary"),
264+
Self::Dev => write!(f, "dev"),
265+
}
266+
}
267+
}
268+
269+
impl ConfigField for MetricType {
270+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
271+
v.some(key, self, description)
272+
}
273+
274+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
275+
*self = MetricType::from_str(value)?;
276+
Ok(())
277+
}
278+
}
279+
209280
/// Classifies a metric by what it measures.
210281
///
211-
/// This is orthogonal to [`MetricType`] (SUMMARY / DEV), which controls
282+
/// This is orthogonal to [`MetricType`] (Summary / Dev), which controls
212283
/// *verbosity*. `MetricCategory` controls *what kind of value* is shown,
213284
/// so that `EXPLAIN ANALYZE` output can be narrowed to only the categories
214285
/// that are useful in a given context.
215286
///
216-
/// For testing, the key property is **determinism**:
217-
/// - [`Rows`](Self::Rows) and [`Bytes`](Self::Bytes) depend on the plan
218-
/// and the data, so they are deterministic across runs (given the same
219-
/// input).
287+
/// In particular this is useful for testing since metrics differ in their stability across runs:
288+
/// - [`Rows`](Self::Rows) and [`Bytes`](Self::Bytes) depend only on the plan
289+
/// and the data, so they are mostly deterministic across runs (given the same
290+
/// input). Variations can existing e.g. because of non-deterministic ordering
291+
/// of evaluation between threads.
292+
/// Running with a single target partition often makes these metrics stable enough to assert on in tests.
220293
/// - [`Timing`](Self::Timing) depends on hardware, system load, scheduling,
221294
/// etc., so it varies from run to run even on the same machine.
222295
///
@@ -226,28 +299,66 @@ impl ConfigField for ExplainFormat {
226299
/// timing value.
227300
///
228301
/// Metrics that do not declare a category (the default for custom
229-
/// `Count` / `Gauge` metrics) are **always included** unless the config
230-
/// is set to `'none'`.
231-
///
232-
/// [`MetricType`]: datafusion_physical_expr_common::metrics::MetricType
302+
/// `Count` / `Gauge` metrics) are treated as
303+
/// [`Uncategorized`](Self::Uncategorized) for filtering purposes.
233304
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
234305
pub enum MetricCategory {
235306
/// Row counts and related dimensionless counters: `output_rows`,
236307
/// `spilled_rows`, `output_batches`, pruning metrics, ratios, etc.
237308
///
238-
/// Deterministic given the same plan and data.
309+
/// Mostly deterministic given the same plan and data.
239310
Rows,
240311
/// Byte measurements: `output_bytes`, `spilled_bytes`,
241312
/// `current_memory_usage`, `bytes_scanned`, etc.
242313
///
243-
/// Deterministic given the same plan and data.
314+
/// Mostly deterministic given the same plan and data.
244315
Bytes,
245316
/// Wall-clock durations and timestamps: `elapsed_compute`,
246317
/// operator-defined `Time` metrics, `start_timestamp` /
247318
/// `end_timestamp`, etc.
248319
///
249320
/// **Non-deterministic** — varies across runs even on the same hardware.
250321
Timing,
322+
/// Catch-all for metrics that do not fit into [`Rows`](Self::Rows),
323+
/// [`Bytes`](Self::Bytes), or [`Timing`](Self::Timing).
324+
///
325+
/// Custom `Count` / `Gauge` metrics that are not explicitly assigned
326+
/// a category are treated as `Uncategorized` for filtering purposes.
327+
///
328+
/// This variant lets users explicitly include or exclude these
329+
/// metrics, e.g.:
330+
/// ```sql
331+
/// SET datafusion.explain.analyze_categories = 'rows, bytes, uncategorized';
332+
/// ```
333+
Uncategorized,
334+
}
335+
336+
impl FromStr for MetricCategory {
337+
type Err = DataFusionError;
338+
339+
fn from_str(s: &str) -> Result<Self, Self::Err> {
340+
match s.trim().to_lowercase().as_str() {
341+
"rows" => Ok(Self::Rows),
342+
"bytes" => Ok(Self::Bytes),
343+
"timing" => Ok(Self::Timing),
344+
"uncategorized" => Ok(Self::Uncategorized),
345+
other => Err(DataFusionError::Configuration(format!(
346+
"Invalid metric category '{other}'. \
347+
Expected 'rows', 'bytes', 'timing', or 'uncategorized'."
348+
))),
349+
}
350+
}
351+
}
352+
353+
impl Display for MetricCategory {
354+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355+
match self {
356+
Self::Rows => write!(f, "rows"),
357+
Self::Bytes => write!(f, "bytes"),
358+
Self::Timing => write!(f, "timing"),
359+
Self::Uncategorized => write!(f, "uncategorized"),
360+
}
361+
}
251362
}
252363

253364
/// Controls which [`MetricCategory`] values are shown in `EXPLAIN ANALYZE`.
@@ -262,8 +373,8 @@ pub enum ExplainAnalyzeCategories {
262373
#[default]
263374
All,
264375
/// Show only metrics whose category is in the list.
265-
/// Metrics that have no declared category are still included
266-
/// (they are treated as "always on").
376+
/// Metrics with no declared category are treated as
377+
/// [`Uncategorized`](MetricCategory::Uncategorized) for filtering.
267378
///
268379
/// An **empty** vec means "plan only" — suppress all metrics.
269380
Only(Vec<MetricCategory>),
@@ -280,19 +391,7 @@ impl FromStr for ExplainAnalyzeCategories {
280391
other => {
281392
let mut cats = Vec::new();
282393
for part in other.split(',') {
283-
let part = part.trim();
284-
match part {
285-
"rows" => cats.push(MetricCategory::Rows),
286-
"bytes" => cats.push(MetricCategory::Bytes),
287-
"timing" => cats.push(MetricCategory::Timing),
288-
unknown => {
289-
return Err(DataFusionError::Configuration(format!(
290-
"Invalid metric category '{unknown}'. \
291-
Expected 'all', 'none', or comma-separated list of \
292-
'rows', 'bytes', 'timing'."
293-
)));
294-
}
295-
}
394+
cats.push(part.trim().parse::<MetricCategory>()?);
296395
}
297396
cats.dedup();
298397
Ok(Self::Only(cats))
@@ -313,12 +412,7 @@ impl Display for ExplainAnalyzeCategories {
313412
write!(f, ",")?;
314413
}
315414
first = false;
316-
let s = match cat {
317-
MetricCategory::Rows => "rows",
318-
MetricCategory::Bytes => "bytes",
319-
MetricCategory::Timing => "timing",
320-
};
321-
write!(f, "{s}")?;
415+
write!(f, "{cat}")?;
322416
}
323417
Ok(())
324418
}
@@ -336,48 +430,3 @@ impl ConfigField for ExplainAnalyzeCategories {
336430
Ok(())
337431
}
338432
}
339-
340-
/// Verbosity levels controlling how `EXPLAIN ANALYZE` renders metrics
341-
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
342-
pub enum ExplainAnalyzeLevel {
343-
/// Show a compact view containing high-level metrics
344-
Summary,
345-
/// Show a developer-focused view with per-operator details
346-
Dev,
347-
// When adding new enum, update the error message in `from_str()` accordingly.
348-
}
349-
350-
impl FromStr for ExplainAnalyzeLevel {
351-
type Err = DataFusionError;
352-
353-
fn from_str(level: &str) -> Result<Self, Self::Err> {
354-
match level.to_lowercase().as_str() {
355-
"summary" => Ok(ExplainAnalyzeLevel::Summary),
356-
"dev" => Ok(ExplainAnalyzeLevel::Dev),
357-
other => Err(DataFusionError::Configuration(format!(
358-
"Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'"
359-
))),
360-
}
361-
}
362-
}
363-
364-
impl Display for ExplainAnalyzeLevel {
365-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
366-
let s = match self {
367-
ExplainAnalyzeLevel::Summary => "summary",
368-
ExplainAnalyzeLevel::Dev => "dev",
369-
};
370-
write!(f, "{s}")
371-
}
372-
}
373-
374-
impl ConfigField for ExplainAnalyzeLevel {
375-
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
376-
v.some(key, self, description)
377-
}
378-
379-
fn set(&mut self, _: &str, value: &str) -> Result<()> {
380-
*self = ExplainAnalyzeLevel::from_str(value)?;
381-
Ok(())
382-
}
383-
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ mod tests {
234234
let analyze_exec = Arc::new(AnalyzeExec::new(
235235
false,
236236
false,
237-
vec![MetricType::SUMMARY, MetricType::DEV],
237+
vec![MetricType::Summary, MetricType::Dev],
238238
None,
239239
// use a new ParquetSource to avoid sharing execution metrics
240240
self.build_parquet_exec(

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use arrow_schema::Field;
6464
use datafusion_catalog::ScanArgs;
6565
use datafusion_common::Column;
6666
use datafusion_common::display::ToStringifiedPlan;
67-
use datafusion_common::format::{ExplainAnalyzeCategories, ExplainAnalyzeLevel};
67+
use datafusion_common::format::ExplainAnalyzeCategories;
6868
use datafusion_common::tree_node::{
6969
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
7070
};
@@ -99,7 +99,6 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
9999
use datafusion_physical_plan::empty::EmptyExec;
100100
use datafusion_physical_plan::execution_plan::InvariantLevel;
101101
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
102-
use datafusion_physical_plan::metrics::MetricType;
103102
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
104103
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
105104
use datafusion_physical_plan::unnest::ListUnnest;
@@ -2716,10 +2715,7 @@ impl DefaultPhysicalPlanner {
27162715
let schema = Arc::clone(a.schema.inner());
27172716
let show_statistics = session_state.config_options().explain.show_statistics;
27182717
let analyze_level = session_state.config_options().explain.analyze_level;
2719-
let metric_types = match analyze_level {
2720-
ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
2721-
ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
2722-
};
2718+
let metric_types = analyze_level.included_types();
27232719
let analyze_categories = session_state
27242720
.config_options()
27252721
.explain

0 commit comments

Comments
 (0)