Rich t kid/dictionary encoding hash optmize#21589
Rich t kid/dictionary encoding hash optmize#21589Rich-T-kid wants to merge 20 commits intoapache:mainfrom
Conversation
|
Currenly the CI is breaking because my current emit implementations returns the values dirrectly instead of returning a dictionary encoded column. & im not handling nulls. |
8ea71fd to
cc18a8f
Compare
|
run benchmarks |
|
Hi @Rich-T-kid, thanks for the request (#21589 (comment)). Only whitelisted users can trigger benchmarks. Allowed users: Dandandan, Fokko, Jefffrey, Omega359, adriangb, alamb, asubiotto, brunal, buraksenn, cetra3, codephage2020, comphead, erenavsarogullari, etseidl, friendlymatthew, gabotechs, geoffreyclaude, grtlr, haohuaijin, jonathanc-n, kevinjqliu, klion26, kosiew, kumarUjjawal, kunalsinghdadhwal, liamzwbao, mbutrovich, mzabaluev, neilconway, rluvaton, sdf-jkl, timsaucer, xudong963, zhuqi-lucas. File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-t-kid/Dictionary-encoding-Hash-optmize (aa69892) to 37cd3de (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-t-kid/Dictionary-encoding-Hash-optmize (aa69892) to 37cd3de (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-t-kid/Dictionary-encoding-Hash-optmize (aa69892) to 37cd3de (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
Wrote some criterion benchmarks and they reflect what the query benchmarks show. I think storing Scalar values directly in the hash map is causing performance issues. Need to profile to see heap allocations. |
|
ScalarValue::try_from_array (38%) — allocating a new ScalarValue heap object for every single row lots of heap allocations are happening on the hot path (_xzm_free) - a solution to this may be to pre-allocate space for the unique values array and hashmap. |
Optimization 1 — Hash cachingInstead of hashing each row's value individually, pre-compute hashes for all d distinct values in the values array once per batch and cache them in a Vec indexed by dictionary key. For a batch of n rows with d distinct values this reduces hash computations from O(n) to O(d). Optimization 2 — Eliminate ScalarValueReplace HashMap<ScalarValue, usize> with a structure that stores raw hashes and raw string slices pointing directly into the Arrow buffer. This eliminates per-row heap allocation (ScalarValue::try_from_array) and deallocation (drop_in_place) which the profiler shows accounts for ~60% of intern time combined. Optimization 3 — Pre-allocate using occupancyUse dict_array.occupancy().count_set_bits() to determine the number of truly distinct non-null values in the batch upfront and pre-allocate internal storage accordingly. This avoids incremental Vec growth during intern. |
8adc7ef to
c8fe15a
Compare
796573d to
2d7bc48
Compare
| let raw = Self::get_raw_bytes(values, value_idx); | ||
| if let Some((group_id, _)) = entries | ||
| .iter() | ||
| .find(|(_, stored_bytes)| raw == stored_bytes.as_slice()) |
There was a problem hiding this comment.
Is this comparison for hash collisions?
There was a problem hiding this comment.
Yes, there's a chance of hash collisions where hash_array[i] == hash_array[j] but original_array[i] != original_array[j]. So when a hash already exists in the hash table, we can't just return that group id, we need to verify the actual value matches. We iterate through the vector of values that map to this hash, comparing each one to find the entry where the actual value matches, and return that group id. If no match is found it's a new group.
|
|
||
| */ | ||
| // stores the order new unique elements are seen for self.emit() | ||
| seen_elements: Vec<Vec<u8>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) |
There was a problem hiding this comment.
maybe instread of a Vec<Vec<u8>> we can use another structure like ArrowBytesMap doc: https://docs.rs/datafusion/latest/datafusion/physical_expr_common/binary_map/struct.ArrowBytesMap.html , that would provide better memory tracking but I don't think the insert functions support dictionary right now
There was a problem hiding this comment.
I think this is a good idea with the current approach. The only issue I can think of is in the future when other value types are supported. ArrowBytesMap only supports,
Utf8,
Utf8View,
Binary,
BinaryView,
in my revised PR I could abstract away how seen_elements are stored into a trait comprised ofstore() retrive(N).This should make it easier for future work to integrate with ArrowDictionaryKeyType while also giving us a performance boost in the string/binary case
There was a problem hiding this comment.
Also I think its worth benchmarking this. ArrowBytesMap insert api forces the client to insert arrays, not single elements. its very unlikely that the keys array is a 1-1 mapping of the values array, this means for each insertion we'll to do a Arc::new(StringArray::from(values[idx])). the overhead that comes with creating a new array (heap allocation for the buffer + heap allocation for the offsets + heap allocation for the null bitmap) might negate the benefits the specialized map provides.
There was a problem hiding this comment.
Also the ArrowBytesMap doesnt allow for partial takes. each call to retrieve values from ArrowBytesMap drains the entire data structure, this doesnt blend well with emit since it allows for partial takes.
Null handlingHow nulls are currently handled Why this is hard to extend to other typesThis approach works for Utf8 because there exist byte sequences that are invalid UTF-8, which can serve as an unambiguous sentinel value that could never be confused with real data. However, this is difficult to extend to other types. For example, there is no equivalent concept of an 'invalid' binary buffer, since any sequence of raw bytes is valid by definition. This problem extends to numeric types as well. |
ad33d7c to
405f820
Compare
405f820 to
680d0e6
Compare
| type GroupEntry = (usize, Option<Vec<u8>>); | ||
| pub struct GroupValuesDictionary<K: ArrowDictionaryKeyType + Send> { | ||
| // stores the order new unique elements are seen for self.emit() | ||
| seen_elements: Vec<Option<Vec<u8>>>, // Box<dyn Builder> doesnt provide the flexibility of building partition arrays that wed need to support emit::First(N) |
There was a problem hiding this comment.
Looking at other implementations, I see for nulls they keep this in a separate field and use MaybeNullBufferBuilder
| canonical_indices.push(i); | ||
| } | ||
| } | ||
| // build new deduplicated values array using take |
There was a problem hiding this comment.
I'm suprised arrow does not enforce values to be unique, theoretically when you insert a value into a dict if it was there before we only update the key array no?
There was a problem hiding this comment.
yea for example in group_by.slt @ 4801 there is this test->
# Table with an int column and Dict<UInt8> column:
statement ok
CREATE TABLE uint8_dict AS VALUES
(1, arrow_cast('A', 'Dictionary(UInt8, Utf8)')),
(2, arrow_cast('B', 'Dictionary(UInt8, Utf8)')),
(2, arrow_cast('A', 'Dictionary(UInt8, Utf8)')),
(4, arrow_cast('A', 'Dictionary(UInt8, Utf8)')),
(1, arrow_cast('C', 'Dictionary(UInt8, Utf8)')),
(1, arrow_cast('A', 'Dictionary(UInt8, Utf8)'));
# Group by the non-dict column
query TI rowsort
SELECT column2, count(column1) FROM uint8_dict GROUP BY column2;
----
A 4
B 1
C 1
what .Intern() receives is
keys : PrimitiveArray<UInt8>
[
0,
1,
2,
3,
4,
5,
]
values : StringArray
[
"A",
"B",
"A",
"A",
"C",
"A",
]
Im going to go through the arrow docs to see if this is intended behavior
| if list_array.is_null(index) { | ||
| panic!() // this cannot happen. leaving this here as an invariant | ||
| } |
There was a problem hiding this comment.
I see other places in the codebase follow this pattern:
| if list_array.is_null(index) { | |
| panic!() // this cannot happen. leaving this here as an invariant | |
| } | |
| debug_assert!(!values.is_null(index)); | |
| if cols.len() != 1 { | ||
| return Err(datafusion_common::DataFusionError::Internal( | ||
| "GroupValuesDictionary only supports single column group by".to_string(), | ||
| )); | ||
| } |
There was a problem hiding this comment.
| if cols.len() != 1 { | |
| return Err(datafusion_common::DataFusionError::Internal( | |
| "GroupValuesDictionary only supports single column group by".to_string(), | |
| )); | |
| } | |
| assert_eq!(cols.len(), 1); |
There was a problem hiding this comment.
Do you think it's worth adding a diagram explaining how the two-phase grouping works? It took me a while to understand it and a diagram claude made helped a lot
Input batch: DictionaryArray
values = ["ap-south", "us-east"] (d=2)
keys = [0, 1, 0, 0, 1] (n=5)
Pre-step: hash all d values once
value_hashes = [h("ap-south"), h("us-east")]
Pass 1 — d iterations (build key_to_group from persistent cache)
unique_dict_value_mapping: {h("ap-south") -> (group_id=0), ...}
│
for each value in values array ─────┘
lookup in map → key_to_group[0] = Some(0)
key_to_group[1] = Some(1)
Pass 2 — n iterations (assign group ids to every row)
for each key in keys array:
key=0 → key_to_group[0] = Some(0) → groups.push(0)
key=1 → key_to_group[1] = Some(1) → groups.push(1)
key=0 → key_to_group[0] = Some(0) → groups.push(0)
key=0 → key_to_group[0] = Some(0) → groups.push(0)
key=1 → key_to_group[1] = Some(1) → groups.push(1)
Output: groups = [0, 1, 0, 0, 1]
There was a problem hiding this comment.
I'm currently working on this, should be done by the evening.
There was a problem hiding this comment.
| #[cfg(test)] | ||
| mod basic_functionality { | ||
| use super::*; |
There was a problem hiding this comment.
These cases should already be covered by the existing sqllogictest files -- maybe we can keep only the ones with tricky edge cases that sqllogictests don't guarantee to cover, wdyt?
There was a problem hiding this comment.
This makes sense. I'll remove a majority of the test cases besides the null edge cases test ,regression test. like you mentioned other cases should be caught by existing test.
|
run benchmarks |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
2 similar comments
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
gabotechs
left a comment
There was a problem hiding this comment.
I have not reviewed this PR in depth, but 3500 lines of new code sounds like unreasonably too many for what this PR has to offer from a feature standpoint.
One of the things I notice is that the code overall looks a bit too LLM-y:
- An unreasonable amount of tests (3000 lines of tests for testing 400 lines of code)
- Comments that just state what the code is doing
- Over-use of highly complex macros, when avoiding them can give a much simpler implementation
- Not using DataFusion error builders and instead construct raw errors with fully qualified paths
One advice for getting some interest from committers for prioritizing reviewing this PR, would be significantly shrink its size, and reduce in complexity as much as possible. If there's an opportunity to split the PR in multiple chunks that can be independently shipped, that's also welcomed.
e0d3aaa to
b8165d3
Compare
| let raw = Self::get_raw_bytes(child, i); | ||
| bytes.extend_from_slice(&(raw.len() as i32).to_ne_bytes()); | ||
| bytes.extend_from_slice(&raw); | ||
| } |
There was a problem hiding this comment.
store as u64 to account for Largeutf8 taking up 64 bits
b8165d3 to
f8ff947
Compare
f8ff947 to
81998dc
Compare
81998dc to
dd641b9
Compare

Which issue does this PR close?
This PR make an effort towards #7000 & closing materializing dictionary columns + #21466
A separate follow up PR aims to close the multi-column + dictionary column case
Rationale for this change
This PR implements a specialized GroupValues implementation for single-column dictionary-encoded GROUP BY columns, motivated by the dictionary encoding efficiency work tracked in #7647.
GroupValuesRowswas inefficient for dictionary-encoded columns because it runs the full RowConverter pipeline on every row, decoding the dictionary back to its underlying string values and serializing them into a packed row format for comparison, completely discarding the integer key structure that dictionary encoding provides and doing O(n) expensive string operations when the same d distinct values could be processed just once.Initial approach
the first implementation used HashMap<ScalarValue, usize> to map group values to group indices. While correct, profiling revealed this was significantly slower than GroupValuesRows due to per-row heap allocation from ScalarValue::try_from_array, with ~60% of intern time spent on allocation and deallocation.
Final approach
after profiling and iteration the implementation now uses a two-pass strategy that directly exploits dictionary encoding's structure. Pass 1 iterates the small values array (d distinct values) once, building a key_to_group lookup via raw byte comparison and pre-computed hashes. Pass 2 iterates the keys array (n rows) using only cheap array index lookups — no hashing, no byte comparison, no hashmap lookup in the hot path. This reduces the expensive work from O(n) to O(d) per batch.
Benchmarks show consistent 1.9x–2.7x improvement over GroupValuesRows across all cardinality and batch size configurations with no regressions.
What changes are included in this PR?
Update the match statement in
new_group_valuesto include a custom dictionary encoding branch that works for single fields that are of typeDictionaryarrayAre these changes tested?
Yes a large portion of the PR are test, these include
Are there any user-facing changes?
No everything is internal.