Skip to content

Commit 494dc35

Browse files
authored
Merge branch 'main' into feat_migrate_ffi_to_stabby
2 parents 71f1fde + 766dff1 commit 494dc35

8 files changed

Lines changed: 199 additions & 30 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/tests/sql/select.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,3 +431,65 @@ async fn test_select_cast_date_literal_to_timestamp_overflow() -> Result<()> {
431431
);
432432
Ok(())
433433
}
434+
435+
// Regression test: a recursive CTE whose anchor aliases a computed column
436+
// (`upper(val) AS val`) and whose recursive term leaves the same expression
437+
// un-aliased must still produce batches whose schema field names come from
438+
// the anchor term — especially when the outer query uses ORDER BY + LIMIT
439+
// (the TopK path passes batch schemas through verbatim, so any drift in
440+
// RecursiveQueryExec's emitted batches is visible downstream).
441+
#[tokio::test]
442+
async fn test_recursive_cte_batch_schema_stable_with_order_by_limit() -> Result<()> {
443+
let ctx = SessionContext::new();
444+
ctx.sql(
445+
"CREATE TABLE records (\
446+
id VARCHAR NOT NULL, \
447+
parent_id VARCHAR, \
448+
ts TIMESTAMP NOT NULL, \
449+
val VARCHAR\
450+
)",
451+
)
452+
.await?
453+
.collect()
454+
.await?;
455+
ctx.sql(
456+
"INSERT INTO records VALUES \
457+
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'), \
458+
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'), \
459+
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'), \
460+
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'), \
461+
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'), \
462+
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5')",
463+
)
464+
.await?
465+
.collect()
466+
.await?;
467+
468+
let results = ctx
469+
.sql(
470+
"WITH RECURSIVE descendants AS (\
471+
SELECT id, parent_id, ts, upper(val) AS val \
472+
FROM records WHERE id = 'a00' \
473+
UNION ALL \
474+
SELECT r.id, r.parent_id, r.ts, upper(r.val) \
475+
FROM records r INNER JOIN descendants d ON r.parent_id = d.id \
476+
) \
477+
SELECT id, parent_id, ts, val FROM descendants ORDER BY ts ASC LIMIT 10",
478+
)
479+
.await?
480+
.collect()
481+
.await?;
482+
483+
let expected_names = ["id", "parent_id", "ts", "val"];
484+
assert!(!results.is_empty(), "expected at least one batch");
485+
for (i, batch) in results.iter().enumerate() {
486+
let schema = batch.schema();
487+
let actual_names: Vec<&str> =
488+
schema.fields().iter().map(|f| f.name().as_str()).collect();
489+
assert_eq!(
490+
actual_names, expected_names,
491+
"batch {i} schema field names leaked from recursive branch"
492+
);
493+
}
494+
Ok(())
495+
}

datafusion/functions/src/string/concat.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use std::sync::Arc;
2222

2323
use crate::string::concat;
2424
use crate::strings::{
25-
ColumnarValueRef, LargeStringArrayBuilder, StringArrayBuilder, StringViewArrayBuilder,
25+
ColumnarValueRef, ConcatLargeStringBuilder, ConcatStringBuilder,
26+
ConcatStringViewBuilder,
2627
};
2728
use datafusion_common::cast::{as_binary_array, as_string_array, as_string_view_array};
2829
use datafusion_common::{
@@ -242,7 +243,7 @@ impl ScalarUDFImpl for ConcatFunc {
242243

243244
match return_datatype {
244245
DataType::Utf8 => {
245-
let mut builder = StringArrayBuilder::with_capacity(len, data_size);
246+
let mut builder = ConcatStringBuilder::with_capacity(len, data_size);
246247
for i in 0..len {
247248
columns
248249
.iter()
@@ -254,7 +255,7 @@ impl ScalarUDFImpl for ConcatFunc {
254255
Ok(ColumnarValue::Array(Arc::new(string_array)))
255256
}
256257
DataType::Utf8View => {
257-
let mut builder = StringViewArrayBuilder::with_capacity(len, data_size);
258+
let mut builder = ConcatStringViewBuilder::with_capacity(len, data_size);
258259
for i in 0..len {
259260
columns
260261
.iter()
@@ -266,7 +267,7 @@ impl ScalarUDFImpl for ConcatFunc {
266267
Ok(ColumnarValue::Array(Arc::new(string_array)))
267268
}
268269
DataType::LargeUtf8 => {
269-
let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size);
270+
let mut builder = ConcatLargeStringBuilder::with_capacity(len, data_size);
270271
for i in 0..len {
271272
columns
272273
.iter()

datafusion/functions/src/string/concat_ws.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use crate::string::concat;
2424
use crate::string::concat::simplify_concat;
2525
use crate::string::concat_ws;
2626
use crate::strings::{
27-
ColumnarValueRef, LargeStringArrayBuilder, StringArrayBuilder, StringViewArrayBuilder,
27+
ColumnarValueRef, ConcatLargeStringBuilder, ConcatStringBuilder,
28+
ConcatStringViewBuilder,
2829
};
2930
use datafusion_common::cast::{
3031
as_large_string_array, as_string_array, as_string_view_array,
@@ -311,7 +312,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
311312

312313
match return_datatype {
313314
DataType::Utf8View => {
314-
let mut builder = StringViewArrayBuilder::with_capacity(len, data_size);
315+
let mut builder = ConcatStringViewBuilder::with_capacity(len, data_size);
315316
for i in 0..len {
316317
if !sep.is_valid(i) {
317318
builder.append_offset()?;
@@ -332,7 +333,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
332333
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?)))
333334
}
334335
DataType::LargeUtf8 => {
335-
let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size);
336+
let mut builder = ConcatLargeStringBuilder::with_capacity(len, data_size);
336337
for i in 0..len {
337338
if !sep.is_valid(i) {
338339
builder.append_offset()?;
@@ -353,7 +354,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
353354
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?)))
354355
}
355356
_ => {
356-
let mut builder = StringArrayBuilder::with_capacity(len, data_size);
357+
let mut builder = ConcatStringBuilder::with_capacity(len, data_size);
357358
for i in 0..len {
358359
if !sep.is_valid(i) {
359360
builder.append_offset()?;

datafusion/functions/src/strings.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,20 @@ use arrow::array::{
2626
use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer};
2727
use arrow::datatypes::DataType;
2828

29-
/// Optimized version of the StringBuilder in Arrow that:
30-
/// 1. Precalculating the expected length of the result, avoiding reallocations.
31-
/// 2. Avoids creating / incrementally creating a `NullBufferBuilder`
32-
pub struct StringArrayBuilder {
29+
/// Builder used by `concat`/`concat_ws` to assemble a [`StringArray`] one row
30+
/// at a time from multiple input columns.
31+
///
32+
/// Each row is written via repeated `write` calls, followed by a single
33+
/// `append_offset` call to commit the row. The output null buffer is supplied
34+
/// by the caller at `finish` time.
35+
pub(crate) struct ConcatStringBuilder {
3336
offsets_buffer: MutableBuffer,
3437
value_buffer: MutableBuffer,
3538
/// If true, a safety check is required during the `finish` call
3639
tainted: bool,
3740
}
3841

39-
impl StringArrayBuilder {
42+
impl ConcatStringBuilder {
4043
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
4144
let capacity = item_capacity
4245
.checked_add(1)
@@ -151,21 +154,21 @@ impl StringArrayBuilder {
151154
}
152155
}
153156

154-
/// Optimized version of Arrow's [`StringViewBuilder`]. Rather than adding NULLs
155-
/// on a row-by-row basis, the caller should provide nulls when calling
156-
/// [`finish`](Self::finish). This allows callers to compute nulls more
157-
/// efficiently (e.g., via bulk bitmap operations).
157+
/// Builder used by `concat`/`concat_ws` to assemble a [`StringViewArray`] one
158+
/// row at a time from multiple input columns.
158159
///
159-
/// [`StringViewBuilder`]: arrow::array::StringViewBuilder
160-
pub struct StringViewArrayBuilder {
160+
/// Each row is written via repeated `write` calls, followed by a single
161+
/// `append_offset` call to commit the row as a single string view. The output
162+
/// null buffer is supplied by the caller at `finish` time.
163+
pub(crate) struct ConcatStringViewBuilder {
161164
views: Vec<u128>,
162165
data: Vec<u8>,
163166
block: Vec<u8>,
164167
/// If true, a safety check is required during the `append_offset` call
165168
tainted: bool,
166169
}
167170

168-
impl StringViewArrayBuilder {
171+
impl ConcatStringViewBuilder {
169172
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
170173
Self {
171174
views: Vec::with_capacity(item_capacity),
@@ -286,14 +289,17 @@ impl StringViewArrayBuilder {
286289
}
287290
}
288291

289-
pub struct LargeStringArrayBuilder {
292+
/// Builder used by `concat`/`concat_ws` to assemble a [`LargeStringArray`] one
293+
/// row at a time from multiple input columns. See [`ConcatStringBuilder`] for
294+
/// details on the row-composition contract.
295+
pub(crate) struct ConcatLargeStringBuilder {
290296
offsets_buffer: MutableBuffer,
291297
value_buffer: MutableBuffer,
292298
/// If true, a safety check is required during the `finish` call
293299
tainted: bool,
294300
}
295301

296-
impl LargeStringArrayBuilder {
302+
impl ConcatLargeStringBuilder {
297303
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
298304
let capacity = item_capacity
299305
.checked_add(1)
@@ -426,7 +432,7 @@ impl LargeStringArrayBuilder {
426432
/// LLVM is apparently overly eager to inline this function into some hot loops,
427433
/// which bloats them and regresses performance, so we disable inlining for now.
428434
#[inline(never)]
429-
pub fn append_view(
435+
pub(crate) fn append_view(
430436
views_buffer: &mut Vec<u128>,
431437
original_view: &u128,
432438
substr: &str,
@@ -447,7 +453,7 @@ pub fn append_view(
447453
}
448454

449455
#[derive(Debug)]
450-
pub enum ColumnarValueRef<'a> {
456+
pub(crate) enum ColumnarValueRef<'a> {
451457
Scalar(&'a [u8]),
452458
NullableArray(&'a StringArray),
453459
NonNullableArray(&'a StringArray),
@@ -497,13 +503,13 @@ mod tests {
497503

498504
#[test]
499505
#[should_panic(expected = "capacity integer overflow")]
500-
fn test_overflow_string_array_builder() {
501-
let _builder = StringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
506+
fn test_overflow_concat_string_builder() {
507+
let _builder = ConcatStringBuilder::with_capacity(usize::MAX, usize::MAX);
502508
}
503509

504510
#[test]
505511
#[should_panic(expected = "capacity integer overflow")]
506-
fn test_overflow_large_string_array_builder() {
507-
let _builder = LargeStringArrayBuilder::with_capacity(usize::MAX, usize::MAX);
512+
fn test_overflow_concat_large_string_builder() {
513+
let _builder = ConcatLargeStringBuilder::with_capacity(usize::MAX, usize::MAX);
508514
}
509515
}

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,20 @@ impl RecursiveQueryStream {
317317
mut batch: RecordBatch,
318318
) -> Poll<Option<Result<RecordBatch>>> {
319319
let baseline_metrics = self.baseline_metrics.clone();
320+
321+
// Rebind to the declared output schema. The recursive term is planned
322+
// independently from the static term and its projection may leave
323+
// columns un-aliased (e.g. `upper(r.val)` vs the anchor's
324+
// `upper(val) AS val`); downstream consumers that key on
325+
// `batch.schema().field(i).name()` (TopK, CSV/JSON writers, custom
326+
// collectors) would otherwise see the recursive branch's names leak
327+
// through. Logical-plan coercion guarantees matching types, so this
328+
// is a zero-copy field rebind.
329+
if batch.schema() != self.schema {
330+
batch =
331+
RecordBatch::try_new(Arc::clone(&self.schema), batch.columns().to_vec())?;
332+
}
333+
320334
if let Some(deduplicator) = &mut self.distinct_deduplicator {
321335
let _timer_guard = baseline_metrics.elapsed_compute().timer();
322336
batch = deduplicator.deduplicate(&batch)?;

datafusion/sqllogictest/test_files/cte.slt

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,83 @@ physical_plan
12231223
07)------RepartitionExec: partitioning=Hash([start@0], 4), input_partitions=1
12241224
08)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/recursive_cte/closure.csv]]}, projection=[start, end], file_type=csv, has_header=true
12251225

1226+
# Regression test: recursive CTE column-name stability under ORDER BY + LIMIT.
1227+
#
1228+
# The recursive CTE below has an anchor that aliases a computed column
1229+
# (`upper(val) AS val`) while the recursive term leaves the same expression
1230+
# un-aliased (`upper(r.val)`). RecursiveQueryExec declares its output schema
1231+
# from the anchor, so the plan-level column is `val`; but if it forwards the
1232+
# recursive term's batches without rebinding their schema to the anchor's,
1233+
# downstream consumers that read `batch.schema().field(i).name()` observe
1234+
# the leaked `upper(r.val)` name. The outer `ORDER BY ts LIMIT 10` routes
1235+
# through the TopK path in SortExec, which passes batch schemas through
1236+
# verbatim — so the leak surfaces there.
1237+
#
1238+
# sqllogictest compares row values and can't assert column names directly.
1239+
# To surface the leak inside SLT, we `COPY` the result to a CSV file with a
1240+
# header row (the CSV writer names header columns from each batch's own
1241+
# schema) and then read the file back as headerless CSV so the header line
1242+
# appears as a data row we can compare against the expected column names.
1243+
statement ok
1244+
CREATE TABLE cte_schema_records (
1245+
id VARCHAR NOT NULL,
1246+
parent_id VARCHAR,
1247+
ts TIMESTAMP NOT NULL,
1248+
val VARCHAR
1249+
);
1250+
1251+
statement ok
1252+
INSERT INTO cte_schema_records VALUES
1253+
('a00', NULL, TIMESTAMP '2025-01-01 00:00:00', 'v_span'),
1254+
('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'),
1255+
('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'),
1256+
('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3'),
1257+
('a04', 'a03', TIMESTAMP '2025-01-01 00:00:04', 'v_log_4'),
1258+
('a05', 'a04', TIMESTAMP '2025-01-01 00:00:05', 'v_log_5');
1259+
1260+
query I
1261+
COPY (WITH RECURSIVE descendants AS (
1262+
SELECT id, parent_id, ts, upper(val) AS val
1263+
FROM cte_schema_records WHERE id = 'a00'
1264+
UNION ALL
1265+
SELECT r.id, r.parent_id, r.ts, upper(r.val)
1266+
FROM cte_schema_records r
1267+
INNER JOIN descendants d ON r.parent_id = d.id
1268+
)
1269+
SELECT id, parent_id, ts, val
1270+
FROM descendants
1271+
ORDER BY ts ASC
1272+
LIMIT 10)
1273+
TO 'test_files/scratch/cte/recursive_order_limit_schema/'
1274+
STORED AS CSV OPTIONS ('format.has_header' 'true');
1275+
----
1276+
6
1277+
1278+
statement ok
1279+
CREATE EXTERNAL TABLE cte_schema_reread (
1280+
id VARCHAR, parent_id VARCHAR, ts VARCHAR, val VARCHAR
1281+
)
1282+
STORED AS CSV
1283+
LOCATION 'test_files/scratch/cte/recursive_order_limit_schema/'
1284+
OPTIONS ('format.has_header' 'false');
1285+
1286+
# The CSV header row appears as a data row here because we re-read with
1287+
# has_header='false'. It must match the CTE's declared column names; if
1288+
# RecursiveQueryExec leaked the recursive branch's schema, the last field
1289+
# would read `upper(r.val)` instead of `val`.
1290+
query TTTT
1291+
SELECT id, parent_id, ts, val
1292+
FROM cte_schema_reread
1293+
WHERE id = 'id';
1294+
----
1295+
id parent_id ts val
1296+
1297+
statement ok
1298+
DROP TABLE cte_schema_reread;
1299+
1300+
statement ok
1301+
DROP TABLE cte_schema_records;
1302+
12261303
statement count 0
12271304
set datafusion.execution.enable_recursive_ctes = false;
12281305

docs/source/library-user-guide/upgrading/54.0.0.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,14 @@ The difference is only observable for strings containing combining characters
320320
clusters (e.g., ZWJ emoji sequences). For ASCII and most common Unicode text,
321321
behavior is unchanged.
322322

323+
### Items in `datafusion_functions::strings` are no longer public
324+
325+
`StringArrayBuilder`, `LargeStringArrayBuilder`, `StringViewArrayBuilder`,
326+
`ColumnarValueRef`, and `append_view` have been reduced to `pub(crate)`. They
327+
were only ever used to implement `concat` and `concat_ws` inside the crate. If
328+
you were importing them externally, use Arrow's corresponding builders with a
329+
caller-computed `NullBuffer`.
330+
323331
[#17861]: https://github.com/apache/datafusion/pull/17861
324332

325333
### `approx_percentile_cont`, `approx_percentile_cont_with_weight`, `approx_median` now coerce to floats

0 commit comments

Comments
 (0)