Skip to content

Commit b3b721f

Browse files
authored
feat: make DataFrame::create_physical_plan take &self instead of self (#20562)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #20561 ## Rationale for this change Previously `create_physical_plan` consumed the `DataFrame`, making it impossible to inspect (e.g. log) the physical plan and then execute the same `DataFrame` (e.g. via `write_parquet` or `collect`) without first cloning it. Since the method only needs `&LogicalPlan` (which it forwards to `SessionState::create_physical_plan`), there is no reason to take ownership. Changing the signature to `&self` makes the common pattern of "get plan for logging, then write/collect" work naturally. Also removes the now-unnecessary `self.clone()` in `DataFrame::cache` that was introduced for the same reason. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Changing `self` to `&self` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: xanderbailey <xanderbailey@users.noreply.github.com>
1 parent c17c87c commit b3b721f

2 files changed

Lines changed: 27 additions & 3 deletions

File tree

datafusion/core/src/dataframe/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,11 @@ impl DataFrame {
295295
self.session_state.create_logical_expr(sql, df_schema)
296296
}
297297

298-
/// Consume the DataFrame and produce a physical plan
299-
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
298+
/// Create a physical plan from this DataFrame.
299+
///
300+
/// The `DataFrame` remains accessible after this call, so you can inspect
301+
/// the plan and still call [`DataFrame::collect`] or other execution methods.
302+
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
300303
self.session_state.create_physical_plan(&self.plan).await
301304
}
302305

@@ -2398,7 +2401,7 @@ impl DataFrame {
23982401
} else {
23992402
let context = SessionContext::new_with_state((*self.session_state).clone());
24002403
// The schema is consistent with the output
2401-
let plan = self.clone().create_physical_plan().await?;
2404+
let plan = self.create_physical_plan().await?;
24022405
let schema = plan.schema();
24032406
let task_ctx = Arc::new(self.task_ctx());
24042407
let partitions = collect_partitioned(plan, task_ctx).await?;

datafusion/core/src/dataframe/parquet.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,4 +610,25 @@ mod tests {
610610

611611
Ok(())
612612
}
613+
614+
/// Test that `create_physical_plan` does not consume the `DataFrame`, so
615+
/// callers can inspect (e.g. log) the physical plan and then still call
616+
/// `write_parquet` or any other execution method on the same `DataFrame`.
617+
#[tokio::test]
618+
async fn create_physical_plan_does_not_consume_dataframe() -> Result<()> {
619+
use crate::prelude::CsvReadOptions;
620+
let ctx = SessionContext::new();
621+
let df = ctx
622+
.read_csv("tests/data/example.csv", CsvReadOptions::new())
623+
.await?;
624+
625+
// Obtain the physical plan for inspection without consuming `df`.
626+
let _physical_plan = df.create_physical_plan().await?;
627+
628+
// `df` is still usable — collect the results.
629+
let batches = df.collect().await?;
630+
assert!(!batches.is_empty());
631+
632+
Ok(())
633+
}
613634
}

0 commit comments

Comments
 (0)