Skip to content

Commit ff844be

Browse files
authored
Improve ergonomics for ExecutionPlanMetricsSet and MetricsSet (#21762)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - None ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Sometimes, when an `ExecutionPlan` implementation is complex, different metrics are collected from different structs that compose the whole execution plan. These metrics need to eventually be served from the single entrypoint `ExecutionPlan::metrics()` or `DataSource::metrics()`, and the current api does not have good methods for merging several `ExecutionPlanMetricsSet` coming from different sources into a single one. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Add some basic conversion and iteration methods for `MetricsSet` and `ExecutionPlanMetricsSet`, in order to improve ergonomics around these structs. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> This is purely just basic std trait implementations and method exposure, so as long as the code compiles, I don't think it needs further tests. ## Are there any user-facing changes? People will see some more available methods in the `MetricsSet` and `ExecutionPlanMetricsSet` structs for ergonomics. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 8875956 commit ff844be

1 file changed

Lines changed: 87 additions & 0 deletions

File tree

  • datafusion/physical-expr-common/src/metrics

datafusion/physical-expr-common/src/metrics/mod.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::{
3030
borrow::Cow,
3131
fmt::{self, Debug, Display},
3232
sync::Arc,
33+
vec::IntoIter,
3334
};
3435

3536
// public exports
@@ -432,6 +433,38 @@ impl Display for MetricsSet {
432433
}
433434
}
434435

436+
impl IntoIterator for MetricsSet {
437+
type Item = Arc<Metric>;
438+
type IntoIter = IntoIter<Self::Item>;
439+
440+
fn into_iter(self) -> Self::IntoIter {
441+
self.metrics.into_iter()
442+
}
443+
}
444+
445+
impl<'a> IntoIterator for &'a MetricsSet {
446+
type Item = &'a Arc<Metric>;
447+
type IntoIter = std::slice::Iter<'a, Arc<Metric>>;
448+
449+
fn into_iter(self) -> Self::IntoIter {
450+
self.metrics.iter()
451+
}
452+
}
453+
454+
impl Extend<Arc<Metric>> for MetricsSet {
455+
fn extend<I: IntoIterator<Item = Arc<Metric>>>(&mut self, iter: I) {
456+
self.metrics.extend(iter);
457+
}
458+
}
459+
460+
impl FromIterator<Arc<Metric>> for MetricsSet {
461+
fn from_iter<T: IntoIterator<Item = Arc<Metric>>>(iter: T) -> Self {
462+
Self {
463+
metrics: iter.into_iter().collect(),
464+
}
465+
}
466+
}
467+
435468
/// A set of [`Metric`]s for an individual operator.
436469
///
437470
/// This structure is intended as a convenience for execution plan
@@ -465,6 +498,14 @@ impl ExecutionPlanMetricsSet {
465498
}
466499
}
467500

501+
impl From<MetricsSet> for ExecutionPlanMetricsSet {
502+
fn from(metrics: MetricsSet) -> Self {
503+
Self {
504+
inner: Arc::new(Mutex::new(metrics)),
505+
}
506+
}
507+
}
508+
468509
/// `name=value` pairs identifying a metric. This concept is called various things
469510
/// in various different systems:
470511
///
@@ -752,6 +793,52 @@ mod tests {
752793
};
753794
}
754795

796+
#[test]
797+
fn test_extend() {
798+
let mut metrics = MetricsSet::new();
799+
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
800+
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
801+
802+
metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]);
803+
assert_eq!(metrics.iter().count(), 2);
804+
805+
let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None));
806+
metrics.extend(std::iter::once(Arc::clone(&m3)));
807+
assert_eq!(metrics.iter().count(), 3);
808+
}
809+
810+
#[test]
811+
fn test_collect() {
812+
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
813+
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
814+
815+
let metrics: MetricsSet =
816+
vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect();
817+
assert_eq!(metrics.iter().count(), 2);
818+
819+
let empty: MetricsSet = std::iter::empty().collect();
820+
assert_eq!(empty.iter().count(), 0);
821+
}
822+
823+
#[test]
824+
fn test_into_iterator_by_ref() {
825+
let mut metrics = MetricsSet::new();
826+
metrics.push(Arc::new(Metric::new(
827+
MetricValue::OutputRows(Count::new()),
828+
None,
829+
)));
830+
metrics.push(Arc::new(Metric::new(
831+
MetricValue::SpillCount(Count::new()),
832+
None,
833+
)));
834+
835+
let mut count = 0;
836+
for _m in &metrics {
837+
count += 1;
838+
}
839+
assert_eq!(count, 2);
840+
}
841+
755842
#[test]
756843
fn test_sorted_for_display() {
757844
let metrics = ExecutionPlanMetricsSet::new();

0 commit comments

Comments
 (0)