Add lambda support and array_transform udf#21679
Conversation
| let arg_fields = args | ||
| .iter() | ||
| .enumerate() | ||
| .map(|(i, e)| match e.downcast_ref::<LambdaExpr>() { | ||
| Some(lambda) => { | ||
| lambda_positions.push(i); | ||
|
|
||
| Ok(ValueOrLambda::Lambda(lambda.body().return_field(schema)?)) | ||
| } | ||
| None => Ok(ValueOrLambda::Value(e.return_field(schema)?)), | ||
| }) | ||
| .collect::<Result<Vec<_>>>()?; |
There was a problem hiding this comment.
it looks like here you match lambda expression here only if the argument is lambda expression, but in the wrapped_lambda helper function you allow for nesting, because this will not allow nesting I would remove wrapped_lambda, if you do want to support wrapping I would add a test for those cases (wrapped and not wrapped)
| _ => return None, | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
I think this should be removed as I said in:
but anyway this has a bug for matching lambda expression for example matching array_filter:
array_transform(array_filter(list, v -> v > 0), v -> 1 / v)
and also for expression that have more than one child
| if wrapped_lambda(&children[*i]).is_none() { | ||
| return plan_err!("unable to unwrap lambda from {}", &children[*i]); | ||
| } | ||
| } |
There was a problem hiding this comment.
what about added lambda arguments?, this only check if the lambda position from before are still lambda expressions
| fun: Arc::clone(&self.fun), | ||
| args: children, | ||
| lambda_positions: self.lambda_positions.clone(), | ||
| return_field: Arc::clone(&self.return_field), |
There was a problem hiding this comment.
FYI, the return field might not match the function output after the new arguments, but you don't have the schema here so you cant check that and I see ScalarFunctionExpr have the same problem
| let value = Field::new("", field.data_type().clone(), field.is_nullable()) | ||
| .with_metadata(field.metadata().clone()); |
There was a problem hiding this comment.
why removing the name, I would think that just returning the list field is enough
| Ok(vec![coerced]) | ||
| } | ||
|
|
||
| fn lambda_parameters(&self, value_fields: &[FieldRef]) -> Result<Vec<Vec<Field>>> { |
There was a problem hiding this comment.
How would I implement spark reduce function where there are 2 lambda functions and the input of the second one depend on the output of the first one, it seems like the current implementation of lambda functions infra does not allow that
|
|
||
| let expected_list = create_i32_fsl( | ||
| 3, | ||
| vec![1, 5, 10, 1, 5, 10, 1, 5, 10], |
There was a problem hiding this comment.
please add a comment why this is the expected list values for the nulls (since we reuse the first valid list output)
| return Ok(ColumnarValue::Array(new_null_array( | ||
| args.return_type(), | ||
| list_array.len(), | ||
| ))); |
There was a problem hiding this comment.
Nit: this can return just ColumnarValue::Scalar
|
|
||
| // Fast path for fully null input array and also the only way to safely work with | ||
| // a fully null fixed size list array as it can't be handled by remove_list_null_values below | ||
| if list_array.null_count() == list_array.len() { |
There was a problem hiding this comment.
Nit: you can add another fast path if all values are either nulls or empty, you can check that by checking list_array.offsets[0] == list_array.offsets[last]
for list and large list
| let list = create_i32_fsl( | ||
| 3, | ||
| // 0 here for one of the values behind null, so if it will be evaluated | ||
| // it will fail due to divide by 0 | ||
| vec![100, 20, 10, 0, 1, 2, 0, 1, 50], | ||
| Some(NullBuffer::from(vec![true, false, false])), | ||
| ); |
There was a problem hiding this comment.
can you create fixed size list when the number of items is not equal to the list length (here fixed size list type is fixed(3) while the list itself have 3 items as well), I think this can hide bugs since I'm pretty sure you have in fixed size and nulls
| DataType::LargeList(_) => { | ||
| Ok(Arc::new(truncate_list_nulls(array.as_list::<i64>())?)) | ||
| } | ||
| DataType::FixedSizeList(_, _) => replace_nulls_with_first_valid(array), |
There was a problem hiding this comment.
this can cause unintended behavior, like if I have random in the lambda expression or other expression that have state this will mutate more than needed
| /// // the value being transformed | ||
| /// Field::new("", DataType::Float32, false), | ||
| /// // the 1-based index being transformed, not used on the example above, | ||
| /// //but implementations doesn't need to care about it | ||
| /// Field::new("", DataType::Int32, false), |
There was a problem hiding this comment.
this create 2 fields with the same name, and both empty, please add comment about that, is it required to return empty name?
| /// Whether List, LargeList and FixedSizeList arguments should have it's | ||
| /// non-empty null sublists cleaned by Datafusion before invoking this function | ||
| /// | ||
| /// The default implementation always returns true and should only be implemented | ||
| /// if you want to handle non-empty null sublists yourself | ||
| /// | ||
| /// fully null fixed size list arrays should always be handled regardless of | ||
| /// the return of this function | ||
| // todo: extend this to listview and maps when remove_list_null_values supports it |
There was a problem hiding this comment.
please add reference to the function that does the cleaning so users can see reference implementation and what it does
| /// Determines which of the arguments passed to this function are evaluated eagerly | ||
| /// and which may be evaluated lazily. | ||
| /// | ||
| /// If this function returns `None`, all arguments are eagerly evaluated. | ||
| /// Returning `None` is a micro optimization that saves a needless `Vec` | ||
| /// allocation. | ||
| /// | ||
| /// If the function returns `Some`, returns (`eager`, `lazy`) where `eager` | ||
| /// are the arguments that are always evaluated, and `lazy` are the | ||
| /// arguments that may be evaluated lazily (i.e. may not be evaluated at all | ||
| /// in some cases). | ||
| /// | ||
| /// Implementations must ensure that the two returned `Vec`s are disjunct, | ||
| /// and that each argument from `args` is present in one the two `Vec`s. | ||
| /// | ||
| /// When overriding this function, [HigherOrderUDF::short_circuits] must | ||
| /// be overridden to return `true`. | ||
| fn conditional_arguments<'a>( |
There was a problem hiding this comment.
can yo clarify this this talk about the udf and NOT about the lambda function arguments
| /// this field will be `[ | ||
| /// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)), | ||
| /// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)) | ||
| /// ]` |
There was a problem hiding this comment.
| /// this field will be `[ | |
| /// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)), | |
| /// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)) | |
| /// ]` | |
| /// this field will be | |
| /// ```ignore | |
| /// [ | |
| /// ValueOrLambda::Value(Field::new("", DataType::List(DataType::Int32), false)), | |
| /// ValueOrLambda::Lambda(Field::new("", DataType::Boolean, false)) | |
| /// ] | |
| /// ``` |
| /// | ||
| /// See [`array_transform.rs`] for a commented complete implementation | ||
| /// | ||
| /// [`array_transform.rs`]: https://github.com/apache/datafusion/blob/main/datafusion/functions-nested/src/array_transform.rs |
There was a problem hiding this comment.
@comphead do we have some kind of CI checks that verify that links are valid?
There was a problem hiding this comment.
Unfortunately we dont, but thanks for bringing this up. We need to add it
| #[test] | ||
| fn test_remove_list_null_values_fsl() { | ||
| let list = Arc::new(create_i32_fsl( | ||
| 3, | ||
| vec![100, 20, 10, 0, 0, 0, 0, 0, 0], | ||
| Some(NullBuffer::from(vec![true, false, false])), | ||
| )) as ArrayRef; | ||
|
|
||
| let res = remove_list_null_values(&list).unwrap(); | ||
|
|
||
| let expected = Arc::new(create_i32_fsl( | ||
| 3, | ||
| vec![100, 20, 10, 100, 20, 10, 100, 20, 10], | ||
| Some(NullBuffer::from(vec![true, false, false])), | ||
| )) as ArrayRef; | ||
|
|
||
| assert_eq!(&res, &expected); | ||
| // check above skips inner value of nulls | ||
| assert_eq!( | ||
| res.as_fixed_size_list().values(), | ||
| expected.as_fixed_size_list().values() | ||
| ); |
There was a problem hiding this comment.
can you please add more tests for fixed size list:
- no valid value
- all valid values
- empty list
- fixed is 0
|
Looking at the code how can we support implementing spark how can we do that in sql and how can we do that in when we are creating the physical expr ourself (like Comet does). I just wanna know if we can implement that without creating breaking changes and if the current infrastructure support it |
I just wanted to second this question. During my last pass through the changes I think I spotted a comment stating that some construct was there to support |
|
IIRC this PR supports multiple lambdas (link to comment @pepijnve mentions) . it's up to the caller to handle the execution of the multiple lambdas in |
|
@rluvaton @pepijnve @LiaCastaneda Thanks for bringing this up. This also applies to the accumulator parameter of the first lambda as well. In the past, I used the datatype of the start value on a local PoC. But thinking about it again, there are edge cases like On the other hand, DuckDB list_reduce without initial value would be restricted to either functions that reduce into the same type as the list value (or worse, to expressions that are correctly planned with an accumulator with I checked clickhouse and snowflake and both require an initial value just like spark. @LiaCastaneda trino link also show that it requires an initial value I'll experiment with a new enum LambdaParametersStatus {
Finished,
CallAgain
}
fn multi_step_lambda_parameters(
&self,
value_fields: &[FieldRef],
previous_step: Option<Vec<Vec<Field>>>,
) -> Result<(Vec<Vec<Field>>, LambdaParametersStatus)> {
Ok((self.lambda_parameters(value_fields)?, LambdaParametersStatus::Finished))
}
The spark lambda variable include it's datatype and nullability, so I think that @LiaCastaneda For invoking the current version would indeed work for reduce/list_reduce, but the problem is during planning as the accumulator parameter datatype of the merge lambda depends on the lambda itself, and the parameter of finish lambda depends on the output of the merge lambda, which is not possible to express with the current single step |
| [a, aa, aaa, aaaa, aaaaa] | ||
|
|
||
| query ? | ||
| SELECT array_transform([1,2,3,4,5], v -> list_repeat("a", v)); |
| //! # fn udafs(&self) -> HashSet<String> { unimplemented!() } | ||
| //! # fn udwfs(&self) -> HashSet<String> { unimplemented!() } | ||
| //! # fn udf(&self, _name: &str) -> Result<Arc<ScalarUDF>> { unimplemented!() } | ||
| //! # fn udhof(&self, name: &str) -> Result<Arc<dyn HigherOrderUDF>> { unimplemented!() } |
| //! # impl FunctionRegistry for SessionContext { | ||
| //! # fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> { Ok (None) } | ||
| //! # fn udfs(&self) -> HashSet<String> { unimplemented!() } | ||
| //! # fn udhofs(&self) -> HashSet<String> { unimplemented!() } |
| ) | ||
| } | ||
|
|
||
| fn udhof(&self, name: &str) -> Result<Arc<dyn datafusion_expr::HigherOrderUDF>> { |
There was a problem hiding this comment.
please have a meaningful name
| vec![] | ||
| } | ||
|
|
||
| fn udhofs(&self) -> std::collections::HashSet<String> { |
There was a problem hiding this comment.
please have a meaningful name
| } | ||
| } | ||
|
|
||
| fn all_unique(params: &[sqlparser::ast::Ident]) -> bool { |
There was a problem hiding this comment.
lets comment, what exactly uniqueness is being checked
| statement ok | ||
| drop table t; | ||
|
|
||
| statement ok |
There was a problem hiding this comment.
checked tests with DuckDB, results are correct
I understand, thanks for explaining, so the issue with these kinds of lambdas is that we can't always rely on inferring the type from the init value. looks like we have two issues with type inference: the following query would break because we don't know the init type: And also, iiuc we also need a solution to handle cases like: |
This a clean version of #18921 to make it easier to review
this is a breaking change due to adding variant to
Exprenum, new methods on traitsSession,FunctionRegistryandContextProviderand a new arg onTaskContext::newThis PR adds support for lambdas with column capture and the
array_transformfunction used to test the lambda implementation.Example usage:
Note: column capture has been removed for now and will be added on a follow on PR, see #21172
Some comments on code snippets of this doc show what value each struct, variant or field would hold after planning the first example above. Some literals are simplified pseudo code
3 new
Exprvariants are added,HigherOrderFunction, owing a new traitHigherOrderUDF, which is like aScalarFunction/ScalarUDFImplwith support for lambdas,Lambda, for the lambda body and it's parameters names, andLambdaVariable, which is likeColumnbut for lambdas parameters.Their logical representations:
The example would be planned into a tree like this:
The physical counterparts definition:
Note: For those who primarly wants to check if this lambda implementation supports their usecase and don't want to spend much time here, it's okay to skip most collapsed blocks, as those serve mostly to help code reviewers, with the exception of
HigherOrderUDFand thearray_transformimplementation ofHigherOrderUDFrelevant methods, collapsed due to their sizeThe added
HigherOrderUDFtrait is almost a clone ofScalarUDFImpl, with the exception of:return_field_from_argsandinvoke_with_args, where nowargs.argsis a list of enums with two variants:ValueorLambdainstead of a list of valueslambda_parameters, which return aFieldfor each parameter supported for every lambda argument based on theFieldof the non lambda argumentsreturn_fieldand the deprecated onesis_nullableanddisplay_name.HigherOrderUDF
array_transform lambda_parameters implementation
array_transform return_field_from_args implementation
array_transform invoke_with_args implementation
How relevant HigherOrderUDF methods would be called and what they would return during planning and evaluation of the example
A pair HigherOrderUDF/HigherOrderUDFImpl like ScalarFunction was not used because those exist only to maintain backwards compatibility with the older API #8045
Why
LambdaVariableand notColumn:Existing tree traversals that operate on columns would break if some column nodes referenced to a lambda parameter and not a real column. In the example query, projection pushdown would try to push the lambda parameter "v", which won't exist in table "t".
Example of code of another traversal that would break:
Furthermore, the implemention of
ExprSchemableandPhysicalExpr::return_fieldforColumnexpects that the schema it receives as a argument contains an entry for its name, which is not the case for lambda parameters.By including a
FieldRefonLambdaVariablethat should be resolved during construction time in the sql planner,ExprSchemableandPhysicalExpr::return_fieldsimply return it's own Field:LambdaVariable ExprSchemable and PhysicalExpr::return_field implementation
Possible alternatives discarded due to complexity, requiring downstream changes and implementation size:
How minimize_join_filter would looks like:
How minimize_join_filter would look like:
For any given HigherOrderFunction found during the traversal, a new schema is created for each lambda argument that contains it's parameter, returned from HigherOrderUDF::lambda_parameters
How it would look like: