Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
13ae590
Remove as_any function from execution plans and use trait upcasting
timsaucer Mar 28, 2026
d1bb4ad
Merge branch 'main' into chore/remove-as-any-execution-plan
timsaucer Mar 30, 2026
c6e9e53
we already had &dyn so no need for as_ref()
timsaucer Mar 31, 2026
5562887
Update unit tests
timsaucer Mar 31, 2026
cde29a5
Add warning about Arc<dyn Trait> downcasting pitfall in upgrade guide
timsaucer Mar 31, 2026
7a4b653
Use Any::is() instead of downcast_ref().is_some() in tests
timsaucer Mar 31, 2026
5d3a083
Merge branch 'main' into chore/remove-as-any-execution-plan
timsaucer Mar 31, 2026
da69a26
Add downcast_ref and is methods to dyn ExecutionPlan
timsaucer Mar 31, 2026
ec73f02
Add downcast_ref and is methods to dyn ScalarUDFImpl, AggregateUDFImp…
timsaucer Mar 31, 2026
0a2881f
Use ExecutionPlan::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Mar 31, 2026
5c8a154
Use ExecutionPlan::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Apr 1, 2026
c36b282
Clean up remaining ExecutionPlan as &dyn Any patterns
timsaucer Apr 1, 2026
3fa1869
Use ScalarUDFImpl::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Apr 1, 2026
fa7fa2d
Use AggregateUDFImpl::downcast_ref and is() instead of as &dyn Any pa…
timsaucer Apr 1, 2026
02457bd
Use WindowUDFImpl::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Apr 1, 2026
9233df3
We got a little too carried away with the 'as &dyn Any' pattern and a…
timsaucer Apr 1, 2026
1109532
Merge branch 'main' into chore/remove-as-any-execution-plan
timsaucer Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
//! of the `PhysicalExtensionCodec` interception pattern. Both plan and expression
//! serialization route through the codec, enabling interception at every node in the tree.

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -317,7 +318,7 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<PhysicalPlanNode> {
// Check if this is a DataSourceExec with adapter
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(exec) = (plan.as_ref() as &dyn Any).downcast_ref::<DataSourceExec>()
&& let Some(config) =
exec.data_source().as_any().downcast_ref::<FileScanConfig>()
&& let Some(adapter_factory) = &config.expr_adapter_factory
Expand Down Expand Up @@ -481,7 +482,7 @@ fn inject_adapter_into_plan(
plan: Arc<dyn ExecutionPlan>,
adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(exec) = (plan.as_ref() as &dyn Any).downcast_ref::<DataSourceExec>()
&& let Some(config) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
{
let new_config = FileScanConfigBuilder::from(config.clone())
Expand All @@ -497,7 +498,7 @@ fn inject_adapter_into_plan(
fn verify_adapter_in_plan(plan: &Arc<dyn ExecutionPlan>, label: &str) -> bool {
// Walk the plan tree to find DataSourceExec with adapter
fn check_plan(plan: &dyn ExecutionPlan) -> bool {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(exec) = (plan as &dyn Any).downcast_ref::<DataSourceExec>()
&& let Some(config) =
exec.data_source().as_any().downcast_ref::<FileScanConfig>()
&& config.expr_adapter_factory.is_some()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,6 @@ impl ExecutionPlan for CustomExec {
"CustomExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion-examples/examples/data_io/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! See `main.rs` for how to run it.

use std::any::Any;
use std::sync::Arc;

use datafusion::datasource::file_format::parquet::ParquetFormat;
Expand Down Expand Up @@ -104,7 +105,8 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(data_source_exec) =
(plan as &dyn Any).downcast_ref::<DataSourceExec>()
&& let Some((file_config, _)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::*;
use futures::stream::{StreamExt, TryStreamExt};
use std::any::Any;
use std::fmt;
use std::sync::Arc;

Expand Down Expand Up @@ -226,10 +225,6 @@ impl ExecutionPlan for BufferingExecutionPlan {
"BufferingExecutionPlan"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
18 changes: 8 additions & 10 deletions datafusion-examples/examples/proto/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ impl ExecutionPlan for ParentExec {
"ParentExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}
Expand Down Expand Up @@ -161,7 +157,10 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
if node.as_any().downcast_ref::<ParentExec>().is_some() {
if (node.as_ref() as &dyn Any)
.downcast_ref::<ParentExec>()
.is_some()
{
buf.extend_from_slice("ParentExec".as_bytes());
Ok(())
} else {
Expand All @@ -188,10 +187,6 @@ impl ExecutionPlan for ChildExec {
"ChildExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}
Expand Down Expand Up @@ -244,7 +239,10 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
if node.as_any().downcast_ref::<ChildExec>().is_some() {
if (node.as_ref() as &dyn Any)
.downcast_ref::<ChildExec>()
.is_some()
{
buf.extend_from_slice("ChildExec".as_bytes());
Ok(())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
//! This demonstrates the decorator pattern enabled by the `PhysicalExtensionCodec` trait,
//! where all expression serialization/deserialization routes through the codec methods.

use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -124,7 +125,8 @@ pub async fn expression_deduplication() -> Result<()> {

// Step 5: check that we deduplicated expressions
println!("Step 5: Checking for deduplicated expressions...");
let Some(filter_exec) = deserialized_plan.as_any().downcast_ref::<FilterExec>()
let Some(filter_exec) =
(deserialized_plan.as_ref() as &dyn Any).downcast_ref::<FilterExec>()
else {
panic!("Deserialized plan is not a FilterExec");
};
Expand Down
5 changes: 0 additions & 5 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
//! ```

use std::{
any::Any,
fmt::{self, Debug, Formatter},
hash::{Hash, Hasher},
pin::Pin,
Expand Down Expand Up @@ -682,10 +681,6 @@ impl ExecutionPlan for SampleExec {
"SampleExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,6 @@ impl ExecutionPlan for DmlResultExec {
"DmlResultExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl ListingTableConfigExt for ListingTableConfig {

#[cfg(test)]
mod tests {
use std::any::Any;

#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::listing::table::ListingTableConfigExt;
Expand Down Expand Up @@ -404,7 +406,7 @@ mod tests {
.await
.expect("Empty execution plan");

assert!(scan.as_any().is::<EmptyExec>());
assert!((scan.as_ref() as &dyn Any).is::<EmptyExec>());
assert_eq!(
columns(&scan.schema()),
vec!["a".to_owned(), "p1".to_owned()]
Expand Down
25 changes: 5 additions & 20 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3624,8 +3624,7 @@ mod tests {
.build()?;

let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
.as_any()
let final_hash_agg = (execution_plan.as_ref() as &dyn Any)
.downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
Expand All @@ -3652,8 +3651,7 @@ mod tests {
.build()?;

let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
.as_any()
let final_hash_agg = (execution_plan.as_ref() as &dyn Any)
.downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
Expand Down Expand Up @@ -3788,7 +3786,7 @@ mod tests {
.unwrap();

let plan = plan(&logical_plan).await.unwrap();
if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
if let Some(plan) = (plan.as_ref() as &dyn Any).downcast_ref::<ExplainExec>() {
let stringified_plans = plan.stringified_plans();
assert!(stringified_plans.len() >= 4);
assert!(
Expand Down Expand Up @@ -3856,7 +3854,7 @@ mod tests {
.handle_explain(&explain, &ctx.state())
.await
.unwrap();
if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
if let Some(plan) = (plan.as_ref() as &dyn Any).downcast_ref::<ExplainExec>() {
let stringified_plans = plan.stringified_plans();
assert_eq!(stringified_plans.len(), 1);
assert_eq!(stringified_plans[0].plan.as_str(), "Test Err");
Expand Down Expand Up @@ -3996,10 +3994,6 @@ mod tests {
}

/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down Expand Up @@ -4162,9 +4156,6 @@ digraph {
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
Expand Down Expand Up @@ -4217,9 +4208,6 @@ digraph {
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
Expand Down Expand Up @@ -4344,9 +4332,6 @@ digraph {
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down Expand Up @@ -4758,6 +4743,6 @@ digraph {
.unwrap();

assert_eq!(plan.schema(), schema);
assert!(plan.as_any().is::<EmptyExec>());
assert!((plan.as_ref() as &dyn Any).is::<EmptyExec>());
}
}
3 changes: 2 additions & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ impl TestParquetFile {
/// Recursively searches for DataSourceExec and returns the metrics
/// on the first one it finds
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(data_source_exec) =
(plan.as_ref() as &dyn std::any::Any).downcast_ref::<DataSourceExec>()
&& data_source_exec
.downcast_to_file_source::<ParquetSource>()
.is_some()
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ impl ExecutionPlan for CustomExecutionPlan {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down Expand Up @@ -335,7 +331,7 @@ async fn optimizers_catch_all_statistics() {

#[expect(clippy::needless_pass_by_value)]
fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<PlaceholderRowExec>() {
if (plan.as_ref() as &dyn Any).is::<PlaceholderRowExec>() {
true
} else if plan.children().len() != 1 {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ impl ExecutionPlan for CustomPlan {
Self::static_name()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ impl ExecutionPlan for StatisticsValidation {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,9 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
if let Some(exec) = node.as_any().downcast_ref::<AggregateExec>() {
if let Some(exec) =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be honest that the previous syntax is quite a bit nicer 🤔

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Moved to draft, at least for the time being.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the push back @alamb. If you take another look, I think we now have a very clean downcast. I also updated the UDFs at the same time and applied the pattern. They have many fewer places where we do downcast_ref or is so it wasn't so noticeable.

(node.as_ref() as &dyn std::any::Any).downcast_ref::<AggregateExec>()
{
if self.expected_sort {
assert!(matches!(
exec.input_order_mode(),
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/tests/fuzz_cases/once_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -80,10 +79,6 @@ impl ExecutionPlan for OnceExec {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/memory_limit/repartition_mem_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch};
Expand Down Expand Up @@ -74,7 +75,7 @@ async fn test_repartition_memory_limit() {
let mut metrics = None;
Arc::clone(&plan)
.transform_down(|node| {
if node.as_any().is::<RepartitionExec>() {
if (node.as_ref() as &dyn Any).is::<RepartitionExec>() {
metrics = node.metrics();
}
Ok(Transformed::no(node))
Expand Down
Loading
Loading