Skip to content

Commit 1bb588e

Browse files
neilconwayalamb
andauthored
perf: Implement physical execution of uncorrelated scalar subqueries (#21240)
## Which issue does this PR close? - Closes #3781. - Closes #18181. ## Rationale for this change Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has three shortcomings: 1. Scalar subqueries that return > 1 row were allowed, producing incorrect query results. Such queries should instead result in a runtime error. 2. Performance. Evaluating scalar subqueries as a join requires going through the join machinery. More importantly, it means that UDFs that have specialized handling of scalar inputs cannot use those code paths for scalar subqueries, which often results in significantly slower query execution (e.g., #18181). It also makes filter pushdown for scalar subquery filters more difficult (#21324) 3. Uncorrelated scalar subqueries previously did not work in `ORDER BY` or `JOIN ON`, or as arguments to an aggregate function. Those cases are now supported. This PR introduces physical execution of uncorrelated scalar subqueries: * Uncorrelated subqueries are left in the plan by the optimizer, not rewritten into joins * The physical planner collects uncorrelated scalar subqueries and plans them recursively (supporting nested subqueries). We add a `ScalarSubqueryExec` plan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with a `ScalarSubqueryExpr`. * `ScalarSubqueryExec` manages the execution of the subqueries. Subquery evaluation is done in parallel (for a given query level), but at present it happens strictly before evaluation of the parent query. This might be improved in the future (#21591). * `ScalarSubqueryExpr` reads its value from a shared slot that `ScalarSubqueryExec` populates when the subquery finishes; the physical planner assigns each subquery its slot index via `ExecutionProps`. This architecture makes it easy to avoid the shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms. ## What changes are included in this PR? * Modify subquery rewriter to not transform subqueries -> joins * Collect and plan uncorrelated scalar subqueries in the physical planner, and wire up `ScalarSubqueryExpr` * Support for subqueries in physical plan serialization/deserialization using `PhysicalProtoConverterExtension` to wire up `ScalarSubqueryExpr` correctly * Support for subqueries in logical plan serialization/deserialization * Add various SLT tests and update expected plan shapes for some tests ## Are these changes tested? Yes. New SLT coverage for cardinality errors, `ORDER BY` / `JOIN ON` / aggregate-arg contexts, nested uncorrelated subqueries, duplicate-subquery deduplication, and partition-pruning filters; new roundtrip tests for logical and physical plan serialization. ## Are there any user-facing changes? SQL: * Uncorrelated scalar subqueries that return more than one row now result in a runtime error, instead of silently producing incorrect results. * Uncorrelated scalar subqueries now work in `ORDER BY`, `JOIN ON`, and as aggregate function arguments. Rust APIs: * In `datafusion-proto`, breaking changes to `Serializeable::from_bytes_with_registry` (renamed to `from_bytes_with_ctx`), `parse_expr` / `parse_sorts` / `parse_exprs`, and the `PhysicalProtoConverterExtension` trait. Plan shape: * `LogicalPlan::Subquery` nodes will now be preserved in the logical plan * Physical plans can now contain `ScalarSubqueryExec` plan node and `ScalarSubqueryExpr` expressions The wire format has also changed to include scalar subqueries. --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent ca1d39d commit 1bb588e

41 files changed

Lines changed: 3559 additions & 1110 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/large_files.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ jobs:
3434
fetch-depth: 0
3535
- name: Check size of new Git objects
3636
env:
37-
# 1 MB ought to be enough for anybody.
37+
# 1.5 MB ought to be enough for anybody.
3838
# TODO in case we may want to consciously commit a bigger file to the repo without using Git LFS we may disable the check e.g. with a label
39-
MAX_FILE_SIZE_BYTES: 1048576
39+
MAX_FILE_SIZE_BYTES: 1572864
4040
shell: bash
4141
run: |
4242
if [ "${{ github.event_name }}" = "merge_group" ]; then

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
//! See `main.rs` for how to run it.
1919
//!
20-
//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's
21-
//! interception methods (`serialize_physical_plan` and `deserialize_physical_plan`)
22-
//! to implement custom serialization logic.
20+
//! This example demonstrates how to use the `PhysicalProtoConverterExtension`
21+
//! trait's interception methods (`execution_plan_to_proto` and
22+
//! `proto_to_execution_plan`) to implement custom serialization logic.
2323
//!
2424
//! The key insight is that `FileScanConfig::expr_adapter_factory` is NOT serialized by
2525
//! default. This example shows how to:
@@ -28,9 +28,10 @@
2828
//! 3. Store the inner DataSourceExec (without adapter) as a child in the extension's inputs field
2929
//! 4. Unwrap and restore the adapter during deserialization
3030
//!
31-
//! This demonstrates nested serialization (protobuf outer, JSON inner) and the power
32-
//! of the `PhysicalExtensionCodec` interception pattern. Both plan and expression
33-
//! serialization route through the codec, enabling interception at every node in the tree.
31+
//! This demonstrates nested serialization (protobuf outer, JSON inner) and the
32+
//! power of `PhysicalProtoConverterExtension`. Both plan and expression
33+
//! serialization route through converter hooks, enabling interception at every
34+
//! node in the tree.
3435
3536
use std::fmt::Debug;
3637
use std::sync::Arc;
@@ -61,7 +62,7 @@ use datafusion_proto::bytes::{
6162
use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
6263
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
6364
use datafusion_proto::physical_plan::{
64-
PhysicalExtensionCodec, PhysicalProtoConverterExtension,
65+
PhysicalExtensionCodec, PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
6566
};
6667
use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType;
6768
use datafusion_proto::protobuf::{
@@ -177,7 +178,7 @@ pub async fn adapter_serialization() -> Result<()> {
177178
println!("\n=== Example Complete! ===");
178179
println!("Key takeaways:");
179180
println!(
180-
" 1. PhysicalExtensionCodec provides serialize_physical_plan/deserialize_physical_plan hooks"
181+
" 1. PhysicalProtoConverterExtension provides execution_plan_to_proto/proto_to_execution_plan hooks"
181182
);
182183
println!(" 2. Custom metadata can be wrapped as PhysicalExtensionNode");
183184
println!(" 3. Nested serialization (protobuf + JSON) works seamlessly");
@@ -303,9 +304,10 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
303304
_node: Arc<dyn ExecutionPlan>,
304305
_buf: &mut Vec<u8>,
305306
) -> Result<()> {
306-
// We don't need this for the example - we use serialize_physical_plan instead
307+
// We don't need this for the example - adapter wrapping happens in
308+
// `execution_plan_to_proto` instead.
307309
not_impl_err!(
308-
"try_encode not used - adapter wrapping happens in serialize_physical_plan"
310+
"try_encode not used - adapter wrapping happens in execution_plan_to_proto"
309311
)
310312
}
311313
}
@@ -371,9 +373,8 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
371373
// Interception point: override deserialization to unwrap adapters
372374
fn proto_to_execution_plan(
373375
&self,
374-
ctx: &TaskContext,
375-
extension_codec: &dyn PhysicalExtensionCodec,
376376
proto: &PhysicalPlanNode,
377+
ctx: &PhysicalPlanDecodeContext<'_>,
377378
) -> Result<Arc<dyn ExecutionPlan>> {
378379
// Check if this is our custom extension wrapper
379380
if let Some(PhysicalPlanType::Extension(extension)) = &proto.physical_plan_type
@@ -395,11 +396,7 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
395396
let inner_proto = &extension.inputs[0];
396397

397398
// Deserialize the inner plan
398-
let inner_plan = inner_proto.try_into_physical_plan_with_converter(
399-
ctx,
400-
extension_codec,
401-
self,
402-
)?;
399+
let inner_plan = self.default_proto_to_execution_plan(inner_proto, ctx)?;
403400

404401
// Recreate the adapter factory
405402
let adapter_factory = create_adapter_factory(&payload.adapter_metadata.tag);
@@ -409,17 +406,16 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
409406
}
410407

411408
// Not our extension - use default deserialization
412-
proto.try_into_physical_plan_with_converter(ctx, extension_codec, self)
409+
self.default_proto_to_execution_plan(proto, ctx)
413410
}
414411

415412
fn proto_to_physical_expr(
416413
&self,
417414
proto: &PhysicalExprNode,
418-
ctx: &TaskContext,
419415
input_schema: &Schema,
420-
codec: &dyn PhysicalExtensionCodec,
416+
ctx: &PhysicalPlanDecodeContext<'_>,
421417
) -> Result<Arc<dyn PhysicalExpr>> {
422-
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
418+
parse_physical_expr_with_converter(proto, input_schema, ctx, self)
423419
}
424420

425421
fn physical_expr_to_proto(

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
//! See `main.rs` for how to run it.
1919
//!
20-
//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's
21-
//! interception methods to implement expression deduplication during deserialization.
20+
//! This example demonstrates how to use the
21+
//! `PhysicalProtoConverterExtension` trait's interception methods to
22+
//! implement expression deduplication during deserialization.
2223
//!
2324
//! This pattern is inspired by PR #18192, which introduces expression caching
2425
//! to reduce memory usage when deserializing plans with duplicate expressions.
@@ -29,8 +30,9 @@
2930
//! 2. Reduce memory allocation during deserialization
3031
//! 3. Enable downstream optimizations that rely on Arc pointer equality
3132
//!
32-
//! This demonstrates the decorator pattern enabled by the `PhysicalExtensionCodec` trait,
33-
//! where all expression serialization/deserialization routes through the codec methods.
33+
//! This demonstrates the decorator pattern enabled by
34+
//! `PhysicalProtoConverterExtension`, where physical-expression
35+
//! serialization and deserialization route through converter hooks.
3436
3537
use std::collections::HashMap;
3638
use std::fmt::Debug;
@@ -49,7 +51,7 @@ use datafusion::prelude::SessionContext;
4951
use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
5052
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
5153
use datafusion_proto::physical_plan::{
52-
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
54+
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
5355
PhysicalProtoConverterExtension,
5456
};
5557
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
@@ -202,11 +204,10 @@ impl PhysicalExtensionCodec for CachingCodec {
202204
impl PhysicalProtoConverterExtension for CachingCodec {
203205
fn proto_to_execution_plan(
204206
&self,
205-
ctx: &TaskContext,
206-
extension_codec: &dyn PhysicalExtensionCodec,
207207
proto: &PhysicalPlanNode,
208+
ctx: &PhysicalPlanDecodeContext<'_>,
208209
) -> Result<Arc<dyn ExecutionPlan>> {
209-
proto.try_into_physical_plan_with_converter(ctx, extension_codec, self)
210+
self.default_proto_to_execution_plan(proto, ctx)
210211
}
211212

212213
fn execution_plan_to_proto(
@@ -225,9 +226,8 @@ impl PhysicalProtoConverterExtension for CachingCodec {
225226
fn proto_to_physical_expr(
226227
&self,
227228
proto: &PhysicalExprNode,
228-
ctx: &TaskContext,
229229
input_schema: &Schema,
230-
codec: &dyn PhysicalExtensionCodec,
230+
ctx: &PhysicalPlanDecodeContext<'_>,
231231
) -> Result<Arc<dyn PhysicalExpr>> {
232232
// Create cache key from protobuf bytes
233233
let mut key = Vec::new();
@@ -249,8 +249,7 @@ impl PhysicalProtoConverterExtension for CachingCodec {
249249
}
250250

251251
// Cache miss - deserialize and store
252-
let expr =
253-
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?;
252+
let expr = parse_physical_expr_with_converter(proto, input_schema, ctx, self)?;
254253

255254
// Store in cache
256255
{

datafusion/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ datafusion-session = { workspace = true }
144144
datafusion-sql = { workspace = true, optional = true }
145145
flate2 = { workspace = true, optional = true }
146146
futures = { workspace = true }
147+
indexmap = { workspace = true }
147148
itertools = { workspace = true }
148149
liblzma = { workspace = true, optional = true }
149150
log = { workspace = true }

0 commit comments

Comments
 (0)