Skip to content

Commit 310dd5d

Browse files
authored
Support Dictionary Arrays in MIN/MAX Aggregates (#21315)
## Which issue does this PR close? * Closes #21150. --- ## Rationale for this change The existing implementation of `min`/`max` does not correctly support dictionary-encoded arrays. Previously, dictionary arrays were handled by directly evaluating their underlying values array, which is semantically incorrect because: * It may include unreferenced values that do not appear in the logical dataset * It ignores nulls in the key array * It does not preserve dictionary key semantics in scalar results This leads to incorrect aggregation results for dictionary types. This PR introduces a logical row-based evaluation for dictionary arrays and ensures scalar comparisons correctly unwrap and rewrap dictionary values when needed. --- ## What changes are included in this PR? * Add logical row-based min/max computation (`scalar_row_extreme`) for: * Dictionary arrays * Struct, List, LargeList, and FixedSizeList types * Replace previous dictionary handling that operated on `values()` with correct row-wise evaluation * Introduce `requires_logical_row_scan` to centralize fallback logic for complex types * Enhance scalar comparison logic: * Unwrap dictionary scalars before comparison * Rewrap results when both inputs are dictionaries with matching key types * Validate key type compatibility * Improve error messaging for incompatible scalar comparisons * Remove obsolete `min_max_batch_generic` implementation --- ## Are these changes tested? Yes. Comprehensive tests have been added to validate correctness across multiple scenarios: * Basic dictionary min/max behavior * Handling of null keys and null values * Ignoring unreferenced dictionary values * Multi-batch aggregation behavior * Float dictionary handling including `NaN` and infinities These tests ensure correctness and guard against regressions. --- ## Are there any user-facing changes? Yes. The behavior of `min` and `max` on dictionary-encoded arrays is now: * Correct and semantically aligned with logical row values * Consistent with other data types Previously incorrect results may now differ, which is a correctness fix rather than a breaking API change. --- ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent f802ed1 commit 310dd5d

2 files changed

Lines changed: 329 additions & 29 deletions

File tree

datafusion/functions-aggregate-common/src/min_max.rs

Lines changed: 152 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
//! Basic min/max functionality shared across DataFusion aggregate functions
1919
2020
use arrow::array::{
21-
ArrayRef, AsArray as _, BinaryArray, BinaryViewArray, BooleanArray, Date32Array,
22-
Date64Array, Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
21+
ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
22+
Decimal32Array, Decimal64Array, Decimal128Array, Decimal256Array,
2323
DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray,
2424
DurationSecondArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
2525
Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
@@ -141,10 +141,25 @@ macro_rules! min_max_generic {
141141
}};
142142
}
143143

144-
// min/max of two scalar values of the same type
145144
macro_rules! min_max {
146145
($VALUE:expr, $DELTA:expr, $OP:ident) => {{
147-
Ok(match ($VALUE, $DELTA) {
146+
match choose_min_max!($OP) {
147+
Ordering::Greater => Ok(min_max_scalar_impl!($VALUE, $DELTA, min)),
148+
Ordering::Less => Ok(min_max_scalar_impl!($VALUE, $DELTA, max)),
149+
Ordering::Equal => {
150+
unreachable!("min/max comparisons do not use equal ordering")
151+
}
152+
}
153+
}};
154+
}
155+
156+
// min/max of two logically compatible scalar values.
157+
// Dictionary scalars participate by comparing their inner logical values.
158+
// When both inputs are dictionaries, matching key types are preserved in the
159+
// result; differing key types remain an unexpected invariant violation.
160+
macro_rules! min_max_scalar_impl {
161+
($VALUE:expr, $DELTA:expr, $OP:ident) => {{
162+
match ($VALUE, $DELTA) {
148163
(ScalarValue::Null, ScalarValue::Null) => ScalarValue::Null,
149164
(
150165
lhs @ ScalarValue::Decimal32(lhsv, lhsp, lhss),
@@ -413,16 +428,54 @@ macro_rules! min_max {
413428
min_max_generic!(lhs, rhs, $OP)
414429
}
415430

431+
(
432+
ScalarValue::Dictionary(lhs_dict_key_type, lhs_dict_value),
433+
ScalarValue::Dictionary(rhs_dict_key_type, rhs_dict_value),
434+
) => {
435+
if lhs_dict_key_type != rhs_dict_key_type {
436+
return internal_err!(
437+
"MIN/MAX is not expected to receive dictionary scalars with different key types ({:?} vs {:?})",
438+
lhs_dict_key_type,
439+
rhs_dict_key_type
440+
);
441+
}
442+
443+
let result = min_max_scalar(
444+
lhs_dict_value.as_ref(),
445+
rhs_dict_value.as_ref(),
446+
choose_min_max!($OP),
447+
)?;
448+
ScalarValue::Dictionary(lhs_dict_key_type.clone(), Box::new(result))
449+
}
450+
(ScalarValue::Dictionary(_, lhs_dict_value), rhs_scalar) => {
451+
min_max_scalar(lhs_dict_value.as_ref(), rhs_scalar, choose_min_max!($OP))?
452+
}
453+
(lhs_scalar, ScalarValue::Dictionary(_, rhs_dict_value)) => {
454+
min_max_scalar(lhs_scalar, rhs_dict_value.as_ref(), choose_min_max!($OP))?
455+
}
456+
416457
e => {
417458
return internal_err!(
418-
"MIN/MAX is not expected to receive scalars of incompatible types {:?}",
459+
"MIN/MAX is not expected to receive logically incompatible scalar values {:?}",
419460
e
420461
)
421462
}
422-
})
463+
}
423464
}};
424465
}
425466

467+
fn min_max_scalar(
468+
lhs: &ScalarValue,
469+
rhs: &ScalarValue,
470+
ordering: Ordering,
471+
) -> Result<ScalarValue> {
472+
match ordering {
473+
Ordering::Greater => Ok(min_max_scalar_impl!(lhs, rhs, min)),
474+
Ordering::Less => Ok(min_max_scalar_impl!(lhs, rhs, max)),
475+
Ordering::Equal => unreachable!("min/max comparisons do not use equal ordering"),
476+
}
477+
}
478+
426479
/// An accumulator to compute the maximum value
427480
#[derive(Debug, Clone)]
428481
pub struct MaxAccumulator {
@@ -760,16 +813,11 @@ pub fn min_batch(values: &ArrayRef) -> Result<ScalarValue> {
760813
min_binary_view
761814
)
762815
}
763-
DataType::Struct(_) => min_max_batch_generic(values, Ordering::Greater)?,
764-
DataType::List(_) => min_max_batch_generic(values, Ordering::Greater)?,
765-
DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Greater)?,
766-
DataType::FixedSizeList(_, _) => {
767-
min_max_batch_generic(values, Ordering::Greater)?
768-
}
769-
DataType::Dictionary(_, _) => {
770-
let values = values.as_any_dictionary().values();
771-
min_batch(values)?
772-
}
816+
DataType::Struct(_)
817+
| DataType::List(_)
818+
| DataType::LargeList(_)
819+
| DataType::FixedSizeList(_, _)
820+
| DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Greater)?,
773821
_ => min_max_batch!(values, min),
774822
})
775823
}
@@ -843,14 +891,94 @@ pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> {
843891
let value = value.map(|e| e.to_vec());
844892
ScalarValue::FixedSizeBinary(*size, value)
845893
}
846-
DataType::Struct(_) => min_max_batch_generic(values, Ordering::Less)?,
847-
DataType::List(_) => min_max_batch_generic(values, Ordering::Less)?,
848-
DataType::LargeList(_) => min_max_batch_generic(values, Ordering::Less)?,
849-
DataType::FixedSizeList(_, _) => min_max_batch_generic(values, Ordering::Less)?,
850-
DataType::Dictionary(_, _) => {
851-
let values = values.as_any_dictionary().values();
852-
max_batch(values)?
853-
}
894+
DataType::Struct(_)
895+
| DataType::List(_)
896+
| DataType::LargeList(_)
897+
| DataType::FixedSizeList(_, _)
898+
| DataType::Dictionary(_, _) => min_max_batch_generic(values, Ordering::Less)?,
854899
_ => min_max_batch!(values, max),
855900
})
856901
}
902+
903+
#[cfg(test)]
904+
mod tests {
905+
use super::*;
906+
907+
#[test]
908+
fn min_max_dictionary_and_scalar_compare_by_inner_value() -> Result<()> {
909+
let dictionary = ScalarValue::Dictionary(
910+
Box::new(DataType::Int32),
911+
Box::new(ScalarValue::Float32(Some(1.0))),
912+
);
913+
let scalar = ScalarValue::Float32(Some(2.0));
914+
915+
let result = min_max_scalar(&dictionary, &scalar, Ordering::Less)?;
916+
917+
assert_eq!(result, ScalarValue::Float32(Some(2.0)));
918+
Ok(())
919+
}
920+
921+
#[test]
922+
fn min_max_dictionary_same_key_type_rewraps_result() -> Result<()> {
923+
let lhs = ScalarValue::Dictionary(
924+
Box::new(DataType::Int32),
925+
Box::new(ScalarValue::Float32(Some(1.0))),
926+
);
927+
let rhs = ScalarValue::Dictionary(
928+
Box::new(DataType::Int32),
929+
Box::new(ScalarValue::Float32(Some(2.0))),
930+
);
931+
932+
let result = min_max_scalar(&lhs, &rhs, Ordering::Less)?;
933+
934+
assert_eq!(
935+
result,
936+
ScalarValue::Dictionary(
937+
Box::new(DataType::Int32),
938+
Box::new(ScalarValue::Float32(Some(2.0))),
939+
)
940+
);
941+
Ok(())
942+
}
943+
944+
#[test]
945+
fn min_max_dictionary_different_key_types_error() -> Result<()> {
946+
let lhs = ScalarValue::Dictionary(
947+
Box::new(DataType::Int8),
948+
Box::new(ScalarValue::Float32(Some(1.0))),
949+
);
950+
let rhs = ScalarValue::Dictionary(
951+
Box::new(DataType::Int32),
952+
Box::new(ScalarValue::Float32(Some(2.0))),
953+
);
954+
955+
let error: DataFusionError =
956+
min_max_scalar(&lhs, &rhs, Ordering::Less).unwrap_err();
957+
958+
assert!(
959+
error
960+
.to_string()
961+
.contains("dictionary scalars with different key types")
962+
);
963+
Ok(())
964+
}
965+
966+
#[test]
967+
fn min_max_dictionary_and_incompatible_scalar_error() -> Result<()> {
968+
let dictionary = ScalarValue::Dictionary(
969+
Box::new(DataType::Int32),
970+
Box::new(ScalarValue::Float32(Some(1.0))),
971+
);
972+
let scalar = ScalarValue::Int32(Some(2));
973+
974+
let error: DataFusionError =
975+
min_max_scalar(&dictionary, &scalar, Ordering::Less).unwrap_err();
976+
977+
assert!(
978+
error
979+
.to_string()
980+
.contains("logically incompatible scalar values")
981+
);
982+
Ok(())
983+
}
984+
}

0 commit comments

Comments
 (0)