Skip to content

Commit 9e70cdd

Browse files
alambkumarUjjawal
andauthored
[branch-53] fix: sqllogictest cannot convert <subquery> to Substrait (#19739) (#20897)
- Part of #19692 - Closes #16281 on branch-53 This PR: - Backports #19739 from @kumarUjjawal to the branch-53 line Co-authored-by: Kumar Ujjawal <ujjawalpathak6@gmail.com>
1 parent 2f2bf32 commit 9e70cdd

7 files changed

Lines changed: 297 additions & 52 deletions

File tree

datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@ pub(crate) fn try_to_substrait_field_reference(
7676
}
7777
}
7878

79+
/// Convert an outer reference column to a Substrait field reference.
80+
/// Outer reference columns reference columns from an outer query scope in correlated subqueries.
81+
/// We convert them the same way as regular columns since the subquery plan will be
82+
/// reconstructed with the proper schema context during consumption.
83+
pub fn from_outer_reference_column(
84+
col: &Column,
85+
schema: &DFSchemaRef,
86+
) -> datafusion::common::Result<Expression> {
87+
// OuterReferenceColumn is converted similarly to a regular column reference.
88+
// The schema provided should be the schema context in which the outer reference
89+
// column appears. During Substrait round-trip, the consumer will reconstruct
90+
// the outer reference based on the subquery context.
91+
let index = schema.index_of_column(col)?;
92+
substrait_field_ref(index)
93+
}
94+
7995
#[cfg(test)]
8096
mod tests {
8197
use super::*;

datafusion/substrait/src/logical_plan/producer/expr/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,17 @@ pub fn to_substrait_rex(
139139
}
140140
Expr::WindowFunction(expr) => producer.handle_window_function(expr, schema),
141141
Expr::InList(expr) => producer.handle_in_list(expr, schema),
142-
Expr::Exists(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
142+
Expr::Exists(expr) => producer.handle_exists(expr, schema),
143143
Expr::InSubquery(expr) => producer.handle_in_subquery(expr, schema),
144144
Expr::SetComparison(expr) => producer.handle_set_comparison(expr, schema),
145-
Expr::ScalarSubquery(expr) => {
146-
not_impl_err!("Cannot convert {expr:?} to Substrait")
147-
}
145+
Expr::ScalarSubquery(expr) => producer.handle_scalar_subquery(expr, schema),
148146
#[expect(deprecated)]
149147
Expr::Wildcard { .. } => not_impl_err!("Cannot convert {expr:?} to Substrait"),
150148
Expr::GroupingSet(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
151149
Expr::Placeholder(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),
152150
Expr::OuterReferenceColumn(_, _) => {
151+
// OuterReferenceColumn requires tracking outer query schema context for correlated
152+
// subqueries. This is a complex feature that is not yet implemented.
153153
not_impl_err!("Cannot convert {expr:?} to Substrait")
154154
}
155155
Expr::Unnest(expr) => not_impl_err!("Cannot convert {expr:?} to Substrait"),

datafusion/substrait/src/logical_plan/producer/expr/singular_or_list.rs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::producer::SubstraitProducer;
18+
use crate::logical_plan::producer::{SubstraitProducer, negate};
1919
use datafusion::common::DFSchemaRef;
2020
use datafusion::logical_expr::expr::InList;
21-
use substrait::proto::expression::{RexType, ScalarFunction, SingularOrList};
22-
use substrait::proto::function_argument::ArgType;
23-
use substrait::proto::{Expression, FunctionArgument};
21+
use substrait::proto::Expression;
22+
use substrait::proto::expression::{RexType, SingularOrList};
2423

2524
pub fn from_in_list(
2625
producer: &mut impl SubstraitProducer,
@@ -46,20 +45,7 @@ pub fn from_in_list(
4645
};
4746

4847
if *negated {
49-
let function_anchor = producer.register_function("not".to_string());
50-
51-
#[expect(deprecated)]
52-
Ok(Expression {
53-
rex_type: Some(RexType::ScalarFunction(ScalarFunction {
54-
function_reference: function_anchor,
55-
arguments: vec![FunctionArgument {
56-
arg_type: Some(ArgType::Value(substrait_or_list)),
57-
}],
58-
output_type: None,
59-
args: vec![],
60-
options: vec![],
61-
})),
62-
})
48+
Ok(negate(producer, substrait_or_list))
6349
} else {
6450
Ok(substrait_or_list)
6551
}

datafusion/substrait/src/logical_plan/producer/expr/subquery.rs

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::producer::SubstraitProducer;
18+
use crate::logical_plan::producer::{SubstraitProducer, negate};
1919
use datafusion::common::{DFSchemaRef, substrait_err};
20-
use datafusion::logical_expr::Operator;
21-
use datafusion::logical_expr::expr::{InSubquery, SetComparison, SetQuantifier};
22-
use substrait::proto::expression::subquery::InPredicate;
20+
use datafusion::logical_expr::expr::{Exists, InSubquery, SetComparison, SetQuantifier};
21+
use datafusion::logical_expr::{Operator, Subquery};
22+
use substrait::proto::Expression;
23+
use substrait::proto::expression::RexType;
2324
use substrait::proto::expression::subquery::set_comparison::{ComparisonOp, ReductionOp};
24-
use substrait::proto::expression::{RexType, ScalarFunction};
25-
use substrait::proto::function_argument::ArgType;
26-
use substrait::proto::{Expression, FunctionArgument};
25+
use substrait::proto::expression::subquery::{InPredicate, Scalar, SetPredicate};
2726

2827
pub fn from_in_subquery(
2928
producer: &mut impl SubstraitProducer,
@@ -54,20 +53,7 @@ pub fn from_in_subquery(
5453
))),
5554
};
5655
if *negated {
57-
let function_anchor = producer.register_function("not".to_string());
58-
59-
#[expect(deprecated)]
60-
Ok(Expression {
61-
rex_type: Some(RexType::ScalarFunction(ScalarFunction {
62-
function_reference: function_anchor,
63-
arguments: vec![FunctionArgument {
64-
arg_type: Some(ArgType::Value(substrait_subquery)),
65-
}],
66-
output_type: None,
67-
args: vec![],
68-
options: vec![],
69-
})),
70-
})
56+
Ok(negate(producer, substrait_subquery))
7157
} else {
7258
Ok(substrait_subquery)
7359
}
@@ -122,3 +108,56 @@ pub fn from_set_comparison(
122108
))),
123109
})
124110
}
111+
112+
/// Convert DataFusion ScalarSubquery to Substrait Scalar subquery type
113+
pub fn from_scalar_subquery(
114+
producer: &mut impl SubstraitProducer,
115+
subquery: &Subquery,
116+
_schema: &DFSchemaRef,
117+
) -> datafusion::common::Result<Expression> {
118+
let subquery_plan = producer.handle_plan(subquery.subquery.as_ref())?;
119+
120+
Ok(Expression {
121+
rex_type: Some(RexType::Subquery(Box::new(
122+
substrait::proto::expression::Subquery {
123+
subquery_type: Some(
124+
substrait::proto::expression::subquery::SubqueryType::Scalar(
125+
Box::new(Scalar {
126+
input: Some(subquery_plan),
127+
}),
128+
),
129+
),
130+
},
131+
))),
132+
})
133+
}
134+
135+
/// Convert DataFusion Exists expression to Substrait SetPredicate subquery type
136+
pub fn from_exists(
137+
producer: &mut impl SubstraitProducer,
138+
exists: &Exists,
139+
_schema: &DFSchemaRef,
140+
) -> datafusion::common::Result<Expression> {
141+
let subquery_plan = producer.handle_plan(exists.subquery.subquery.as_ref())?;
142+
143+
let substrait_exists = Expression {
144+
rex_type: Some(RexType::Subquery(Box::new(
145+
substrait::proto::expression::Subquery {
146+
subquery_type: Some(
147+
substrait::proto::expression::subquery::SubqueryType::SetPredicate(
148+
Box::new(SetPredicate {
149+
predicate_op: substrait::proto::expression::subquery::set_predicate::PredicateOp::Exists as i32,
150+
tuples: Some(subquery_plan),
151+
}),
152+
),
153+
),
154+
},
155+
))),
156+
};
157+
158+
if exists.negated {
159+
Ok(negate(producer, substrait_exists))
160+
} else {
161+
Ok(substrait_exists)
162+
}
163+
}

datafusion/substrait/src/logical_plan/producer/substrait_producer.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818
use crate::extensions::Extensions;
1919
use crate::logical_plan::producer::{
2020
from_aggregate, from_aggregate_function, from_alias, from_between, from_binary_expr,
21-
from_case, from_cast, from_column, from_distinct, from_empty_relation, from_filter,
22-
from_in_list, from_in_subquery, from_join, from_like, from_limit, from_literal,
23-
from_projection, from_repartition, from_scalar_function, from_set_comparison,
24-
from_sort, from_subquery_alias, from_table_scan, from_try_cast, from_unary_expr,
25-
from_union, from_values, from_window, from_window_function, to_substrait_rel,
26-
to_substrait_rex,
21+
from_case, from_cast, from_column, from_distinct, from_empty_relation, from_exists,
22+
from_filter, from_in_list, from_in_subquery, from_join, from_like, from_limit,
23+
from_literal, from_projection, from_repartition, from_scalar_function,
24+
from_scalar_subquery, from_set_comparison, from_sort, from_subquery_alias,
25+
from_table_scan, from_try_cast, from_unary_expr, from_union, from_values,
26+
from_window, from_window_function, to_substrait_rel, to_substrait_rex,
2727
};
2828
use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err};
2929
use datafusion::execution::SessionState;
3030
use datafusion::execution::registry::SerializerRegistry;
31+
use datafusion::logical_expr::Subquery;
3132
use datafusion::logical_expr::expr::{
32-
Alias, InList, InSubquery, SetComparison, WindowFunction,
33+
Alias, Exists, InList, InSubquery, SetComparison, WindowFunction,
3334
};
3435
use datafusion::logical_expr::{
3536
Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr, Extension,
@@ -372,6 +373,21 @@ pub trait SubstraitProducer: Send + Sync + Sized {
372373
) -> datafusion::common::Result<Expression> {
373374
from_set_comparison(self, set_comparison, schema)
374375
}
376+
fn handle_scalar_subquery(
377+
&mut self,
378+
subquery: &Subquery,
379+
schema: &DFSchemaRef,
380+
) -> datafusion::common::Result<Expression> {
381+
from_scalar_subquery(self, subquery, schema)
382+
}
383+
384+
fn handle_exists(
385+
&mut self,
386+
exists: &Exists,
387+
schema: &DFSchemaRef,
388+
) -> datafusion::common::Result<Expression> {
389+
from_exists(self, exists, schema)
390+
}
375391
}
376392

377393
pub struct DefaultSubstraitProducer<'a> {

datafusion/substrait/src/logical_plan/producer/utils.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use crate::logical_plan::producer::SubstraitProducer;
1919
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
2020
use datafusion::common::{DFSchemaRef, plan_err};
2121
use datafusion::logical_expr::SortExpr;
22-
use substrait::proto::SortField;
2322
use substrait::proto::sort_field::{SortDirection, SortKind};
23+
use substrait::proto::{Expression, SortField};
2424

2525
// Substrait wants a list of all field names, including nested fields from structs,
2626
// also from within e.g. lists and maps. However, it does not want the list and map field names
@@ -85,3 +85,28 @@ pub(crate) fn to_substrait_precision(time_unit: &TimeUnit) -> i32 {
8585
TimeUnit::Nanosecond => 9,
8686
}
8787
}
88+
89+
/// Wraps an expression with a `not()` function.
90+
pub(crate) fn negate(
91+
producer: &mut impl SubstraitProducer,
92+
expr: Expression,
93+
) -> Expression {
94+
let function_anchor = producer.register_function("not".to_string());
95+
96+
#[expect(deprecated)]
97+
Expression {
98+
rex_type: Some(substrait::proto::expression::RexType::ScalarFunction(
99+
substrait::proto::expression::ScalarFunction {
100+
function_reference: function_anchor,
101+
arguments: vec![substrait::proto::FunctionArgument {
102+
arg_type: Some(substrait::proto::function_argument::ArgType::Value(
103+
expr,
104+
)),
105+
}],
106+
output_type: None,
107+
args: vec![],
108+
options: vec![],
109+
},
110+
)),
111+
}
112+
}

0 commit comments

Comments
 (0)