Skip to content

Commit 155ccea

Browse files
authored
Merge branch 'main' into feat_migrate_ffi_to_stabby
2 parents 510526e + 2c03881 commit 155ccea

33 files changed

Lines changed: 856 additions & 183 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow_ipc::CompressionType;
2222
#[cfg(feature = "parquet_encryption")]
2323
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
2424
use crate::error::_config_err;
25-
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
25+
use crate::format::{ExplainAnalyzeCategories, ExplainFormat, MetricType};
2626
use crate::parquet_config::DFParquetWriterVersion;
2727
use crate::parsers::CompressionTypeVariant;
2828
use crate::utils::get_available_parallelism;
@@ -1211,7 +1211,13 @@ config_namespace! {
12111211
/// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
12121212
/// "summary" shows common metrics for high-level insights.
12131213
/// "dev" provides deep operator-level introspection for developers.
1214-
pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1214+
pub analyze_level: MetricType, default = MetricType::Dev
1215+
1216+
/// Which metric categories to include in "EXPLAIN ANALYZE" output.
1217+
/// Comma-separated list of: "rows", "bytes", "timing", "uncategorized".
1218+
/// Use "none" to show plan structure only, or "all" (default) to show everything.
1219+
/// Metrics without a declared category are treated as "uncategorized".
1220+
pub analyze_categories: ExplainAnalyzeCategories, default = ExplainAnalyzeCategories::All
12151221
}
12161222
}
12171223

datafusion/common/src/format.rs

Lines changed: 198 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -206,47 +206,227 @@ impl ConfigField for ExplainFormat {
206206
}
207207
}
208208

209-
/// Verbosity levels controlling how `EXPLAIN ANALYZE` renders metrics
209+
/// Categorizes metrics so the display layer can choose the desired verbosity.
210+
///
211+
/// The `datafusion.explain.analyze_level` configuration controls which
212+
/// type is shown:
213+
/// - `"dev"` (the default): all metrics are shown.
214+
/// - `"summary"`: only metrics tagged as `Summary` are shown.
215+
///
216+
/// This is orthogonal to [`MetricCategory`], which filters by *what kind*
217+
/// of value a metric represents (rows / bytes / timing).
218+
///
219+
/// # Difference from `EXPLAIN ANALYZE VERBOSE`
220+
///
221+
/// The `VERBOSE` keyword controls whether per-partition metrics are shown
222+
/// (when specified) or aggregated metrics are displayed (when omitted).
223+
/// In contrast, `MetricType` determines which *levels* of metrics are
224+
/// displayed.
210225
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
211-
pub enum ExplainAnalyzeLevel {
212-
/// Show a compact view containing high-level metrics
226+
pub enum MetricType {
227+
/// Common metrics for high-level insights (answering which operator is slow)
213228
Summary,
214-
/// Show a developer-focused view with per-operator details
229+
/// For deep operator-level introspection for developers
215230
Dev,
216-
// When adding new enum, update the error message in `from_str()` accordingly.
217231
}
218232

219-
impl FromStr for ExplainAnalyzeLevel {
233+
impl MetricType {
234+
/// Returns the set of metric types that should be shown for this level.
235+
///
236+
/// `Dev` is a superset of `Summary`: when the user selects
237+
/// `analyze_level = 'dev'`, both `Summary` and `Dev` metrics are shown.
238+
pub fn included_types(self) -> Vec<MetricType> {
239+
match self {
240+
MetricType::Summary => vec![MetricType::Summary],
241+
MetricType::Dev => vec![MetricType::Summary, MetricType::Dev],
242+
}
243+
}
244+
}
245+
246+
impl FromStr for MetricType {
220247
type Err = DataFusionError;
221248

222-
fn from_str(level: &str) -> Result<Self, Self::Err> {
223-
match level.to_lowercase().as_str() {
224-
"summary" => Ok(ExplainAnalyzeLevel::Summary),
225-
"dev" => Ok(ExplainAnalyzeLevel::Dev),
249+
fn from_str(s: &str) -> Result<Self, Self::Err> {
250+
match s.trim().to_lowercase().as_str() {
251+
"summary" => Ok(Self::Summary),
252+
"dev" => Ok(Self::Dev),
226253
other => Err(DataFusionError::Configuration(format!(
227254
"Invalid explain analyze level. Expected 'summary' or 'dev'. Got '{other}'"
228255
))),
229256
}
230257
}
231258
}
232259

233-
impl Display for ExplainAnalyzeLevel {
260+
impl Display for MetricType {
234261
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235-
let s = match self {
236-
ExplainAnalyzeLevel::Summary => "summary",
237-
ExplainAnalyzeLevel::Dev => "dev",
238-
};
239-
write!(f, "{s}")
262+
match self {
263+
Self::Summary => write!(f, "summary"),
264+
Self::Dev => write!(f, "dev"),
265+
}
266+
}
267+
}
268+
269+
impl ConfigField for MetricType {
270+
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
271+
v.some(key, self, description)
272+
}
273+
274+
fn set(&mut self, _: &str, value: &str) -> Result<()> {
275+
*self = MetricType::from_str(value)?;
276+
Ok(())
277+
}
278+
}
279+
280+
/// Classifies a metric by what it measures.
281+
///
282+
/// This is orthogonal to [`MetricType`] (Summary / Dev), which controls
283+
/// *verbosity*. `MetricCategory` controls *what kind of value* is shown,
284+
/// so that `EXPLAIN ANALYZE` output can be narrowed to only the categories
285+
/// that are useful in a given context.
286+
///
287+
/// In particular this is useful for testing since metrics differ in their stability across runs:
288+
/// - [`Rows`](Self::Rows) and [`Bytes`](Self::Bytes) depend only on the plan
289+
/// and the data, so they are mostly deterministic across runs (given the same
290+
/// input). Variations can existing e.g. because of non-deterministic ordering
291+
/// of evaluation between threads.
292+
/// Running with a single target partition often makes these metrics stable enough to assert on in tests.
293+
/// - [`Timing`](Self::Timing) depends on hardware, system load, scheduling,
294+
/// etc., so it varies from run to run even on the same machine.
295+
///
296+
/// [`MetricCategory`] is especially useful in sqllogictest (`.slt`) files:
297+
/// setting `datafusion.explain.analyze_categories = 'rows'` lets a test
298+
/// assert on row-count metrics without sprinkling `<slt:ignore>` over every
299+
/// timing value.
300+
///
301+
/// Metrics that do not declare a category (the default for custom
302+
/// `Count` / `Gauge` metrics) are treated as
303+
/// [`Uncategorized`](Self::Uncategorized) for filtering purposes.
304+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
305+
pub enum MetricCategory {
306+
/// Row counts and related dimensionless counters: `output_rows`,
307+
/// `spilled_rows`, `output_batches`, pruning metrics, ratios, etc.
308+
///
309+
/// Mostly deterministic given the same plan and data.
310+
Rows,
311+
/// Byte measurements: `output_bytes`, `spilled_bytes`,
312+
/// `current_memory_usage`, `bytes_scanned`, etc.
313+
///
314+
/// Mostly deterministic given the same plan and data.
315+
Bytes,
316+
/// Wall-clock durations and timestamps: `elapsed_compute`,
317+
/// operator-defined `Time` metrics, `start_timestamp` /
318+
/// `end_timestamp`, etc.
319+
///
320+
/// **Non-deterministic** — varies across runs even on the same hardware.
321+
Timing,
322+
/// Catch-all for metrics that do not fit into [`Rows`](Self::Rows),
323+
/// [`Bytes`](Self::Bytes), or [`Timing`](Self::Timing).
324+
///
325+
/// Custom `Count` / `Gauge` metrics that are not explicitly assigned
326+
/// a category are treated as `Uncategorized` for filtering purposes.
327+
///
328+
/// This variant lets users explicitly include or exclude these
329+
/// metrics, e.g.:
330+
/// ```sql
331+
/// SET datafusion.explain.analyze_categories = 'rows, bytes, uncategorized';
332+
/// ```
333+
Uncategorized,
334+
}
335+
336+
impl FromStr for MetricCategory {
337+
type Err = DataFusionError;
338+
339+
fn from_str(s: &str) -> Result<Self, Self::Err> {
340+
match s.trim().to_lowercase().as_str() {
341+
"rows" => Ok(Self::Rows),
342+
"bytes" => Ok(Self::Bytes),
343+
"timing" => Ok(Self::Timing),
344+
"uncategorized" => Ok(Self::Uncategorized),
345+
other => Err(DataFusionError::Configuration(format!(
346+
"Invalid metric category '{other}'. \
347+
Expected 'rows', 'bytes', 'timing', or 'uncategorized'."
348+
))),
349+
}
350+
}
351+
}
352+
353+
impl Display for MetricCategory {
354+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355+
match self {
356+
Self::Rows => write!(f, "rows"),
357+
Self::Bytes => write!(f, "bytes"),
358+
Self::Timing => write!(f, "timing"),
359+
Self::Uncategorized => write!(f, "uncategorized"),
360+
}
361+
}
362+
}
363+
364+
/// Controls which [`MetricCategory`] values are shown in `EXPLAIN ANALYZE`.
365+
///
366+
/// Set via `SET datafusion.explain.analyze_categories = '...'`.
367+
///
368+
/// See [`MetricCategory`] for the determinism properties that motivate
369+
/// this filter.
370+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
371+
pub enum ExplainAnalyzeCategories {
372+
/// Show all metrics regardless of category (the default).
373+
#[default]
374+
All,
375+
/// Show only metrics whose category is in the list.
376+
/// Metrics with no declared category are treated as
377+
/// [`Uncategorized`](MetricCategory::Uncategorized) for filtering.
378+
///
379+
/// An **empty** vec means "plan only" — suppress all metrics.
380+
Only(Vec<MetricCategory>),
381+
}
382+
383+
impl FromStr for ExplainAnalyzeCategories {
384+
type Err = DataFusionError;
385+
386+
fn from_str(s: &str) -> Result<Self, Self::Err> {
387+
let s = s.trim().to_lowercase();
388+
match s.as_str() {
389+
"all" => Ok(Self::All),
390+
"none" => Ok(Self::Only(vec![])),
391+
other => {
392+
let mut cats = Vec::new();
393+
for part in other.split(',') {
394+
cats.push(part.trim().parse::<MetricCategory>()?);
395+
}
396+
cats.dedup();
397+
Ok(Self::Only(cats))
398+
}
399+
}
400+
}
401+
}
402+
403+
impl Display for ExplainAnalyzeCategories {
404+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405+
match self {
406+
Self::All => write!(f, "all"),
407+
Self::Only(cats) if cats.is_empty() => write!(f, "none"),
408+
Self::Only(cats) => {
409+
let mut first = true;
410+
for cat in cats {
411+
if !first {
412+
write!(f, ",")?;
413+
}
414+
first = false;
415+
write!(f, "{cat}")?;
416+
}
417+
Ok(())
418+
}
419+
}
240420
}
241421
}
242422

243-
impl ConfigField for ExplainAnalyzeLevel {
423+
impl ConfigField for ExplainAnalyzeCategories {
244424
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
245425
v.some(key, self, description)
246426
}
247427

248428
fn set(&mut self, _: &str, value: &str) -> Result<()> {
249-
*self = ExplainAnalyzeLevel::from_str(value)?;
429+
*self = ExplainAnalyzeCategories::from_str(value)?;
250430
Ok(())
251431
}
252432
}

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ mod tests {
234234
let analyze_exec = Arc::new(AnalyzeExec::new(
235235
false,
236236
false,
237-
vec![MetricType::SUMMARY, MetricType::DEV],
237+
vec![MetricType::Summary, MetricType::Dev],
238+
None,
238239
// use a new ParquetSource to avoid sharing execution metrics
239240
self.build_parquet_exec(
240241
file_group.clone(),

datafusion/core/src/physical_planner.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ use arrow_schema::Field;
6464
use datafusion_catalog::ScanArgs;
6565
use datafusion_common::Column;
6666
use datafusion_common::display::ToStringifiedPlan;
67-
use datafusion_common::format::ExplainAnalyzeLevel;
67+
use datafusion_common::format::ExplainAnalyzeCategories;
6868
use datafusion_common::tree_node::{
6969
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
7070
};
@@ -99,7 +99,6 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
9999
use datafusion_physical_plan::empty::EmptyExec;
100100
use datafusion_physical_plan::execution_plan::InvariantLevel;
101101
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
102-
use datafusion_physical_plan::metrics::MetricType;
103102
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
104103
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
105104
use datafusion_physical_plan::unnest::ListUnnest;
@@ -2716,14 +2715,21 @@ impl DefaultPhysicalPlanner {
27162715
let schema = Arc::clone(a.schema.inner());
27172716
let show_statistics = session_state.config_options().explain.show_statistics;
27182717
let analyze_level = session_state.config_options().explain.analyze_level;
2719-
let metric_types = match analyze_level {
2720-
ExplainAnalyzeLevel::Summary => vec![MetricType::SUMMARY],
2721-
ExplainAnalyzeLevel::Dev => vec![MetricType::SUMMARY, MetricType::DEV],
2718+
let metric_types = analyze_level.included_types();
2719+
let analyze_categories = session_state
2720+
.config_options()
2721+
.explain
2722+
.analyze_categories
2723+
.clone();
2724+
let metric_categories = match analyze_categories {
2725+
ExplainAnalyzeCategories::All => None,
2726+
ExplainAnalyzeCategories::Only(cats) => Some(cats),
27222727
};
27232728
Ok(Arc::new(AnalyzeExec::new(
27242729
a.verbose,
27252730
show_statistics,
27262731
metric_types,
2732+
metric_categories,
27272733
input,
27282734
schema,
27292735
)))

0 commit comments

Comments
 (0)