diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6145df65032e3..afb66c71e3399 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -51,7 +51,6 @@ use datafusion_datasource_parquet::source::ParquetSource; #[cfg(feature = "parquet")] use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{FunctionRegistry, TaskContext}; -use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use datafusion_functions_table::generate_series::{ Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue, @@ -84,7 +83,6 @@ use datafusion_physical_plan::metrics::{MetricCategory, MetricType}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::scalar_subquery::{ScalarSubqueryExec, ScalarSubqueryLink}; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::{InterleaveExec, UnionExec}; @@ -119,58 +117,6 @@ use crate::{convert_required, into_required}; pub mod from_proto; pub mod to_proto; -/// Context threaded through physical-plan deserialization. -/// -/// This bundles the stable per-call inputs for deserialization and the -/// per-scope `ScalarSubqueryResults` handle needed while reconstructing -/// `ScalarSubqueryExpr` nodes inside a `ScalarSubqueryExec` input plan. -#[derive(Clone)] -pub struct PhysicalPlanDecodeContext<'a> { - task_ctx: &'a TaskContext, - codec: &'a dyn PhysicalExtensionCodec, - scalar_subquery_results: Option, -} - -impl<'a> PhysicalPlanDecodeContext<'a> { - /// Creates a new root decode context. - pub fn new(task_ctx: &'a TaskContext, codec: &'a dyn PhysicalExtensionCodec) -> Self { - Self { - task_ctx, - codec, - scalar_subquery_results: None, - } - } - - /// Returns the task context used for deserialization. - pub fn task_ctx(&self) -> &'a TaskContext { - self.task_ctx - } - - /// Returns the physical extension codec used for deserialization. - pub fn codec(&self) -> &'a dyn PhysicalExtensionCodec { - self.codec - } - - /// Returns the scalar subquery results container for the current scope, if - /// one is active. - pub fn scalar_subquery_results(&self) -> Option<&ScalarSubqueryResults> { - self.scalar_subquery_results.as_ref() - } - - /// Returns a child context with a different scalar subquery results - /// container. - pub fn with_scalar_subquery_results( - &self, - scalar_subquery_results: ScalarSubqueryResults, - ) -> Self { - Self { - task_ctx: self.task_ctx, - codec: self.codec, - scalar_subquery_results: Some(scalar_subquery_results), - } - } -} - impl AsExecutionPlan for protobuf::PhysicalPlanNode { fn try_decode(buf: &[u8]) -> Result where @@ -222,16 +168,8 @@ impl protobuf::PhysicalPlanNode { pub fn try_into_physical_plan_with_converter( &self, ctx: &TaskContext, - codec: &dyn PhysicalExtensionCodec, - proto_converter: &dyn PhysicalProtoConverterExtension, - ) -> Result> { - let decode_ctx = PhysicalPlanDecodeContext::new(ctx, codec); - self.try_into_physical_plan_with_context(&decode_ctx, proto_converter) - } - pub(crate) fn try_into_physical_plan_with_context( - &self, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let plan = self.physical_plan_type.as_ref().ok_or_else(|| { @@ -241,124 +179,147 @@ impl protobuf::PhysicalPlanNode { })?; match plan { PhysicalPlanType::Explain(explain) => { - self.try_into_explain_physical_plan(explain, ctx, proto_converter) - } - PhysicalPlanType::Projection(projection) => { - self.try_into_projection_physical_plan(projection, ctx, proto_converter) + self.try_into_explain_physical_plan(explain, ctx, codec, proto_converter) } + PhysicalPlanType::Projection(projection) => self + .try_into_projection_physical_plan( + projection, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Filter(filter) => { - self.try_into_filter_physical_plan(filter, ctx, proto_converter) + self.try_into_filter_physical_plan(filter, ctx, codec, proto_converter) } PhysicalPlanType::CsvScan(scan) => { - self.try_into_csv_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_csv_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::JsonScan(scan) => { - self.try_into_json_scan_physical_plan(scan, ctx, proto_converter) - } - PhysicalPlanType::ParquetScan(scan) => { - self.try_into_parquet_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_json_scan_physical_plan(scan, ctx, codec, proto_converter) } + PhysicalPlanType::ParquetScan(scan) => self + .try_into_parquet_scan_physical_plan(scan, ctx, codec, proto_converter), PhysicalPlanType::AvroScan(scan) => { - self.try_into_avro_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_avro_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::MemoryScan(scan) => { - self.try_into_memory_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_memory_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::ArrowScan(scan) => { - self.try_into_arrow_scan_physical_plan(scan, ctx, proto_converter) + self.try_into_arrow_scan_physical_plan(scan, ctx, codec, proto_converter) } PhysicalPlanType::CoalesceBatches(coalesce_batches) => self .try_into_coalesce_batches_physical_plan( coalesce_batches, ctx, + codec, proto_converter, ), PhysicalPlanType::Merge(merge) => { - self.try_into_merge_physical_plan(merge, ctx, proto_converter) - } - PhysicalPlanType::Repartition(repart) => { - self.try_into_repartition_physical_plan(repart, ctx, proto_converter) - } - PhysicalPlanType::GlobalLimit(limit) => { - self.try_into_global_limit_physical_plan(limit, ctx, proto_converter) - } - PhysicalPlanType::LocalLimit(limit) => { - self.try_into_local_limit_physical_plan(limit, ctx, proto_converter) - } - PhysicalPlanType::Window(window_agg) => { - self.try_into_window_physical_plan(window_agg, ctx, proto_converter) - } - PhysicalPlanType::Aggregate(hash_agg) => { - self.try_into_aggregate_physical_plan(hash_agg, ctx, proto_converter) - } - PhysicalPlanType::HashJoin(hashjoin) => { - self.try_into_hash_join_physical_plan(hashjoin, ctx, proto_converter) + self.try_into_merge_physical_plan(merge, ctx, codec, proto_converter) } + PhysicalPlanType::Repartition(repart) => self + .try_into_repartition_physical_plan(repart, ctx, codec, proto_converter), + PhysicalPlanType::GlobalLimit(limit) => self + .try_into_global_limit_physical_plan(limit, ctx, codec, proto_converter), + PhysicalPlanType::LocalLimit(limit) => self + .try_into_local_limit_physical_plan(limit, ctx, codec, proto_converter), + PhysicalPlanType::Window(window_agg) => self.try_into_window_physical_plan( + window_agg, + ctx, + codec, + proto_converter, + ), + PhysicalPlanType::Aggregate(hash_agg) => self + .try_into_aggregate_physical_plan(hash_agg, ctx, codec, proto_converter), + PhysicalPlanType::HashJoin(hashjoin) => self + .try_into_hash_join_physical_plan(hashjoin, ctx, codec, proto_converter), PhysicalPlanType::SymmetricHashJoin(sym_join) => self .try_into_symmetric_hash_join_physical_plan( sym_join, ctx, + codec, proto_converter, ), PhysicalPlanType::Union(union) => { - self.try_into_union_physical_plan(union, ctx, proto_converter) - } - PhysicalPlanType::Interleave(interleave) => { - self.try_into_interleave_physical_plan(interleave, ctx, proto_converter) - } - PhysicalPlanType::CrossJoin(crossjoin) => { - self.try_into_cross_join_physical_plan(crossjoin, ctx, proto_converter) + self.try_into_union_physical_plan(union, ctx, codec, proto_converter) } + PhysicalPlanType::Interleave(interleave) => self + .try_into_interleave_physical_plan( + interleave, + ctx, + codec, + proto_converter, + ), + PhysicalPlanType::CrossJoin(crossjoin) => self + .try_into_cross_join_physical_plan( + crossjoin, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Empty(empty) => { - self.try_into_empty_physical_plan(empty, ctx, proto_converter) + self.try_into_empty_physical_plan(empty, ctx, codec, proto_converter) } PhysicalPlanType::PlaceholderRow(placeholder) => { - self.try_into_placeholder_row_physical_plan(placeholder, ctx) + self.try_into_placeholder_row_physical_plan(placeholder, ctx, codec) } PhysicalPlanType::Sort(sort) => { - self.try_into_sort_physical_plan(sort, ctx, proto_converter) + self.try_into_sort_physical_plan(sort, ctx, codec, proto_converter) } PhysicalPlanType::SortPreservingMerge(sort) => self - .try_into_sort_preserving_merge_physical_plan(sort, ctx, proto_converter), - PhysicalPlanType::Extension(extension) => { - self.try_into_extension_physical_plan(extension, ctx, proto_converter) - } - PhysicalPlanType::NestedLoopJoin(join) => { - self.try_into_nested_loop_join_physical_plan(join, ctx, proto_converter) - } + .try_into_sort_preserving_merge_physical_plan( + sort, + ctx, + codec, + proto_converter, + ), + PhysicalPlanType::Extension(extension) => self + .try_into_extension_physical_plan(extension, ctx, codec, proto_converter), + PhysicalPlanType::NestedLoopJoin(join) => self + .try_into_nested_loop_join_physical_plan( + join, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Analyze(analyze) => { - self.try_into_analyze_physical_plan(analyze, ctx, proto_converter) + self.try_into_analyze_physical_plan(analyze, ctx, codec, proto_converter) } PhysicalPlanType::JsonSink(sink) => { - self.try_into_json_sink_physical_plan(sink, ctx, proto_converter) + self.try_into_json_sink_physical_plan(sink, ctx, codec, proto_converter) } PhysicalPlanType::CsvSink(sink) => { - self.try_into_csv_sink_physical_plan(sink, ctx, proto_converter) + self.try_into_csv_sink_physical_plan(sink, ctx, codec, proto_converter) } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] - PhysicalPlanType::ParquetSink(sink) => { - self.try_into_parquet_sink_physical_plan(sink, ctx, proto_converter) - } + PhysicalPlanType::ParquetSink(sink) => self + .try_into_parquet_sink_physical_plan(sink, ctx, codec, proto_converter), PhysicalPlanType::Unnest(unnest) => { - self.try_into_unnest_physical_plan(unnest, ctx, proto_converter) - } - PhysicalPlanType::Cooperative(cooperative) => { - self.try_into_cooperative_physical_plan(cooperative, ctx, proto_converter) + self.try_into_unnest_physical_plan(unnest, ctx, codec, proto_converter) } + PhysicalPlanType::Cooperative(cooperative) => self + .try_into_cooperative_physical_plan( + cooperative, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::GenerateSeries(generate_series) => { self.try_into_generate_series_physical_plan(generate_series) } PhysicalPlanType::SortMergeJoin(sort_join) => { - self.try_into_sort_join(sort_join, ctx, proto_converter) - } - PhysicalPlanType::AsyncFunc(async_func) => { - self.try_into_async_func_physical_plan(async_func, ctx, proto_converter) + self.try_into_sort_join(sort_join, ctx, codec, proto_converter) } + PhysicalPlanType::AsyncFunc(async_func) => self + .try_into_async_func_physical_plan( + async_func, + ctx, + codec, + proto_converter, + ), PhysicalPlanType::Buffer(buffer) => { - self.try_into_buffer_physical_plan(buffer, ctx, proto_converter) - } - PhysicalPlanType::ScalarSubquery(sq) => { - self.try_into_scalar_subquery_physical_plan(sq, ctx, proto_converter) + self.try_into_buffer_physical_plan(buffer, ctx, codec, proto_converter) } } } @@ -608,14 +569,6 @@ impl protobuf::PhysicalPlanNode { ); } - if let Some(exec) = plan.downcast_ref::() { - return protobuf::PhysicalPlanNode::try_from_scalar_subquery_exec( - exec, - codec, - proto_converter, - ); - } - let mut buf: Vec = vec![]; match codec.try_encode(Arc::clone(&plan_clone), &mut buf) { Ok(_) => { @@ -649,7 +602,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_explain_physical_plan( &self, explain: &protobuf::ExplainExecNode, - _ctx: &PhysicalPlanDecodeContext<'_>, + _ctx: &TaskContext, + + _codec: &dyn PhysicalExtensionCodec, _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { Ok(Arc::new(ExplainExec::new( @@ -666,11 +621,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_projection_physical_plan( &self, projection: &protobuf::ProjectionExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&projection.input, ctx, proto_converter)?; + into_physical_plan(&projection.input, ctx, codec, proto_converter)?; let exprs = projection .expr .iter() @@ -679,8 +636,9 @@ impl protobuf::PhysicalPlanNode { Ok(( proto_converter.proto_to_physical_expr( expr, - input.schema().as_ref(), ctx, + input.schema().as_ref(), + codec, )?, name.to_string(), )) @@ -696,17 +654,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_filter_physical_plan( &self, filter: &protobuf::FilterExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&filter.input, ctx, proto_converter)?; + into_physical_plan(&filter.input, ctx, codec, proto_converter)?; let predicate = filter .expr .as_ref() .map(|expr| { - proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + proto_converter.proto_to_physical_expr( + expr, + ctx, + input.schema().as_ref(), + codec, + ) }) .transpose()? .ok_or_else(|| { @@ -716,16 +681,23 @@ impl protobuf::PhysicalPlanNode { })?; let filter_selectivity = filter.default_filter_selectivity.try_into(); - let projection = if !filter.projection.is_empty() { - Some( - filter - .projection - .iter() - .map(|i| *i as usize) - .collect::>(), - ) - } else { + + // Deserialize projection, distinguishing between: + // None => full projection (all columns) + // Some(vec![]) => empty projection (no columns) + // Some(vec![..]) => partial projection + let num_fields = input.schema().fields().len(); + let mut is_full_projection = filter.projection.len() == num_fields; + let mut projection_vec: Vec = Vec::with_capacity(filter.projection.len()); + for (i, idx) in filter.projection.iter().enumerate() { + let idx = *idx as usize; + is_full_projection &= idx == i; + projection_vec.push(idx); + } + let projection = if is_full_projection { None + } else { + Some(projection_vec) }; let filter = FilterExecBuilder::new(predicate, input) @@ -746,7 +718,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_csv_scan_physical_plan( &self, scan: &protobuf::CsvScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let escape = @@ -788,6 +762,7 @@ impl protobuf::PhysicalPlanNode { let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), ctx, + codec, proto_converter, source, )?) @@ -799,7 +774,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_json_scan_physical_plan( &self, scan: &protobuf::JsonScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let base_conf = scan.base_conf.as_ref().unwrap(); @@ -807,6 +784,7 @@ impl protobuf::PhysicalPlanNode { let scan_conf = parse_protobuf_file_scan_config( base_conf, ctx, + codec, proto_converter, Arc::new(JsonSource::new(table_schema)), )?; @@ -816,7 +794,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_arrow_scan_physical_plan( &self, scan: &protobuf::ArrowScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let base_conf = scan.base_conf.as_ref().ok_or_else(|| { @@ -826,6 +805,7 @@ impl protobuf::PhysicalPlanNode { let scan_conf = parse_protobuf_file_scan_config( base_conf, ctx, + codec, proto_converter, Arc::new(ArrowSource::new_file_source(table_schema)), )?; @@ -836,7 +816,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_parquet_scan_physical_plan( &self, scan: &protobuf::ParquetScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { #[cfg(feature = "parquet")] @@ -865,8 +846,9 @@ impl protobuf::PhysicalPlanNode { .map(|expr| { proto_converter.proto_to_physical_expr( expr, - predicate_schema.as_ref(), ctx, + predicate_schema.as_ref(), + codec, ) }) .transpose()?; @@ -882,15 +864,9 @@ impl protobuf::PhysicalPlanNode { false => ObjectStoreUrl::parse(&base_conf.object_store_url)?, true => ObjectStoreUrl::local_filesystem(), }; - let store = ctx - .task_ctx() - .runtime_env() - .object_store(object_store_url)?; - let metadata_cache = ctx - .task_ctx() - .runtime_env() - .cache_manager - .get_file_metadata_cache(); + let store = ctx.runtime_env().object_store(object_store_url)?; + let metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); let reader_factory = Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); @@ -904,6 +880,7 @@ impl protobuf::PhysicalPlanNode { let base_config = parse_protobuf_file_scan_config( base_conf, ctx, + codec, proto_converter, Arc::new(source), )?; @@ -919,7 +896,8 @@ impl protobuf::PhysicalPlanNode { fn try_into_avro_scan_physical_plan( &self, scan: &protobuf::AvroScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { #[cfg(feature = "avro")] @@ -929,6 +907,7 @@ impl protobuf::PhysicalPlanNode { let conf = parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), ctx, + codec, proto_converter, Arc::new(AvroSource::new(table_schema)), )?; @@ -942,7 +921,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_memory_scan_physical_plan( &self, scan: &protobuf::MemoryScanExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let partitions = scan @@ -973,6 +954,7 @@ impl protobuf::PhysicalPlanNode { &ordering.physical_sort_expr_nodes, ctx, &schema, + codec, proto_converter, )?; sort_information.extend(LexOrdering::new(sort_exprs)); @@ -990,11 +972,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_coalesce_batches_physical_plan( &self, coalesce_batches: &protobuf::CoalesceBatchesExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&coalesce_batches.input, ctx, proto_converter)?; + into_physical_plan(&coalesce_batches.input, ctx, codec, proto_converter)?; Ok(Arc::new( #[expect(deprecated)] CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize) @@ -1005,11 +989,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_merge_physical_plan( &self, merge: &protobuf::CoalescePartitionsExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&merge.input, ctx, proto_converter)?; + into_physical_plan(&merge.input, ctx, codec, proto_converter)?; Ok(Arc::new( CoalescePartitionsExec::new(input) .with_fetch(merge.fetch.map(|f| f as usize)), @@ -1019,15 +1005,18 @@ impl protobuf::PhysicalPlanNode { fn try_into_repartition_physical_plan( &self, repart: &protobuf::RepartitionExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&repart.input, ctx, proto_converter)?; + into_physical_plan(&repart.input, ctx, codec, proto_converter)?; let partitioning = parse_protobuf_partitioning( repart.partitioning.as_ref(), ctx, input.schema().as_ref(), + codec, proto_converter, )?; let mut repart_exec = RepartitionExec::try_new(input, partitioning.unwrap())?; @@ -1040,11 +1029,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_global_limit_physical_plan( &self, limit: &protobuf::GlobalLimitExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&limit.input, ctx, proto_converter)?; + into_physical_plan(&limit.input, ctx, codec, proto_converter)?; let fetch = if limit.fetch >= 0 { Some(limit.fetch as usize) } else { @@ -1060,22 +1051,26 @@ impl protobuf::PhysicalPlanNode { fn try_into_local_limit_physical_plan( &self, limit: &protobuf::LocalLimitExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&limit.input, ctx, proto_converter)?; + into_physical_plan(&limit.input, ctx, codec, proto_converter)?; Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize))) } fn try_into_window_physical_plan( &self, window_agg: &protobuf::WindowAggExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&window_agg.input, ctx, proto_converter)?; + into_physical_plan(&window_agg.input, ctx, codec, proto_converter)?; let input_schema = input.schema(); let physical_window_expr: Vec> = window_agg @@ -1086,6 +1081,7 @@ impl protobuf::PhysicalPlanNode { window_expr, ctx, input_schema.as_ref(), + codec, proto_converter, ) }) @@ -1095,7 +1091,12 @@ impl protobuf::PhysicalPlanNode { .partition_keys .iter() .map(|expr| { - proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + proto_converter.proto_to_physical_expr( + expr, + ctx, + input.schema().as_ref(), + codec, + ) }) .collect::>>>()?; @@ -1128,11 +1129,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_aggregate_physical_plan( &self, hash_agg: &protobuf::AggregateExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&hash_agg.input, ctx, proto_converter)?; + into_physical_plan(&hash_agg.input, ctx, codec, proto_converter)?; let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| { proto_error(format!( "Received a AggregateNode message with unknown AggregateMode {}", @@ -1158,7 +1161,7 @@ impl protobuf::PhysicalPlanNode { .zip(hash_agg.group_expr_name.iter()) .map(|(expr, name)| { proto_converter - .proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec) .map(|expr| (expr, name.to_string())) }) .collect::, _>>()?; @@ -1169,7 +1172,7 @@ impl protobuf::PhysicalPlanNode { .zip(hash_agg.group_expr_name.iter()) .map(|(expr, name)| { proto_converter - .proto_to_physical_expr(expr, input.schema().as_ref(), ctx) + .proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec) .map(|expr| (expr, name.to_string())) }) .collect::, _>>()?; @@ -1198,7 +1201,12 @@ impl protobuf::PhysicalPlanNode { expr.expr .as_ref() .map(|e| { - proto_converter.proto_to_physical_expr(e, &physical_schema, ctx) + proto_converter.proto_to_physical_expr( + e, + ctx, + &physical_schema, + codec, + ) }) .transpose() }) @@ -1221,8 +1229,9 @@ impl protobuf::PhysicalPlanNode { .map(|e| { proto_converter.proto_to_physical_expr( e, - &physical_schema, ctx, + &physical_schema, + codec, ) }) .collect::>>()?; @@ -1234,6 +1243,7 @@ impl protobuf::PhysicalPlanNode { e, ctx, &physical_schema, + codec, proto_converter, ) }) @@ -1245,14 +1255,11 @@ impl protobuf::PhysicalPlanNode { AggregateFunction::UserDefinedAggrFunction(udaf_name) => { let agg_udf = match &agg_node.fun_definition { Some(buf) => { - ctx.codec().try_decode_udaf(udaf_name, buf)? + codec.try_decode_udaf(udaf_name, buf)? } - None => ctx.task_ctx().udaf(udaf_name).or_else( - |_| { - ctx.codec() - .try_decode_udaf(udaf_name, &[]) - }, - )?, + None => ctx.udaf(udaf_name).or_else(|_| { + codec.try_decode_udaf(udaf_name, &[]) + })?, }; AggregateExprBuilder::new(agg_udf, input_phy_expr) @@ -1304,13 +1311,15 @@ impl protobuf::PhysicalPlanNode { fn try_into_hash_join_physical_plan( &self, hashjoin: &protobuf::HashJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let left: Arc = - into_physical_plan(&hashjoin.left, ctx, proto_converter)?; + into_physical_plan(&hashjoin.left, ctx, codec, proto_converter)?; let right: Arc = - into_physical_plan(&hashjoin.right, ctx, proto_converter)?; + into_physical_plan(&hashjoin.right, ctx, codec, proto_converter)?; let left_schema = left.schema(); let right_schema = right.schema(); let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin @@ -1319,13 +1328,15 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = proto_converter.proto_to_physical_expr( &col.left.clone().unwrap(), - left_schema.as_ref(), ctx, + left_schema.as_ref(), + codec, )?; let right = proto_converter.proto_to_physical_expr( &col.right.clone().unwrap(), - right_schema.as_ref(), ctx, + right_schema.as_ref(), + codec, )?; Ok((left, right)) }) @@ -1358,8 +1369,8 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, - ctx, + ctx, &schema, + codec, )?; let column_indices = f.column_indices .iter() @@ -1420,11 +1431,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_symmetric_hash_join_physical_plan( &self, sym_join: &protobuf::SymmetricHashJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let left = into_physical_plan(&sym_join.left, ctx, proto_converter)?; - let right = into_physical_plan(&sym_join.right, ctx, proto_converter)?; + let left = into_physical_plan(&sym_join.left, ctx, codec, proto_converter)?; + let right = into_physical_plan(&sym_join.right, ctx, codec, proto_converter)?; let left_schema = left.schema(); let right_schema = right.schema(); let on = sym_join @@ -1433,13 +1446,15 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = proto_converter.proto_to_physical_expr( &col.left.clone().unwrap(), - left_schema.as_ref(), ctx, + left_schema.as_ref(), + codec, )?; let right = proto_converter.proto_to_physical_expr( &col.right.clone().unwrap(), - right_schema.as_ref(), ctx, + right_schema.as_ref(), + codec, )?; Ok((left, right)) }) @@ -1472,8 +1487,8 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, - ctx, + ctx, &schema, + codec, )?; let column_indices = f.column_indices .iter() @@ -1499,6 +1514,7 @@ impl protobuf::PhysicalPlanNode { &sym_join.left_sort_exprs, ctx, &left_schema, + codec, proto_converter, )?; let left_sort_exprs = LexOrdering::new(left_sort_exprs); @@ -1507,6 +1523,7 @@ impl protobuf::PhysicalPlanNode { &sym_join.right_sort_exprs, ctx, &right_schema, + codec, proto_converter, )?; let right_sort_exprs = LexOrdering::new(right_sort_exprs); @@ -1545,12 +1562,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_union_physical_plan( &self, union: &protobuf::UnionExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let mut inputs: Vec> = vec![]; for input in &union.inputs { - inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?); + inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?); } UnionExec::try_new(inputs) } @@ -1558,12 +1577,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_interleave_physical_plan( &self, interleave: &protobuf::InterleaveExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let mut inputs: Vec> = vec![]; for input in &interleave.inputs { - inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?); + inputs.push(proto_converter.proto_to_execution_plan(ctx, codec, input)?); } Ok(Arc::new(InterleaveExec::try_new(inputs)?)) } @@ -1571,20 +1592,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_cross_join_physical_plan( &self, crossjoin: &protobuf::CrossJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let left: Arc = - into_physical_plan(&crossjoin.left, ctx, proto_converter)?; + into_physical_plan(&crossjoin.left, ctx, codec, proto_converter)?; let right: Arc = - into_physical_plan(&crossjoin.right, ctx, proto_converter)?; + into_physical_plan(&crossjoin.right, ctx, codec, proto_converter)?; Ok(Arc::new(CrossJoinExec::new(left, right))) } fn try_into_empty_physical_plan( &self, empty: &protobuf::EmptyExecNode, - _ctx: &PhysicalPlanDecodeContext<'_>, + _ctx: &TaskContext, + + _codec: &dyn PhysicalExtensionCodec, _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let schema = Arc::new(convert_required!(empty.schema)?); @@ -1594,7 +1619,9 @@ impl protobuf::PhysicalPlanNode { fn try_into_placeholder_row_physical_plan( &self, placeholder: &protobuf::PlaceholderRowExecNode, - _ctx: &PhysicalPlanDecodeContext<'_>, + _ctx: &TaskContext, + + _codec: &dyn PhysicalExtensionCodec, ) -> Result> { let schema = Arc::new(convert_required!(placeholder.schema)?); Ok(Arc::new(PlaceholderRowExec::new(schema))) @@ -1603,10 +1630,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_physical_plan( &self, sort: &protobuf::SortExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sort.input, ctx, proto_converter)?; + let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?; let exprs = sort .expr .iter() @@ -1627,11 +1656,7 @@ impl protobuf::PhysicalPlanNode { })? .as_ref(); Ok(PhysicalSortExpr { - expr: proto_converter.proto_to_physical_expr( - expr, - input.schema().as_ref(), - ctx, - )?, + expr: proto_converter.proto_to_physical_expr(expr, ctx, input.schema().as_ref(), codec)?, options: SortOptions { descending: !sort_expr.asc, nulls_first: sort_expr.nulls_first, @@ -1658,10 +1683,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_preserving_merge_physical_plan( &self, sort: &protobuf::SortPreservingMergeExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sort.input, ctx, proto_converter)?; + let input = into_physical_plan(&sort.input, ctx, codec, proto_converter)?; let exprs = sort .expr .iter() @@ -1684,8 +1711,9 @@ impl protobuf::PhysicalPlanNode { Ok(PhysicalSortExpr { expr: proto_converter.proto_to_physical_expr( expr, - input.schema().as_ref(), ctx, + input.schema().as_ref(), + codec, )?, options: SortOptions { descending: !sort_expr.asc, @@ -1709,18 +1737,18 @@ impl protobuf::PhysicalPlanNode { fn try_into_extension_physical_plan( &self, extension: &protobuf::PhysicalExtensionNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let inputs: Vec> = extension .inputs .iter() - .map(|i| proto_converter.proto_to_execution_plan(i, ctx)) + .map(|i| proto_converter.proto_to_execution_plan(ctx, codec, i)) .collect::>()?; - let extension_node = - ctx.codec() - .try_decode(extension.node.as_slice(), &inputs, ctx.task_ctx())?; + let extension_node = codec.try_decode(extension.node.as_slice(), &inputs, ctx)?; Ok(extension_node) } @@ -1728,13 +1756,15 @@ impl protobuf::PhysicalPlanNode { fn try_into_nested_loop_join_physical_plan( &self, join: &protobuf::NestedLoopJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let left: Arc = - into_physical_plan(&join.left, ctx, proto_converter)?; + into_physical_plan(&join.left, ctx, codec, proto_converter)?; let right: Arc = - into_physical_plan(&join.right, ctx, proto_converter)?; + into_physical_plan(&join.right, ctx, codec, proto_converter)?; let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| { proto_error(format!( "Received a NestedLoopJoinExecNode message with unknown JoinType {}", @@ -1751,13 +1781,12 @@ impl protobuf::PhysicalPlanNode { .ok_or_else(|| proto_error("Missing JoinFilter schema"))? .try_into()?; - let expression = proto_converter - .proto_to_physical_expr( + let expression = proto_converter.proto_to_physical_expr( f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, - ctx, + ctx, &schema, + codec, )?; let column_indices = f.column_indices .iter() @@ -1802,11 +1831,13 @@ impl protobuf::PhysicalPlanNode { fn try_into_analyze_physical_plan( &self, analyze: &protobuf::AnalyzeExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&analyze.input, ctx, proto_converter)?; + into_physical_plan(&analyze.input, ctx, codec, proto_converter)?; let metric_categories = if analyze.has_metric_categories { let cats: Result> = analyze .metric_categories @@ -1830,10 +1861,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_json_sink_physical_plan( &self, sink: &protobuf::JsonSinkExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sink.input, ctx, proto_converter)?; + let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?; let data_sink: JsonSink = sink .sink @@ -1849,6 +1882,7 @@ impl protobuf::PhysicalPlanNode { &collection.physical_sort_expr_nodes, ctx, &sink_schema, + codec, proto_converter, ) .map(|sort_exprs| { @@ -1867,10 +1901,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_csv_sink_physical_plan( &self, sink: &protobuf::CsvSinkExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&sink.input, ctx, proto_converter)?; + let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?; let data_sink: CsvSink = sink .sink @@ -1886,6 +1922,7 @@ impl protobuf::PhysicalPlanNode { &collection.physical_sort_expr_nodes, ctx, &sink_schema, + codec, proto_converter, ) .map(|sort_exprs| { @@ -1905,12 +1942,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_parquet_sink_physical_plan( &self, sink: &protobuf::ParquetSinkExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { #[cfg(feature = "parquet")] { - let input = into_physical_plan(&sink.input, ctx, proto_converter)?; + let input = into_physical_plan(&sink.input, ctx, codec, proto_converter)?; let data_sink: ParquetSink = sink .sink @@ -1926,6 +1965,7 @@ impl protobuf::PhysicalPlanNode { &collection.physical_sort_expr_nodes, ctx, &sink_schema, + codec, proto_converter, ) .map(|sort_exprs| { @@ -1947,10 +1987,12 @@ impl protobuf::PhysicalPlanNode { fn try_into_unnest_physical_plan( &self, unnest: &protobuf::UnnestExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&unnest.input, ctx, proto_converter)?; + let input = into_physical_plan(&unnest.input, ctx, codec, proto_converter)?; Ok(Arc::new(UnnestExec::new( input, @@ -1977,12 +2019,14 @@ impl protobuf::PhysicalPlanNode { fn try_into_sort_join( &self, sort_join: &SortMergeJoinExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let left = into_physical_plan(&sort_join.left, ctx, proto_converter)?; + let left = into_physical_plan(&sort_join.left, ctx, codec, proto_converter)?; let left_schema = left.schema(); - let right = into_physical_plan(&sort_join.right, ctx, proto_converter)?; + let right = into_physical_plan(&sort_join.right, ctx, codec, proto_converter)?; let right_schema = right.schema(); let filter = sort_join @@ -1999,8 +2043,9 @@ impl protobuf::PhysicalPlanNode { f.expression.as_ref().ok_or_else(|| { proto_error("Unexpected empty filter expression") })?, - &schema, ctx, + &schema, + codec, )?; let column_indices = f .column_indices @@ -2059,13 +2104,15 @@ impl protobuf::PhysicalPlanNode { .map(|col| { let left = proto_converter.proto_to_physical_expr( &col.left.clone().unwrap(), - left_schema.as_ref(), ctx, + left_schema.as_ref(), + codec, )?; let right = proto_converter.proto_to_physical_expr( &col.right.clone().unwrap(), - right_schema.as_ref(), ctx, + right_schema.as_ref(), + codec, )?; Ok((left, right)) }) @@ -2150,21 +2197,24 @@ impl protobuf::PhysicalPlanNode { fn try_into_cooperative_physical_plan( &self, field_stream: &protobuf::CooperativeExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { - let input = into_physical_plan(&field_stream.input, ctx, proto_converter)?; + let input = into_physical_plan(&field_stream.input, ctx, codec, proto_converter)?; Ok(Arc::new(CooperativeExec::new(input))) } fn try_into_async_func_physical_plan( &self, async_func: &protobuf::AsyncFuncExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&async_func.input, ctx, proto_converter)?; + into_physical_plan(&async_func.input, ctx, codec, proto_converter)?; if async_func.async_exprs.len() != async_func.async_expr_names.len() { return internal_err!( @@ -2179,8 +2229,9 @@ impl protobuf::PhysicalPlanNode { .map(|(expr, name)| { let physical_expr = proto_converter.proto_to_physical_expr( expr, - input.schema().as_ref(), ctx, + input.schema().as_ref(), + codec, )?; Ok(Arc::new(AsyncFuncExpr::try_new( @@ -2197,49 +2248,16 @@ impl protobuf::PhysicalPlanNode { fn try_into_buffer_physical_plan( &self, buffer: &protobuf::BufferExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + extension_codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { let input: Arc = - into_physical_plan(&buffer.input, ctx, proto_converter)?; + into_physical_plan(&buffer.input, ctx, extension_codec, proto_converter)?; Ok(Arc::new(BufferExec::new(input, buffer.capacity as usize))) } - fn try_into_scalar_subquery_physical_plan( - &self, - sq: &protobuf::ScalarSubqueryExecNode, - ctx: &PhysicalPlanDecodeContext<'_>, - proto_converter: &dyn PhysicalProtoConverterExtension, - ) -> Result> { - // First, deserialize the main input plan. We set up the subquery results - // container first, so that ScalarSubqueryExpr nodes can reference it. - let subquery_results = ScalarSubqueryResults::new(sq.subqueries.len()); - let input_ctx = ctx.with_scalar_subquery_results(subquery_results.clone()); - let input = into_physical_plan(&sq.input, &input_ctx, proto_converter)?; - - // Now deserialize the subquery children. - let subqueries: Vec = sq - .subqueries - .iter() - .enumerate() - .map(|(index, sq_plan)| { - let plan = - sq_plan.try_into_physical_plan_with_context(ctx, proto_converter)?; - Ok(ScalarSubqueryLink { - plan, - index: SubqueryIndex::new(index), - }) - }) - .collect::>>()?; - - Ok(Arc::new(ScalarSubqueryExec::new( - input, - subqueries, - subquery_results, - ))) - } - fn try_from_explain_exec( exec: &ExplainExec, _codec: &dyn PhysicalExtensionCodec, @@ -2339,9 +2357,12 @@ impl protobuf::PhysicalPlanNode { .physical_expr_to_proto(exec.predicate(), codec)?, ), default_filter_selectivity: exec.default_selectivity() as u32, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + projection: match exec.projection() { + None => (0..exec.input().schema().fields().len()) + .map(|i| i as u32) + .collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, batch_size: exec.batch_size() as u32, fetch: exec.fetch().map(|f| f as u32), }, @@ -3238,9 +3259,12 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { - v.iter().map(|x| *x as u32).collect::>() - }), + projection: match exec.projection() { + None => (0..exec.input().schema().fields().len()) + .map(|i| i as u32) + .collect(), + Some(v) => v.iter().map(|x| *x as u32).collect(), + }, }, ))), }) @@ -3632,38 +3656,6 @@ impl protobuf::PhysicalPlanNode { ))), }) } - - fn try_from_scalar_subquery_exec( - exec: &ScalarSubqueryExec, - codec: &dyn PhysicalExtensionCodec, - proto_converter: &dyn PhysicalProtoConverterExtension, - ) -> Result { - let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( - Arc::clone(exec.input()), - codec, - proto_converter, - )?; - let subqueries = exec - .subqueries() - .iter() - .map(|sq| { - protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( - Arc::clone(&sq.plan), - codec, - proto_converter, - ) - }) - .collect::>>()?; - - Ok(protobuf::PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::ScalarSubquery(Box::new( - protobuf::ScalarSubqueryExecNode { - input: Some(Box::new(input)), - subqueries, - }, - ))), - }) - } } pub trait AsExecutionPlan: Debug + Send + Sync + Clone { @@ -3772,21 +3764,11 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec { pub trait PhysicalProtoConverterExtension { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result>; - fn default_proto_to_execution_plan( - &self, - proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, - ) -> Result> - where - Self: Sized, - { - proto.try_into_physical_plan_with_context(ctx, self) - } - fn execution_plan_to_proto( &self, plan: &Arc, @@ -3796,22 +3778,11 @@ pub trait PhysicalProtoConverterExtension { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result>; - fn default_proto_to_physical_expr( - &self, - proto: &protobuf::PhysicalExprNode, - input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, - ) -> Result> - where - Self: Sized, - { - parse_physical_expr_with_converter(proto, input_schema, ctx, self) - } - fn physical_expr_to_proto( &self, expr: &Arc, @@ -3832,15 +3803,15 @@ struct DataEncoderTuple { pub blob: Vec, } -pub struct DefaultPhysicalProtoConverter {} - +pub struct DefaultPhysicalProtoConverter; impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { - proto.try_into_physical_plan_with_context(ctx, self) + proto.try_into_physical_plan_with_converter(ctx, codec, self) } fn execution_plan_to_proto( @@ -3861,14 +3832,15 @@ impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, { // Default implementation calls the free function - parse_physical_expr_with_converter(proto, input_schema, ctx, self) + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) } fn physical_expr_to_proto( @@ -3898,8 +3870,9 @@ impl DeduplicatingSerializer { impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_execution_plan( &self, + _ctx: &TaskContext, + _codec: &dyn PhysicalExtensionCodec, _proto: &protobuf::PhysicalPlanNode, - _ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { internal_err!("DeduplicatingSerializer cannot deserialize execution plans") } @@ -3922,8 +3895,9 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { fn proto_to_physical_expr( &self, _proto: &protobuf::PhysicalExprNode, + _ctx: &TaskContext, _input_schema: &Schema, - _ctx: &PhysicalPlanDecodeContext<'_>, + _codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, @@ -3963,10 +3937,11 @@ struct DeduplicatingDeserializer { impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { - proto.try_into_physical_plan_with_context(ctx, self) + proto.try_into_physical_plan_with_converter(ctx, codec, self) } fn execution_plan_to_proto( @@ -3983,8 +3958,9 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, @@ -3995,12 +3971,17 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { return Ok(Arc::clone(cached)); } // Deserialize and cache - let expr = - parse_physical_expr_with_converter(proto, input_schema, ctx, self)?; + let expr = parse_physical_expr_with_converter( + proto, + ctx, + input_schema, + codec, + self, + )?; self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); Ok(expr) } else { - parse_physical_expr_with_converter(proto, input_schema, ctx, self) + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) } } @@ -4034,11 +4015,12 @@ pub struct DeduplicatingProtoConverter {} impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { fn proto_to_execution_plan( &self, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto: &protobuf::PhysicalPlanNode, - ctx: &PhysicalPlanDecodeContext<'_>, ) -> Result> { let deserializer = DeduplicatingDeserializer::default(); - proto.try_into_physical_plan_with_context(ctx, &deserializer) + proto.try_into_physical_plan_with_converter(ctx, codec, &deserializer) } fn execution_plan_to_proto( @@ -4060,14 +4042,15 @@ impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter { fn proto_to_physical_expr( &self, proto: &protobuf::PhysicalExprNode, + ctx: &TaskContext, input_schema: &Schema, - ctx: &PhysicalPlanDecodeContext<'_>, + codec: &dyn PhysicalExtensionCodec, ) -> Result> where Self: Sized, { let deserializer = DeduplicatingDeserializer::default(); - deserializer.proto_to_physical_expr(proto, input_schema, ctx) + deserializer.proto_to_physical_expr(proto, ctx, input_schema, codec) } fn physical_expr_to_proto( @@ -4181,11 +4164,12 @@ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec { fn into_physical_plan( node: &Option>, - ctx: &PhysicalPlanDecodeContext<'_>, + ctx: &TaskContext, + codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if let Some(field) = node { - proto_converter.proto_to_execution_plan(field, ctx) + proto_converter.proto_to_execution_plan(ctx, codec, field) } else { Err(proto_error("Missing required field in protobuf")) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index e7d38b57a1522..ff359d09ef448 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3681,3 +3681,66 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> { roundtrip_test_sql_with_context(sql, &ctx).await } + +#[tokio::test] +async fn test_filter_exec_projection_serde_roundtrip() -> Result<()> { + use datafusion::physical_plan::memory::MemoryExec; + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input: Arc = + Arc::new(MemoryExec::try_new(&[], Arc::clone(&schema), None)?); + + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(0)))), + )); + + // Case 1: None -> should round-trip as None (return all columns) + let filter = + FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)).build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + None, + "None projection must stay None after roundtrip" + ); + + // Case 2: Some(vec![]) -> must survive as Some([]), NOT silently become None + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + Some(&vec![]), + "Empty projection Some([]) must survive roundtrip, not become None" + ); + + // Case 3: Some(vec![2, 0]) -> partial projection must survive + let filter = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(Some(vec![2, 0]))? + .build()?; + let proto = PhysicalPlanNode::try_from_physical_plan(Arc::new(filter) as _, &codec)?; + let roundtripped = proto.try_into_physical_plan(ctx.task_ctx().as_ref(), &codec)?; + let rt = roundtripped.as_any().downcast_ref::().unwrap(); + assert_eq!( + rt.projection(), + Some(&vec![2_usize, 0_usize]), + "Partial projection must survive roundtrip" + ); + + Ok(()) +}