Skip to content

Commit efaf690

Browse files
committed
first iteration on intern()
1 parent 14d4d80 commit efaf690

5 files changed

Lines changed: 501 additions & 312 deletions

File tree

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@ pub(crate) use single_group_by::primitive::HashValue;
4141
use crate::aggregates::{
4242
group_values::single_group_by::{
4343
boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
44-
bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive, dictionary::GroupValuesDictionary,
44+
bytes_view::GroupValuesBytesView, dictionary::GroupValuesDictionary,
45+
primitive::GroupValuesPrimitive,
4546
},
4647
order::GroupOrdering,
4748
};
49+
use arrow::array::*;
50+
use std::sync::Arc;
4851

4952
mod metrics;
5053
mod null_builder;
@@ -199,11 +202,24 @@ pub fn new_group_values(
199202
DataType::Boolean => {
200203
return Ok(Box::new(GroupValuesBoolean::new()));
201204
}
202-
/*DataType::Dictionary(_, _) => {
203-
println!("dictionary type detected, using SingleDictionaryGroupValues");
204-
return Ok(Box::new(SingleDictionaryGroupValues::new()));
205-
206-
}*/
205+
DataType::Dictionary(key_type, value_type) => {
206+
if supported_single_dictionary_value(value_type) {
207+
println!("dictionary type detected, using GroupValuesDictionary");
208+
return match key_type.as_ref() { // TODO: turn this into a macro
209+
DataType::Int8 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::Int8Type>::new(value_type))),
210+
DataType::Int16 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::Int16Type>::new(value_type))),
211+
DataType::Int32 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::Int32Type>::new(value_type))),
212+
DataType::Int64 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::Int64Type>::new(value_type))),
213+
DataType::UInt8 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::UInt8Type>::new(value_type))),
214+
DataType::UInt16 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::UInt16Type>::new(value_type))),
215+
DataType::UInt32 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::UInt32Type>::new(value_type))),
216+
DataType::UInt64 => Ok(Box::new(GroupValuesDictionary::<arrow::datatypes::UInt64Type>::new(value_type))),
217+
_ => Err(datafusion_common::DataFusionError::NotImplemented(
218+
format!("Unsupported dictionary key type: {:?}", key_type)
219+
)),
220+
};
221+
}
222+
}
207223
_ => {}
208224
}
209225
}
@@ -219,3 +235,24 @@ pub fn new_group_values(
219235
Ok(Box::new(GroupValuesRows::try_new(schema)?))
220236
}
221237
}
238+
239+
fn supported_single_dictionary_value(t: &DataType) -> bool {
240+
matches!(
241+
t,
242+
DataType::Utf8
243+
| DataType::LargeUtf8
244+
| DataType::Binary
245+
| DataType::LargeBinary
246+
| DataType::Utf8View
247+
| DataType::BinaryView
248+
| DataType::Int8
249+
| DataType::Int16
250+
| DataType::Int32
251+
| DataType::Int64
252+
| DataType::UInt8
253+
| DataType::UInt16
254+
| DataType::UInt32
255+
| DataType::UInt64
256+
)
257+
}
258+

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -473,24 +473,21 @@ mod playground {
473473

474474
#[tokio::test]
475475
async fn test_trivial_group_by_dictionary() -> Result<()> {
476+
use crate::aggregates::RecordBatch;
477+
use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
478+
use crate::common::collect;
479+
use crate::test::TestMemoryExec;
476480
use arrow::array::DictionaryArray;
477481
use arrow::datatypes::{DataType, Field, Schema};
478482
use datafusion_functions_aggregate::count::count_udaf;
479483
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
480484
use datafusion_physical_expr::expressions::col;
481-
use crate::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
482-
use crate::test::TestMemoryExec;
483-
use crate::aggregates::RecordBatch;
484-
use crate::common::collect;
485485

486486
// Create schema with dictionary column and value column
487487
let schema = Arc::new(Schema::new(vec![
488488
Field::new(
489489
"color",
490-
DataType::Dictionary(
491-
Box::new(DataType::UInt8),
492-
Box::new(DataType::Utf8),
493-
),
490+
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
494491
false,
495492
),
496493
Field::new("amount", DataType::UInt32, false),
@@ -499,11 +496,9 @@ mod playground {
499496
// Create dictionary array
500497
let values = StringArray::from(vec!["red", "blue", "green"]);
501498
let keys = arrow::array::UInt8Array::from(vec![0, 1, 0, 2, 1]);
502-
let dict_array: ArrayRef =
503-
Arc::new(DictionaryArray::<arrow::datatypes::UInt8Type>::try_new(
504-
keys,
505-
Arc::new(values),
506-
)?);
499+
let dict_array: ArrayRef = Arc::new(DictionaryArray::<
500+
arrow::datatypes::UInt8Type,
501+
>::try_new(keys, Arc::new(values))?);
507502

508503
// Create value column
509504
let amount_array: ArrayRef =
@@ -514,7 +509,8 @@ mod playground {
514509
RecordBatch::try_new(Arc::clone(&schema), vec![dict_array, amount_array])?;
515510

516511
// Create in-memory source with the batch
517-
let source = TestMemoryExec::try_new(&vec![vec![batch]], Arc::clone(&schema), None)?;
512+
let source =
513+
TestMemoryExec::try_new(&vec![vec![batch]], Arc::clone(&schema), None)?;
518514

519515
// Create GROUP BY expression
520516
let group_expr = vec![(col("color", &schema)?, "color".to_string())];
@@ -537,11 +533,9 @@ mod playground {
537533
Arc::clone(&schema),
538534
)?;
539535

540-
541536
let output =
542537
collect(aggregate_exec.execute(0, Arc::new(TaskContext::default()))?).await?;
543538
println!("Output batch: {:#?}", output);
544539
Ok(())
545540
}
546-
547541
}

0 commit comments

Comments
 (0)