Skip to content

Commit e701962

Browse files
Snowflake Dialect
1 parent 6acaa9c commit e701962

2 files changed

Lines changed: 201 additions & 23 deletions

File tree

datafusion/sql/src/unparser/plan.rs

Lines changed: 113 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,10 @@ impl Unparser<'_> {
423423
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
424424
// only one expression, which is the placeholder column generated by the rewriter.
425425
let unnest_input_type = if p.expr.len() == 1 {
426-
Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
426+
Self::check_unnest_placeholder_with_outer_ref(
427+
&p.expr[0],
428+
self.dialect.unnest_as_lateral_flatten(),
429+
)
427430
} else {
428431
None
429432
};
@@ -434,26 +437,98 @@ impl Unparser<'_> {
434437
self.try_unnest_to_lateral_flatten_sql(unnest)?
435438
{
436439
let alias_name = flatten_relation.alias_name().to_string();
437-
relation.flatten(flatten_relation);
438440

439-
// Only set the projection if it hasn't already been set by
440-
// an outer Projection (e.g. SELECT * FROM t, UNNEST(...)).
441-
// When the outer Projection already called
442-
// reconstruct_select_statement, the SELECT list is correct
443-
// and we must not replace it with just `_unnest."VALUE"`.
441+
// Check if the Unnest source is a real query (subselect)
442+
// vs an inline array (EmptyRelation).
443+
let inner_projection = match unnest.input.as_ref() {
444+
LogicalPlan::Projection(proj) => proj,
445+
other => {
446+
return internal_err!(
447+
"Unnest input is not a Projection: {other:?}"
448+
);
449+
}
450+
};
451+
452+
if matches!(
453+
inner_projection.input.as_ref(),
454+
LogicalPlan::EmptyRelation(_)
455+
) {
456+
// Inline array case (e.g. UNNEST([1,2,3])):
457+
// FLATTEN is the sole FROM source.
458+
relation.flatten(flatten_relation);
459+
460+
if !select.already_projected() {
461+
let value_expr =
462+
self.build_flatten_value_select_item(&alias_name);
463+
select.projection(vec![value_expr]);
464+
}
465+
466+
return self.select_to_sql_recursively(
467+
p.input.as_ref(),
468+
query,
469+
select,
470+
relation,
471+
);
472+
}
473+
474+
// Non-empty source (table, subquery, etc.):
475+
// The FLATTEN references columns/expressions from the source.
476+
// Convert the inner Projection's expression to SQL for the
477+
// FLATTEN INPUT, then store the FLATTEN to be added as a
478+
// CROSS JOIN after the source relation is processed.
479+
let first_expr = inner_projection.expr.first().ok_or_else(|| {
480+
DataFusionError::Internal(
481+
"Unnest inner projection has no expressions".to_string(),
482+
)
483+
})?;
484+
// Strip the __unnest_placeholder alias to get the raw expression
485+
let raw_expr = match first_expr {
486+
Expr::Alias(Alias { expr, .. }) => expr.as_ref(),
487+
other => other,
488+
};
489+
let input_sql = self.expr_to_sql(raw_expr)?;
490+
491+
let mut flatten = FlattenRelationBuilder::default();
492+
flatten.input_expr(input_sql);
493+
flatten.outer(unnest.options.preserve_nulls);
494+
444495
if !select.already_projected() {
445496
let value_expr =
446497
self.build_flatten_value_select_item(&alias_name);
447498
select.projection(vec![value_expr]);
448-
select.already_projected();
449499
}
450500

451-
return self.select_to_sql_recursively(
501+
// Recurse into the Unnest → inner source to set the primary
502+
// relation (table scan, subquery, etc.), then add FLATTEN
503+
// as a CROSS JOIN.
504+
self.select_to_sql_recursively(
452505
p.input.as_ref(),
453506
query,
454507
select,
455508
relation,
456-
);
509+
)?;
510+
511+
let flatten_factor = flatten.build().map_err(|e| {
512+
DataFusionError::Internal(format!("Failed to build FLATTEN: {e}"))
513+
})?;
514+
let cross_join = ast::Join {
515+
relation: flatten_factor,
516+
global: false,
517+
join_operator: ast::JoinOperator::CrossJoin(
518+
ast::JoinConstraint::None,
519+
),
520+
};
521+
if let Some(mut from) = select.pop_from() {
522+
from.push_join(cross_join);
523+
select.push_from(from);
524+
} else {
525+
// No existing FROM — create one with just the FLATTEN
526+
let mut twj = TableWithJoinsBuilder::default();
527+
twj.push_join(cross_join);
528+
select.push_from(twj);
529+
}
530+
531+
return Ok(());
457532
}
458533
if self.dialect.unnest_as_table_factor()
459534
&& unnest_input_type.is_some()
@@ -1153,15 +1228,35 @@ impl Unparser<'_> {
11531228
/// - If the column is not a placeholder column, return [None].
11541229
///
11551230
/// `outer_ref` is the display result of [Expr::OuterReferenceColumn]
1156-
fn check_unnest_placeholder_with_outer_ref(expr: &Expr) -> Option<UnnestInputType> {
1157-
if let Expr::Alias(Alias { expr, .. }) = expr
1158-
&& let Expr::Column(Column { name, .. }) = expr.as_ref()
1159-
&& let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER)
1160-
{
1161-
if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1162-
return Some(UnnestInputType::OuterReference);
1231+
/// When `deep_peel` is true, peels through multiple Alias layers to find
1232+
/// the inner Column. This is needed for Snowflake FLATTEN where user aliases
1233+
/// (e.g. `AS items`) add extra Alias wrappers around the placeholder column.
1234+
fn check_unnest_placeholder_with_outer_ref(
1235+
expr: &Expr,
1236+
deep_peel: bool,
1237+
) -> Option<UnnestInputType> {
1238+
let inner = match expr {
1239+
Expr::Alias(Alias { expr, .. }) => {
1240+
if deep_peel {
1241+
// Peel through all Alias layers
1242+
let mut e = expr.as_ref();
1243+
while let Expr::Alias(Alias { expr: next, .. }) = e {
1244+
e = next.as_ref();
1245+
}
1246+
e
1247+
} else {
1248+
expr.as_ref()
1249+
}
1250+
}
1251+
_ => return None,
1252+
};
1253+
if let Expr::Column(Column { name, .. }) = inner {
1254+
if let Some(prefix) = name.strip_prefix(UNNEST_PLACEHOLDER) {
1255+
if prefix.starts_with(&format!("({OUTER_REFERENCE_COLUMN_PREFIX}(")) {
1256+
return Some(UnnestInputType::OuterReference);
1257+
}
1258+
return Some(UnnestInputType::Scalar);
11631259
}
1164-
return Some(UnnestInputType::Scalar);
11651260
}
11661261
None
11671262
}

datafusion/sql/tests/cases/plan_to_sql.rs

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ use datafusion_expr::test::function_stub::{
2626
count_udaf, max, max_udaf, min_udaf, sum, sum_udaf,
2727
};
2828
use datafusion_expr::{
29-
EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder, Union,
30-
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, WindowFrame,
29+
ColumnarValue, EmptyRelation, Expr, Extension, LogicalPlan, LogicalPlanBuilder,
30+
ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Union,
31+
UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Volatility, WindowFrame,
3132
WindowFunctionDefinition, cast, col, lit, table_scan, wildcard,
3233
};
3334
use datafusion_functions::unicode;
@@ -3032,7 +3033,7 @@ fn snowflake_unnest_to_lateral_flatten_cross_join_inline() -> Result<(), DataFus
30323033
sql: "SELECT * FROM UNNEST([1,2,3]) u(c1) JOIN j1 ON u.c1 = j1.j1_id",
30333034
parser_dialect: GenericDialect {},
30343035
unparser_dialect: snowflake,
3035-
expected: @r#"SELECT "u"."c1", "j1"."j1_id", "j1"."j1_string" FROM (SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest) AS "u" INNER JOIN "j1" ON ("u"."c1" = "j1"."j1_id")"#,
3036+
expected: @r#"SELECT "u"."c1", "j1"."j1_id", "j1"."j1_string" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "u" INNER JOIN "j1" ON ("u"."c1" = "j1"."j1_id")"#,
30363037
);
30373038
Ok(())
30383039
}
@@ -3072,7 +3073,7 @@ fn snowflake_flatten_select_unnest_with_alias() -> Result<(), DataFusionError> {
30723073
sql: "SELECT UNNEST([1,2,3]) as c1",
30733074
parser_dialect: GenericDialect {},
30743075
unparser_dialect: snowflake,
3075-
expected: @r#"SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#,
3076+
expected: @r#"SELECT _unnest."VALUE" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest"#,
30763077
);
30773078
Ok(())
30783079
}
@@ -3096,7 +3097,89 @@ fn snowflake_flatten_from_unnest_with_table_alias() -> Result<(), DataFusionErro
30963097
sql: "SELECT * FROM UNNEST([1,2,3]) AS t1 (c1)",
30973098
parser_dialect: GenericDialect {},
30983099
unparser_dialect: snowflake,
3099-
expected: @r#"SELECT "t1"."c1" FROM (SELECT "_unnest"."VALUE" AS "c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS _unnest) AS "t1""#,
3100+
expected: @r#"SELECT "t1"."c1" FROM LATERAL FLATTEN(INPUT => [1, 2, 3]) AS "t1""#,
3101+
);
3102+
Ok(())
3103+
}
3104+
3105+
#[test]
3106+
fn snowflake_flatten_unnest_from_subselect() -> Result<(), DataFusionError> {
3107+
// UNNEST operating on an array column produced by a subselect.
3108+
// Uses unnest_table which has array_col (List<Int64>).
3109+
// The filter uses array_col IS NOT NULL — a simple predicate
3110+
// that doesn't involve struct types (which Snowflake FLATTEN can't handle).
3111+
let snowflake = SnowflakeDialect::new();
3112+
roundtrip_statement_with_dialect_helper!(
3113+
sql: "SELECT UNNEST(array_col) FROM (SELECT array_col FROM unnest_table WHERE array_col IS NOT NULL LIMIT 3)",
3114+
parser_dialect: GenericDialect {},
3115+
unparser_dialect: snowflake,
3116+
expected: @r#"SELECT _unnest."VALUE" FROM (SELECT "unnest_table"."array_col" FROM "unnest_table" WHERE "unnest_table"."array_col" IS NOT NULL LIMIT 3) CROSS JOIN LATERAL FLATTEN(INPUT => "unnest_table"."array_col") AS _unnest"#,
31003117
);
31013118
Ok(())
31023119
}
3120+
3121+
/// Dummy scalar UDF for testing — takes a string and returns List<Int64>.
3122+
#[derive(Debug, PartialEq, Eq, Hash)]
3123+
struct JsonGetArrayUdf {
3124+
signature: Signature,
3125+
}
3126+
3127+
impl JsonGetArrayUdf {
3128+
fn new() -> Self {
3129+
Self {
3130+
signature: Signature::exact(vec![DataType::Utf8], Volatility::Immutable),
3131+
}
3132+
}
3133+
}
3134+
3135+
impl ScalarUDFImpl for JsonGetArrayUdf {
3136+
fn name(&self) -> &str {
3137+
"json_get_array"
3138+
}
3139+
fn signature(&self) -> &Signature {
3140+
&self.signature
3141+
}
3142+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
3143+
Ok(DataType::List(Arc::new(Field::new_list_field(
3144+
DataType::Int64,
3145+
true,
3146+
))))
3147+
}
3148+
fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
3149+
unimplemented!("test stub")
3150+
}
3151+
}
3152+
3153+
#[test]
3154+
fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> {
3155+
// UNNEST on a UDF result: json_get_array(col) returns List<Int64>,
3156+
// then UNNEST flattens it. This simulates a common Snowflake pattern
3157+
// where a UDF parses JSON into an array, then FLATTEN expands it.
3158+
let sql = "SELECT UNNEST(json_get_array(j1_string)) AS items FROM j1 LIMIT 5";
3159+
3160+
let statement = Parser::new(&GenericDialect {})
3161+
.try_with_sql(sql)?
3162+
.parse_statement()?;
3163+
3164+
let state = MockSessionState::default()
3165+
.with_aggregate_function(max_udaf())
3166+
.with_aggregate_function(min_udaf())
3167+
.with_scalar_function(Arc::new(ScalarUDF::new_from_impl(JsonGetArrayUdf::new())))
3168+
.with_expr_planner(Arc::new(CoreFunctionPlanner::default()))
3169+
.with_expr_planner(Arc::new(NestedFunctionPlanner))
3170+
.with_expr_planner(Arc::new(FieldAccessPlanner));
3171+
3172+
let context = MockContextProvider { state };
3173+
let sql_to_rel = SqlToRel::new(&context);
3174+
let plan = sql_to_rel
3175+
.sql_statement_to_plan(statement)
3176+
.unwrap_or_else(|e| panic!("Failed to parse sql: {sql}\n{e}"));
3177+
3178+
let snowflake = SnowflakeDialect::new();
3179+
let unparser = Unparser::new(&snowflake);
3180+
let result = unparser.plan_to_sql(&plan)?;
3181+
let actual = result.to_string();
3182+
3183+
insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" FROM "j1" CROSS JOIN LATERAL FLATTEN(INPUT => json_get_array("j1"."j1_string")) AS _unnest LIMIT 5"#);
3184+
Ok(())
3185+
}

0 commit comments

Comments
 (0)