Skip to content

Commit 0423028

Browse files
alexanderbianchigabotechs
authored andcommitted
Add support for nested lists in substrait consumer (apache#20953)
## Rationale for this change Adds support for nested array expressions to the substrait consumer. Defined in [algebra.proto.](https://github.com/substrait-io/substrait/blob/main/proto/substrait/algebra.proto#L1162) ## What changes are included in this PR? Implements the previously unimplemented `consume_nested` for `NestedType::List`. ## Are these changes tested? Yes, unit tests match the testing pattern for substrait literals in `consumer/expr/literal.rs`. Snapshot test is added for `make_array()` path. ## Are there any user-facing changes? User's will now be able to send nested list expressions. This change is purely additive all previous consumable Substrait plans will continue to work. (cherry picked from commit 4010a55)
1 parent 2d96876 commit 0423028

File tree

5 files changed

+257
-4
lines changed

5 files changed

+257
-4
lines changed

datafusion/substrait/src/logical_plan/consumer/expr/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod field_reference;
2121
mod function_arguments;
2222
mod if_then;
2323
mod literal;
24+
mod nested;
2425
mod scalar_function;
2526
mod singular_or_list;
2627
mod subquery;
@@ -32,6 +33,7 @@ pub use field_reference::*;
3233
pub use function_arguments::*;
3334
pub use if_then::*;
3435
pub use literal::*;
36+
pub use nested::*;
3537
pub use scalar_function::*;
3638
pub use singular_or_list::*;
3739
pub use subquery::*;
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use datafusion::common::{DFSchema, not_impl_err, substrait_err};
20+
use datafusion::execution::FunctionRegistry;
21+
use datafusion::logical_expr::Expr;
22+
use substrait::proto::expression::Nested;
23+
use substrait::proto::expression::nested::NestedType;
24+
25+
/// Converts a Substrait [Nested] expression into a DataFusion [Expr].
26+
///
27+
/// Substrait Nested expressions represent complex type constructors (list, struct, map)
28+
/// where elements are full expressions rather than just literals. This is used by
29+
/// producers that emit `Nested { list: ... }` for array construction, as opposed to
30+
/// `Literal { list: ... }` which only supports scalar values.
31+
pub async fn from_nested(
32+
consumer: &impl SubstraitConsumer,
33+
nested: &Nested,
34+
input_schema: &DFSchema,
35+
) -> datafusion::common::Result<Expr> {
36+
let Some(nested_type) = &nested.nested_type else {
37+
return substrait_err!("Nested expression requires a nested_type");
38+
};
39+
40+
match nested_type {
41+
NestedType::List(list) => {
42+
if list.values.is_empty() {
43+
return substrait_err!(
44+
"Empty Nested lists are not supported; use Literal.empty_list instead"
45+
);
46+
}
47+
48+
let mut args = Vec::with_capacity(list.values.len());
49+
for value in &list.values {
50+
args.push(consumer.consume_expression(value, input_schema).await?);
51+
}
52+
53+
let make_array_udf = consumer.get_function_registry().udf("make_array")?;
54+
Ok(Expr::ScalarFunction(
55+
datafusion::logical_expr::expr::ScalarFunction::new_udf(
56+
make_array_udf,
57+
args,
58+
),
59+
))
60+
}
61+
NestedType::Struct(_) => {
62+
not_impl_err!("Nested struct expressions are not yet supported")
63+
}
64+
NestedType::Map(_) => {
65+
not_impl_err!("Nested map expressions are not yet supported")
66+
}
67+
}
68+
}
69+
70+
#[cfg(test)]
71+
mod tests {
72+
use super::*;
73+
use crate::logical_plan::consumer::utils::tests::test_consumer;
74+
use substrait::proto::expression::Literal;
75+
use substrait::proto::expression::nested::List;
76+
use substrait::proto::{self, Expression};
77+
78+
fn make_i64_literal(value: i64) -> Expression {
79+
Expression {
80+
rex_type: Some(proto::expression::RexType::Literal(Literal {
81+
nullable: false,
82+
type_variation_reference: 0,
83+
literal_type: Some(proto::expression::literal::LiteralType::I64(value)),
84+
})),
85+
}
86+
}
87+
88+
#[tokio::test]
89+
async fn nested_list_with_literals() -> datafusion::common::Result<()> {
90+
let consumer = test_consumer();
91+
let schema = DFSchema::empty();
92+
let nested = Nested {
93+
nullable: false,
94+
type_variation_reference: 0,
95+
nested_type: Some(NestedType::List(List {
96+
values: vec![
97+
make_i64_literal(1),
98+
make_i64_literal(2),
99+
make_i64_literal(3),
100+
],
101+
})),
102+
};
103+
104+
let expr = from_nested(&consumer, &nested, &schema).await?;
105+
assert_eq!(
106+
format!("{expr}"),
107+
"make_array(Int64(1), Int64(2), Int64(3))"
108+
);
109+
110+
Ok(())
111+
}
112+
113+
#[tokio::test]
114+
async fn nested_list_empty_rejected() -> datafusion::common::Result<()> {
115+
let consumer = test_consumer();
116+
let schema = DFSchema::empty();
117+
let nested = Nested {
118+
nullable: true,
119+
type_variation_reference: 0,
120+
nested_type: Some(NestedType::List(List { values: vec![] })),
121+
};
122+
123+
let result = from_nested(&consumer, &nested, &schema).await;
124+
assert!(result.is_err());
125+
assert!(
126+
result
127+
.unwrap_err()
128+
.to_string()
129+
.contains("Empty Nested lists are not supported")
130+
);
131+
132+
Ok(())
133+
}
134+
135+
#[tokio::test]
136+
async fn nested_missing_type() -> datafusion::common::Result<()> {
137+
let consumer = test_consumer();
138+
let schema = DFSchema::empty();
139+
let nested = Nested {
140+
nullable: false,
141+
type_variation_reference: 0,
142+
nested_type: None,
143+
};
144+
145+
let result = from_nested(&consumer, &nested, &schema).await;
146+
assert!(result.is_err());
147+
assert!(result.unwrap_err().to_string().contains("nested_type"));
148+
149+
Ok(())
150+
}
151+
}

datafusion/substrait/src/logical_plan/consumer/substrait_consumer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use super::{
1919
from_aggregate_rel, from_cast, from_cross_rel, from_exchange_rel, from_fetch_rel,
2020
from_field_reference, from_filter_rel, from_if_then, from_join_rel, from_literal,
21-
from_project_rel, from_read_rel, from_scalar_function, from_set_rel,
21+
from_nested, from_project_rel, from_read_rel, from_scalar_function, from_set_rel,
2222
from_singular_or_list, from_sort_rel, from_subquery, from_substrait_rel,
2323
from_substrait_rex, from_window_function,
2424
};
@@ -342,10 +342,10 @@ pub trait SubstraitConsumer: Send + Sync + Sized {
342342

343343
async fn consume_nested(
344344
&self,
345-
_expr: &Nested,
346-
_input_schema: &DFSchema,
345+
expr: &Nested,
346+
input_schema: &DFSchema,
347347
) -> datafusion::common::Result<Expr> {
348-
not_impl_err!("Nested expression not supported")
348+
from_nested(self, expr, input_schema).await
349349
}
350350

351351
async fn consume_enum(

datafusion/substrait/tests/cases/logical_plans.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,27 @@ mod tests {
270270

271271
Ok(())
272272
}
273+
274+
#[tokio::test]
275+
async fn nested_list_expressions() -> Result<()> {
276+
// Tests that a Substrait Nested list expression containing non-literal
277+
// expressions (column references) uses the make_array UDF.
278+
let proto_plan =
279+
read_json("tests/testdata/test_plans/nested_list_expressions.substrait.json");
280+
let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto_plan)?;
281+
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
282+
283+
assert_snapshot!(
284+
plan,
285+
@r"
286+
Projection: make_array(DATA.a, DATA.b) AS my_list
287+
TableScan: DATA
288+
"
289+
);
290+
291+
// Trigger execution to ensure plan validity
292+
DataFrame::new(ctx.state(), plan).show().await?;
293+
294+
Ok(())
295+
}
273296
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
{
2+
"relations": [
3+
{
4+
"root": {
5+
"input": {
6+
"project": {
7+
"common": {
8+
"emit": {
9+
"outputMapping": [2]
10+
}
11+
},
12+
"input": {
13+
"read": {
14+
"common": {
15+
"direct": {}
16+
},
17+
"baseSchema": {
18+
"names": ["a", "b"],
19+
"struct": {
20+
"types": [
21+
{
22+
"i32": {
23+
"nullability": "NULLABILITY_NULLABLE"
24+
}
25+
},
26+
{
27+
"i32": {
28+
"nullability": "NULLABILITY_NULLABLE"
29+
}
30+
}
31+
],
32+
"nullability": "NULLABILITY_REQUIRED"
33+
}
34+
},
35+
"namedTable": {
36+
"names": ["DATA"]
37+
}
38+
}
39+
},
40+
"expressions": [
41+
{
42+
"nested": {
43+
"nullable": false,
44+
"list": {
45+
"values": [
46+
{
47+
"selection": {
48+
"directReference": {
49+
"structField": {
50+
"field": 0
51+
}
52+
},
53+
"rootReference": {}
54+
}
55+
},
56+
{
57+
"selection": {
58+
"directReference": {
59+
"structField": {
60+
"field": 1
61+
}
62+
},
63+
"rootReference": {}
64+
}
65+
}
66+
]
67+
}
68+
}
69+
}
70+
]
71+
}
72+
},
73+
"names": ["my_list"]
74+
}
75+
}
76+
]
77+
}

0 commit comments

Comments
 (0)