Skip to content

Commit 1a0c2e0

Browse files
authored
Use return_field_from_args in information schema and date_trunc (#20079)
## Which issue does this PR close? - Closes #19870. ## Rationale for this change Some UDFs/UDAFs implement `return_field_from_args` / `return_field` instead of `return_type`. The information schema was calling `return_type` directly, which fails for those functions. The default implementation of `return_field_from_args` already delegates to `return_type`, so switching to the newer API works for all functions. ## What changes are included in this PR? - **`information_schema.rs`**: `get_udf_args_and_return_types` now calls `return_field_from_args` instead of `return_type`; `get_udaf_args_and_return_types` now calls `return_field` instead of `return_type`. Removed stale comments referencing the old API. - **`date_trunc.rs`**: `return_type` now returns `internal_err`, and `return_field_from_args` is self-contained (no longer delegates to `return_type`), following the same pattern as other UDFs like `named_struct` and `map_from_arrays` (ref: #19275). ## Are these changes tested? Covered by existing information_schema sqllogictests and `datafusion-functions` unit tests. ## Are there any user-facing changes? No.
1 parent c849374 commit 1a0c2e0

3 files changed

Lines changed: 69 additions & 31 deletions

File tree

datafusion/catalog/src/information_schema.rs

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{CatalogProviderList, SchemaProvider, TableProvider};
2424
use arrow::array::builder::{BooleanBuilder, UInt8Builder};
2525
use arrow::{
2626
array::{StringBuilder, UInt64Builder},
27-
datatypes::{DataType, Field, Schema, SchemaRef},
27+
datatypes::{DataType, Field, FieldRef, Schema, SchemaRef},
2828
record_batch::RecordBatch,
2929
};
3030
use async_trait::async_trait;
@@ -34,7 +34,10 @@ use datafusion_common::error::Result;
3434
use datafusion_common::types::NativeType;
3535
use datafusion_execution::TaskContext;
3636
use datafusion_execution::runtime_env::RuntimeEnv;
37-
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
37+
use datafusion_expr::function::WindowUDFFieldArgs;
38+
use datafusion_expr::{
39+
AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF,
40+
};
3841
use datafusion_expr::{TableType, Volatility};
3942
use datafusion_physical_plan::SendableRecordBatchStream;
4043
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -421,10 +424,24 @@ fn get_udf_args_and_return_types(
421424
Ok(arg_types
422425
.into_iter()
423426
.map(|arg_types| {
424-
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
427+
let arg_fields: Vec<FieldRef> = arg_types
428+
.iter()
429+
.enumerate()
430+
.map(|(i, t)| {
431+
Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
432+
})
433+
.collect();
434+
let scalar_arguments = vec![None; arg_fields.len()];
425435
let return_type = udf
426-
.return_type(&arg_types)
427-
.map(|t| remove_native_type_prefix(&NativeType::from(t)))
436+
.return_field_from_args(ReturnFieldArgs {
437+
arg_fields: &arg_fields,
438+
scalar_arguments: &scalar_arguments,
439+
})
440+
.map(|f| {
441+
remove_native_type_prefix(&NativeType::from(
442+
f.data_type().clone(),
443+
))
444+
})
428445
.ok();
429446
let arg_types = arg_types
430447
.into_iter()
@@ -447,11 +464,21 @@ fn get_udaf_args_and_return_types(
447464
Ok(arg_types
448465
.into_iter()
449466
.map(|arg_types| {
450-
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
467+
let arg_fields: Vec<FieldRef> = arg_types
468+
.iter()
469+
.enumerate()
470+
.map(|(i, t)| {
471+
Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
472+
})
473+
.collect();
451474
let return_type = udaf
452-
.return_type(&arg_types)
453-
.ok()
454-
.map(|t| remove_native_type_prefix(&NativeType::from(t)));
475+
.return_field(&arg_fields)
476+
.map(|f| {
477+
remove_native_type_prefix(&NativeType::from(
478+
f.data_type().clone(),
479+
))
480+
})
481+
.ok();
455482
let arg_types = arg_types
456483
.into_iter()
457484
.map(|t| remove_native_type_prefix(&NativeType::from(t)))
@@ -473,12 +500,26 @@ fn get_udwf_args_and_return_types(
473500
Ok(arg_types
474501
.into_iter()
475502
.map(|arg_types| {
476-
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
503+
let arg_fields: Vec<FieldRef> = arg_types
504+
.iter()
505+
.enumerate()
506+
.map(|(i, t)| {
507+
Arc::new(Field::new(format!("arg_{i}"), t.clone(), true))
508+
})
509+
.collect();
510+
let return_type = udwf
511+
.field(WindowUDFFieldArgs::new(&arg_fields, udwf.name()))
512+
.map(|f| {
513+
remove_native_type_prefix(&NativeType::from(
514+
f.data_type().clone(),
515+
))
516+
})
517+
.ok();
477518
let arg_types = arg_types
478519
.into_iter()
479520
.map(|t| remove_native_type_prefix(&NativeType::from(t)))
480521
.collect::<Vec<_>>();
481-
(arg_types, None)
522+
(arg_types, return_type)
482523
})
483524
.collect::<BTreeSet<_>>())
484525
}

datafusion/functions/benches/date_trunc.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::datatypes::Field;
2525
use criterion::{Criterion, criterion_group, criterion_main};
2626
use datafusion_common::ScalarValue;
2727
use datafusion_common::config::ConfigOptions;
28-
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
28+
use datafusion_expr::{ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs};
2929
use datafusion_functions::datetime::date_trunc;
3030
use rand::Rng;
3131
use rand::rngs::ThreadRng;
@@ -57,10 +57,13 @@ fn criterion_benchmark(c: &mut Criterion) {
5757
})
5858
.collect::<Vec<_>>();
5959

60-
let return_type = udf
61-
.return_type(&args.iter().map(|arg| arg.data_type()).collect::<Vec<_>>())
60+
let scalar_arguments = vec![None; arg_fields.len()];
61+
let return_field = udf
62+
.return_field_from_args(ReturnFieldArgs {
63+
arg_fields: &arg_fields,
64+
scalar_arguments: &scalar_arguments,
65+
})
6266
.unwrap();
63-
let return_field = Arc::new(Field::new("f", return_type, true));
6467
let config_options = Arc::new(ConfigOptions::default());
6568

6669
b.iter(|| {

datafusion/functions/src/datetime/date_trunc.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use arrow::datatypes::{Field, FieldRef};
3838
use datafusion_common::cast::as_primitive_array;
3939
use datafusion_common::types::{NativeType, logical_date, logical_string};
4040
use datafusion_common::{
41-
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
41+
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
4242
};
4343
use datafusion_expr::sort_properties::{ExprProperties, SortProperties};
4444
use datafusion_expr::{
@@ -223,27 +223,21 @@ impl ScalarUDFImpl for DateTruncFunc {
223223
&self.signature
224224
}
225225

226-
// keep return_type implementation for information schema generation
227-
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
228-
if arg_types[1].is_null() {
229-
Ok(Timestamp(Nanosecond, None))
230-
} else {
231-
Ok(arg_types[1].clone())
232-
}
226+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
227+
internal_err!("return_field_from_args should be called instead")
233228
}
234229

235230
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
236-
let data_types = args
237-
.arg_fields
238-
.iter()
239-
.map(|f| f.data_type())
240-
.cloned()
241-
.collect::<Vec<_>>();
242-
let return_type = self.return_type(&data_types)?;
231+
let field = &args.arg_fields[1];
232+
let return_type = if field.data_type().is_null() {
233+
Timestamp(Nanosecond, None)
234+
} else {
235+
field.data_type().clone()
236+
};
243237
Ok(Arc::new(Field::new(
244238
self.name(),
245239
return_type,
246-
args.arg_fields[1].is_nullable(),
240+
field.is_nullable(),
247241
)))
248242
}
249243

0 commit comments

Comments
 (0)