Skip to content

Commit 17cbff0

Browse files
authored
Misc hash / hash aggregation performance improvements (#19910)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19912 ## Rationale for this change Just a couple of optimizations for hash table lookups usage in hash aggregate. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 23f5003 commit 17cbff0

4 files changed

Lines changed: 26 additions & 21 deletions

File tree

datafusion/common/src/utils/proxy.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ pub trait HashTableAllocExt {
121121
///
122122
/// Returns the bucket where the element was inserted.
123123
/// Note that allocation counts capacity, not size.
124+
/// Panics:
125+
/// Assumes the element is not already present, and may panic if it does
124126
///
125127
/// # Example:
126128
/// ```
@@ -134,7 +136,7 @@ pub trait HashTableAllocExt {
134136
/// assert_eq!(allocated, 64);
135137
///
136138
/// // insert more values
137-
/// for i in 0..100 {
139+
/// for i in 2..100 {
138140
/// table.insert_accounted(i, hash_fn, &mut allocated);
139141
/// }
140142
/// assert_eq!(allocated, 400);
@@ -161,22 +163,24 @@ where
161163
) {
162164
let hash = hasher(&x);
163165

164-
// NOTE: `find_entry` does NOT grow!
165-
match self.find_entry(hash, |y| y == &x) {
166-
Ok(_occupied) => {}
167-
Err(_absent) => {
168-
if self.len() == self.capacity() {
169-
// need to request more memory
170-
let bump_elements = self.capacity().max(16);
171-
let bump_size = bump_elements * size_of::<T>();
172-
*accounting = (*accounting).checked_add(bump_size).expect("overflow");
166+
if cfg!(debug_assertions) {
167+
// In debug mode, check that the element is not already present
168+
debug_assert!(
169+
self.find_entry(hash, |y| y == &x).is_err(),
170+
"attempted to insert duplicate element into HashTableAllocExt::insert_accounted"
171+
);
172+
}
173173

174-
self.reserve(bump_elements, &hasher);
175-
}
174+
if self.len() == self.capacity() {
175+
// need to request more memory
176+
let bump_elements = self.capacity().max(16);
177+
let bump_size = bump_elements * size_of::<T>();
178+
*accounting = (*accounting).checked_add(bump_size).expect("overflow");
176179

177-
// still need to insert the element since first try failed
178-
self.entry(hash, |y| y == &x, hasher).insert(x);
179-
}
180+
self.reserve(bump_elements, &hasher);
180181
}
182+
183+
// We assume the element is not already present
184+
self.insert_unique(hash, x, hasher);
181185
}
182186
}

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ where
389389
// is value is already present in the set?
390390
let entry = self.map.find_mut(hash, |header| {
391391
// compare value if hashes match
392-
if header.len != value_len {
392+
if header.hash != hash || header.len != value_len {
393393
return false;
394394
}
395395
// value is stored inline so no need to consult buffer
@@ -427,7 +427,7 @@ where
427427
// Check if the value is already present in the set
428428
let entry = self.map.find_mut(hash, |header| {
429429
// compare value if hashes match
430-
if header.len != value_len {
430+
if header.hash != hash {
431431
return false;
432432
}
433433
// Need to compare the bytes in the buffer

datafusion/physical-expr-common/src/binary_view_map.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,11 +273,10 @@ where
273273
let value: &[u8] = value.as_ref();
274274

275275
let entry = self.map.find_mut(hash, |header| {
276-
let v = self.builder.get_value(header.view_idx);
277-
278-
if v.len() != value.len() {
276+
if header.hash != hash {
279277
return false;
280278
}
279+
let v = self.builder.get_value(header.view_idx);
281280

282281
v == value
283282
});

datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ where
128128
let hash = key.hash(state);
129129
let insert = self.map.entry(
130130
hash,
131-
|&(g, _)| unsafe { self.values.get_unchecked(g).is_eq(key) },
131+
|&(g, h)| unsafe {
132+
hash == h && self.values.get_unchecked(g).is_eq(key)
133+
},
132134
|&(_, h)| h,
133135
);
134136

0 commit comments

Comments
 (0)