Skip to content

Commit 909d8c6

Browse files
More snowflake dialect fixes for unnest
1 parent 065e111 commit 909d8c6

2 files changed

Lines changed: 221 additions & 43 deletions

File tree

datafusion/sql/src/unparser/plan.rs

Lines changed: 122 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -437,30 +437,31 @@ impl Unparser<'_> {
437437
};
438438
if self.dialect.unnest_as_lateral_flatten()
439439
&& unnest_input_type.is_some()
440-
&& let LogicalPlan::Unnest(unnest) = p.input.as_ref()
440+
&& let Some(unnest) = Self::peel_to_unnest(p.input.as_ref())
441441
&& let Some(flatten_relation) =
442442
self.try_unnest_to_lateral_flatten_sql(unnest)?
443443
{
444444
let alias_name = flatten_relation.alias_name().to_string();
445445

446446
// Check if the Unnest source is a real query (subselect)
447447
// vs an inline array (EmptyRelation).
448-
let inner_projection = match unnest.input.as_ref() {
449-
LogicalPlan::Projection(proj) => proj,
450-
LogicalPlan::SubqueryAlias(alias) => match alias.input.as_ref() {
451-
LogicalPlan::Projection(proj) => proj,
452-
other => {
453-
return internal_err!(
454-
"Unnest input (through SubqueryAlias) is not a Projection: {other:?}"
455-
);
456-
}
457-
},
458-
other => {
459-
return internal_err!(
460-
"Unnest input is not a Projection: {other:?}"
461-
);
462-
}
463-
};
448+
let inner_projection =
449+
Self::peel_to_inner_projection(unnest.input.as_ref())
450+
.ok_or_else(|| {
451+
DataFusionError::Internal(format!(
452+
"Unnest input is not a Projection: {:?}",
453+
unnest.input
454+
))
455+
})?;
456+
457+
// Apply any intermediate Limit/Sort modifiers and get
458+
// a reference to the Unnest LogicalPlan node for recursion.
459+
// This bypasses the Limit/Sort handlers (which would create
460+
// unwanted derived subqueries when already_projected is set).
461+
let unnest_plan = self.apply_transparent_and_find_unnest_plan(
462+
p.input.as_ref(),
463+
query,
464+
)?;
464465

465466
if matches!(
466467
inner_projection.input.as_ref(),
@@ -479,7 +480,7 @@ impl Unparser<'_> {
479480
}
480481

481482
return self.select_to_sql_recursively(
482-
p.input.as_ref(),
483+
unnest_plan,
483484
query,
484485
select,
485486
relation,
@@ -518,12 +519,7 @@ impl Unparser<'_> {
518519
// Recurse into the Unnest → inner source to set the primary
519520
// relation (table scan, subquery, etc.), then add FLATTEN
520521
// as a CROSS JOIN.
521-
self.select_to_sql_recursively(
522-
p.input.as_ref(),
523-
query,
524-
select,
525-
relation,
526-
)?;
522+
self.select_to_sql_recursively(unnest_plan, query, select, relation)?;
527523

528524
let flatten_factor = flatten.build().map_err(|e| {
529525
DataFusionError::Internal(format!("Failed to build FLATTEN: {e}"))
@@ -550,14 +546,18 @@ impl Unparser<'_> {
550546
if self.dialect.unnest_as_table_factor()
551547
&& unnest_input_type.is_some()
552548
&& user_alias.is_none() // Skip if user alias present — fall through to reconstruct_select_statement which preserves aliases
553-
&& let LogicalPlan::Unnest(unnest) = &p.input.as_ref()
549+
&& let Some(unnest) = Self::peel_to_unnest(p.input.as_ref())
554550
&& let Some(unnest_relation) =
555551
self.try_unnest_to_table_factor_sql(unnest)?
556552
{
557553
relation.unnest(unnest_relation);
558-
return self.select_to_sql_recursively(
554+
let unnest_plan = self.apply_transparent_and_find_unnest_plan(
559555
p.input.as_ref(),
560556
query,
557+
)?;
558+
return self.select_to_sql_recursively(
559+
unnest_plan,
560+
query,
561561
select,
562562
relation,
563563
);
@@ -1201,13 +1201,9 @@ impl Unparser<'_> {
12011201
// | Projection: table.col1, table.col2 AS UNNEST(table.col2)
12021202
// | Filter: table.col3 = Int64(3)
12031203
// | TableScan: table projection=None
1204-
if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
1205-
// continue with projection input
1206-
self.select_to_sql_recursively(&p.input, query, select, relation)
1207-
} else if let LogicalPlan::SubqueryAlias(alias) = unnest.input.as_ref()
1208-
&& let LogicalPlan::Projection(p) = alias.input.as_ref()
1209-
{
1210-
// SubqueryAlias wraps the Projection (e.g. passthrough tables)
1204+
if let Some(p) = Self::peel_to_inner_projection(unnest.input.as_ref()) {
1205+
// Skip the inner Projection (synthetic rewriter node)
1206+
// and continue with its input.
12111207
self.select_to_sql_recursively(&p.input, query, select, relation)
12121208
} else {
12131209
internal_err!("Unnest input is not a Projection: {unnest:?}")
@@ -1242,6 +1238,96 @@ impl Unparser<'_> {
12421238
}
12431239
}
12441240

1241+
/// Walk through "transparent" plan nodes (Limit, Sort) to find an Unnest.
1242+
///
1243+
/// The DataFusion optimizer may insert Limit or Sort between the outer
1244+
/// Projection and the Unnest node. These nodes modify result quantity or
1245+
/// ordering but do not change the plan shape for unnest detection. The
1246+
/// normal recursion in [`Self::select_to_sql_recursively`] handles their
1247+
/// SQL rendering (LIMIT/OFFSET/ORDER BY); this helper only needs to
1248+
/// locate the Unnest so the FLATTEN / table-factor code path can fire.
1249+
fn peel_to_unnest(plan: &LogicalPlan) -> Option<&Unnest> {
1250+
match plan {
1251+
LogicalPlan::Unnest(unnest) => Some(unnest),
1252+
LogicalPlan::Limit(limit) => Self::peel_to_unnest(limit.input.as_ref()),
1253+
LogicalPlan::Sort(sort) => Self::peel_to_unnest(sort.input.as_ref()),
1254+
_ => None,
1255+
}
1256+
}
1257+
1258+
/// Walk through "transparent" plan nodes (SubqueryAlias, Limit, Sort) to
1259+
/// find the inner Projection that feeds an Unnest node.
1260+
///
1261+
/// The inner Projection is created by the `RecursiveUnnestRewriter` and
1262+
/// contains the array expression that the Unnest operates on. A
1263+
/// `SubqueryAlias` (e.g. from a virtual/passthrough table) may wrap the
1264+
/// Projection; Limit/Sort are also handled for robustness.
1265+
fn peel_to_inner_projection(plan: &LogicalPlan) -> Option<&Projection> {
1266+
match plan {
1267+
LogicalPlan::Projection(p) => Some(p),
1268+
LogicalPlan::SubqueryAlias(alias) => {
1269+
Self::peel_to_inner_projection(alias.input.as_ref())
1270+
}
1271+
LogicalPlan::Limit(limit) => {
1272+
Self::peel_to_inner_projection(limit.input.as_ref())
1273+
}
1274+
LogicalPlan::Sort(sort) => {
1275+
Self::peel_to_inner_projection(sort.input.as_ref())
1276+
}
1277+
_ => None,
1278+
}
1279+
}
1280+
1281+
/// Walk through transparent nodes (Limit, Sort) between the outer
1282+
/// Projection and the Unnest, applying their SQL modifiers (LIMIT,
1283+
/// OFFSET, ORDER BY) to the query builder. Returns a reference to the
1284+
/// Unnest `LogicalPlan` node so the caller can recurse into it directly,
1285+
/// bypassing the intermediate handlers that would otherwise create
1286+
/// unwanted derived subqueries.
1287+
fn apply_transparent_and_find_unnest_plan<'a>(
1288+
&self,
1289+
plan: &'a LogicalPlan,
1290+
query: &mut Option<QueryBuilder>,
1291+
) -> Result<&'a LogicalPlan> {
1292+
match plan {
1293+
LogicalPlan::Unnest(_) => Ok(plan),
1294+
LogicalPlan::Limit(limit) => {
1295+
if let Some(fetch) = &limit.fetch
1296+
&& let Some(q) = query.as_mut()
1297+
{
1298+
q.limit(Some(self.expr_to_sql(fetch)?));
1299+
}
1300+
if let Some(skip) = &limit.skip
1301+
&& let Some(q) = query.as_mut()
1302+
{
1303+
q.offset(Some(ast::Offset {
1304+
rows: ast::OffsetRows::None,
1305+
value: self.expr_to_sql(skip)?,
1306+
}));
1307+
}
1308+
self.apply_transparent_and_find_unnest_plan(limit.input.as_ref(), query)
1309+
}
1310+
LogicalPlan::Sort(sort) => {
1311+
let Some(query_ref) = query.as_mut() else {
1312+
return internal_err!(
1313+
"Sort between Projection and Unnest requires a statement context."
1314+
);
1315+
};
1316+
if let Some(fetch) = sort.fetch {
1317+
query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
1318+
fetch.to_string(),
1319+
false,
1320+
))));
1321+
}
1322+
query_ref.order_by(self.sorts_to_sql(&sort.expr)?);
1323+
self.apply_transparent_and_find_unnest_plan(sort.input.as_ref(), query)
1324+
}
1325+
other => {
1326+
internal_err!("Unexpected node between Projection and Unnest: {other:?}")
1327+
}
1328+
}
1329+
}
1330+
12451331
/// Try to find the placeholder column name generated by `RecursiveUnnestRewriter`.
12461332
///
12471333
/// - If the column is a placeholder column match the pattern `Expr::Alias(Expr::Column("__unnest_placeholder(...)"))`,
@@ -1338,16 +1424,9 @@ impl Unparser<'_> {
13381424
&self,
13391425
unnest: &Unnest,
13401426
) -> Result<Option<FlattenRelationBuilder>> {
1341-
let projection = match unnest.input.as_ref() {
1342-
LogicalPlan::Projection(p) => p,
1343-
LogicalPlan::SubqueryAlias(alias) => {
1344-
if let LogicalPlan::Projection(p) = alias.input.as_ref() {
1345-
p
1346-
} else {
1347-
return Ok(None);
1348-
}
1349-
}
1350-
_ => return Ok(None),
1427+
let Some(projection) = Self::peel_to_inner_projection(unnest.input.as_ref())
1428+
else {
1429+
return Ok(None);
13511430
};
13521431

13531432
// For now, handle the simple case of a single expression to flatten.

datafusion/sql/tests/cases/plan_to_sql.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3184,6 +3184,105 @@ fn snowflake_flatten_unnest_udf_result() -> Result<(), DataFusionError> {
31843184
Ok(())
31853185
}
31863186

3187+
#[test]
3188+
fn snowflake_flatten_limit_between_projection_and_unnest() -> Result<(), DataFusionError>
3189+
{
3190+
// Build: Projection → Limit → Unnest → Projection → TableScan
3191+
// The optimizer can insert a Limit between the outer Projection and the
3192+
// Unnest. The FLATTEN code path must look through transparent nodes
3193+
// (Limit, Sort) to find the Unnest.
3194+
let schema = Schema::new(vec![Field::new(
3195+
"items",
3196+
DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
3197+
true,
3198+
)]);
3199+
3200+
let plan = table_scan(Some("source"), &schema, None)?
3201+
.project(vec![col("items").alias("__unnest_placeholder(items)")])?
3202+
.unnest_column("__unnest_placeholder(items)")?
3203+
.limit(0, Some(5))? // Limit BETWEEN outer Projection and Unnest
3204+
.project(vec![col("__unnest_placeholder(items)").alias("item")])?
3205+
.build()?;
3206+
3207+
let snowflake = SnowflakeDialect::new();
3208+
let unparser = Unparser::new(&snowflake);
3209+
let result = unparser.plan_to_sql(&plan)?;
3210+
let actual = result.to_string();
3211+
3212+
// Must contain LATERAL FLATTEN — the Limit must not prevent FLATTEN detection
3213+
insta::assert_snapshot!(actual, @r#"SELECT _unnest."VALUE" AS "item" FROM "source" CROSS JOIN LATERAL FLATTEN(INPUT => "source"."items", OUTER => true) AS _unnest LIMIT 5"#);
3214+
Ok(())
3215+
}
3216+
3217+
#[test]
3218+
fn snowflake_flatten_sort_between_projection_and_unnest() -> Result<(), DataFusionError> {
3219+
// Build: Projection → Sort → Unnest → Projection → TableScan
3220+
// Same as Limit test but with Sort instead.
3221+
let schema = Schema::new(vec![Field::new(
3222+
"items",
3223+
DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
3224+
true,
3225+
)]);
3226+
3227+
let plan = table_scan(Some("source"), &schema, None)?
3228+
.project(vec![col("items").alias("__unnest_placeholder(items)")])?
3229+
.unnest_column("__unnest_placeholder(items)")?
3230+
.sort(vec![col("__unnest_placeholder(items)").sort(true, true)])?
3231+
.project(vec![col("__unnest_placeholder(items)").alias("item")])?
3232+
.build()?;
3233+
3234+
let snowflake = SnowflakeDialect::new();
3235+
let unparser = Unparser::new(&snowflake);
3236+
let result = unparser.plan_to_sql(&plan)?;
3237+
let actual = result.to_string();
3238+
3239+
// Must contain LATERAL FLATTEN — the Sort must not prevent FLATTEN detection
3240+
assert!(
3241+
actual.contains("LATERAL FLATTEN"),
3242+
"Expected LATERAL FLATTEN in SQL, got: {actual}"
3243+
);
3244+
assert!(
3245+
actual.contains("ORDER BY"),
3246+
"Expected ORDER BY in SQL, got: {actual}"
3247+
);
3248+
Ok(())
3249+
}
3250+
3251+
#[test]
3252+
fn snowflake_flatten_limit_between_projection_and_unnest_with_subquery_alias()
3253+
-> Result<(), DataFusionError> {
3254+
// Build: Projection → Limit → Unnest → SubqueryAlias → Projection → TableScan
3255+
// Combines the Limit and SubqueryAlias transparent node patterns.
3256+
let schema = Schema::new(vec![Field::new(
3257+
"items",
3258+
DataType::List(Arc::new(Field::new_list_field(DataType::Utf8, true))),
3259+
true,
3260+
)]);
3261+
3262+
let plan = table_scan(Some("source"), &schema, None)?
3263+
.project(vec![col("items").alias("__unnest_placeholder(items)")])?
3264+
.alias("t")?
3265+
.unnest_column("__unnest_placeholder(items)")?
3266+
.limit(0, Some(10))?
3267+
.project(vec![col("__unnest_placeholder(items)").alias("item")])?
3268+
.build()?;
3269+
3270+
let snowflake = SnowflakeDialect::new();
3271+
let unparser = Unparser::new(&snowflake);
3272+
let result = unparser.plan_to_sql(&plan)?;
3273+
let actual = result.to_string();
3274+
3275+
assert!(
3276+
actual.contains("LATERAL FLATTEN"),
3277+
"Expected LATERAL FLATTEN in SQL, got: {actual}"
3278+
);
3279+
assert!(
3280+
actual.contains("LIMIT 10"),
3281+
"Expected LIMIT 10 in SQL, got: {actual}"
3282+
);
3283+
Ok(())
3284+
}
3285+
31873286
#[test]
31883287
fn snowflake_unnest_through_subquery_alias() -> Result<(), DataFusionError> {
31893288
// Build: Projection → Unnest → SubqueryAlias → Projection → TableScan

0 commit comments

Comments
 (0)