Skip to content

Commit 8d9501a

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents ff8068a + 6a770aa commit 8d9501a

44 files changed

Lines changed: 2715 additions & 414 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/tests/physical_optimizer/projection_pushdown.rs

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
4646
use datafusion_physical_optimizer::projection_pushdown::ProjectionPushdown;
4747
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
4848
use datafusion_physical_plan::coop::CooperativeExec;
49-
use datafusion_physical_plan::filter::FilterExec;
49+
use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
5050
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
5151
use datafusion_physical_plan::joins::{
5252
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
@@ -1754,3 +1754,121 @@ fn test_hash_join_empty_projection_embeds() -> Result<()> {
17541754

17551755
Ok(())
17561756
}
1757+
1758+
/// Regression test for <https://github.com/apache/datafusion/issues/21459>
1759+
///
1760+
/// When a `ProjectionExec` sits on top of a `FilterExec` that already carries
1761+
/// an embedded projection, the `ProjectionPushdown` optimizer must not panic.
1762+
///
1763+
/// Before the fix, `FilterExecBuilder::from(self)` copied stale projection
1764+
/// indices (e.g. `[0, 1, 2]`). After swapping, the new input was narrower
1765+
/// (2 columns), so `.build()` panicked with "project index out of bounds".
1766+
#[test]
1767+
fn test_filter_with_embedded_projection_after_projection() -> Result<()> {
1768+
// DataSourceExec: [a, b, c, d, e]
1769+
let csv = create_simple_csv_exec();
1770+
1771+
// FilterExec: a > 0, projection=[0, 1, 2] → output: [a, b, c]
1772+
let predicate = Arc::new(BinaryExpr::new(
1773+
Arc::new(Column::new("a", 0)),
1774+
Operator::Gt,
1775+
Arc::new(Literal::new(ScalarValue::Int32(Some(0)))),
1776+
));
1777+
let filter: Arc<dyn ExecutionPlan> = Arc::new(
1778+
FilterExecBuilder::new(predicate, csv)
1779+
.apply_projection(Some(vec![0, 1, 2]))?
1780+
.build()?,
1781+
);
1782+
1783+
// ProjectionExec: narrows [a, b, c] → [a, b]
1784+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1785+
vec![
1786+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "a"),
1787+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "b"),
1788+
],
1789+
filter,
1790+
)?);
1791+
1792+
let initial = displayable(projection.as_ref()).indent(true).to_string();
1793+
let actual = initial.trim();
1794+
assert_snapshot!(
1795+
actual,
1796+
@r"
1797+
ProjectionExec: expr=[a@0 as a, b@1 as b]
1798+
FilterExec: a@0 > 0, projection=[a@0, b@1, c@2]
1799+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1800+
"
1801+
);
1802+
1803+
// This must not panic
1804+
let after_optimize =
1805+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1806+
let after_optimize_string = displayable(after_optimize.as_ref())
1807+
.indent(true)
1808+
.to_string();
1809+
let actual = after_optimize_string.trim();
1810+
assert_snapshot!(
1811+
actual,
1812+
@r"
1813+
FilterExec: a@0 > 0
1814+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b], file_type=csv, has_header=false
1815+
"
1816+
);
1817+
1818+
Ok(())
1819+
}
1820+
1821+
/// Same as above, but the outer ProjectionExec also renames columns.
1822+
/// Ensures the rename is preserved after the projection pushdown swap.
1823+
#[test]
1824+
fn test_filter_with_embedded_projection_after_renaming_projection() -> Result<()> {
1825+
let csv = create_simple_csv_exec();
1826+
1827+
// FilterExec: b > 10, projection=[0, 1, 2, 3] → output: [a, b, c, d]
1828+
let predicate = Arc::new(BinaryExpr::new(
1829+
Arc::new(Column::new("b", 1)),
1830+
Operator::Gt,
1831+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1832+
));
1833+
let filter: Arc<dyn ExecutionPlan> = Arc::new(
1834+
FilterExecBuilder::new(predicate, csv)
1835+
.apply_projection(Some(vec![0, 1, 2, 3]))?
1836+
.build()?,
1837+
);
1838+
1839+
// ProjectionExec: [a as x, b as y] — narrows and renames
1840+
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
1841+
vec![
1842+
ProjectionExpr::new(Arc::new(Column::new("a", 0)), "x"),
1843+
ProjectionExpr::new(Arc::new(Column::new("b", 1)), "y"),
1844+
],
1845+
filter,
1846+
)?);
1847+
1848+
let initial = displayable(projection.as_ref()).indent(true).to_string();
1849+
let actual = initial.trim();
1850+
assert_snapshot!(
1851+
actual,
1852+
@r"
1853+
ProjectionExec: expr=[a@0 as x, b@1 as y]
1854+
FilterExec: b@1 > 10, projection=[a@0, b@1, c@2, d@3]
1855+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false
1856+
"
1857+
);
1858+
1859+
let after_optimize =
1860+
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;
1861+
let after_optimize_string = displayable(after_optimize.as_ref())
1862+
.indent(true)
1863+
.to_string();
1864+
let actual = after_optimize_string.trim();
1865+
assert_snapshot!(
1866+
actual,
1867+
@r"
1868+
FilterExec: y@1 > 10
1869+
DataSourceExec: file_groups={1 group: [[x]]}, projection=[a@0 as x, b@1 as y], file_type=csv, has_header=false
1870+
"
1871+
);
1872+
1873+
Ok(())
1874+
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,23 @@ impl LogicalPlanBuilder {
593593
self,
594594
expr: Vec<(impl Into<SelectExpr>, bool)>,
595595
) -> Result<Self> {
596-
project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
596+
project_with_validation(Arc::unwrap_or_clone(self.plan), expr, None)
597+
.map(Self::new)
598+
}
599+
600+
/// Apply a projection, aliasing non-Column/non-Alias expressions to
601+
/// match the field names from the provided schema.
602+
pub fn project_with_validation_and_schema(
603+
self,
604+
expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
605+
schema: &DFSchemaRef,
606+
) -> Result<Self> {
607+
project_with_validation(
608+
Arc::unwrap_or_clone(self.plan),
609+
expr.into_iter().map(|e| (e, true)),
610+
Some(schema),
611+
)
612+
.map(Self::new)
597613
}
598614

599615
/// Select the given column indices
@@ -1916,7 +1932,7 @@ pub fn project(
19161932
plan: LogicalPlan,
19171933
expr: impl IntoIterator<Item = impl Into<SelectExpr>>,
19181934
) -> Result<LogicalPlan> {
1919-
project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1935+
project_with_validation(plan, expr.into_iter().map(|e| (e, true)), None)
19201936
}
19211937

19221938
/// Create Projection. Similar to project except that the expressions
@@ -1929,6 +1945,7 @@ pub fn project(
19291945
fn project_with_validation(
19301946
plan: LogicalPlan,
19311947
expr: impl IntoIterator<Item = (impl Into<SelectExpr>, bool)>,
1948+
schema: Option<&DFSchemaRef>,
19321949
) -> Result<LogicalPlan> {
19331950
let mut projected_expr = vec![];
19341951
let mut has_wildcard = false;
@@ -1987,12 +2004,24 @@ fn project_with_validation(
19872004
}
19882005
}
19892006
}
2007+
19902008
if has_wildcard && projected_expr.is_empty() && !plan.schema().fields().is_empty() {
19912009
return plan_err!(
19922010
"SELECT list is empty after resolving * expressions, \
19932011
the wildcard expanded to zero columns"
19942012
);
19952013
}
2014+
2015+
// When inside a set expression, alias non-Column/non-Alias expressions
2016+
// to match the left side's field names, avoiding duplicate name errors.
2017+
if let Some(schema) = &schema {
2018+
for (expr, field) in projected_expr.iter_mut().zip(schema.fields()) {
2019+
if !matches!(expr, Expr::Column(_) | Expr::Alias(_)) {
2020+
*expr = std::mem::take(expr).alias(field.name());
2021+
}
2022+
}
2023+
}
2024+
19962025
validate_unique_names("Projections", projected_expr.iter())?;
19972026

19982027
Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::utils::{
4545
grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
4646
};
4747
use crate::{
48-
BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable,
48+
BinaryExpr, CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, GroupingSet,
4949
LogicalPlanBuilder, Operator, Prepare, TableProviderFilterPushDown, TableSource,
5050
WindowFunctionDefinition, build_join_schema, expr_vec_fmt, requalify_sides_if_needed,
5151
};
@@ -3595,11 +3595,12 @@ impl Aggregate {
35953595
.into_iter()
35963596
.map(|(q, f)| (q, f.as_ref().clone().with_nullable(true).into()))
35973597
.collect::<Vec<_>>();
3598+
let max_ordinal = max_grouping_set_duplicate_ordinal(&group_expr);
35983599
qualified_fields.push((
35993600
None,
36003601
Field::new(
36013602
Self::INTERNAL_GROUPING_ID,
3602-
Self::grouping_id_type(qualified_fields.len()),
3603+
Self::grouping_id_type(qualified_fields.len(), max_ordinal),
36033604
false,
36043605
)
36053606
.into(),
@@ -3685,15 +3686,24 @@ impl Aggregate {
36853686
}
36863687

36873688
/// Returns the data type of the grouping id.
3688-
/// The grouping ID value is a bitmask where each set bit
3689-
/// indicates that the corresponding grouping expression is
3690-
/// null
3691-
pub fn grouping_id_type(group_exprs: usize) -> DataType {
3692-
if group_exprs <= 8 {
3689+
///
3690+
/// The grouping ID packs two pieces of information into a single integer:
3691+
/// - The low `group_exprs` bits are the semantic bitmask (a set bit means the
3692+
/// corresponding grouping expression is NULL for this grouping set).
3693+
/// - The bits above position `group_exprs` encode a duplicate ordinal that
3694+
/// distinguishes multiple occurrences of the same grouping set pattern.
3695+
///
3696+
/// `max_ordinal` is the highest ordinal value that will appear (0 when there
3697+
/// are no duplicate grouping sets). The type is chosen to be the smallest
3698+
/// unsigned integer that can represent both parts.
3699+
pub fn grouping_id_type(group_exprs: usize, max_ordinal: usize) -> DataType {
3700+
let ordinal_bits = usize::BITS as usize - max_ordinal.leading_zeros() as usize;
3701+
let total_bits = group_exprs + ordinal_bits;
3702+
if total_bits <= 8 {
36933703
DataType::UInt8
3694-
} else if group_exprs <= 16 {
3704+
} else if total_bits <= 16 {
36953705
DataType::UInt16
3696-
} else if group_exprs <= 32 {
3706+
} else if total_bits <= 32 {
36973707
DataType::UInt32
36983708
} else {
36993709
DataType::UInt64
@@ -3702,21 +3712,36 @@ impl Aggregate {
37023712

37033713
/// Internal column used when the aggregation is a grouping set.
37043714
///
3705-
/// This column contains a bitmask where each bit represents a grouping
3706-
/// expression. The least significant bit corresponds to the rightmost
3707-
/// grouping expression. A bit value of 0 indicates that the corresponding
3708-
/// column is included in the grouping set, while a value of 1 means it is excluded.
3715+
/// This column packs two values into a single unsigned integer:
3716+
///
3717+
/// - **Low bits (positions 0 .. n-1)**: a semantic bitmask where each bit
3718+
/// represents one of the `n` grouping expressions. The least significant
3719+
/// bit corresponds to the rightmost grouping expression. A `1` bit means
3720+
/// the corresponding column is replaced with `NULL` for this grouping set;
3721+
/// a `0` bit means it is included.
3722+
/// - **High bits (positions n and above)**: a *duplicate ordinal* that
3723+
/// distinguishes multiple occurrences of the same semantic grouping set
3724+
/// pattern within a single query. The ordinal is `0` for the first
3725+
/// occurrence, `1` for the second, and so on.
3726+
///
3727+
/// The integer type is chosen by [`Self::grouping_id_type`] to be the
3728+
/// smallest `UInt8 / UInt16 / UInt32 / UInt64` that can represent both
3729+
/// parts.
37093730
///
3710-
/// For example, for the grouping expressions CUBE(a, b), the grouping ID
3711-
/// column will have the following values:
3731+
/// For example, for the grouping expressions CUBE(a, b) (no duplicates),
3732+
/// the grouping ID column will have the following values:
37123733
/// 0b00: Both `a` and `b` are included
37133734
/// 0b01: `b` is excluded
37143735
/// 0b10: `a` is excluded
37153736
/// 0b11: Both `a` and `b` are excluded
37163737
///
3717-
/// This internal column is necessary because excluded columns are replaced
3718-
/// with `NULL` values. To handle these cases correctly, we must distinguish
3719-
/// between an actual `NULL` value in a column and a column being excluded from the set.
3738+
/// When the same set appears twice and `n = 2`, the duplicate ordinal is
3739+
/// packed into bit 2:
3740+
/// first occurrence: `0b0_01` (ordinal = 0, mask = 0b01)
3741+
/// second occurrence: `0b1_01` (ordinal = 1, mask = 0b01)
3742+
///
3743+
/// The GROUPING function always masks the value with `(1 << n) - 1` before
3744+
/// interpreting it so the ordinal bits are invisible to user-facing SQL.
37203745
pub const INTERNAL_GROUPING_ID: &'static str = "__grouping_id";
37213746
}
37223747

@@ -3737,6 +3762,24 @@ impl PartialOrd for Aggregate {
37373762
}
37383763
}
37393764

3765+
/// Returns the highest duplicate ordinal across all grouping sets in `group_expr`.
3766+
///
3767+
/// The ordinal for each occurrence of a grouping set pattern is its 0-based
3768+
/// index among identical entries. For example, if the same set appears three
3769+
/// times, the ordinals are 0, 1, 2 and this function returns 2.
3770+
/// Returns 0 when no grouping set is duplicated.
3771+
fn max_grouping_set_duplicate_ordinal(group_expr: &[Expr]) -> usize {
3772+
if let Some(Expr::GroupingSet(GroupingSet::GroupingSets(sets))) = group_expr.first() {
3773+
let mut counts: HashMap<&[Expr], usize> = HashMap::new();
3774+
for set in sets {
3775+
*counts.entry(set).or_insert(0) += 1;
3776+
}
3777+
counts.into_values().max().unwrap_or(0).saturating_sub(1)
3778+
} else {
3779+
0
3780+
}
3781+
}
3782+
37403783
/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
37413784
fn contains_grouping_set(group_expr: &[Expr]) -> bool {
37423785
group_expr
@@ -5053,6 +5096,14 @@ mod tests {
50535096
);
50545097
}
50555098

5099+
#[test]
5100+
fn grouping_id_type_accounts_for_duplicate_ordinal_bits() {
5101+
// 8 grouping columns fit in UInt8 when there are no duplicate ordinals,
5102+
// but adding one duplicate ordinal bit widens the type to UInt16.
5103+
assert_eq!(Aggregate::grouping_id_type(8, 0), DataType::UInt8);
5104+
assert_eq!(Aggregate::grouping_id_type(8, 1), DataType::UInt16);
5105+
}
5106+
50565107
#[test]
50575108
fn test_filter_is_scalar() {
50585109
// test empty placeholder

datafusion/functions/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ regex_expressions = ["regex"]
5959
# enable string functions
6060
string_expressions = ["uuid"]
6161
# enable unicode functions
62-
unicode_expressions = ["unicode-segmentation"]
62+
unicode_expressions = []
6363

6464
[lib]
6565
name = "datafusion_functions"
@@ -87,7 +87,6 @@ num-traits = { workspace = true }
8787
rand = { workspace = true }
8888
regex = { workspace = true, optional = true }
8989
sha2 = { workspace = true, optional = true }
90-
unicode-segmentation = { version = "^1.13.2", optional = true }
9190
uuid = { workspace = true, features = ["v4"], optional = true }
9291

9392
[dev-dependencies]

datafusion/functions/benches/split_part.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,23 @@ fn criterion_benchmark(c: &mut Criterion) {
210210
);
211211
}
212212

213+
// Utf8View, very long parts (256 bytes), position 1
214+
{
215+
let strings = gen_string_array(N_ROWS, 5, 256, ".", true);
216+
let delimiter = ColumnarValue::Scalar(ScalarValue::Utf8View(Some(".".into())));
217+
let position = ColumnarValue::Scalar(ScalarValue::Int64(Some(1)));
218+
bench_split_part(
219+
&mut group,
220+
&split_part_func,
221+
&config_options,
222+
"scalar_utf8view_very_long_parts",
223+
"pos_first",
224+
strings,
225+
delimiter,
226+
position,
227+
);
228+
}
229+
213230
// ── Array delimiter and position ─────────────────
214231

215232
// Utf8, single-char delimiter, array args

0 commit comments

Comments
 (0)