-
Notifications
You must be signed in to change notification settings - Fork 152
Add Python bindings for accessing ExecutionMetrics #1381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
697de36
0a57da6
e1d0c81
7200857
d2b6c9f
30ec047
a8623c2
a0ddc25
afe8df8
71e20ed
98d5904
7631a82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| __all__ = [ | ||
| "ExecutionPlan", | ||
| "LogicalPlan", | ||
| "Metric", | ||
| "MetricsSet", | ||
| ] | ||
|
|
||
|
|
||
|
|
@@ -151,3 +153,107 @@ def to_proto(self) -> bytes: | |
| Tables created in memory from record batches are currently not supported. | ||
| """ | ||
| return self._raw_plan.to_proto() | ||
|
|
||
| def metrics(self) -> MetricsSet | None: | ||
| """Return metrics for this plan node after execution, or None if unavailable.""" | ||
| raw = self._raw_plan.metrics() | ||
| if raw is None: | ||
| return None | ||
| return MetricsSet(raw) | ||
|
|
||
| def collect_metrics(self) -> list[tuple[str, MetricsSet]]: | ||
| """Walk the plan tree and collect metrics from all operators. | ||
|
|
||
| Returns a list of (operator_name, MetricsSet) tuples. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Walk the plan tree and collect metrics" probably does not make a lot of sense to someone other than a developer. I think we can make this more user focused.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't dug in, but is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated! |
||
| """ | ||
| result: list[tuple[str, MetricsSet]] = [] | ||
|
|
||
| def _walk(node: ExecutionPlan) -> None: | ||
| ms = node.metrics() | ||
| if ms is not None: | ||
| result.append((node.display(), ms)) | ||
| for child in node.children(): | ||
| _walk(child) | ||
|
|
||
| _walk(self) | ||
| return result | ||
|
|
||
|
|
||
| class MetricsSet: | ||
| """A set of metrics for a single execution plan operator. | ||
|
|
||
| Provides both individual metric access and convenience aggregations | ||
| across partitions. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit of an explanation is probably useful here. Again, I don't think we can assume the user understands that there are both individual execution plan metrics as well as aggregate. I think that some operators have metrics that cannot be aggregated. In general I suspect we really do need some high level documentation with examples we can point to that makes all of this more concrete.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second read I now see this is aggregating across partitions. So does that mean the |
||
| """ | ||
|
|
||
| def __init__(self, raw: df_internal.MetricsSet) -> None: | ||
| """This constructor should not be called by the end user.""" | ||
| self._raw = raw | ||
|
|
||
| def metrics(self) -> list[Metric]: | ||
| """Return all individual metrics in this set.""" | ||
| return [Metric(m) for m in self._raw.metrics()] | ||
|
|
||
| @property | ||
| def output_rows(self) -> int | None: | ||
| """Sum of output_rows across all partitions.""" | ||
| return self._raw.output_rows() | ||
|
|
||
| @property | ||
| def elapsed_compute(self) -> int | None: | ||
| """Sum of elapsed_compute across all partitions, in nanoseconds.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want to describe what |
||
| return self._raw.elapsed_compute() | ||
|
|
||
| @property | ||
| def spill_count(self) -> int | None: | ||
| """Sum of spill_count across all partitions.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same with spill count. Do you know what units it has?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From my understanding it seems to be spill-to-disk events (so no units). Updated the doc |
||
| return self._raw.spill_count() | ||
|
|
||
| @property | ||
| def spilled_bytes(self) -> int | None: | ||
| """Sum of spilled_bytes across all partitions.""" | ||
| return self._raw.spilled_bytes() | ||
|
|
||
| @property | ||
| def spilled_rows(self) -> int | None: | ||
| """Sum of spilled_rows across all partitions.""" | ||
| return self._raw.spilled_rows() | ||
|
|
||
| def sum_by_name(self, name: str) -> int | None: | ||
| """Return the sum of metrics matching the given name.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sum across partitions, I presume. This is different because it works for an arbitrary metric, right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's right |
||
| return self._raw.sum_by_name(name) | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the metrics set.""" | ||
| return repr(self._raw) | ||
|
|
||
|
|
||
| class Metric: | ||
| """A single execution metric with name, value, partition, and labels.""" | ||
|
|
||
| def __init__(self, raw: df_internal.Metric) -> None: | ||
| """This constructor should not be called by the end user.""" | ||
| self._raw = raw | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| """The name of this metric (e.g. ``output_rows``).""" | ||
| return self._raw.name | ||
|
|
||
| @property | ||
| def value(self) -> int | None: | ||
| """The numeric value of this metric, or None for non-numeric types.""" | ||
| return self._raw.value | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it not possible to give values for non-numeric metrics?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, i think right now it uses a option, unrecognised variants return None; we could possibly return richer types by returning |
||
|
|
||
| @property | ||
| def partition(self) -> int | None: | ||
| """The partition this metric applies to, or None if global.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know if this partition is a hash or an index or something else? I am trying to figure out how a user makes use of this information.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is just a 0-based partition id. updated the docstring to call it a “0-based partition index” and clarified that None just means the metric isn’t tied to any specific partition. |
||
| return self._raw.partition | ||
|
|
||
| def labels(self) -> dict[str, str]: | ||
| """Return the labels associated with this metric.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something to include in user documentation is an example of these labels. |
||
| return self._raw.labels() | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Return a string representation of the metric.""" | ||
| return repr(self._raw) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,13 @@ | |
| # under the License. | ||
|
|
||
| import pytest | ||
| from datafusion import ExecutionPlan, LogicalPlan, SessionContext | ||
| from datafusion import ( | ||
| ExecutionPlan, | ||
| LogicalPlan, | ||
| Metric, | ||
| MetricsSet, | ||
| SessionContext, | ||
| ) | ||
|
|
||
|
|
||
| # Note: We must use CSV because memory tables are currently not supported for | ||
|
|
@@ -40,3 +46,101 @@ def test_logical_plan_to_proto(ctx, df) -> None: | |
| execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes) | ||
|
|
||
| assert str(original_execution_plan) == str(execution_plan) | ||
|
|
||
|
|
||
| def test_metrics_tree_walk() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
| df.collect() | ||
| plan = df.execution_plan() | ||
|
|
||
| results = plan.collect_metrics() | ||
| assert len(results) >= 1 | ||
| found_metrics = False | ||
| for name, ms in results: | ||
| assert isinstance(name, str) | ||
| assert isinstance(ms, MetricsSet) | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_metric_properties() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
| df.collect() | ||
| plan = df.execution_plan() | ||
|
|
||
| for _, ms in plan.collect_metrics(): | ||
| r = repr(ms) | ||
| assert isinstance(r, str) | ||
| for metric in ms.metrics(): | ||
| assert isinstance(metric, Metric) | ||
| assert isinstance(metric.name, str) | ||
| assert len(metric.name) > 0 | ||
| assert metric.partition is None or isinstance(metric.partition, int) | ||
| assert isinstance(metric.labels(), dict) | ||
| mr = repr(metric) | ||
| assert isinstance(mr, str) | ||
| assert len(mr) > 0 | ||
| return | ||
| pytest.skip("No metrics found") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want a skip here. I think we want to ensure in our unit test that we do generate metrics. I think we should know a priori what metrics we're getting in this test. While some values we can't use, some parts of it we should be able to test directly, like the metric name and labels.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed all pytest.skip calls |
||
|
|
||
|
|
||
| def test_no_metrics_before_execution() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)") | ||
| df = ctx.sql("SELECT * FROM t") | ||
| plan = df.execution_plan() | ||
| ms = plan.metrics() | ||
| assert ms is None or ms.output_rows is None or ms.output_rows == 0 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should know exactly which condition should be hit. Do you know when the metrics is None vs returns no useful output?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. metrics() returns None when the plan node hasn't been executed yet. After execution, it returns a MetricsSet which always contains at least the metrics the operator recorded. I've updated test_no_metrics_before_execution in to assert ms is None directly, rather than the previous looser check, since before execution the root node should have no MetricsSet at all |
||
|
|
||
|
|
||
| def test_collect_partitioned_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| df.collect_partitioned() | ||
| plan = df.execution_plan() | ||
|
|
||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should know exactly how many output rows to expect
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also for the unit tests to follow
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done! |
||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_execute_stream_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| for _ in df.execute_stream(): | ||
| pass | ||
|
|
||
| plan = df.execution_plan() | ||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
|
|
||
|
|
||
| def test_execute_stream_partitioned_metrics() -> None: | ||
| ctx = SessionContext() | ||
| ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')") | ||
| df = ctx.sql("SELECT * FROM t WHERE column1 > 1") | ||
|
|
||
| for stream in df.execute_stream_partitioned(): | ||
| for _ in stream: | ||
| pass | ||
|
|
||
| plan = df.execution_plan() | ||
| found_metrics = False | ||
| for _, ms in plan.collect_metrics(): | ||
| if ms.output_rows is not None and ms.output_rows > 0: | ||
| found_metrics = True | ||
| assert found_metrics | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,13 @@ use pyo3::pybacked::PyBackedStr; | |
| use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods}; | ||
|
|
||
| use crate::common::data_type::PyScalarValue; | ||
| use datafusion::physical_plan::{ | ||
| ExecutionPlan as DFExecutionPlan, | ||
| collect as df_collect, | ||
| collect_partitioned as df_collect_partitioned, | ||
| execute_stream as df_execute_stream, | ||
| execute_stream_partitioned as df_execute_stream_partitioned, | ||
| }; | ||
| use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err}; | ||
| use crate::expr::PyExpr; | ||
| use crate::expr::sort_expr::{PySortExpr, to_sort_expressions}; | ||
|
|
@@ -289,6 +296,9 @@ pub struct PyDataFrame { | |
|
|
||
| // In IPython environment cache batches between __repr__ and _repr_html_ calls. | ||
| batches: SharedCachedBatches, | ||
|
|
||
| // Cache the last physical plan so that metrics are available after execution. | ||
| last_plan: Arc<Mutex<Option<Arc<dyn DFExecutionPlan>>>>, | ||
| } | ||
|
|
||
| impl PyDataFrame { | ||
|
|
@@ -297,6 +307,7 @@ impl PyDataFrame { | |
| Self { | ||
| df: Arc::new(df), | ||
| batches: Arc::new(Mutex::new(None)), | ||
| last_plan: Arc::new(Mutex::new(None)), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -626,7 +637,12 @@ impl PyDataFrame { | |
| /// Unless some order is specified in the plan, there is no | ||
| /// guarantee of the order of the result. | ||
| fn collect<'py>(&self, py: Python<'py>) -> PyResult<Vec<Bound<'py, PyAny>>> { | ||
| let batches = wait_for_future(py, self.df.as_ref().clone().collect())? | ||
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect(plan, task_ctx))? | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I run |
||
| .map_err(PyDataFusionError::from)?; | ||
| // cannot use PyResult<Vec<RecordBatch>> return type due to | ||
| // https://github.com/PyO3/pyo3/issues/1813 | ||
|
|
@@ -642,7 +658,12 @@ impl PyDataFrame { | |
| /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch | ||
| /// maintaining the input partitioning. | ||
| fn collect_partitioned<'py>(&self, py: Python<'py>) -> PyResult<Vec<Vec<Bound<'py, PyAny>>>> { | ||
| let batches = wait_for_future(py, self.df.as_ref().clone().collect_partitioned())? | ||
| let df = self.df.as_ref().clone(); | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let batches = wait_for_future(py, df_collect_partitioned(plan, task_ctx))? | ||
| .map_err(PyDataFusionError::from)?; | ||
|
|
||
| batches | ||
|
|
@@ -802,7 +823,13 @@ impl PyDataFrame { | |
| } | ||
|
|
||
| /// Get the execution plan for this `DataFrame` | ||
| /// | ||
| /// If the DataFrame has already been executed (e.g. via `collect()`), | ||
| /// returns the cached plan which includes populated metrics. | ||
| fn execution_plan(&self, py: Python) -> PyDataFusionResult<PyExecutionPlan> { | ||
| if let Some(plan) = self.last_plan.lock().as_ref() { | ||
| return Ok(PyExecutionPlan::new(Arc::clone(plan))); | ||
| } | ||
| let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())??; | ||
| Ok(plan.into()) | ||
| } | ||
|
|
@@ -1127,13 +1154,22 @@ impl PyDataFrame { | |
|
|
||
| fn execute_stream(&self, py: Python) -> PyDataFusionResult<PyRecordBatchStream> { | ||
| let df = self.df.as_ref().clone(); | ||
| let stream = spawn_future(py, async move { df.execute_stream().await })?; | ||
| let plan = wait_for_future(py, df.create_physical_plan())??; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
| let stream = spawn_future(py, async move { df_execute_stream(plan, task_ctx) })?; | ||
| Ok(PyRecordBatchStream::new(stream)) | ||
| } | ||
|
|
||
| fn execute_stream_partitioned(&self, py: Python) -> PyResult<Vec<PyRecordBatchStream>> { | ||
| let df = self.df.as_ref().clone(); | ||
| let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?; | ||
| let plan = wait_for_future(py, df.create_physical_plan())? | ||
| .map_err(PyDataFusionError::from)?; | ||
| *self.last_plan.lock() = Some(Arc::clone(&plan)); | ||
| let task_ctx = Arc::new(self.df.as_ref().task_ctx()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like we're doing this in a bunch of places, so maybe make a private helper function. |
||
| let streams = spawn_future(py, async move { | ||
| df_execute_stream_partitioned(plan, task_ctx) | ||
| })?; | ||
| Ok(streams.into_iter().map(PyRecordBatchStream::new).collect()) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is leading me to think we should have some high level documentation, probably in the DataFrame page (or a subpage under it). Some of the things it would be good to do are to explain to a user what kinds of information they could find under these metrics and why that data are not available until after the DataFrame has been executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I think that would be super helpful. I can extend this to include a new user-facing RST page covering things like what metrics are, when they're available, how the physical plan tree maps to operators, etc.