From 833942d1876bec840f4469c7db32fb416f16da9d Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Mon, 27 Apr 2026 17:24:49 +0530 Subject: [PATCH 1/8] Update mod.rs --- datafusion/proto/src/physical_plan/mod.rs | 24 ++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 206f4378d3d3b..8685454885d73 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -681,16 +681,17 @@ impl protobuf::PhysicalPlanNode { })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); - let projection = if !filter.projection.is_empty() { - Some( - filter - .projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) + // After deserializing, check if it equals the full range + let num_fields = schema.fields().len(); + let full_projection: Vec = (0..num_fields).collect(); + let deserialized: Vec = filter.projection.iter().map(|i| *i as usize).collect(); + + let projection = if deserialized == full_projection { + None // treat full range as "no projection" + } else if deserialized.is_empty() { + Some(vec![]) // genuine empty projection } else { - None + Some(deserialized) }; let filter = FilterExecBuilder::new(predicate, input) @@ -2350,8 +2351,9 @@ impl protobuf::PhysicalPlanNode { .physical_expr_to_proto(exec.predicate(), codec)?, ), default_filter_selectivity: exec.default_selectivity() as u32, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() + projection: match exec.projection() { + None => (0..fields.len()).map(|i| i as u32).collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), }), batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), From a7e865721302daeadbcff3aadc41a8787fcbff71 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Mon, 27 Apr 2026 17:57:01 +0530 Subject: [PATCH 2/8] cargo fix --- datafusion/proto/src/physical_plan/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 8685454885d73..9ecd71880e494 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2354,7 +2354,7 @@ impl protobuf::PhysicalPlanNode { projection: match exec.projection() { None => (0..fields.len()).map(|i| i as u32).collect(), Some(v) => v.iter().map(|x| *x as u32).collect(), - }), + }, batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), }, From 9a66177618451febaa895a2219fb1aaea10369f0 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 10:06:28 +0530 Subject: [PATCH 3/8] quick fix --- datafusion/proto/src/physical_plan/mod.rs | 735 +++++++++++----------- 1 file changed, 357 insertions(+), 378 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index b0cd438d4a90e..605d520529a8c 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -51,7 +51,6 @@ use datafusion_datasource_parquet::source::ParquetSource; #[cfg(feature = "parquet")] use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{FunctionRegistry, TaskContext}; -use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use datafusion_functions_table::generate_series::{ Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue, @@ -84,7 +83,6 @@ use datafusion_physical_plan::metrics::{MetricCategory, MetricType}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::scalar_subquery::{ScalarSubqueryExec, ScalarSubqueryLink}; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::{InterleaveExec, UnionExec}; @@ -119,58 +117,6 @@ use crate::{convert_required, into_required}; pub mod from_proto; pub mod to_proto; -/// Context threaded through physical-plan deserialization. -/// -/// This bundles the stable per-call inputs for deserialization and the -/// per-scope `ScalarSubqueryResults` handle needed while reconstructing -/// `ScalarSubqueryExpr` nodes inside a `ScalarSubqueryExec` input plan. -#[derive(Clone)] -pub struct PhysicalPlanDecodeContext<'a> { - task_ctx: &'a TaskContext, - codec: &'a dyn PhysicalExtensionCodec, - scalar_subquery_results: Option, -} - -impl<'a> PhysicalPlanDecodeContext<'a> { - /// Creates a new root decode context. - pub fn new(task_ctx: &'a TaskContext, codec: &'a dyn PhysicalExtensionCodec) -> Self { - Self { - task_ctx, - codec, - scalar_subquery_results: None, - } - } - - /// Returns the task context used for deserialization. - pub fn task_ctx(&self) -> &'a TaskContext { - self.task_ctx - } - - /// Returns the physical extension codec used for deserialization. - pub fn codec(&self) -> &'a dyn PhysicalExtensionCodec { - self.codec - } - - /// Returns the scalar subquery results container for the current scope, if - /// one is active. - pub fn scalar_subquery_results(&self) -> Option<&ScalarSubqueryResults> { - self.scalar_subquery_results.as_ref() - } - - /// Returns a child context with a different scalar subquery results - /// container. - pub fn with_scalar_subquery_results( - &self, - scalar_subquery_results: ScalarSubqueryResults, - ) -> Self { - Self { - task_ctx: self.task_ctx, - codec: self.codec, - scalar_subquery_results: Some(scalar_subquery_results), - } - } -} - impl AsExecutionPlan for protobuf::PhysicalPlanNode { fn try_decode(buf: &[u8]) -> Result where @@ -222,16 +168,8 @@ impl protobuf::PhysicalPlanNode { pub fn try_into_physical_plan_with_converter( &self, ctx: &TaskContext, - codec: &dyn PhysicalExtensionCodec, - proto_converter: &dyn PhysicalProtoConverterExtension, - ) -> Result> { - let decode_ctx = PhysicalPlanDecodeContext::new(ctx, codec); - self.try_into_physical_plan_with_context(&decode_ctx, proto_converter) - } - pub(crate) fn try_into_physical_plan_with_context( - &self, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let plan = self.physical_plan_type.as_ref().ok_or_else(|| { @@ -241,124 +179,147 @@ impl protobuf::PhysicalPlanNode { })?; match plan { PhysicalPlanType::Explain(explain) => { - self.try_into_explain_physical_plan(explain, ctx, proto_converter) - } - PhysicalPlanType::Projection(projection) => { - self.try_into_projection_physical_plan(projection, ctx, proto_converter) + self.try_into_explain_physical_plan(explain, ctx, codec, proto_converter) } + PhysicalPlanType::Projection(projection) => self + .try_into_projection_physical_plan( + projection, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Filter(filter) => { - self.try_into_filter_physical_plan(filter, ctx, proto_converter) + self.try_into_filter_physical_plan(filter, ctx, codec, proto_converter) } PhysicalPlanType::CsvScan(scan) => { - self.try_into_csv_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_csv_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::JsonScan(scan) => { - self.try_into_json_scan_physical_plan(scan, ctx, proto_converter) - } - PhysicalPlanType::ParquetScan(scan) => { - self.try_into_parquet_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_json_scan_physical_plan(scan, ctx, codec, proto_converter) } + PhysicalPlanType::ParquetScan(scan) => self + .try_into_parquet_scan_physical_plan(scan, ctx, codec, proto_converter), PhysicalPlanType::AvroScan(scan) => { - self.try_into_avro_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_avro_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::MemoryScan(scan) => { - self.try_into_memory_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_memory_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::ArrowScan(scan) => { - self.try_into_arrow_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_arrow_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::CoalesceBatches(coalesce_batches) => self .try_into_coalesce_batches_physical_plan( coalesce_batches, ctx, + codec, proto_converter, ), PhysicalPlanType::Merge(merge) => { - self.try_into_merge_physical_plan(merge, ctx, proto_converter) - } - PhysicalPlanType::Repartition(repart) => { - self.try_into_repartition_physical_plan(repart, ctx, proto_converter) - } - PhysicalPlanType::GlobalLimit(limit) => { - self.try_into_global_limit_physical_plan(limit, ctx, proto_converter) - } - PhysicalPlanType::LocalLimit(limit) => { - self.try_into_local_limit_physical_plan(limit, ctx, proto_converter) - } - PhysicalPlanType::Window(window_agg) => { - self.try_into_window_physical_plan(window_agg, ctx, proto_converter) - } - PhysicalPlanType::Aggregate(hash_agg) => { - self.try_into_aggregate_physical_plan(hash_agg, ctx, proto_converter) - } - PhysicalPlanType::HashJoin(hashjoin) => { - self.try_into_hash_join_physical_plan(hashjoin, ctx, proto_converter) + self.try_into_merge_physical_plan(merge, ctx, codec, proto_converter) } + PhysicalPlanType::Repartition(repart) => self + .try_into_repartition_physical_plan(repart, ctx, codec, proto_converter), + PhysicalPlanType::GlobalLimit(limit) => self + .try_into_global_limit_physical_plan(limit, ctx, codec, proto_converter), + PhysicalPlanType::LocalLimit(limit) => self + .try_into_local_limit_physical_plan(limit, ctx, codec, proto_converter), + PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan( + window_agg, + ctx, + codec, + proto_converter, + ), + PhysicalPlanType::Aggregate(hash_agg) => self + .try_into_aggregate_physical_plan(hash_agg, ctx, codec, proto_converter), + PhysicalPlanType::HashJoin(hashjoin) => self + .try_into_hash_join_physical_plan(hashjoin, ctx, codec, proto_converter), PhysicalPlanType::SymmetricHashJoin(sym_join) => self .try_into_symmetric_hash_join_physical_plan( sym_join, ctx, + codec, proto_converter, ), PhysicalPlanType::Union(union) => { - self.try_into_union_physical_plan(union, ctx, proto_converter) - } - PhysicalPlanType::Interleave(interleave) => { - self.try_into_interleave_physical_plan(interleave, ctx, proto_converter) - } - PhysicalPlanType::CrossJoin(crossjoin) => { - self.try_into_cross_join_physical_plan(crossjoin, ctx, proto_converter) + self.try_into_union_physical_plan(union, ctx, codec, proto_converter) } + PhysicalPlanType::Interleave(interleave) => self + .try_into_interleave_physical_plan( + interleave, + ctx, + codec, + proto_converter, + ), + PhysicalPlanType::CrossJoin(crossjoin) => self + .try_into_cross_join_physical_plan( + crossjoin, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Empty(empty) => { - self.try_into_empty_physical_plan(empty, ctx, proto_converter) + self.try_into_empty_physical_plan(empty, ctx, codec, proto_converter) } PhysicalPlanType::PlaceholderRow(placeholder) => { - self.try_into_placeholder_row_physical_plan(placeholder, ctx) + self.try_into_placeholder_row_physical_plan(placeholder, ctx, codec) } PhysicalPlanType::Sort(sort) => { - self.try_into_sort_physical_plan(sort, ctx, proto_converter) + self.try_into_sort_physical_plan(sort, ctx, codec, proto_converter) } PhysicalPlanType::SortPreservingMerge(sort) => self - .try_into_sort_preserving_merge_physical_plan(sort, ctx, proto_converter), - PhysicalPlanType::Extension(extension) => { - self.try_into_extension_physical_plan(extension, ctx, proto_converter) - } - PhysicalPlanType::NestedLoopJoin(join) => { - self.try_into_nested_loop_join_physical_plan(join, ctx, proto_converter) - } + .try_into_sort_preserving_merge_physical_plan( + sort, + ctx, + codec, + proto_converter, + ), + PhysicalPlanType::Extension(extension) => self + .try_into_extension_physical_plan(extension, ctx, codec, proto_converter), + PhysicalPlanType::NestedLoopJoin(join) => self + .try_into_nested_loop_join_physical_plan( + join, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Analyze(analyze) => { - self.try_into_analyze_physical_plan(analyze, ctx, proto_converter) + self.try_into_analyze_physical_plan(analyze, ctx, codec, proto_converter) } PhysicalPlanType::JsonSink(sink) => { - self.try_into_json_sink_physical_plan(sink, ctx, proto_converter) + self.try_into_json_sink_physical_plan(sink, ctx, codec, proto_converter) } PhysicalPlanType::CsvSink(sink) => { - self.try_into_csv_sink_physical_plan(sink, ctx, proto_converter) + self.try_into_csv_sink_physical_plan(sink, ctx, codec, proto_converter) } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] - PhysicalPlanType::ParquetSink(sink) => { - self.try_into_parquet_sink_physical_plan(sink, ctx, proto_converter) - } + PhysicalPlanType::ParquetSink(sink) => self + .try_into_parquet_sink_physical_plan(sink, ctx, codec, proto_converter), PhysicalPlanType::Unnest(unnest) => { - self.try_into_unnest_physical_plan(unnest, ctx, proto_converter) - } - PhysicalPlanType::Cooperative(cooperative) => { - self.try_into_cooperative_physical_plan(cooperative, ctx, proto_converter) + self.try_into_unnest_physical_plan(unnest, ctx, codec, proto_converter) } + PhysicalPlanType::Cooperative(cooperative) => self + .try_into_cooperative_physical_plan( + cooperative, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::GenerateSeries(generate_series) => { self.try_into_generate_series_physical_plan(generate_series) } PhysicalPlanType::SortMergeJoin(sort_join) => { - self.try_into_sort_join(sort_join, ctx, proto_converter) - } - PhysicalPlanType::AsyncFunc(async_func) => { - self.try_into_async_func_physical_plan(async_func, ctx, proto_converter) + self.try_into_sort_join(sort_join, ctx, codec, proto_converter) } + PhysicalPlanType::AsyncFunc(async_func) => self + .try_into_async_func_physical_plan( + async_func, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Buffer(buffer) => { - self.try_into_buffer_physical_plan(buffer, ctx, proto_converter) - } - PhysicalPlanType::ScalarSubquery(sq) => { - self.try_into_scalar_subquery_physical_plan(sq, ctx, proto_converter) + self.try_into_buffer_physical_plan(buffer, ctx, codec, proto_converter) } } } @@ -608,14 +569,6 @@ impl protobuf::PhysicalPlanNode { ); } - if let Some(exec) = plan.downcast_ref::() { - return protobuf::PhysicalPlanNode::try_from_scalar_subquery_exec( - exec, - codec, - proto_converter, - ); - } - let mut buf: Vec = vec![]; match codec.try_encode(Arc::clone(&plan_clone), &mut buf) { Ok(_) => { @@ -649,7 +602,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_explain_physical_plan( &self, explain: &protobuf::ExplainExecNode, - _ctx: &PhysicalPlanDecodeContext<'_>, + _ctx: &TaskContext, + + _codec: &dyn PhysicalExtensionCodec, _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { Ok(Arc::new(ExplainExec::new( @@ -666,11 +621,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_projection_physical_plan( &self, projection: &protobuf::ProjectionExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&projection.input, ctx, proto_converter)?; + into_physical_plan(&projection.input, ctx, codec, proto_converter)?; let exprs = projection .expr .iter() @@ -679,8 +636,9 @@ impl protobuf::PhysicalPlanNode { Ok(( proto_converter.proto_to_physical_expr( expr, - input.schema().as_ref(), ctx, + input.schema().as_ref(), + codec, )?, name.to_string(), )) @@ -696,17 +654,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_filter_physical_plan( &self, filter: &protobuf::FilterExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&filter.input, ctx, proto_converter)?; + into_physical_plan(&filter.input, ctx, codec, proto_converter)?; let predicate = filter .expr .as_ref() .map(|expr| { - proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + proto_converter.proto_to_physical_expr( + expr, + ctx, + input.schema().as_ref(), + codec, + ) }) .transpose()? .ok_or_else(|| { @@ -716,17 +681,23 @@ impl protobuf::PhysicalPlanNode { })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); - // After deserializing, check if it equals the full range - let num_fields = schema.fields().len(); - let full_projection: Vec = (0..num_fields).collect(); - let deserialized: Vec = filter.projection.iter().map(|i| *i as usize).collect(); - - let projection = if deserialized == full_projection { - None // treat full range as "no projection" - } else if deserialized.is_empty() { - Some(vec![]) // genuine empty projection + + // Deserialize projection, distinguishing between: + // None => full projection (all columns) + // Some(vec![]) => empty projection (no columns) + // Some(vec![..]) => partial projection + let num_fields = input.schema().fields().len(); + let mut is_full_projection = filter.projection.len() == num_fields; + let mut projection_vec: Vec = Vec::with_capacity(filter.projection.len()); + for (i, idx) in filter.projection.iter().enumerate() { + let idx = *idx as usize; + is_full_projection &= idx == i; + projection_vec.push(idx); + } + let projection = if is_full_projection { + None } else { - Some(deserialized) + Some(projection_vec) }; let filter = FilterExecBuilder::new(predicate, input) @@ -747,7 +718,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_csv_scan_physical_plan( &self, scan: &protobuf::CsvScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let escape = @@ -789,6 +762,7 @@ impl protobuf::PhysicalPlanNode { let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), ctx, + codec, proto_converter, source, )?) @@ -800,7 +774,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_json_scan_physical_plan( &self, scan: &protobuf::JsonScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let base_conf = scan.base_conf.as_ref().unwrap(); @@ -808,6 +784,7 @@ impl protobuf::PhysicalPlanNode { let scan_conf = parse_protobuf_file_scan_config( base_conf, ctx, + codec, proto_converter, Arc::new(JsonSource::new(table_schema)), )?; @@ -817,7 +794,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_arrow_scan_physical_plan( &self, scan: &protobuf::ArrowScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let base_conf = scan.base_conf.as_ref().ok_or_else(|| { @@ -827,6 +805,7 @@ impl protobuf::PhysicalPlanNode { let scan_conf = parse_protobuf_file_scan_config( base_conf, ctx, + codec, proto_converter, Arc::new(ArrowSource::new_file_source(table_schema)), )?; @@ -837,7 +816,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_parquet_scan_physical_plan( &self, scan: &protobuf::ParquetScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { #[cfg(feature = "parquet")] @@ -866,8 +846,9 @@ impl protobuf::PhysicalPlanNode { .map(|expr| { proto_converter.proto_to_physical_expr( expr, - predicate_schema.as_ref(), ctx, + predicate_schema.as_ref(), + codec, ) }) .transpose()?; @@ -883,15 +864,9 @@ impl protobuf::PhysicalPlanNode { false => ObjectStoreUrl::parse(&base_conf.object_store_url)?, true => ObjectStoreUrl::local_filesystem(), }; - let store = ctx - .task_ctx() - .runtime_env() - .object_store(object_store_url)?; - let metadata_cache = ctx - .task_ctx() - .runtime_env() - .cache_manager - .get_file_metadata_cache(); + let store = ctx.runtime_env().object_store(object_store_url)?; + let metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); let reader_factory = Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); @@ -905,6 +880,7 @@ impl protobuf::PhysicalPlanNode { let base_config = parse_protobuf_file_scan_config( base_conf, ctx, + codec, proto_converter, Arc::new(source), )?; @@ -920,7 +896,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_avro_scan_physical_plan( &self, scan: &protobuf::AvroScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { #[cfg(feature = "avro")] @@ -930,6 +907,7 @@ impl protobuf::PhysicalPlanNode { let conf = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), ctx, + codec, proto_converter, Arc::new(AvroSource::new(table_schema)), )?; @@ -943,7 +921,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_memory_scan_physical_plan( &self, scan: &protobuf::MemoryScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let partitions = scan @@ -974,6 +954,7 @@ impl protobuf::PhysicalPlanNode { &ordering.physical_sort_expr_nodes, ctx, &schema, + codec, proto_converter, )?; sort_information.extend(LexOrdering::new(sort_exprs)); @@ -991,11 +972,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_coalesce_batches_physical_plan( &self, coalesce_batches: &protobuf::CoalesceBatchesExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&coalesce_batches.input, ctx, proto_converter)?; + into_physical_plan(&coalesce_batches.input, ctx, codec, proto_converter)?; Ok(Arc::new( #[expect(deprecated)] CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize) @@ -1006,11 +989,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_merge_physical_plan( &self, merge: &protobuf::CoalescePartitionsExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&merge.input, ctx, proto_converter)?; + into_physical_plan(&merge.input, ctx, codec, proto_converter)?; Ok(Arc::new( CoalescePartitionsExec::new(input) .with_fetch(merge.fetch.map(|f| f as usize)), @@ -1020,15 +1005,18 @@ impl protobuf::PhysicalPlanNode { fn try_into_repartition_physical_plan( &self, repart: &protobuf::RepartitionExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&repart.input, ctx, proto_converter)?; + into_physical_plan(&repart.input, ctx, codec, proto_converter)?; let partitioning = parse_protobuf_partitioning( repart.partitioning.as_ref(), ctx, input.schema().as_ref(), + codec, proto_converter, )?; let mut repart_exec = RepartitionExec::try_new(input, partitioning.unwrap())?; @@ -1041,11 +1029,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_global_limit_physical_plan( &self, limit: &protobuf::GlobalLimitExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&limit.input, ctx, proto_converter)?; + into_physical_plan(&limit.input, ctx, codec, proto_converter)?; let fetch = if limit.fetch >= 0 { Some(limit.fetch as usize) } else { @@ -1061,22 +1051,26 @@ impl protobuf::PhysicalPlanNode { fn try_into_local_limit_physical_plan( &self, limit: &protobuf::LocalLimitExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&limit.input, ctx, proto_converter)?; + into_physical_plan(&limit.input, ctx, codec, proto_converter)?; Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize))) } fn try_into_window_physical_plan( &self, window_agg: &protobuf::WindowAggExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&window_agg.input, ctx, proto_converter)?; + into_physical_plan(&window_agg.input, ctx, codec, proto_converter)?; let input_schema = input.schema(); let physical_window_expr: Vec> = window_agg @@ -1087,6 +1081,7 @@ impl protobuf::PhysicalPlanNode { window_expr, ctx, input_schema.as_ref(), + codec, proto_converter, ) }) @@ -1096,7 +1091,12 @@ impl protobuf::PhysicalPlanNode { .partition_keys .iter() .map(|expr| { - proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + proto_converter.proto_to_physical_expr( + expr, + ctx, + input.schema().as_ref(), + codec, + ) }) .collect::>>>()?; @@ -1129,11 +1129,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_aggregate_physical_plan( &self, hash_agg: &protobuf::AggregateExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&hash_agg.input, ctx, proto_converter)?; + into_physical_plan(&hash_agg.input, ctx, codec, proto_converter)?; let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| { proto_error(format!( "Received a AggregateNode message with unknown AggregateMode {}", @@ -1159,7 +1161,7 @@ impl protobuf::PhysicalPlanNode { .zip(hash_agg.group_expr_name.iter()) .map(|(expr, name)| { proto_converter - .proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec) .map(|expr| (expr, name.to_string())) }) .collect::, _>>()?; @@ -1170,7 +1172,7 @@ impl protobuf::PhysicalPlanNode { .zip(hash_agg.group_expr_name.iter()) .map(|(expr, name)| { proto_converter - .proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec) .map(|expr| (expr, name.to_string())) }) .collect::, _>>()?; @@ -1199,7 +1201,12 @@ impl protobuf::PhysicalPlanNode { expr.expr .as_ref() .map(|e| { - proto_converter.proto_to_physical_expr(e, &physical_schema, ctx) + proto_converter.proto_to_physical_expr( + e, + ctx, + &physical_schema, + codec, + ) }) .transpose() }) @@ -1222,8 +1229,9 @@ impl protobuf::PhysicalPlanNode { .map(|e| { proto_converter.proto_to_physical_expr( e, - &physical_schema, ctx, + &physical_schema, + codec, ) }) .collect::>>()?; @@ -1235,6 +1243,7 @@ impl protobuf::PhysicalPlanNode { e, ctx, &physical_schema, + codec, proto_converter, ) }) @@ -1246,14 +1255,11 @@ impl protobuf::PhysicalPlanNode { AggregateFunction::UserDefinedAggrFunction(udaf_name) => { let agg_udf = match &agg_node.fun_definition { Some(buf) => { - ctx.codec().try_decode_udaf(udaf_name, buf)? + codec.try_decode_udaf(udaf_name, buf)? } - None => ctx.task_ctx().udaf(udaf_name).or_else( - |_| { - ctx.codec() - .try_decode_udaf(udaf_name, &[]) - }, - )?, + None => ctx.udaf(udaf_name).or_else(|_| { + codec.try_decode_udaf(udaf_name, &[]) + })?, }; AggregateExprBuilder::new(agg_udf, input_phy_expr) @@ -1305,13 +1311,15 @@ impl protobuf::PhysicalPlanNode { fn try_into_hash_join_physical_plan( &self, hashjoin: &protobuf::HashJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let left: Arc = - into_physical_plan(&hashjoin.left, ctx, proto_converter)?; + into_physical_plan(&hashjoin.left, ctx, codec, proto_converter)?; let right: Arc = - into_physical_plan(&hashjoin.right, ctx, proto_converter)?; + into_physical_plan(&hashjoin.right, ctx, codec, proto_converter)?; let left_schema = left.schema(); let right_schema = right.schema(); let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin @@ -1320,13 +1328,15 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = proto_converter.proto_to_physical_expr( &col.left.clone().unwrap(), - left_schema.as_ref(), ctx, + left_schema.as_ref(), + codec, )?; let right = proto_converter.proto_to_physical_expr( &col.right.clone().unwrap(), - right_schema.as_ref(), ctx, + right_schema.as_ref(), + codec, )?; Ok((left, right)) }) @@ -1359,8 +1369,8 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, - ctx, + ctx, &schema, + codec, )?; let column_indices = f.column_indices .iter() @@ -1421,11 +1431,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_symmetric_hash_join_physical_plan( &self, sym_join: &protobuf::SymmetricHashJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let left = into_physical_plan(&sym_join.left, ctx, proto_converter)?; - let right = into_physical_plan(&sym_join.right, ctx, proto_converter)?; + let left = into_physical_plan(&sym_join.left, ctx, codec, proto_converter)?; + let right = into_physical_plan(&sym_join.right, ctx, codec, proto_converter)?; let left_schema = left.schema(); let right_schema = right.schema(); let on = sym_join @@ -1434,13 +1446,15 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = proto_converter.proto_to_physical_expr( &col.left.clone().unwrap(), - left_schema.as_ref(), ctx, + left_schema.as_ref(), + codec, )?; let right = proto_converter.proto_to_physical_expr( &col.right.clone().unwrap(), - right_schema.as_ref(), ctx, + right_schema.as_ref(), + codec, )?; Ok((left, right)) }) @@ -1473,8 +1487,8 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, - ctx, + ctx, &schema, + codec, )?; let column_indices = f.column_indices .iter() @@ -1500,6 +1514,7 @@ impl protobuf::PhysicalPlanNode { &sym_join.left_sort_exprs, ctx, &left_schema, + codec, proto_converter, )?; let left_sort_exprs = LexOrdering::new(left_sort_exprs); @@ -1508,6 +1523,7 @@ impl protobuf::PhysicalPlanNode { &sym_join.right_sort_exprs, ctx, &right_schema, + codec, proto_converter, )?; let right_sort_exprs = LexOrdering::new(right_sort_exprs); @@ -1546,12 +1562,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_union_physical_plan( &self, union: &protobuf::UnionExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let mut inputs: Vec> = vec![]; for input in &union.inputs { - inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?); + inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?); } UnionExec::try_new(inputs) } @@ -1559,12 +1577,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_interleave_physical_plan( &self, interleave: &protobuf::InterleaveExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let mut inputs: Vec> = vec![]; for input in &interleave.inputs { - inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?); + inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?); } Ok(Arc::new(InterleaveExec::try_new(inputs)?)) } @@ -1572,20 +1592,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_cross_join_physical_plan( &self, crossjoin: &protobuf::CrossJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let left: Arc = - into_physical_plan(&crossjoin.left, ctx, proto_converter)?; + into_physical_plan(&crossjoin.left, ctx, codec, proto_converter)?; let right: Arc = - into_physical_plan(&crossjoin.right, ctx, proto_converter)?; + into_physical_plan(&crossjoin.right, ctx, codec, proto_converter)?; Ok(Arc::new(CrossJoinExec::new(left, right))) } fn try_into_empty_physical_plan( &self, empty: &protobuf::EmptyExecNode, - _ctx: &PhysicalPlanDecodeContext<'_>, + _ctx: &TaskContext, + + _codec: &dyn PhysicalExtensionCodec, _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let schema = Arc::new(convert_required!(empty.schema)?); @@ -1595,7 +1619,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_placeholder_row_physical_plan( &self, placeholder: &protobuf::PlaceholderRowExecNode, - _ctx: &PhysicalPlanDecodeContext<'_>, + _ctx: &TaskContext, + + _codec: &dyn PhysicalExtensionCodec, ) -> Result> { let schema = Arc::new(convert_required!(placeholder.schema)?); Ok(Arc::new(PlaceholderRowExec::new(schema))) @@ -1604,10 +1630,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_physical_plan( &self, sort: &protobuf::SortExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sort.input, ctx, proto_converter)?; + let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?; let exprs = sort .expr .iter() @@ -1628,11 +1656,7 @@ impl protobuf::PhysicalPlanNode { })? .as_ref(); Ok(PhysicalSortExpr { - expr: proto_converter.proto_to_physical_expr( - expr, - input.schema().as_ref(), - ctx, - )?, + expr: proto_converter.proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)?, options: SortOptions { descending: !sort_expr.asc, nulls_first: sort_expr.nulls_first, @@ -1659,10 +1683,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_preserving_merge_physical_plan( &self, sort: &protobuf::SortPreservingMergeExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sort.input, ctx, proto_converter)?; + let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?; let exprs = sort .expr .iter() @@ -1685,8 +1711,9 @@ impl protobuf::PhysicalPlanNode { Ok(PhysicalSortExpr { expr: proto_converter.proto_to_physical_expr( expr, - input.schema().as_ref(), ctx, + input.schema().as_ref(), + codec, )?, options: SortOptions { descending: !sort_expr.asc, @@ -1710,18 +1737,18 @@ impl protobuf::PhysicalPlanNode { fn try_into_extension_physical_plan( &self, extension: &protobuf::PhysicalExtensionNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let inputs: Vec> = extension .inputs .iter() - .map(|i| proto_converter.proto_to_execution_plan(i, ctx)) + .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i)) .collect::>()?; - let extension_node = - ctx.codec() - .try_decode(extension.node.as_slice(), &inputs, ctx.task_ctx())?; + let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?; Ok(extension_node) } @@ -1729,13 +1756,15 @@ impl protobuf::PhysicalPlanNode { fn try_into_nested_loop_join_physical_plan( &self, join: &protobuf::NestedLoopJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let left: Arc = - into_physical_plan(&join.left, ctx, proto_converter)?; + into_physical_plan(&join.left, ctx, codec, proto_converter)?; let right: Arc = - into_physical_plan(&join.right, ctx, proto_converter)?; + into_physical_plan(&join.right, ctx, codec, proto_converter)?; let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| { proto_error(format!( "Received a NestedLoopJoinExecNode message with unknown JoinType {}", @@ -1752,13 +1781,12 @@ impl protobuf::PhysicalPlanNode { .ok_or_else(|| proto_error("Missing JoinFilter schema"))? .try_into()?; - let expression = proto_converter - .proto_to_physical_expr( + let expression = proto_converter.proto_to_physical_expr( f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, - ctx, + ctx, &schema, + codec, )?; let column_indices = f.column_indices .iter() @@ -1803,11 +1831,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_analyze_physical_plan( &self, analyze: &protobuf::AnalyzeExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&analyze.input, ctx, proto_converter)?; + into_physical_plan(&analyze.input, ctx, codec, proto_converter)?; let metric_categories = if analyze.has_metric_categories { let cats: Result> = analyze .metric_categories @@ -1831,10 +1861,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_json_sink_physical_plan( &self, sink: &protobuf::JsonSinkExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sink.input, ctx, proto_converter)?; + let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?; let data_sink: JsonSink = sink .sink @@ -1850,6 +1882,7 @@ impl protobuf::PhysicalPlanNode { &collection.physical_sort_expr_nodes, ctx, &sink_schema, + codec, proto_converter, ) .map(|sort_exprs| { @@ -1868,10 +1901,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_csv_sink_physical_plan( &self, sink: &protobuf::CsvSinkExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sink.input, ctx, proto_converter)?; + let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?; let data_sink: CsvSink = sink .sink @@ -1887,6 +1922,7 @@ impl protobuf::PhysicalPlanNode { &collection.physical_sort_expr_nodes, ctx, &sink_schema, + codec, proto_converter, ) .map(|sort_exprs| { @@ -1906,12 +1942,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_parquet_sink_physical_plan( &self, sink: &protobuf::ParquetSinkExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { #[cfg(feature = "parquet")] { - let input = into_physical_plan(&sink.input, ctx, proto_converter)?; + let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?; let data_sink: ParquetSink = sink .sink @@ -1927,6 +1965,7 @@ impl protobuf::PhysicalPlanNode { &collection.physical_sort_expr_nodes, ctx, &sink_schema, + codec, proto_converter, ) .map(|sort_exprs| { @@ -1948,10 +1987,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_unnest_physical_plan( &self, unnest: &protobuf::UnnestExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&unnest.input, ctx, proto_converter)?; + let input = into_physical_plan(&unnest.input, ctx, codec, proto_converter)?; Ok(Arc::new(UnnestExec::new( input, @@ -1978,12 +2019,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_join( &self, sort_join: &SortMergeJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let left = into_physical_plan(&sort_join.left, ctx, proto_converter)?; + let left = into_physical_plan(&sort_join.left, ctx, codec, proto_converter)?; let left_schema = left.schema(); - let right = into_physical_plan(&sort_join.right, ctx, proto_converter)?; + let right = into_physical_plan(&sort_join.right, ctx, codec, proto_converter)?; let right_schema = right.schema(); let filter = sort_join @@ -2000,8 +2043,9 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, ctx, + &schema, + codec, )?; let column_indices = f .column_indices @@ -2060,13 +2104,15 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = proto_converter.proto_to_physical_expr( &col.left.clone().unwrap(), - left_schema.as_ref(), ctx, + left_schema.as_ref(), + codec, )?; let right = proto_converter.proto_to_physical_expr( &col.right.clone().unwrap(), - right_schema.as_ref(), ctx, + right_schema.as_ref(), + codec, )?; Ok((left, right)) }) @@ -2151,21 +2197,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_cooperative_physical_plan( &self, field_stream: &protobuf::CooperativeExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&field_stream.input, ctx, proto_converter)?; + let input = into_physical_plan(&field_stream.input, ctx, codec, proto_converter)?; Ok(Arc::new(CooperativeExec::new(input))) } fn try_into_async_func_physical_plan( &self, async_func: &protobuf::AsyncFuncExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&async_func.input, ctx, proto_converter)?; + into_physical_plan(&async_func.input, ctx, codec, proto_converter)?; if async_func.async_exprs.len() != async_func.async_expr_names.len() { return internal_err!( @@ -2180,8 +2229,9 @@ impl protobuf::PhysicalPlanNode { .map(|(expr, name)| { let physical_expr = proto_converter.proto_to_physical_expr( expr, - input.schema().as_ref(), ctx, + input.schema().as_ref(), + codec, )?; Ok(Arc::new(AsyncFuncExpr::try_new( @@ -2198,49 +2248,16 @@ impl protobuf::PhysicalPlanNode { fn try_into_buffer_physical_plan( &self, buffer: &protobuf::BufferExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&buffer.input, ctx, proto_converter)?; + into_physical_plan(&buffer.input, ctx, extension_codec, proto_converter)?; Ok(Arc::new(BufferExec::new(input, buffer.capacity as usize))) } - fn try_into_scalar_subquery_physical_plan( - &self, - sq: &protobuf::ScalarSubqueryExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, - proto_converter: &dyn PhysicalProtoConverterExtension, - ) -> Result> { - // First, deserialize the main input plan. We set up the subquery results - // container first, so that ScalarSubqueryExpr nodes can reference it. - let subquery_results = ScalarSubqueryResults::new(sq.subqueries.len()); - let input_ctx = ctx.with_scalar_subquery_results(subquery_results.clone()); - let input = into_physical_plan(&sq.input, &input_ctx, proto_converter)?; - - // Now deserialize the subquery children. - let subqueries: Vec = sq - .subqueries - .iter() - .enumerate() - .map(|(index, sq_plan)| { - let plan = - sq_plan.try_into_physical_plan_with_context(ctx, proto_converter)?; - Ok(ScalarSubqueryLink { - plan, - index: SubqueryIndex::new(index), - }) - }) - .collect::>>()?; - - Ok(Arc::new(ScalarSubqueryExec::new( - input, - subqueries, - subquery_results, - ))) - } - fn try_from_explain_exec( exec: &ExplainExec, _codec: &dyn PhysicalExtensionCodec, @@ -2341,8 +2358,10 @@ impl protobuf::PhysicalPlanNode { ), default_filter_selectivity: exec.default_selectivity() as u32, projection: match exec.projection() { - None => (0..fields.len()).map(|i| i as u32).collect(), - Some(v) => v.iter().map(|x| *x as u32).collect(), + None => (0..exec.input().schema().fields().len()) + .map(|i| i as u32) + .collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), }, batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), @@ -3634,38 +3653,6 @@ impl protobuf::PhysicalPlanNode { ))), }) } - - fn try_from_scalar_subquery_exec( - exec: &ScalarSubqueryExec, - codec: &dyn PhysicalExtensionCodec, - proto_converter: &dyn PhysicalProtoConverterExtension, - ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( - Arc::clone(exec.input()), - codec, - proto_converter, - )?; - let subqueries = exec - .subqueries() - .iter() - .map(|sq| { - protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( - Arc::clone(&sq.plan), - codec, - proto_converter, - ) - }) - .collect::>>()?; - - Ok(protobuf::PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::ScalarSubquery(Box::new( - protobuf::ScalarSubqueryExecNode { - input: Some(Box::new(input)), - subqueries, - }, - ))), - }) - } } pub trait AsExecutionPlan: Debug + Send + Sync + Clone { @@ -3774,21 +3761,11 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { pub trait PhysicalProtoConverterExtension { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result>; - fn default_proto_to_execution_plan( - &self, - proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, - ) -> Result> - where - Self: Sized, - { - proto.try_into_physical_plan_with_context(ctx, self) - } - fn execution_plan_to_proto( &self, plan: &Arc, @@ -3798,22 +3775,11 @@ pub trait PhysicalProtoConverterExtension { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result>; - fn default_proto_to_physical_expr( - &self, - proto: &protobuf::PhysicalExprNode, - input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, - ) -> Result> - where - Self: Sized, - { - parse_physical_expr_with_converter(proto, input_schema, ctx, self) - } - fn physical_expr_to_proto( &self, expr: &Arc, @@ -3834,15 +3800,15 @@ struct DataEncoderTuple { pub blob: Vec, } -pub struct DefaultPhysicalProtoConverter {} - +pub struct DefaultPhysicalProtoConverter; impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { - proto.try_into_physical_plan_with_context(ctx, self) + proto.try_into_physical_plan_with_converter(ctx, codec, self) } fn execution_plan_to_proto( @@ -3863,14 +3829,15 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, { // Default implementation calls the free function - parse_physical_expr_with_converter(proto, input_schema, ctx, self) + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) } fn physical_expr_to_proto( @@ -3900,8 +3867,9 @@ impl DeduplicatingSerializer { impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_execution_plan( &self, + _ctx: &TaskContext, + _codec: &dyn PhysicalExtensionCodec, _proto: &protobuf::PhysicalPlanNode, - _ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { internal_err!("DeduplicatingSerializer cannot deserialize execution plans") } @@ -3924,8 +3892,9 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_physical_expr( &self, _proto: &protobuf::PhysicalExprNode, + _ctx: &TaskContext, _input_schema: &Schema, - _ctx: &PhysicalPlanDecodeContext<'_>, + _codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, @@ -3965,10 +3934,11 @@ struct DeduplicatingDeserializer { impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { - proto.try_into_physical_plan_with_context(ctx, self) + proto.try_into_physical_plan_with_converter(ctx, codec, self) } fn execution_plan_to_proto( @@ -3985,8 +3955,9 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, @@ -3997,12 +3968,17 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { return Ok(Arc::clone(cached)); } // Deserialize and cache - let expr = - parse_physical_expr_with_converter(proto, input_schema, ctx, self)?; + let expr = parse_physical_expr_with_converter( + proto, + ctx, + input_schema, + codec, + self, + )?; self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); Ok(expr) } else { - parse_physical_expr_with_converter(proto, input_schema, ctx, self) + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) } } @@ -4036,11 +4012,12 @@ pub struct DeduplicatingProtoConverter {} impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { let deserializer = DeduplicatingDeserializer::default(); - proto.try_into_physical_plan_with_context(ctx, &deserializer) + proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer) } fn execution_plan_to_proto( @@ -4062,14 +4039,15 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, { let deserializer = DeduplicatingDeserializer::default(); - deserializer.proto_to_physical_expr(proto, input_schema, ctx) + deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec) } fn physical_expr_to_proto( @@ -4183,11 +4161,12 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { fn into_physical_plan( node: &Option>, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if let Some(field) = node { - proto_converter.proto_to_execution_plan(field, ctx) + proto_converter.proto_to_execution_plan(ctx, codec, field) } else { Err(proto_error("Missing required field in protobuf")) } From 1a2896712e027fa2f2fdef40894d3749c8b37655 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 10:11:49 +0530 Subject: [PATCH 4/8] test for filterexec --- .../tests/cases/roundtrip_physical_plan.rs | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index e7d38b57a1522..93443f225573f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3680,4 +3680,65 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { LIMIT 20"; roundtrip_test_sql_with_context(sql, &ctx).await + +#[tokio::test] +async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { + use datafusion::physical_plan::memory::MemoryExec; + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input: Arc = Arc::new( + MemoryExec::try_new(&[], Arc::clone(&schema), None)? + ); + + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), + )); + + // Case 1: None -> should round-trip as None (return all columns) + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!(rt.projection(), None, "None projection must stay None after roundtrip"); + + // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + Some(&vec![]), + "Empty projection Some([]) must survive roundtrip, not become None" + ); + + // Case 3: Some(vec![2, 0]) -> partial projection must survive + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![2, 0]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + Some(&vec![2_usize, 0_usize]), + "Partial projection must survive roundtrip" + ); + + Ok(()) +} + } From bfc0235d2b9265e93f4a9af28bbeb1b602bc2287 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 04:50:02 +0000 Subject: [PATCH 5/8] spell fix --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 93443f225573f..967224da9b16b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3680,7 +3680,8 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { LIMIT 20"; roundtrip_test_sql_with_context(sql, &ctx).await - +} + #[tokio::test] async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { use datafusion::physical_plan::memory::MemoryExec; From 3c344be425d3f652c819441b64f50f7e10e9346d Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 04:54:23 +0000 Subject: [PATCH 6/8] bracesfix --- datafusion/proto/tests/cases/roundtrip_physical_plan.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 967224da9b16b..3bf9be8d76c9b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3742,4 +3742,3 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { Ok(()) } -} From ceefc0262294dbf941aad19fb80ae6e8efd4b671 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 04:56:36 +0000 Subject: [PATCH 7/8] fmt --- .../proto/tests/cases/roundtrip_physical_plan.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3bf9be8d76c9b..ff359d09ef448 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3695,9 +3695,8 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { Field::new("c", DataType::Int32, false), ])); - let input: Arc = Arc::new( - MemoryExec::try_new(&[], Arc::clone(&schema), None)? - ); + let input: Arc = + Arc::new(MemoryExec::try_new(&[], Arc::clone(&schema), None)?); let predicate: Arc = Arc::new(BinaryExpr::new( Arc::new(Column::new("a", 0)), @@ -3706,12 +3705,16 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { )); // Case 1: None -> should round-trip as None (return all columns) - let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) - .build()?; + let filter = + FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)).build()?; let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; let rt = roundtripped.as_any().downcast_ref::().unwrap(); - assert_eq!(rt.projection(), None, "None projection must stay None after roundtrip"); + assert_eq!( + rt.projection(), + None, + "None projection must stay None after roundtrip" + ); // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) @@ -3741,4 +3744,3 @@ async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { Ok(()) } - From 44a005b3b848c51d210017ff4ff8b773209127d4 Mon Sep 17 00:00:00 2001 From: aditya singh rathore Date: Tue, 28 Apr 2026 05:13:08 +0000 Subject: [PATCH 8/8] bug fix --- datafusion/proto/src/physical_plan/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 605d520529a8c..afb66c71e3399 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3259,9 +3259,12 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + projection: match exec.projection() { + None => (0..exec.input().schema().fields().len()) + .map(|i| i as u32) + .collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, }, ))), })