Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
13ae590
Remove as_any function from execution plans and use trait upcasting
timsaucer Mar 28, 2026
d1bb4ad
Merge branch 'main' into chore/remove-as-any-execution-plan
timsaucer Mar 30, 2026
c6e9e53
we already had &dyn so no need for as_ref()
timsaucer Mar 31, 2026
5562887
Update unit tests
timsaucer Mar 31, 2026
cde29a5
Add warning about Arc<dyn Trait> downcasting pitfall in upgrade guide
timsaucer Mar 31, 2026
7a4b653
Use Any::is() instead of downcast_ref().is_some() in tests
timsaucer Mar 31, 2026
5d3a083
Merge branch 'main' into chore/remove-as-any-execution-plan
timsaucer Mar 31, 2026
da69a26
Add downcast_ref and is methods to dyn ExecutionPlan
timsaucer Mar 31, 2026
ec73f02
Add downcast_ref and is methods to dyn ScalarUDFImpl, AggregateUDFImp…
timsaucer Mar 31, 2026
0a2881f
Use ExecutionPlan::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Mar 31, 2026
5c8a154
Use ExecutionPlan::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Apr 1, 2026
c36b282
Clean up remaining ExecutionPlan as &dyn Any patterns
timsaucer Apr 1, 2026
3fa1869
Use ScalarUDFImpl::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Apr 1, 2026
fa7fa2d
Use AggregateUDFImpl::downcast_ref and is() instead of as &dyn Any pa…
timsaucer Apr 1, 2026
02457bd
Use WindowUDFImpl::downcast_ref and is() instead of as &dyn Any patterns
timsaucer Apr 1, 2026
9233df3
We got a little too carried away with the 'as &dyn Any' pattern and a…
timsaucer Apr 1, 2026
1109532
Merge branch 'main' into chore/remove-as-any-execution-plan
timsaucer Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<PhysicalPlanNode> {
// Check if this is a DataSourceExec with adapter
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(exec) = plan.downcast_ref::<DataSourceExec>()
&& let Some(config) =
exec.data_source().as_any().downcast_ref::<FileScanConfig>()
&& let Some(adapter_factory) = &config.expr_adapter_factory
Expand Down Expand Up @@ -481,7 +481,7 @@ fn inject_adapter_into_plan(
plan: Arc<dyn ExecutionPlan>,
adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(exec) = plan.downcast_ref::<DataSourceExec>()
&& let Some(config) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
{
let new_config = FileScanConfigBuilder::from(config.clone())
Expand All @@ -497,7 +497,7 @@ fn inject_adapter_into_plan(
fn verify_adapter_in_plan(plan: &Arc<dyn ExecutionPlan>, label: &str) -> bool {
// Walk the plan tree to find DataSourceExec with adapter
fn check_plan(plan: &dyn ExecutionPlan) -> bool {
if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(exec) = plan.downcast_ref::<DataSourceExec>()
&& let Some(config) =
exec.data_source().as_any().downcast_ref::<FileScanConfig>()
&& config.expr_adapter_factory.is_some()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,6 @@ impl ExecutionPlan for CustomExec {
"CustomExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
/// or `post_visit` (visit each node after its children/inputs)
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
// If needed match on a specific `ExecutionPlan` node type
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
&& let Some((file_config, _)) =
data_source_exec.downcast_to_file_source::<ParquetSource>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use datafusion::physical_plan::{
};
use datafusion::prelude::*;
use futures::stream::{StreamExt, TryStreamExt};
use std::any::Any;
use std::fmt;
use std::sync::Arc;

Expand Down Expand Up @@ -226,10 +225,6 @@ impl ExecutionPlan for BufferingExecutionPlan {
"BufferingExecutionPlan"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down
13 changes: 2 additions & 11 deletions datafusion-examples/examples/proto/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
//! DeltaScan
//! ```

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -103,10 +102,6 @@ impl ExecutionPlan for ParentExec {
"ParentExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}
Expand Down Expand Up @@ -161,7 +156,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
if node.as_any().downcast_ref::<ParentExec>().is_some() {
if node.is::<ParentExec>() {
buf.extend_from_slice("ParentExec".as_bytes());
Ok(())
} else {
Expand All @@ -188,10 +183,6 @@ impl ExecutionPlan for ChildExec {
"ChildExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
unreachable!()
}
Expand Down Expand Up @@ -244,7 +235,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
if node.as_any().downcast_ref::<ChildExec>().is_some() {
if node.is::<ChildExec>() {
buf.extend_from_slice("ChildExec".as_bytes());
Ok(())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ pub async fn expression_deduplication() -> Result<()> {

// Step 5: check that we deduplicated expressions
println!("Step 5: Checking for deduplicated expressions...");
let Some(filter_exec) = deserialized_plan.as_any().downcast_ref::<FilterExec>()
else {
let Some(filter_exec) = deserialized_plan.downcast_ref::<FilterExec>() else {
panic!("Deserialized plan is not a FilterExec");
};
let predicate = Arc::clone(filter_exec.predicate());
Expand Down
5 changes: 0 additions & 5 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
//! ```

use std::{
any::Any,
fmt::{self, Debug, Formatter},
hash::{Hash, Hasher},
pin::Pin,
Expand Down Expand Up @@ -682,10 +681,6 @@ impl ExecutionPlan for SampleExec {
"SampleExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/catalog/src/memory/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,10 +594,6 @@ impl ExecutionPlan for DmlResultExec {
"DmlResultExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl ListingTableConfigExt for ListingTableConfig {

#[cfg(test)]
mod tests {

#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::listing::table::ListingTableConfigExt;
Expand Down Expand Up @@ -404,7 +405,7 @@ mod tests {
.await
.expect("Empty execution plan");

assert!(scan.as_any().is::<EmptyExec>());
assert!(scan.is::<EmptyExec>());
assert_eq!(
columns(&scan.schema()),
vec!["a".to_owned(), "p1".to_owned()]
Expand Down
21 changes: 3 additions & 18 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3625,7 +3625,6 @@ mod tests {

let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
.as_any()
.downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
Expand Down Expand Up @@ -3653,7 +3652,6 @@ mod tests {

let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
.as_any()
.downcast_ref::<AggregateExec>()
.expect("hash aggregate");
assert_eq!(
Expand Down Expand Up @@ -3788,7 +3786,7 @@ mod tests {
.unwrap();

let plan = plan(&logical_plan).await.unwrap();
if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
if let Some(plan) = plan.downcast_ref::<ExplainExec>() {
let stringified_plans = plan.stringified_plans();
assert!(stringified_plans.len() >= 4);
assert!(
Expand Down Expand Up @@ -3856,7 +3854,7 @@ mod tests {
.handle_explain(&explain, &ctx.state())
.await
.unwrap();
if let Some(plan) = plan.as_any().downcast_ref::<ExplainExec>() {
if let Some(plan) = plan.downcast_ref::<ExplainExec>() {
let stringified_plans = plan.stringified_plans();
assert_eq!(stringified_plans.len(), 1);
assert_eq!(stringified_plans[0].plan.as_str(), "Test Err");
Expand Down Expand Up @@ -3996,10 +3994,6 @@ mod tests {
}

/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down Expand Up @@ -4162,9 +4156,6 @@ digraph {
fn schema(&self) -> SchemaRef {
Arc::new(Schema::empty())
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.0.iter().collect::<Vec<_>>()
}
Expand Down Expand Up @@ -4217,9 +4208,6 @@ digraph {
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
Expand Down Expand Up @@ -4344,9 +4332,6 @@ digraph {
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down Expand Up @@ -4758,6 +4743,6 @@ digraph {
.unwrap();

assert_eq!(plan.schema(), schema);
assert!(plan.as_any().is::<EmptyExec>());
assert!(plan.is::<EmptyExec>());
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl TestParquetFile {
/// Recursively searches for DataSourceExec and returns the metrics
/// on the first one it finds
pub fn parquet_metrics(plan: &Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
&& data_source_exec
.downcast_to_file_source::<ParquetSource>()
.is_some()
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ impl ExecutionPlan for CustomExecutionPlan {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down Expand Up @@ -335,7 +331,7 @@ async fn optimizers_catch_all_statistics() {

#[expect(clippy::needless_pass_by_value)]
fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
if plan.as_any().is::<PlaceholderRowExec>() {
if plan.is::<PlaceholderRowExec>() {
true
} else if plan.children().len() != 1 {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ impl ExecutionPlan for CustomPlan {
Self::static_name()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ impl ExecutionPlan for StatisticsValidation {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
if let Some(exec) = node.as_any().downcast_ref::<AggregateExec>() {
if let Some(exec) = node.downcast_ref::<AggregateExec>() {
if self.expected_sort {
assert!(matches!(
exec.input_order_mode(),
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/tests/fuzz_cases/once_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -80,10 +79,6 @@ impl ExecutionPlan for OnceExec {
Self::static_name()
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn test_repartition_memory_limit() {
let mut metrics = None;
Arc::clone(&plan)
.transform_down(|node| {
if node.as_any().is::<RepartitionExec>() {
if node.is::<RepartitionExec>() {
metrics = node.metrics();
}
Ok(Transformed::no(node))
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn check_stats_precision_with_filter_pushdown() {
.unwrap();

assert!(
optimized_exec.as_any().is::<DataSourceExec>(),
optimized_exec.is::<DataSourceExec>(),
"Sanity check that the pushdown did what we expected"
);
// Scan with filter pushdown, stats are inexact
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn list_files_with_session_level_cache() {
//Session 1 first time list files
assert_eq!(get_list_file_cache_size(&state1), 0);
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
let data_source_exec = exec1.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source_exec = exec1.downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet1 = data_source
.as_any()
Expand All @@ -212,7 +212,7 @@ async fn list_files_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_list_file_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
let data_source_exec = exec2.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source_exec = exec2.downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet2 = data_source
.as_any()
Expand All @@ -228,7 +228,7 @@ async fn list_files_with_session_level_cache() {
//check session 1 cache result not show in session 2
assert_eq!(get_list_file_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
let data_source_exec = exec3.as_any().downcast_ref::<DataSourceExec>().unwrap();
let data_source_exec = exec3.downcast_ref::<DataSourceExec>().unwrap();
let data_source = data_source_exec.data_source();
let parquet3 = data_source
.as_any()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl MetricsFinder {
impl ExecutionPlanVisitor for MetricsFinder {
type Error = std::convert::Infallible;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
&& data_source_exec
.downcast_to_file_source::<ParquetSource>()
.is_some()
Expand Down
Loading
Loading