Skip to content

Commit a4a3923

Browse files
committed
Improve ergonomics for ExecutionPlanMetricsSet and MetricsSet (apache#21762)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - None ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Sometimes, when an `ExecutionPlan` implementation is complex, different metrics are collected from different structs that compose the whole execution plan. These metrics need to eventually be served from the single entrypoint `ExecutionPlan::metrics()` or `DataSource::metrics()`, and the current api does not have good methods for merging several `ExecutionPlanMetricsSet` coming from different sources into a single one. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Add some basic conversion and iteration methods for `MetricsSet` and `ExecutionPlanMetricsSet`, in order to improve ergonomics around these structs. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> This is purely just basic std trait implementations and method exposure, so as long as the code compiles, I don't think it needs further tests. ## Are there any user-facing changes? People will see some more available methods in the `MetricsSet` and `ExecutionPlanMetricsSet` structs for ergonomics. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> (cherry picked from commit ff844be)
1 parent a0763db commit a4a3923

1 file changed

Lines changed: 87 additions & 0 deletions

File tree

  • datafusion/physical-expr-common/src/metrics

datafusion/physical-expr-common/src/metrics/mod.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::{
2929
borrow::Cow,
3030
fmt::{Debug, Display},
3131
sync::Arc,
32+
vec::IntoIter,
3233
};
3334

3435
// public exports
@@ -398,6 +399,38 @@ impl Display for MetricsSet {
398399
}
399400
}
400401

402+
impl IntoIterator for MetricsSet {
403+
type Item = Arc<Metric>;
404+
type IntoIter = IntoIter<Self::Item>;
405+
406+
fn into_iter(self) -> Self::IntoIter {
407+
self.metrics.into_iter()
408+
}
409+
}
410+
411+
impl<'a> IntoIterator for &'a MetricsSet {
412+
type Item = &'a Arc<Metric>;
413+
type IntoIter = std::slice::Iter<'a, Arc<Metric>>;
414+
415+
fn into_iter(self) -> Self::IntoIter {
416+
self.metrics.iter()
417+
}
418+
}
419+
420+
impl Extend<Arc<Metric>> for MetricsSet {
421+
fn extend<I: IntoIterator<Item = Arc<Metric>>>(&mut self, iter: I) {
422+
self.metrics.extend(iter);
423+
}
424+
}
425+
426+
impl FromIterator<Arc<Metric>> for MetricsSet {
427+
fn from_iter<T: IntoIterator<Item = Arc<Metric>>>(iter: T) -> Self {
428+
Self {
429+
metrics: iter.into_iter().collect(),
430+
}
431+
}
432+
}
433+
401434
/// A set of [`Metric`]s for an individual operator.
402435
///
403436
/// This structure is intended as a convenience for execution plan
@@ -431,6 +464,14 @@ impl ExecutionPlanMetricsSet {
431464
}
432465
}
433466

467+
impl From<MetricsSet> for ExecutionPlanMetricsSet {
468+
fn from(metrics: MetricsSet) -> Self {
469+
Self {
470+
inner: Arc::new(Mutex::new(metrics)),
471+
}
472+
}
473+
}
474+
434475
/// `name=value` pairs identifying a metric. This concept is called various things
435476
/// in various different systems:
436477
///
@@ -718,6 +759,52 @@ mod tests {
718759
};
719760
}
720761

762+
#[test]
763+
fn test_extend() {
764+
let mut metrics = MetricsSet::new();
765+
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
766+
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
767+
768+
metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]);
769+
assert_eq!(metrics.iter().count(), 2);
770+
771+
let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None));
772+
metrics.extend(std::iter::once(Arc::clone(&m3)));
773+
assert_eq!(metrics.iter().count(), 3);
774+
}
775+
776+
#[test]
777+
fn test_collect() {
778+
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
779+
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
780+
781+
let metrics: MetricsSet =
782+
vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect();
783+
assert_eq!(metrics.iter().count(), 2);
784+
785+
let empty: MetricsSet = std::iter::empty().collect();
786+
assert_eq!(empty.iter().count(), 0);
787+
}
788+
789+
#[test]
790+
fn test_into_iterator_by_ref() {
791+
let mut metrics = MetricsSet::new();
792+
metrics.push(Arc::new(Metric::new(
793+
MetricValue::OutputRows(Count::new()),
794+
None,
795+
)));
796+
metrics.push(Arc::new(Metric::new(
797+
MetricValue::SpillCount(Count::new()),
798+
None,
799+
)));
800+
801+
let mut count = 0;
802+
for _m in &metrics {
803+
count += 1;
804+
}
805+
assert_eq!(count, 2);
806+
}
807+
721808
#[test]
722809
fn test_sorted_for_display() {
723810
let metrics = ExecutionPlanMetricsSet::new();

0 commit comments

Comments
 (0)