Skip to content

Add lambda support and array_transform udf#21679

Open
gstvg wants to merge 70 commits intoapache:mainfrom
gstvg:lambda_and_array_transform
Open

Add lambda support and array_transform udf#21679
gstvg wants to merge 70 commits intoapache:mainfrom
gstvg:lambda_and_array_transform

Conversation

@gstvg
Copy link
Copy Markdown
Contributor

@gstvg gstvg commented Apr 16, 2026

This a clean version of #18921 to make it easier to review

this is a breaking change due to adding variant to Expr enum, new methods on traits Session, FunctionRegistry and ContextProvider and a new arg on TaskContext::new

This PR adds support for lambdas with column capture and the array_transform function used to test the lambda implementation.

Example usage:

SELECT array_transform([2, 3], v -> v != 2);

[false, true]

-- arbitrally nested lambdas are also supported
SELECT array_transform([[[2, 3]]], m -> array_transform(m, l -> array_transform(l, v -> v*2)));

[[[4, 6]]]

Note: column capture has been removed for now and will be added on a follow on PR, see #21172

Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code

3 new Expr variants are added, HigherOrderFunction, owing a new trait HigherOrderUDF, which is like a ScalarFunction/ScalarUDFImpl with support for lambdas, Lambda, for the lambda body and it's parameters names, and LambdaVariable, which is like Column but for lambdas parameters.

Their logical representations:

enum Expr {
    // array_transform([2, 3], v -> v != 2)
    HigherOrderFunction(HigherOrderFunction),
    // v -> v != 2
    Lambda(Lambda),
    // v, of the lambda body: v != 2
    LambdaVariable(LambdaVariable),
   ...
}

// array_transform([2, 3], v -> v != 2)
struct HigherOrderFunction {
    // global instance of array_transform
    pub func: Arc<dyn HigherOrderUDF>,
    // [Expr::ScalarValue([2, 3]), Expr::Lambda(v -> v != 2)]
    pub args: Vec<Expr>,
}

// v -> v != 2
struct Lambda {
    // ["v"]
    pub params: Vec<String>,
    // v != 2
    pub body: Box<Expr>,
}

// v, of the lambda body: v != 2
struct LambdaVariable {
    // "v"
    pub name: String,
    // Field::new("", DataType::Int32, false) 
    // Note: a follow on PR will make this field optional
    // to free expr_api from specifying it beforehand, 
    // and add resolve_lambda_variables method to Expr,
    // similar to Expr::Placeholder, see #21172
    pub field: FieldRef, 
    pub spans: Spans,
}

The example would be planned into a tree like this:

HigherOrderFunctionExpression
  name: array_transform
  children:
    1. ListExpression [2,3]
    2. LambdaExpression
         parameters: ["v"]
         body:
            BinaryExpression (!=)
              left:
                 LambdaVariableExpression("v", Field::new("", Int32, false))
              right:
                 LiteralExpression("2")

The physical counterparts definition:

struct HigherOrderFunctionExpr {
    // global instance of array_transform
    fun: Arc<dyn HigherOrderUDF>,
    // "array_transform"
    name: String,
    // [LiteralExpr([2, 3], LambdaExpr("v -> v != 2"))]
    args: Vec<Arc<dyn PhysicalExpr>>,
    // [1], the positions at args that contains lambdas
    lambda_positions: Vec<usize>,
    // Field::new("", DataType::new_list(DataType::Boolean, false), false)
    return_field: FieldRef,
    config_options: Arc<ConfigOptions>, 
}


struct LambdaExpr {
    // ["v"]
    params: Vec<String>,
    // v -> v != 2
    body: Arc<dyn PhysicalExpr>,
}

struct LambdaVariable {
    // Field::new("v", DataType::Int32, false)
    field: FieldRef,
    // 0, the first and only parameter, "v"
    index: usize,
}

Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of HigherOrderUDF and the array_transform implementation of HigherOrderUDF relevant methods, collapsed due to their size

The added HigherOrderUDF trait is almost a clone of ScalarUDFImpl, with the exception of:

  1. return_field_from_args and invoke_with_args, where now args.args is a list of enums with two variants: Value or Lambda instead of a list of values
  2. the addition of lambda_parameters, which return a Field for each parameter supported for every lambda argument based on the Field of the non lambda arguments
  3. the removal of return_field and the deprecated ones is_nullable and display_name.
  4. Not yet includes analogues to the methods preimage, placement, evaluate_bounds, propagate_constraints, output_ordering and preserves_lex_ordering
HigherOrderUDF
trait HigherOrderUDF {
    /// Return the field of all the parameters supported by all the supported lambdas of this function
    /// based on the field of the value arguments. If a lambda support multiple parameters, or if multiple
    /// lambdas are supported and some are optional, all should be returned,
    /// regardless of whether they are used on a particular invocation
    ///
    /// Tip: If you have a [`HigherOrderFunction`] invocation, you can call the helper
    /// [`HigherOrderFunction::lambda_parameters`] instead of this method directly
    ///
    /// [`HigherOrderFunction`]: crate::expr::HigherOrderFunction
    /// [`HigherOrderFunction::lambda_parameters`]: crate::expr::HigherOrderFunction::lambda_parameters
    ///
    /// Example for array_transform:
    ///
    /// `array_transform([2.0, 8.0], v -> v > 4.0)`
    ///
    /// ```ignore
    /// let lambda_parameters = array_transform.lambda_parameters(&[
    ///      Arc::new(Field::new("", DataType::new_list(DataType::Float32, false))), // the Field of the literal `[2, 8]`
    /// ])?;
    ///
    /// assert_eq!(
    ///      lambda_parameters,
    ///      vec![
    ///         // the lambda supported parameters, regardless of how many are actually used
    ///         vec![
    ///             // the value being transformed
    ///             Field::new("", DataType::Float32, false),
    ///         ]
    ///      ]
    /// )
    /// ```
    ///
    /// The implementation can assume that some other part of the code has coerced
    /// the actual argument types to match [`Self::signature`].
    fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>>;
    fn return_field_from_args(&self, args: LambdaReturnFieldArgs) -> Result<FieldRef>;
    fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue>;
   // ... omitted methods that are similar in ScalarUDFImpl
}

/// An argument to a lambda function
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ValueOrLambda<V, L> {
    /// A value with associated data
    Value(V),
    /// A lambda with associated data
    Lambda(L),
}

/// Information about arguments passed to the function
///
/// This structure contains metadata about how the function was called
/// such as the type of the arguments, any scalar arguments and if the
/// arguments can (ever) be null
///
/// See [`HigherOrderUDF::return_field_from_args`] for more information
#[derive(Clone, Debug)]
pub struct LambdaReturnFieldArgs<'a> {
    /// The data types of the arguments to the function
    ///
    /// If argument `i` to the function is a lambda, it will be the field of the result of the
    /// lambda if evaluated with the parameters returned from [`HigherOrderUDF::lambda_parameters`]
    ///
    /// For example, with `array_transform([1], v -> v == 5)`
    /// this field will be `[
    ///     ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)),
    ///     ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false))
    /// ]`
    pub arg_fields: &'a [ValueOrLambda<FieldRef, FieldRef>],
    /// Is argument `i` to the function a scalar (constant)?
    ///
    /// If the argument `i` is not a scalar, it will be None
    ///
    /// For example, if a function is called like `array_transform([1], v -> v == 5)`
    /// this field will be `[Some(ScalarValue::List(...), None]`
    pub scalar_arguments: &'a [Option<&'a ScalarValue>],
}

/// Arguments passed to [`HigherOrderUDF::invoke_with_args`] when invoking a
/// lambda function.
#[derive(Debug, Clone)]
pub struct HigherOrderFunctionArgs {
    /// The evaluated arguments and lambdas to the function
    pub args: Vec<ValueOrLambda<ColumnarValue, LambdaArgument>>,
    /// Field associated with each arg, if it exists
    /// For lambdas, it will be the field of the result of
    /// the lambda if evaluated with the parameters
    /// returned from [`HigherOrderUDF::lambda_parameters`]
    pub arg_fields: Vec<ValueOrLambda<FieldRef, FieldRef>>,
    /// The number of rows in record batch being evaluated
    pub number_rows: usize,
    /// The return field of the lambda function returned
    /// (from `return_field_from_args`) when creating the
    /// physical expression from the logical expression
    pub return_field: FieldRef,
    /// The config options at execution time
    pub config_options: Arc<ConfigOptions>,
}

/// A lambda argument to a HigherOrderFunction
#[derive(Clone, Debug)]
pub struct LambdaArgument {
    /// The parameters defined in this lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be `vec![Field::new("v", DataType::Int32, true)]`
    params: Vec<FieldRef>,
    /// The body of the lambda
    ///
    /// For example, for `array_transform([2], v -> -v)`,
    /// this will be the physical expression of `-v`
    body: Arc<dyn PhysicalExpr>,
}

impl LambdaArgument {
    /// Evaluate this lambda
    /// `args` should evalute to the value of each parameter
    /// of the correspondent lambda returned in [HigherOrderUDF::lambda_parameters].
    pub fn evaluate(
        &self,
        args: &[&dyn Fn() -> Result<ArrayRef>],
    ) -> Result<ColumnarValue> {
        let columns = args
            .iter()
            .take(self.params.len())
            .map(|arg| arg())
            .collect::<Result<_>>()?;

        let schema = Arc::new(Schema::new(self.params.clone()));

        let batch = RecordBatch::try_new(schema, columns)?;

        self.body.evaluate(&batch)
    }
}
array_transform lambda_parameters implementation
impl HigherOrderUDF for ArrayTransform {
fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>> {
        let list = if value_fields.len() == 1 {
            &value_fields[0]
        } else {
            return plan_err!(
                "{} function requires 1 value arguments, got {}",
                self.name(),
                value_fields.len()
            );
        };

        let field = match list.data_type() {
            DataType::List(field) => field,
            DataType::LargeList(field) => field,
            DataType::FixedSizeList(field, _) => field,
            _ => return plan_err!("expected list, got {list}"),
        };

        // we don't need to check whether the lambda contains more than two parameters,
        // e.g. array_transform([], (v, i, j) -> v+i+j), as datafusion will do that for us
        let value = Field::new("", field.data_type().clone(), field.is_nullable())
            .with_metadata(field.metadata().clone());

        Ok(vec![vec![value]])
    }
}
array_transform return_field_from_args implementation
fn value_lambda_pair<'a, V: Debug, L: Debug>(
    name: &str,
    args: &'a [ValueOrLambda<V, L>],
) -> Result<(&'a V, &'a L)> {
    let [value, lambda] = take_function_args(name, args)?;

    let (ValueOrLambda::Value(value), ValueOrLambda::Lambda(lambda)) = (value, lambda)
    else {
        return plan_err!(
            "{name} expects a value followed by a lambda, got {value:?} and {lambda:?}"
        );
    };

    Ok((value, lambda))
}

impl HigherOrderUDF for ArrayTransform {
    fn return_field_from_args(
        &self,
        args: HigherOrderReturnFieldArgs,
    ) -> Result<Arc<Field>> {
        let (list, lambda) = value_lambda_pair(self.name(), args.arg_fields)?;

        // lambda is the resulting field of executing the lambda body
        // with the parameters returned in lambda_parameters
        let field = Arc::new(Field::new(
            Field::LIST_FIELD_DEFAULT_NAME,
            lambda.data_type().clone(),
            lambda.is_nullable(),
        ));

        let return_type = match list.data_type() {
            DataType::List(_) => DataType::List(field),
            DataType::LargeList(_) => DataType::LargeList(field),
            DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size),
            other => plan_err!("expected list, got {other}")?,
        };

        Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
    }
}
array_transform invoke_with_args implementation
impl HigherOrderUDF for ArrayTransform {
fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result<ColumnarValue> {
        let (list, lambda) = value_lambda_pair(self.name(), &args.args)?;

        let list_array = list.to_array(args.number_rows)?;

        // Fast path for fully null input array and also the only way to safely work with
        // a fully null fixed size list array as it can't be handled by remove_list_null_values below
        if list_array.null_count() == list_array.len() {
            return Ok(ColumnarValue::Array(new_null_array(
                args.return_type(),
                list_array.len(),
            )));
        }

        // as per list_values docs, if list_array is sliced, list_values will be sliced too,
        // so before constructing the transformed array below, we must adjust the list offsets with
        // adjust_offsets_for_slice
        let list_values = list_values(&list_array)?;

        // by passing closures, lambda.evaluate can evaluate only those actually needed
        let values_param = || Ok(Arc::clone(&list_values));

        // call the transforming lambda
        let transformed_values = lambda
            .evaluate(&[&values_param])?
            .into_array(list_values.len())?;

        let field = match args.return_field.data_type() {
            DataType::List(field)
            | DataType::LargeList(field)
            | DataType::FixedSizeList(field, _) => Arc::clone(field),
            _ => {
                return exec_err!(
                    "{} expected ScalarFunctionArgs.return_field to be a list, got {}",
                    self.name(),
                    args.return_field
                );
            }
        };

        let transformed_list = match list_array.data_type() {
            DataType::List(_) => {
                let list = list_array.as_list();

                // since we called list_values above which would return sliced values for
                // a sliced list, we must adjust the offsets here as otherwise they would be invalid
                let adjusted_offsets = adjust_offsets_for_slice(list);

                Arc::new(ListArray::new(
                    field,
                    adjusted_offsets,
                    transformed_values,
                    list.nulls().cloned(),
                )) as ArrayRef
            }
            DataType::LargeList(_) => {
                let large_list = list_array.as_list();

                // since we called list_values above which would return sliced values for
                // a sliced list, we must adjust the offsets here as otherwise they would be invalid
                let adjusted_offsets = adjust_offsets_for_slice(large_list);

                Arc::new(LargeListArray::new(
                    field,
                    adjusted_offsets,
                    transformed_values,
                    large_list.nulls().cloned(),
                ))
            }
            DataType::FixedSizeList(_, value_length) => {
                Arc::new(FixedSizeListArray::new(
                    field,
                    *value_length,
                    transformed_values,
                    list_array.as_fixed_size_list().nulls().cloned(),
                ))
            }
            other => exec_err!("expected list, got {other}")?,
        };

        Ok(ColumnarValue::Array(transformed_list))
    }
}
How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example
// this is called at sql planning
let lambda_parameters = lambda_udf.lambda_parameters(&[
    Field::new("", DataType::new_list(DataType::Int32, false), false), // the Field of the [2, 3] literal
])?;

assert_eq!(
    lambda_parameters,
    vec![
            // the parameters that *can* be declared on the lambda, and not only 
            // those actually declared: the implementation doesn't need to care 
            // about it
            vec![
                Field::new("", DataType::Int32, false), // the list inner value
            ]]
);



// this is called every time ExprSchemable is called on a HigherOrderFunction
let return_field = array_transform.return_field_from_args(&LambdaReturnFieldArgs {
    arg_fields: &[
        ValueOrLambda::Value(Field::new("", DataType::new_list(DataType::Int32, false), false)),
        ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)), // the return_field of the expression "v != 2" when "v" is of the type returned in lambda_parameters
    ],
    scalar_arguments // irrelevant
})?;

assert_eq!(return_field, Field::new("", DataType::new_list(DataType::Boolean, false), false));



let value = array_transform.evaluate(&HigherOrderFunctionArgs {
    args: vec![
        ValueOrLambda::Value(List([2, 3])),
        ValueOrLambda::Lambda(LambdaArgument of `v -> v != 2`),
    ],
    arg_fields, // same as above
    number_rows: 1,
    return_field, // same as above
    config_options, // irrelevant
})?;

assert_eq!(value, BooleanArray::from([false, true]))


A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045


Why LambdaVariable and not Column:

Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".

Example of code of another traversal that would break:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // if this is a lambda column, this function will break
            used_columns.insert(col.index());
        }
        Ok(TreeNodeRecursion::Continue)
    });
    ...
}

Furthermore, the implemention of ExprSchemable and PhysicalExpr::return_field for Column expects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.

By including a FieldRef on LambdaVariable that should be resolved during construction time in the sql planner, ExprSchemable and PhysicalExpr::return_field simply return it's own Field:

LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
impl ExprSchemable for Expr {
   fn to_field(
        &self,
        schema: &dyn ExprSchema,
    ) -> Result<(Option<TableReference>, Arc<Field>)> {
        let (relation, schema_name) = self.qualified_name();
        let field = match self {
           Expr::LambdaVariable(l) => Ok(Arc::clone(&l.field)),
           ...
        }?;

        Ok((
            relation,
            Arc::new(field.as_ref().clone().with_name(schema_name)),
        ))
    }
    ...
}

impl PhysicalExpr for LambdaVariable {
    fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
        Ok(Arc::clone(&self.field))
    }
    ...
}

Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:
  1. Add a new set of TreeNode methods that provides the set of lambdas parameters names seen during the traversal, so column nodes can be tested if they refer to a regular column or to a lambda parameter. Any downstream user that wants to support lambdas would need use those methods instead of the existing ones. This also would add 1k+ lines to the PR.
impl Expr {
    pub fn transform_with_lambdas_params<
        F: FnMut(Self, &HashSet<String>) -> Result<Transformed<Self>>,
    >(
        self,
        mut f: F,
    ) -> Result<Transformed<Self>> {}
}

How minimize_join_filter would looks like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply_with_lambdas_params(|expr, lambdas_params| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !lambdas_params.contains(col.name()) {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}
  1. Add a flag to the Column node indicating if it refers to a lambda parameter. Still requires checking for it on existing tree traversals that works on Columns (30+) and also downstream.
//logical
struct Column {
    pub relation: Option<TableReference>,
    pub name: String,
    pub spans: Spans,
    pub is_lambda_parameter: bool,
}

//physical
struct Column {
    name: String,
    index: usize,
    is_lambda_parameter: bool,
}

How minimize_join_filter would look like:

fn minimize_join_filter(expr: Arc<dyn PhysicalExpr>, ...) -> JoinFilter {
    let mut used_columns = HashSet::new();
    expr.apply(|expr| {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            // dont include lambdas parameters
            if !col.is_lambda_parameter {
                used_columns.insert(col.index());
            }
        }
        Ok(TreeNodeRecursion::Continue)
    })
    ...
}
  1. Add a new set of TreeNode methods that provides a schema that includes the lambdas parameters for the scope of the node being visited/transformed:
impl Expr {
    pub fn transform_with_schema<
        F: FnMut(Self, &DFSchema) -> Result<Transformed<Self>>,
    >(
        self,
        schema: &DFSchema,
        f: F,
    ) -> Result<Transformed<Self>> { ... }
    ... other methods
}

For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters
How it would look like:

pub fn infer_placeholder_types(self, schema: &DFSchema) -> Result<(Expr, bool)> {
        let mut has_placeholder = false;
        // Provide the schema as the first argument. 
        // Transforming closure receive an adjusted_schema as argument
        self.transform_with_schema(schema, |mut expr, adjusted_schema| {
            match &mut expr {
                // Default to assuming the arguments are the same type
                Expr::BinaryExpr(BinaryExpr { left, op: _, right }) => {
                    // use adjusted_schema and not schema. Those expressions may contain 
                    // columns referring to a lambda parameter, which Field would only be
                    // available in adjusted_schema and not in schema
                    rewrite_placeholder(left.as_mut(), right.as_ref(), adjusted_schema)?;
                    rewrite_placeholder(right.as_mut(), left.as_ref(), adjusted_schema)?;
                }
    ....
  1. Make available trought LogicalPlan and ExecutionPlan nodes a schema that includes all lambdas parameters from all expressions owned by the node, and use this schema for tree traversals. For nodes which won't own any expression, the regular schema can be returned
impl LogicalPlan {
    fn lambda_extended_schema(&self) -> &DFSchema;
}

trait ExecutionPlan {
    fn lambda_extended_schema(&self) -> &DFSchema;
}

//usage
impl LogicalPlan {
    pub fn replace_params_with_values(
            self,
            param_values: &ParamValues,
        ) -> Result<LogicalPlan> {
            self.transform_up_with_subqueries(|plan| {
                // use plan.lambda_extended_schema() containing lambdas parameters
                // instead of plan.schema() which wont
                let lambda_extended_schema = Arc::clone(plan.lambda_extended_schema());
                let name_preserver = NamePreserver::new(&plan);
                plan.map_expressions(|e| {
                    // if this expression is child of lambda and contain columns referring it's parameters
                    // the lambda_extended_schema already contain them
                    let (e, has_placeholder) = e.infer_placeholder_types(&lambda_extended_schema)?;
    ....

Comment on lines +113 to +124
let arg_fields = args
.iter()
.enumerate()
.map(|(i, e)| match e.downcast_ref::<LambdaExpr>() {
Some(lambda) => {
lambda_positions.push(i);

Ok(ValueOrLambda::Lambda(lambda.body().return_field(schema)?))
}
None => Ok(ValueOrLambda::Value(e.return_field(schema)?)),
})
.collect::<Result<Vec<_>>>()?;
Copy link
Copy Markdown
Member

@rluvaton rluvaton Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like here you match lambda expression here only if the argument is lambda expression, but in the wrapped_lambda helper function you allow for nesting, because this will not allow nesting I would remove wrapped_lambda, if you do want to support wrapping I would add a test for those cases (wrapped and not wrapped)

_ => return None,
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be removed as I said in:

but anyway this has a bug for matching lambda expression for example matching array_filter:

array_transform(array_filter(list, v -> v > 0), v -> 1 / v) 

and also for expression that have more than one child

if wrapped_lambda(&children[*i]).is_none() {
return plan_err!("unable to unwrap lambda from {}", &children[*i]);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about added lambda arguments?, this only check if the lambda position from before are still lambda expressions

fun: Arc::clone(&self.fun),
args: children,
lambda_positions: self.lambda_positions.clone(),
return_field: Arc::clone(&self.return_field),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the return field might not match the function output after the new arguments, but you don't have the schema here so you cant check that and I see ScalarFunctionExpr have the same problem

Comment on lines +147 to +148
let value = Field::new("", field.data_type().clone(), field.is_nullable())
.with_metadata(field.metadata().clone());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing the name, I would think that just returning the list field is enough

Ok(vec![coerced])
}

fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>> {
Copy link
Copy Markdown
Member

@rluvaton rluvaton Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would I implement spark reduce function where there are 2 lambda functions and the input of the second one depend on the output of the first one, it seems like the current implementation of lambda functions infra does not allow that


let expected_list = create_i32_fsl(
3,
vec![1, 5, 10, 1, 5, 10, 1, 5, 10],
Copy link
Copy Markdown
Member

@rluvaton rluvaton Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment why this is the expected list values for the nulls (since we reuse the first valid list output)

Comment on lines +187 to +190
return Ok(ColumnarValue::Array(new_null_array(
args.return_type(),
list_array.len(),
)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this can return just ColumnarValue::Scalar


// Fast path for fully null input array and also the only way to safely work with
// a fully null fixed size list array as it can't be handled by remove_list_null_values below
if list_array.null_count() == list_array.len() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can add another fast path if all values are either nulls or empty, you can check that by checking list_array.offsets[0] == list_array.offsets[last]

for list and large list

Comment on lines +437 to +443
let list = create_i32_fsl(
3,
// 0 here for one of the values behind null, so if it will be evaluated
// it will fail due to divide by 0
vec![100, 20, 10, 0, 1, 2, 0, 1, 50],
Some(NullBuffer::from(vec![true, false, false])),
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you create fixed size list when the number of items is not equal to the list length (here fixed size list type is fixed(3) while the list itself have 3 items as well), I think this can hide bugs since I'm pretty sure you have in fixed size and nulls

DataType::LargeList(_) => {
Ok(Arc::new(truncate_list_nulls(array.as_list::<i64>())?))
}
DataType::FixedSizeList(_, _) => replace_nulls_with_first_valid(array),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can cause unintended behavior, like if I have random in the lambda expression or other expression that have state this will mutate more than needed

Comment on lines +333 to +337
/// // the value being transformed
/// Field::new("", DataType::Float32, false),
/// // the 1-based index being transformed, not used on the example above,
/// //but implementations doesn't need to care about it
/// Field::new("", DataType::Int32, false),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this create 2 fields with the same name, and both empty, please add comment about that, is it required to return empty name?

Comment on lines +375 to +383
/// Whether List, LargeList and FixedSizeList arguments should have it's
/// non-empty null sublists cleaned by Datafusion before invoking this function
///
/// The default implementation always returns true and should only be implemented
/// if you want to handle non-empty null sublists yourself
///
/// fully null fixed size list arrays should always be handled regardless of
/// the return of this function
// todo: extend this to listview and maps when remove_list_null_values supports it
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add reference to the function that does the cleaning so users can see reference implementation and what it does

Comment on lines +413 to +430
/// Determines which of the arguments passed to this function are evaluated eagerly
/// and which may be evaluated lazily.
///
/// If this function returns `None`, all arguments are eagerly evaluated.
/// Returning `None` is a micro optimization that saves a needless `Vec`
/// allocation.
///
/// If the function returns `Some`, returns (`eager`, `lazy`) where `eager`
/// are the arguments that are always evaluated, and `lazy` are the
/// arguments that may be evaluated lazily (i.e. may not be evaluated at all
/// in some cases).
///
/// Implementations must ensure that the two returned `Vec`s are disjunct,
/// and that each argument from `args` is present in one the two `Vec`s.
///
/// When overriding this function, [HigherOrderUDF::short_circuits] must
/// be overridden to return `true`.
fn conditional_arguments<'a>(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can yo clarify this this talk about the udf and NOT about the lambda function arguments

Comment on lines +240 to +243
/// this field will be `[
/// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)),
/// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false))
/// ]`
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// this field will be `[
/// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)),
/// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false))
/// ]`
/// this field will be
/// ```ignore
/// [
/// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)),
/// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false))
/// ]
/// ```

///
/// See [`array_transform.rs`] for a commented complete implementation
///
/// [`array_transform.rs`]: https://github.com/apache/datafusion/blob/main/datafusion/functions-nested/src/array_transform.rs
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comphead do we have some kind of CI checks that verify that links are valid?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we dont, but thanks for bringing this up. We need to add it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #21747

Comment on lines +1594 to +1615
#[test]
fn test_remove_list_null_values_fsl() {
let list = Arc::new(create_i32_fsl(
3,
vec![100, 20, 10, 0, 0, 0, 0, 0, 0],
Some(NullBuffer::from(vec![true, false, false])),
)) as ArrayRef;

let res = remove_list_null_values(&list).unwrap();

let expected = Arc::new(create_i32_fsl(
3,
vec![100, 20, 10, 100, 20, 10, 100, 20, 10],
Some(NullBuffer::from(vec![true, false, false])),
)) as ArrayRef;

assert_eq!(&res, &expected);
// check above skips inner value of nulls
assert_eq!(
res.as_fixed_size_list().values(),
expected.as_fixed_size_list().values()
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add more tests for fixed size list:

  1. no valid value
  2. all valid values
  3. empty list
  4. fixed is 0

@rluvaton
Copy link
Copy Markdown
Member

Looking at the code how can we support implementing spark reduce lambda function where the input of the second lambda (finish function) depend on the output for the first function (merge)

how can we do that in sql and how can we do that in when we are creating the physical expr ourself (like Comet does).

I just wanna know if we can implement that without creating breaking changes and if the current infrastructure support it

@pepijnve
Copy link
Copy Markdown
Contributor

Looking at the code how can we support implementing spark reduce
...

I just wanted to second this question. During my last pass through the changes I think I spotted a comment stating that some construct was there to support reduce. It would be good to already have at least a proof-of-concept implementation of this function in place as part of this PR to prove that the design works. The rule of thumb of trying to make at least three distinct implementations of a new abstraction so you understand the various needs probably applies here.

@LiaCastaneda
Copy link
Copy Markdown
Contributor

LiaCastaneda commented Apr 20, 2026

IIRC this PR supports multiple lambdas (link to comment @pepijnve mentions) . it's up to the caller to handle the execution of the multiple lambdas in invoke_with_args -- call first merge and then finish. For the sql syntax, I'm not familiar with how the parser works but I imagine the shape would be similar to how it's expressed in other engines.

@gstvg
Copy link
Copy Markdown
Contributor Author

gstvg commented Apr 20, 2026

@rluvaton @pepijnve @LiaCastaneda Thanks for bringing this up. This also applies to the accumulator parameter of the first lambda as well. In the past, I used the datatype of the start value on a local PoC. But thinking about it again, there are edge cases like reduce([1, 2], [], (acc, v) -> array_concat(acc, [to_string(v)]), str_array -> str_array[0]), the datatype of the start argument, [], would be List(Null) which is not what the user wants (List(Utf8)). Via SQL, that would require a explict cast on the initial value to work which is not great. Via expr_api I would expect the user to provide the start value with the correct type and when consuming a spark plan it should be of the correct type anyways.

On the other hand, DuckDB list_reduce without initial value would be restricted to either functions that reduce into the same type as the list value (or worse, to expressions that are correctly planned with an accumulator with DataType::Null). Some duckdb related issues duckdb#17009 duckdb#21032

I checked clickhouse and snowflake and both require an initial value just like spark. @LiaCastaneda trino link also show that it requires an initial value

I'll experiment with a new HigherOrderUDF method (we may keep both or use only this new one) and implement reduce with it:

    enum LambdaParametersStatus {
        Finished,
        CallAgain
    }

    fn multi_step_lambda_parameters(
        &self,
        value_fields: &[FieldRef],
        previous_step: Option<Vec<Vec<Field>>>,
    ) -> Result<(Vec<Vec<Field>>, LambdaParametersStatus)> {
        Ok((self.lambda_parameters(value_fields)?, LambdaParametersStatus::Finished))
    }

how can we do that in [...] when we are creating the physical expr ourself (like Comet does).

The spark lambda variable include it's datatype and nullability, so I think that lambda_parameters doesn't need to be invoked, then I assume the rest of the execution to be the same as with sql or expr_api

@LiaCastaneda For invoking the current version would indeed work for reduce/list_reduce, but the problem is during planning as the accumulator parameter datatype of the merge lambda depends on the lambda itself, and the parameter of finish lambda depends on the output of the merge lambda, which is not possible to express with the current single step lambda_parameters

[a, aa, aaa, aaaa, aaaaa]

query ?
SELECT array_transform([1,2,3,4,5], v -> list_repeat("a", v));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

//! # fn udafs(&self) -> HashSet<String> { unimplemented!() }
//! # fn udwfs(&self) -> HashSet<String> { unimplemented!() }
//! # fn udf(&self, _name: &str) -> Result<Arc<ScalarUDF>> { unimplemented!() }
//! # fn udhof(&self, name: &str) -> Result<Arc<dyn HigherOrderUDF>> { unimplemented!() }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still old name

//! # impl FunctionRegistry for SessionContext {
//! # fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> { Ok (None) }
//! # fn udfs(&self) -> HashSet<String> { unimplemented!() }
//! # fn udhofs(&self) -> HashSet<String> { unimplemented!() }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

)
}

fn udhof(&self, name: &str) -> Result<Arc<dyn datafusion_expr::HigherOrderUDF>> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please have a meaningful name

vec![]
}

fn udhofs(&self) -> std::collections::HashSet<String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please have a meaningful name

}
}

fn all_unique(params: &[sqlparser::ast::Ident]) -> bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets comment, what exactly uniqueness is being checked

statement ok
drop table t;

statement ok
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked tests with DuckDB, results are correct

@LiaCastaneda
Copy link
Copy Markdown
Contributor

@LiaCastaneda For invoking the current version would indeed work for reduce/list_reduce, but the problem is during planning as the accumulator parameter datatype of the merge lambda depends on the lambda itself, and the parameter of finish lambda depends on the output of the merge lambda, which is not possible to express with the current single step lambda_parameters

I understand, thanks for explaining, so the issue with these kinds of lambdas is that we can't always rely on inferring the type from the init value. looks like we have two issues with type inference:

the following query would break because we don't know the init type:
SELECT reduce(array(1, 2, 3), [], (acc, x) -> array_append(acc, x), acc -> array_length(acc))
^ that would be fixed by requiring the user to provide a valid typed value, same as other engines.

And also, iiuc we also need a solution to handle cases like:
SELECT reduce(array(1, 2, 3), 0, (acc, x) -> cast(acc as varchar), acc -> upper(acc))
because the init type is different from the finish lambda's input type, which is where the multistep solution you mention help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate catalog Related to the catalog crate common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate documentation Improvements or additions to documentation execution Related to the execution crate ffi Changes to the ffi crate functions Changes to functions implementation logical-expr Logical plan and expressions optimizer Optimizer rules physical-expr Changes to the physical-expr crates proto Related to proto crate spark sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants