Skip to content

Commit 587f4c0

Browse files
UBarneyalamb
andauthored
feat: support GroupsAccumulator for first_value and last_value with string/binary types (#21090)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #17899. ## Rationale for this change ``` ┏━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ first_val_group_acc ┃ Change ┃ ┡━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 0 │ 872.01 ms │ 904.35 ms │ no change │ │ QQuery 1 │ 156.37 ms │ 164.83 ms │ 1.05x slower │ │ QQuery 2 │ 448.58 ms │ 497.05 ms │ 1.11x slower │ │ QQuery 3 │ 233.99 ms │ 274.10 ms │ 1.17x slower │ │ QQuery 4 │ 1448.99 ms │ 1556.95 ms │ 1.07x slower │ │ QQuery 5 │ 10816.83 ms │ 11315.69 ms │ no change │ │ QQuery 6 │ 2053.16 ms │ 2030.02 ms │ no change │ │ QQuery 7 │ 2154.74 ms │ 2274.63 ms │ 1.06x slower │ │ QQuery 8 │ 405.62 ms │ 405.72 ms │ no change │ │ QQuery 9 │ 17160.65 ms │ 4167.34 ms │ +4.12x faster │ │ QQuery 10 │ 1206.03 ms │ 1090.69 ms │ +1.11x faster │ │ QQuery 11 │ 2437.12 ms │ 2446.51 ms │ no change │ │ QQuery 12 │ 331.27 ms │ 317.73 ms │ no change │ └───────────┴─────────────┴─────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (main) │ 39725.35ms │ │ Total Time (first_val_group_acc) │ 27445.62ms │ │ Average Time (main) │ 3055.80ms │ │ Average Time (first_val_group_acc) │ 2111.20ms │ │ Queries Faster │ 2 │ │ Queries Slower │ 5 │ │ Queries with No Change │ 6 │ │ Queries with Failure │ 0 │ └────────────────────────────────────┴────────────┘ ``` Previously, the `first_value` and `last_value` aggregate functions only supported GroupsAccumulator for primitive types. For string or binary types (Utf8, LargeUtf8, Binary, etc.), they fell back to the slower row-based Accumulator path. This change implements a specialized state management for byte-based types, enabling high-performance grouped aggregation for strings and binary data, especially when used with `ORDER BY`. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - New `ValueState` Trait: Abstracted the state management for `first_value` and `last_value` to support different storage backends. - `PrimitiveValueState` : Re-implemented the existing primitive handling using the new trait. - `BytesValueState`: Added a new state implementation for Utf8, LargeUtf8, Utf8View, Binary, LargeBinary, and BinaryView. It optimizes memory by reusing `Vec<u8>` buffers for group updates. - Refactored `FirstLastGroupsAccumulato`r: Migrated the accumulator to use the generic ValueState trait, allowing it to handle both primitive and byte types uniformly. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? YES <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 3d2e6b2 commit 587f4c0

9 files changed

Lines changed: 838 additions & 325 deletions

File tree

benchmarks/queries/clickbench/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,22 @@ Results look like
228228
Elapsed 30.195 seconds.
229229
```
230230

231+
232+
### Q9-Q12: FIRST_VALUE Aggregation Performance
233+
234+
These queries test the performance of the `FIRST_VALUE` aggregation function with different data types and grouping cardinalities.
235+
236+
| Query | `FIRST_VALUE` Column | Column Type | Group By Column | Group By Type | Number of Groups |
237+
|-------|----------------------|-------------|-----------------|---------------|------------------|
238+
| Q9 | `URL` | `Utf8` | `UserID` | `Int64` | 17,630,976 |
239+
| Q10 | `URL` | `Utf8` | `OS` | `Int16` | 91 |
240+
| Q11 | `WatchID` | `Int64` | `UserID` | `Int64` | 17,630,976 |
241+
| Q12 | `WatchID` | `Int64` | `OS` | `Int16` | 91 |
242+
243+
244+
245+
246+
231247
## Data Notes
232248

233249
Here are some interesting statistics about the data used in the queries
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
4+
SELECT MAX(len) FROM (
5+
SELECT LENGTH(FIRST_VALUE("URL" ORDER BY "EventTime")) as len
6+
FROM hits
7+
GROUP BY "OS"
8+
);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
4+
SELECT MAX(fv) FROM (
5+
SELECT FIRST_VALUE("WatchID" ORDER BY "EventTime") as fv
6+
FROM hits
7+
GROUP BY "UserID"
8+
);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
4+
SELECT MAX(fv) FROM (
5+
SELECT FIRST_VALUE("WatchID" ORDER BY "EventTime") as fv
6+
FROM hits
7+
GROUP BY "OS"
8+
);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
4+
SELECT "RegionID", "UserAgent", "OS", AVG(to_timestamp("ResponseEndTiming")-to_timestamp("ResponseStartTiming")) as avg_response_time, AVG(to_timestamp("ResponseEndTiming")-to_timestamp("ConnectTiming")) as avg_latency FROM hits GROUP BY "RegionID", "UserAgent", "OS" ORDER BY avg_latency DESC limit 10;
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
2+
-- set datafusion.execution.parquet.binary_as_string = true
3+
4+
SELECT MAX(len) FROM (
5+
SELECT LENGTH(FIRST_VALUE("URL" ORDER BY "EventTime")) as len
6+
FROM hits
7+
GROUP BY "UserID"
8+
);

0 commit comments

Comments
 (0)