feat: support GroupsAccumulator for first_value and last_value with string/binary types#21090
Conversation
| /// to correctly implement `RESPECT NULLS` behavior. | ||
| /// | ||
| pub(crate) struct BytesValueState { | ||
| vals: Vec<Option<Vec<u8>>>, |
There was a problem hiding this comment.
I think this can be much more efficiently stored as values Vec<u8> and offsets Vec<OffsetType>
There was a problem hiding this comment.
I plan to implement it using the following approach and then run some benchmarks:
Data Structures
vals: Vec<u8>: A single, contiguous flat buffer for all raw bytes.offsets: Vec<usize>: The starting position of each group's data in the buffer.lengths: Vec<usize>: The logical length of the current value for each group.capacities: Vec<usize>: The physical space allocated for each group (enables in-place overwrites ifnew_len <= capacity).active_bytes: usize: A running counter of the sum of all currentlengths(used to track fragmentation and trigger GC).
Update Logic
- In-place Overwrite: If
new_len <= capacity, we overwrite the existing slot at the currentoffset. We update the logicallength, whilecapacityandoffsetremain unchanged. - Append: If
new_len > capacity, we append the value to the end ofvalsand update theoffset,length, andcapacityto point to the new location.
GC (Compaction) Logic
- Trigger: When the buffer grows too large (e.g.,
vals.len() > active_bytes * 2). - Action: Re-allocate a new buffer and copy only the latest valid data for each group to clear "dead" bytes left behind by the append path.
There was a problem hiding this comment.
This is only needed for last_value, no?
There was a problem hiding this comment.
Or wait nvm I see the queries having explicit order.
There was a problem hiding this comment.
After implementing the flattened approach with Vec<u8>, the performance actually regressed in several scenarios. Below are the benchmark results:
| ID | SQL | first_val_str Time(s) | first_last_flatten_acc Time(s) | Performance Change | Note |
|---|---|---|---|---|---|
| 1 | select t.id1, first_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; |
0.690 | 0.666 | +1.04x faster 🚀 | Length of t.id3 is constant (12) per group |
| 2 | select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; |
0.724 | 0.776 | 1.07x slower 🐌 | |
| 3 | select t.id2, t.id4, first_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; |
7.136 | 7.226 | 1.01x slower 🐌 | |
| 4 | SELECT l_suppkey, FIRST_VALUE(l_comment ORDER BY l_orderkey DESC) as fv FROM 'benchmarks/data/tpch_sf10/lineitem' GROUP BY l_suppkey; |
2.914 | 3.206 | 1.10x slower 🐌 | l_comment length varies (10-43) per group |
| 5 | select t.id1, last_value(t.id3 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id1, t.v1; |
0.745 | 0.747 | 1.00x slower 🐌 | |
| 6 | select l_shipmode, last_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; |
0.802 | 0.780 | +1.03x faster 🚀 | |
| 7 | select t.id2, t.id4, last_value(t.v1 order by t.id2, t.id4) as r2 from 'benchmarks/data/h2o/G1_1e8_1e8_100_0.parquet' as t group by t.id2, t.id4; |
7.256 | 7.277 | 1.00x slower 🐌 |
Flame graphs indicate that the overhead from resize operations is the primary bottleneck.
For first_value / last_value queries without an ORDER BY clause, each group's value is only set once since there is no explicit ordering requirement. In this scenario, it might be more efficient to implement a dedicated ValueState using Vec<Vec<u8>> to store strings/bytes, where each inner Vec<u8> has a fixed maximum size. We can pre-allocate this capacity during creation to avoid frequent resizing. Note that for these unordered queries, last_value can be implemented with the same logic as first_value.
There was a problem hiding this comment.
Hmm yes, this is similar to the block-based aggregation storage @Rachelint was working on (resizes are heavy, especially for write-only data).
330a6e1 to
779706c
Compare
|
|
||
| fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { | ||
| use DataType::*; | ||
| !args.order_bys.is_empty() |
There was a problem hiding this comment.
We might want to consider adding a new GroupsAccumulator to handle cases without ORDER BY, or perhaps implement a fast path in FirstLastGroupsAccumulator for when no ordering is required
neilconway
left a comment
There was a problem hiding this comment.
Would it be useful to add a self-contained benchmark to measure the performance change here?
|
|
||
| fn update(&mut self, group_idx: usize, array: &ArrayRef, idx: usize) -> Result<()> { | ||
| if array.is_null(idx) { | ||
| self.vals[group_idx] = None; |
There was a problem hiding this comment.
Decrement total_capacity here? A unit test for the update-with-null case might be useful as well.
There was a problem hiding this comment.
Thanks for the catch! I've updated the code to decrement total_capacity here and added a unit test for the update-with-null case as suggested
| /// Note: While this is not a batch interface, it is not a performance bottleneck. | ||
| /// In heavy aggregation benchmarks, the overhead of this method is typically less than 1%. | ||
| /// | ||
| /// Benchmarked queries with < 1% `update` overhead: |
There was a problem hiding this comment.
This seems like more detail than necessary?
There was a problem hiding this comment.
Done. I've simplified this and removed the extra details to keep it concise.
|
|
||
|
|
||
| ################# | ||
| # first_value on strings/binary with groups and ordering |
There was a problem hiding this comment.
Test last_value as well?
There was a problem hiding this comment.
Good point. I've added a test case for last_value in aggregate.slt as well
| @@ -342,8 +422,7 @@ where | |||
| // buffer for `get_filtered_min_of_each_group` | |||
There was a problem hiding this comment.
These comments should be updated.
There was a problem hiding this comment.
Fixed. I've updated the comments to correctly reflect the current implementation
| .unwrap() | ||
| } | ||
|
|
||
| fn create_groups_primitive_accumulator<T: ArrowPrimitiveType + Send>( |
There was a problem hiding this comment.
create_groups_primitive_accumulator and create_groups_bytes_accumulator are identical except for the ValueState; we could use a single function and pass in the value state as an argument?
There was a problem hiding this comment.
Agreed. I've refactored these into a single function and passed the state as an argument
bab467e to
7145928
Compare
| -- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591 | ||
| -- set datafusion.execution.parquet.binary_as_string = true | ||
|
|
||
| SELECT "RegionID", "UserAgent", "OS", AVG(to_timestamp("ResponseEndTiming")-to_timestamp("ResponseStartTiming")) as avg_response_time, AVG(to_timestamp("ResponseEndTiming")-to_timestamp("ConnectTiming")) as avg_latency FROM hits GROUP BY "RegionID", "UserAgent", "OS" ORDER BY avg_latency DESC limit 10; No newline at end of file |
I added |
Hmm, I think it is more typical to either add a new criterion-based benchmark or extending an existing one to include a benchmark for |
I believe
What do you think? |
|
run benchmark clickbench_extended |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing first_val_group_acc_string (a29ef7d) to ccaf802 (merge-base) diff using: clickbench_extended File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_extended — base (merge-base)
clickbench_extended — branch
File an issue against this benchmark runner |
It looks like the benchmark SQL from the main branch is being used, instead of including the benchmark SQL from this branch |
| | Query | `FIRST_VALUE` Column | Column Type | Group By Column | Group By Type | Number of Groups | | ||
| |-------|----------------------|-------------|-----------------|---------------|------------------| | ||
| | Q9 | `URL` | `Utf8` | `UserID` | `Int64` | 17,630,976 | | ||
| | Q10 | `URL` | `Utf8` | `OS` | `Int16` | 91 | |
There was a problem hiding this comment.
This table formatting looks a bit off
| self.vals[group_idx] | ||
| .as_ref() | ||
| .inspect(|x| self.total_capacity += x.capacity()); |
There was a problem hiding this comment.
I think more idiomatic:
if let Some(v) = &self.vals[group_idx] { self.total_capacity += v.capacity(); }
There was a problem hiding this comment.
Maybe we can clean this up as a follow on PR?
There was a problem hiding this comment.
I'll clean this up when I address #21090 (comment)
Yes you if you want to compare this branch with our scrupts, you need to make a PR with just the benchmarks that we can merge to main first |
|
thanks @UBarney @Dandandan and @neilconway -- this is great |
I added a function-level microbenchmark (not a SQL benchmark, but could be still useful) and a few more improvements - #21383 |
…tring/binary types (apache#21090) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#17899. ## Rationale for this change ``` ┏━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ first_val_group_acc ┃ Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 0 │ 872.01 ms │ 904.35 ms │ no change │ │ QQuery 1 │ 156.37 ms │ 164.83 ms │ 1.05x slower │ │ QQuery 2 │ 448.58 ms │ 497.05 ms │ 1.11x slower │ │ QQuery 3 │ 233.99 ms │ 274.10 ms │ 1.17x slower │ │ QQuery 4 │ 1448.99 ms │ 1556.95 ms │ 1.07x slower │ │ QQuery 5 │ 10816.83 ms │ 11315.69 ms │ no change │ │ QQuery 6 │ 2053.16 ms │ 2030.02 ms │ no change │ │ QQuery 7 │ 2154.74 ms │ 2274.63 ms │ 1.06x slower │ │ QQuery 8 │ 405.62 ms │ 405.72 ms │ no change │ │ QQuery 9 │ 17160.65 ms │ 4167.34 ms │ +4.12x faster │ │ QQuery 10 │ 1206.03 ms │ 1090.69 ms │ +1.11x faster │ │ QQuery 11 │ 2437.12 ms │ 2446.51 ms │ no change │ │ QQuery 12 │ 331.27 ms │ 317.73 ms │ no change │ └───────────┴─────────────┴─────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (main) │ 39725.35ms │ │ Total Time (first_val_group_acc) │ 27445.62ms │ │ Average Time (main) │ 3055.80ms │ │ Average Time (first_val_group_acc) │ 2111.20ms │ │ Queries Faster │ 2 │ │ Queries Slower │ 5 │ │ Queries with No Change │ 6 │ │ Queries with Failure │ 0 │ └────────────────────────────────────┴────────────┘ ``` Previously, the `first_value` and `last_value` aggregate functions only supported GroupsAccumulator for primitive types. For string or binary types (Utf8, LargeUtf8, Binary, etc.), they fell back to the slower row-based Accumulator path. This change implements a specialized state management for byte-based types, enabling high-performance grouped aggregation for strings and binary data, especially when used with `ORDER BY`. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - New `ValueState` Trait: Abstracted the state management for `first_value` and `last_value` to support different storage backends. - `PrimitiveValueState` : Re-implemented the existing primitive handling using the new trait. - `BytesValueState`: Added a new state implementation for Utf8, LargeUtf8, Utf8View, Binary, LargeBinary, and BinaryView. It optimizes memory by reusing `Vec<u8>` buffers for group updates. - Refactored `FirstLastGroupsAccumulato`r: Migrated the accumulator to use the generic ValueState trait, allowing it to handle both primitive and byte types uniformly. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? YES <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
GroupsAccumulatorforfirst_valueaggregate (speed upfirst_valueandDISTINCT ONqueries) #17899.Rationale for this change
Previously, the
first_valueandlast_valueaggregate functions only supported GroupsAccumulator for primitive types. For string or binary types (Utf8, LargeUtf8, Binary, etc.), they fell back to the slower row-based Accumulator path.This change implements a specialized state management for byte-based types, enabling high-performance grouped aggregation for strings and binary data, especially when used with
ORDER BY.What changes are included in this PR?
ValueStateTrait: Abstracted the state management forfirst_valueandlast_valueto support different storage backends.PrimitiveValueState: Re-implemented the existing primitive handling using the new trait.BytesValueState: Added a new state implementation for Utf8, LargeUtf8, Utf8View, Binary, LargeBinary, and BinaryView. Itoptimizes memory by reusing
Vec<u8>buffers for group updates.FirstLastGroupsAccumulator: Migrated the accumulator to use the generic ValueState trait, allowing it to handle both primitive and byte types uniformly.Are these changes tested?
YES
Are there any user-facing changes?