Skip to content

Commit 83c2c01

Browse files
adriangbclaude
andauthored
fix: rebind RecursiveQueryExec batches to the declared output schema (#21770)
## Which issue does this PR close? - Closes #. <!-- Happy to file a tracking issue if the project prefers. --> ## Rationale for this change A recursive CTE whose anchor aliases a computed column (e.g. `upper(val) AS val`) and whose recursive term leaves the same expression un-aliased (`upper(r.val)`) currently returns the wrong column name — but only when the outer query has both `ORDER BY` and `LIMIT`. The plan-level schema is correct (taken from the anchor), but `RecursiveQueryExec` forwards recursive-term `RecordBatch`es with their native schemas intact. Downstream consumers that key on `batch.schema().field(i).name()` — `SortExec`'s TopK path, CSV/JSON writers, user-code `collect`ors — then observe the leaked recursive-branch name instead of the anchor's. MRE (fails on `datafusion-cli` pre-fix): ```sql CREATE TABLE records (id VARCHAR NOT NULL, parent_id VARCHAR, ts TIMESTAMP NOT NULL, val VARCHAR); INSERT INTO records VALUES ('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), ('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), ('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), ('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'); WITH RECURSIVE descendants AS ( SELECT id, parent_id, ts, upper(val) AS val FROM records WHERE id = 'a00' UNION ALL SELECT r.id, r.parent_id, r.ts, upper(r.val) FROM records r INNER JOIN descendants d ON r.parent_id = d.id ) SELECT id, parent_id, ts, val FROM descendants ORDER BY ts ASC LIMIT 10; ``` Pre-fix header column reads `upper(r.val)`; expected `val`. Only `ORDER BY + LIMIT` triggers it because: - `SortExec` without fetch re-materialises batches via `ExternalSorter` (stable schema). - `LimitExec` without sort sits above `RecursiveQueryExec`, never mixing branches. - `SortExec` with fetch uses the TopK path, which emits `interleave_record_batch` output that carries whichever input batch's schema was used last. ## What changes are included in this PR? In `RecursiveQueryStream::push_batch`, rebind each emitted batch to the declared output schema (taken from the anchor term). Logical-plan coercion in `LogicalPlanBuilder::to_recursive_query` already guarantees matching column types, so this is a zero-copy field rebind. 14 lines of production code + comment. ## Are these changes tested? Yes. - `datafusion/core/tests/sql/select.rs::test_recursive_cte_batch_schema_stable_with_order_by_limit` — runs the MRE and asserts every collected `RecordBatch`'s schema field names equal `["id", "parent_id", "ts", "val"]`. Fails pre-fix with `left: ["id", "parent_id", "ts", "upper(r.val)"]`. - `datafusion/sqllogictest/test_files/cte.slt` — round-trips the buggy query through a headered CSV file (whose header is written from each batch's schema) and re-reads it as headerless CSV so the header row is compared as a data row. SLT otherwise cannot assert column names directly, so this is the only way to surface the leak inside SLT. Both regression tests were verified to fail on the base branch before the fix was applied and pass after. ## Are there any user-facing changes? Recursive CTEs with mismatched anchor/recursive column names will now emit batches with the anchor-declared names consistently, regardless of downstream operators. No API changes. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4fbdfd0 commit 83c2c01

3 files changed

Lines changed: 153 additions & 0 deletions

File tree

datafusion/core/tests/sql/select.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,3 +431,65 @@ async fn test_select_cast_date_literal_to_timestamp_overflow() -> Result<()> {
431431
);
432432
Ok(())
433433
}
434+
435+
// Regression test: a recursive CTE whose anchor aliases a computed column
436+
// (`upper(val) AS val`) and whose recursive term leaves the same expression
437+
// un-aliased must still produce batches whose schema field names come from
438+
// the anchor term — especially when the outer query uses ORDER BY + LIMIT
439+
// (the TopK path passes batch schemas through verbatim, so any drift in
440+
// RecursiveQueryExec's emitted batches is visible downstream).
441+
#[tokio::test]
442+
async fn test_recursive_cte_batch_schema_stable_with_order_by_limit() -> Result<()> {
443+
let ctx = SessionContext::new();
444+
ctx.sql(
445+
"CREATE TABLE records (\
446+
id VARCHAR NOT NULL, \
447+
parent_id VARCHAR, \
448+
ts TIMESTAMP NOT NULL, \
449+
val VARCHAR\
450+
)",
451+
)
452+
.await?
453+
.collect()
454+
.await?;
455+
ctx.sql(
456+
"INSERT INTO records VALUES \
457+
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), \
458+
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), \
459+
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), \
460+
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'), \
461+
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'), \
462+
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5')",
463+
)
464+
.await?
465+
.collect()
466+
.await?;
467+
468+
let results = ctx
469+
.sql(
470+
"WITH RECURSIVE descendants AS (\
471+
SELECT id, parent_id, ts, upper(val) AS val \
472+
FROM records WHERE id = 'a00' \
473+
UNION ALL \
474+
SELECT r.id, r.parent_id, r.ts, upper(r.val) \
475+
FROM records r INNER JOIN descendants d ON r.parent_id = d.id \
476+
) \
477+
SELECT id, parent_id, ts, val FROM descendants ORDER BY ts ASC LIMIT 10",
478+
)
479+
.await?
480+
.collect()
481+
.await?;
482+
483+
let expected_names = ["id", "parent_id", "ts", "val"];
484+
assert!(!results.is_empty(), "expected at least one batch");
485+
for (i, batch) in results.iter().enumerate() {
486+
let schema = batch.schema();
487+
let actual_names: Vec<&str> =
488+
schema.fields().iter().map(|f| f.name().as_str()).collect();
489+
assert_eq!(
490+
actual_names, expected_names,
491+
"batch {i} schema field names leaked from recursive branch"
492+
);
493+
}
494+
Ok(())
495+
}

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,20 @@ impl RecursiveQueryStream {
317317
mut batch: RecordBatch,
318318
) -> Poll<Option<Result<RecordBatch>>> {
319319
let baseline_metrics = self.baseline_metrics.clone();
320+
321+
// Rebind to the declared output schema. The recursive term is planned
322+
// independently from the static term and its projection may leave
323+
// columns un-aliased (e.g. `upper(r.val)` vs the anchor's
324+
// `upper(val) AS val`); downstream consumers that key on
325+
// `batch.schema().field(i).name()` (TopK, CSV/JSON writers, custom
326+
// collectors) would otherwise see the recursive branch's names leak
327+
// through. Logical-plan coercion guarantees matching types, so this
328+
// is a zero-copy field rebind.
329+
if batch.schema() != self.schema {
330+
batch =
331+
RecordBatch::try_new(Arc::clone(&self.schema), batch.columns().to_vec())?;
332+
}
333+
320334
if let Some(deduplicator) = &mut self.distinct_deduplicator {
321335
let _timer_guard = baseline_metrics.elapsed_compute().timer();
322336
batch = deduplicator.deduplicate(&batch)?;

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,83 @@ physical_plan
12231223
07)------RepartitionExec: partitioning=Hash([start@0], 4), input_partitions=1
12241224
08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/closure.csv]]}, projection=[start, end], file_type=csv, has_header=true
12251225

1226+
# Regression test: recursive CTE column-name stability under ORDER BY + LIMIT.
1227+
#
1228+
# The recursive CTE below has an anchor that aliases a computed column
1229+
# (`upper(val) AS val`) while the recursive term leaves the same expression
1230+
# un-aliased (`upper(r.val)`). RecursiveQueryExec declares its output schema
1231+
# from the anchor, so the plan-level column is `val`; but if it forwards the
1232+
# recursive term's batches without rebinding their schema to the anchor's,
1233+
# downstream consumers that read `batch.schema().field(i).name()` observe
1234+
# the leaked `upper(r.val)` name. The outer `ORDER BY ts LIMIT 10` routes
1235+
# through the TopK path in SortExec, which passes batch schemas through
1236+
# verbatim — so the leak surfaces there.
1237+
#
1238+
# sqllogictest compares row values and can't assert column names directly.
1239+
# To surface the leak inside SLT, we `COPY` the result to a CSV file with a
1240+
# header row (the CSV writer names header columns from each batch's own
1241+
# schema) and then read the file back as headerless CSV so the header line
1242+
# appears as a data row we can compare against the expected column names.
1243+
statement ok
1244+
CREATE TABLE cte_schema_records (
1245+
id VARCHAR NOT NULL,
1246+
parent_id VARCHAR,
1247+
ts TIMESTAMP NOT NULL,
1248+
val VARCHAR
1249+
);
1250+
1251+
statement ok
1252+
INSERT INTO cte_schema_records VALUES
1253+
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'),
1254+
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'),
1255+
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'),
1256+
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'),
1257+
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'),
1258+
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5');
1259+
1260+
query I
1261+
COPY (WITH RECURSIVE descendants AS (
1262+
SELECT id, parent_id, ts, upper(val) AS val
1263+
FROM cte_schema_records WHERE id = 'a00'
1264+
UNION ALL
1265+
SELECT r.id, r.parent_id, r.ts, upper(r.val)
1266+
FROM cte_schema_records r
1267+
INNER JOIN descendants d ON r.parent_id = d.id
1268+
)
1269+
SELECT id, parent_id, ts, val
1270+
FROM descendants
1271+
ORDER BY ts ASC
1272+
LIMIT 10)
1273+
TO 'test_files/scratch/cte/recursive_order_limit_schema/'
1274+
STORED AS CSV OPTIONS ('format.has_header' 'true');
1275+
----
1276+
6
1277+
1278+
statement ok
1279+
CREATE EXTERNAL TABLE cte_schema_reread (
1280+
id VARCHAR, parent_id VARCHAR, ts VARCHAR, val VARCHAR
1281+
)
1282+
STORED AS CSV
1283+
LOCATION 'test_files/scratch/cte/recursive_order_limit_schema/'
1284+
OPTIONS ('format.has_header' 'false');
1285+
1286+
# The CSV header row appears as a data row here because we re-read with
1287+
# has_header='false'. It must match the CTE's declared column names; if
1288+
# RecursiveQueryExec leaked the recursive branch's schema, the last field
1289+
# would read `upper(r.val)` instead of `val`.
1290+
query TTTT
1291+
SELECT id, parent_id, ts, val
1292+
FROM cte_schema_reread
1293+
WHERE id = 'id';
1294+
----
1295+
id parent_id ts val
1296+
1297+
statement ok
1298+
DROP TABLE cte_schema_reread;
1299+
1300+
statement ok
1301+
DROP TABLE cte_schema_records;
1302+
12261303
statement count 0
12271304
set datafusion.execution.enable_recursive_ctes = false;
12281305

0 commit comments

Comments
 (0)