Skip to content

Commit 95ef332

Browse files
theirixalamb
andauthored
perf: optimise first_value, last_value aggregate function (#21383)
## Which issue does this PR close? - Refers to #17899 ## Rationale for this change A performance improvement of `first_last.rs` (up to 36%), based on the original author's observations. UPD: rebased on yesterday's refactoring in #17899 - сс @UBarney @alamb ## What changes are included in this PR? - Optimise memory allocation in `take_state` - no need to copy vectors and buffers - Optimise extracting single elements - Pre-compute common data for sorting ~~- Rename structs and functions as recommended in TODOs - a majority of changes in this PR~~ - Add benchmark. It's pretty complicated to test aggregates with grouping, since many operations are stateful, so I introduced end-to-end `evaluate` test (to actually test taking state) and `convert_to_state` (as in other benches) ## Are these changes tested? - Existing unit and integration tests - A new bench has a meaningful result Improvements: up to 36% Raw bench result: <details> first_value convert_to_state nulls=0%, filter=false time: [98.086 µs 99.014 µs 100.25 µs] change: [−15.400% −13.532% −11.559%] (p = 0.00 < 0.05) Performance has improved. Found 17 outliers among 100 measurements (17.00%) 4 (4.00%) low mild 3 (3.00%) high mild 10 (10.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=false, first(2): Collecting 100 samples in estimated 7.1575 s (10k ite first_value evaluate_bench nulls=0%, filter=false, first(2) time: [54.938 µs 55.482 µs 56.060 µs] change: [−38.163% −36.295% −34.463%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 5 (5.00%) high mild 4 (4.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=false, all: Collecting 100 samples in estimated 7.0741 s (10k iteratio first_value evaluate_bench nulls=0%, filter=false, all time: [50.624 µs 51.092 µs 51.612 µs] change: [−17.955% −16.332% −14.593%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 5 (5.00%) high mild 4 (4.00%) high severe first_value convert_to_state nulls=0%, filter=true time: [2.0647 µs 2.0881 µs 2.1148 µs] change: [−7.6314% −6.2174% −4.8530%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 2 (2.00%) high mild 6 (6.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=true, first(2): Collecting 100 samples in estimated 9.6354 s (10k iter first_value evaluate_bench nulls=0%, filter=true, first(2) time: [54.708 µs 55.240 µs 55.805 µs] change: [−27.249% −24.755% −22.195%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) high mild 2 (2.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=true, all: Collecting 100 samples in estimated 9.6394 s (10k iteration first_value evaluate_bench nulls=0%, filter=true, all time: [50.540 µs 50.963 µs 51.424 µs] change: [−6.8346% −3.9146% −0.8864%] (p = 0.01 < 0.05) Change within noise threshold. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) high mild 3 (3.00%) high severe Benchmarking first_value convert_to_state nulls=90%, filter=false: Collecting 100 samples in estimated 5.0440 s (50k iterations first_value convert_to_state nulls=90%, filter=false time: [98.054 µs 98.996 µs 100.10 µs] change: [−3.4798% −2.2129% −1.0173%] (p = 0.00 < 0.05) Performance has improved. Found 19 outliers among 100 measurements (19.00%) 1 (1.00%) low severe 8 (8.00%) low mild 2 (2.00%) high mild 8 (8.00%) high severe Benchmarking first_value evaluate_bench nulls=90%, filter=false, first(2): Collecting 100 samples in estimated 8.5385 s (10k it first_value evaluate_bench nulls=90%, filter=false, first(2) time: [53.780 µs 54.673 µs 55.639 µs] change: [−17.702% −15.978% −14.173%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 1 (1.00%) low mild 5 (5.00%) high mild 2 (2.00%) high severe Benchmarking first_value evaluate_bench nulls=90%, filter=false, all: Collecting 100 samples in estimated 8.2692 s (10k iterati first_value evaluate_bench nulls=90%, filter=false, all time: [49.851 µs 50.289 µs 50.755 µs] change: [−4.8554% −3.1896% −1.3951%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 3 (3.00%) low severe 2 (2.00%) high mild 3 (3.00%) high severe Benchmarking first_value convert_to_state nulls=90%, filter=true: Collecting 100 samples in estimated 5.0077 s (2.4M iterations first_value convert_to_state nulls=90%, filter=true time: [2.0339 µs 2.0465 µs 2.0603 µs] change: [−1.8037% −0.7068% +0.3821%] (p = 0.22 > 0.05) No change in performance detected. Found 8 outliers among 100 measurements (8.00%) 2 (2.00%) low mild 3 (3.00%) high mild 3 (3.00%) high severe Benchmarking first_value evaluate_bench nulls=90%, filter=true, first(2): Collecting 100 samples in estimated 9.7925 s (10k ite first_value evaluate_bench nulls=90%, filter=true, first(2) time: [54.544 µs 55.119 µs 55.720 µs] change: [−15.717% −13.982% −12.279%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high severe Benchmarking first_value evaluate_bench nulls=90%, filter=true, all: Collecting 100 samples in estimated 9.7401 s (10k iteratio first_value evaluate_bench nulls=90%, filter=true, all time: [50.126 µs 50.886 µs 51.703 µs] change: [+0.8379% +4.1713% +7.1024%] (p = 0.00 < 0.05) Change within noise threshold. Found 9 outliers among 100 measurements (9.00%) 6 (6.00%) high mild 3 (3.00%) high severe last_value convert_to_state nulls=0%, filter=false time: [97.957 µs 98.314 µs 98.692 µs] change: [−2.8086% −2.0315% −1.2541%] (p = 0.00 < 0.05) Performance has improved. Found 10 outliers among 100 measurements (10.00%) 7 (7.00%) high mild 3 (3.00%) high severe Benchmarking last_value evaluate_bench nulls=0%, filter=false, first(2): Collecting 100 samples in estimated 7.0582 s (10k iter last_value evaluate_bench nulls=0%, filter=false, first(2) time: [52.692 µs 53.414 µs 54.144 µs] change: [−22.228% −20.636% −19.153%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 1 (1.00%) low mild 2 (2.00%) high mild 2 (2.00%) high severe Benchmarking last_value evaluate_bench nulls=0%, filter=false, all: Collecting 100 samples in estimated 6.9411 s (10k iteration last_value evaluate_bench nulls=0%, filter=false, all time: [49.781 µs 50.226 µs 50.793 µs] change: [−1.9658% −0.1634% +1.5825%] (p = 0.86 > 0.05) No change in performance detected. Found 11 outliers among 100 measurements (11.00%) 1 (1.00%) low mild 6 (6.00%) high mild 4 (4.00%) high severe last_value convert_to_state nulls=0%, filter=true time: [2.0639 µs 2.0781 µs 2.0949 µs] change: [−0.7535% +0.4491% +1.6752%] (p = 0.47 > 0.05) No change in performance detected. Found 11 outliers among 100 measurements (11.00%) 3 (3.00%) low mild 4 (4.00%) high mild 4 (4.00%) high severe Benchmarking last_value evaluate_bench nulls=0%, filter=true, first(2): Collecting 100 samples in estimated 9.8040 s (10k itera last_value evaluate_bench nulls=0%, filter=true, first(2) time: [53.779 µs 54.311 µs 54.868 µs] change: [−15.863% −14.071% −12.391%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 4 (4.00%) high mild 3 (3.00%) high severe Benchmarking last_value evaluate_bench nulls=0%, filter=true, all: Collecting 100 samples in estimated 9.6860 s (10k iterations last_value evaluate_bench nulls=0%, filter=true, all time: [50.276 µs 50.794 µs 51.429 µs] change: [−1.6780% +0.0697% +1.9541%] (p = 0.94 > 0.05) No change in performance detected. Found 6 outliers among 100 measurements (6.00%) 2 (2.00%) high mild 4 (4.00%) high severe last_value convert_to_state nulls=90%, filter=false time: [97.508 µs 98.412 µs 99.486 µs] change: [−1.2994% −0.3404% +0.8319%] (p = 0.52 > 0.05) No change in performance detected. Found 9 outliers among 100 measurements (9.00%) 2 (2.00%) low mild 3 (3.00%) high mild 4 (4.00%) high severe Benchmarking last_value evaluate_bench nulls=90%, filter=false, first(2): Collecting 100 samples in estimated 8.9282 s (10k ite last_value evaluate_bench nulls=90%, filter=false, first(2) time: [54.064 µs 54.748 µs 55.433 µs] change: [−15.790% −13.678% −11.790%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) low mild 1 (1.00%) high mild 1 (1.00%) high severe Benchmarking last_value evaluate_bench nulls=90%, filter=false, all: Collecting 100 samples in estimated 9.2411 s (10k iteratio last_value evaluate_bench nulls=90%, filter=false, all time: [49.964 µs 50.731 µs 51.630 µs] change: [−2.4530% −0.2470% +1.8407%] (p = 0.82 > 0.05) No change in performance detected. Found 9 outliers among 100 measurements (9.00%) 8 (8.00%) high mild 1 (1.00%) high severe last_value convert_to_state nulls=90%, filter=true time: [2.0660 µs 2.0874 µs 2.1139 µs] change: [−3.2299% −1.8585% −0.5850%] (p = 0.01 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 4 (4.00%) high mild 6 (6.00%) high severe Benchmarking last_value evaluate_bench nulls=90%, filter=true, first(2): Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.6s, enable flat sampling, or reduce sample count to 60. Benchmarking last_value evaluate_bench nulls=90%, filter=true, first(2): Collecting 100 samples in estimated 5.6274 s (5050 ite last_value evaluate_bench nulls=90%, filter=true, first(2) time: [53.565 µs 54.399 µs 55.381 µs] change: [−18.757% −16.680% −14.334%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 8 (8.00%) high mild Benchmarking last_value evaluate_bench nulls=90%, filter=true, all: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.3s, enable flat sampling, or reduce sample count to 60. Benchmarking last_value evaluate_bench nulls=90%, filter=true, all: Collecting 100 samples in estimated 5.3402 s (5050 iteratio last_value evaluate_bench nulls=90%, filter=true, all time: [49.473 µs 50.055 µs 50.743 µs] change: [−41.448% −32.836% −24.471%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 5 (5.00%) high mild 4 (4.00%) high severe cargo bench --bench first_last -- --baseline main-first_last3 810.16s user 14.72s system 135% cpu 10:09.30 total irix@tau ~/projects/third-party/datafusion [optimise-first_last (theirix/optimise-first_last)?] % time cargo bench --bench first_last -- --baseline main-first_last4 Compiling datafusion-functions-aggregate v53.0.0 (/Users/irix/projects/third-party/datafusion/datafusion/functions-aggregate) Finished `bench` profile [optimized] target(s) in 5m 04s Running benches/first_last.rs (target/release/deps/first_last-c2812404ab64de23) Gnuplot not found, using plotters backend first_value convert_to_state nulls=0%, filter=false time: [100.29 µs 100.92 µs 101.62 µs] change: [−1.3141% −0.4446% +0.4530%] (p = 0.34 > 0.05) No change in performance detected. Found 9 outliers among 100 measurements (9.00%) 7 (7.00%) high mild 2 (2.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=false, first(2): Collecting 100 samples in estimated 7.6058 s (10k ite first_value evaluate_bench nulls=0%, filter=false, first(2) time: [57.325 µs 58.842 µs 60.567 µs] change: [−14.545% −12.765% −10.817%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) high mild 2 (2.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=false, all: Collecting 100 samples in estimated 7.1421 s (10k iteratio first_value evaluate_bench nulls=0%, filter=false, all time: [52.632 µs 53.182 µs 53.814 µs] change: [−9.5351% −6.9990% −4.3937%] (p = 0.00 < 0.05) Performance has improved. Found 10 outliers among 100 measurements (10.00%) 1 (1.00%) low mild 6 (6.00%) high mild 3 (3.00%) high severe first_value convert_to_state nulls=0%, filter=true time: [2.2486 µs 2.3289 µs 2.4224 µs] change: [−14.956% −12.528% −9.6570%] (p = 0.00 < 0.05) Performance has improved. Found 19 outliers among 100 measurements (19.00%) 2 (2.00%) high mild 17 (17.00%) high severe Benchmarking first_value evaluate_bench nulls=0%, filter=true, first(2): Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.3s, enable flat sampling, or reduce sample count to 60. Benchmarking first_value evaluate_bench nulls=0%, filter=true, first(2): Collecting 100 samples in estimated 5.3276 s (5050 ite first_value evaluate_bench nulls=0%, filter=true, first(2) time: [60.696 µs 62.295 µs 63.977 µs] change: [−28.986% −27.090% −25.120%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 2 (2.00%) low mild 5 (5.00%) high mild Benchmarking first_value evaluate_bench nulls=0%, filter=true, all: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.1s, enable flat sampling, or reduce sample count to 60. Benchmarking first_value evaluate_bench nulls=0%, filter=true, all: Collecting 100 samples in estimated 5.1224 s (5050 iteratio first_value evaluate_bench nulls=0%, filter=true, all time: [53.834 µs 54.827 µs 55.913 µs] change: [−12.918% −8.8195% −4.0336%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) low mild 1 (1.00%) high mild 3 (3.00%) high severe Benchmarking first_value convert_to_state nulls=90%, filter=false: Collecting 100 samples in estimated 5.1237 s (50k iterations first_value convert_to_state nulls=90%, filter=false time: [104.46 µs 106.26 µs 108.42 µs] change: [−11.589% −8.9599% −6.3106%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 6 (6.00%) high mild Benchmarking first_value evaluate_bench nulls=90%, filter=false, first(2): Collecting 100 samples in estimated 8.1885 s (10k it first_value evaluate_bench nulls=90%, filter=false, first(2) time: [59.528 µs 60.651 µs 61.809 µs] change: [−21.550% −19.288% −16.929%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild Benchmarking first_value evaluate_bench nulls=90%, filter=false, all: Collecting 100 samples in estimated 8.2091 s (10k iterati first_value evaluate_bench nulls=90%, filter=false, all time: [53.384 µs 54.251 µs 55.248 µs] change: [−9.1283% −6.6250% −4.4009%] (p = 0.00 < 0.05) Performance has improved. Found 13 outliers among 100 measurements (13.00%) 7 (7.00%) high mild 6 (6.00%) high severe Benchmarking first_value convert_to_state nulls=90%, filter=true: Collecting 100 samples in estimated 5.0021 s (2.4M iterations first_value convert_to_state nulls=90%, filter=true time: [2.1172 µs 2.1355 µs 2.1555 µs] change: [−0.6685% +0.5813% +1.7649%] (p = 0.37 > 0.05) No change in performance detected. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe Benchmarking first_value evaluate_bench nulls=90%, filter=true, first(2): Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.2s, enable flat sampling, or reduce sample count to 60. Benchmarking first_value evaluate_bench nulls=90%, filter=true, first(2): Collecting 100 samples in estimated 5.2407 s (5050 it first_value evaluate_bench nulls=90%, filter=true, first(2) time: [57.516 µs 58.757 µs 60.089 µs] change: [−18.305% −15.732% −13.092%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild Benchmarking first_value evaluate_bench nulls=90%, filter=true, all: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.3s, enable flat sampling, or reduce sample count to 60. Benchmarking first_value evaluate_bench nulls=90%, filter=true, all: Collecting 100 samples in estimated 5.2574 s (5050 iterati first_value evaluate_bench nulls=90%, filter=true, all time: [63.381 µs 65.433 µs 67.411 µs] change: [+5.8137% +12.240% +19.268%] (p = 0.00 < 0.05) Performance has regressed. Found 10 outliers among 100 measurements (10.00%) 1 (1.00%) high mild 9 (9.00%) high severe last_value convert_to_state nulls=0%, filter=false time: [100.59 µs 101.71 µs 103.06 µs] change: [−0.8042% +0.1307% +1.0555%] (p = 0.79 > 0.05) No change in performance detected. Found 11 outliers among 100 measurements (11.00%) 2 (2.00%) low mild 6 (6.00%) high mild 3 (3.00%) high severe Benchmarking last_value evaluate_bench nulls=0%, filter=false, first(2): Collecting 100 samples in estimated 7.3301 s (10k iter last_value evaluate_bench nulls=0%, filter=false, first(2) time: [59.031 µs 60.067 µs 61.215 µs] change: [−20.562% −15.398% −10.596%] (p = 0.00 < 0.05) Performance has improved. Benchmarking last_value evaluate_bench nulls=0%, filter=false, all: Collecting 100 samples in estimated 7.4307 s (10k iteration last_value evaluate_bench nulls=0%, filter=false, all time: [54.526 µs 55.568 µs 56.708 µs] change: [−9.5126% −5.3244% −1.2387%] (p = 0.02 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 2 (2.00%) high mild 1 (1.00%) high severe last_value convert_to_state nulls=0%, filter=true time: [2.1428 µs 2.1838 µs 2.2289 µs] change: [−5.7061% −3.1097% −0.6074%] (p = 0.02 < 0.05) Change within noise threshold. Benchmarking last_value evaluate_bench nulls=0%, filter=true, first(2): Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.1s, enable flat sampling, or reduce sample count to 70. Benchmarking last_value evaluate_bench nulls=0%, filter=true, first(2): Collecting 100 samples in estimated 5.0694 s (5050 iter last_value evaluate_bench nulls=0%, filter=true, first(2) time: [63.666 µs 66.076 µs 68.373 µs] change: [−17.910% −13.091% −8.0825%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild Benchmarking last_value evaluate_bench nulls=0%, filter=true, all: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.1s, enable flat sampling, or reduce sample count to 60. Benchmarking last_value evaluate_bench nulls=0%, filter=true, all: Collecting 100 samples in estimated 5.1148 s (5050 iteration last_value evaluate_bench nulls=0%, filter=true, all time: [51.817 µs 52.699 µs 53.714 µs] change: [−15.171% −12.649% −10.079%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 1 (1.00%) low mild 3 (3.00%) high mild 3 (3.00%) high severe last_value convert_to_state nulls=90%, filter=false time: [107.51 µs 110.89 µs 114.78 µs] change: [−0.7613% +1.8668% +4.7036%] (p = 0.17 > 0.05) No change in performance detected. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild Benchmarking last_value evaluate_bench nulls=90%, filter=false, first(2): Collecting 100 samples in estimated 9.6846 s (10k ite last_value evaluate_bench nulls=90%, filter=false, first(2) time: [58.180 µs 58.969 µs 59.915 µs] change: [−19.674% −16.125% −12.269%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) high mild 1 (1.00%) high severe Benchmarking last_value evaluate_bench nulls=90%, filter=false, all: Collecting 100 samples in estimated 9.2340 s (10k iteratio last_value evaluate_bench nulls=90%, filter=false, all time: [52.432 µs 53.242 µs 54.103 µs] change: [−12.306% −9.3583% −6.3687%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 7 (7.00%) high mild 1 (1.00%) high severe last_value convert_to_state nulls=90%, filter=true time: [2.0731 µs 2.0901 µs 2.1137 µs] change: [−10.926% −9.9495% −8.9933%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 3 (3.00%) low mild 2 (2.00%) high mild 3 (3.00%) high severe Benchmarking last_value evaluate_bench nulls=90%, filter=true, first(2): Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60. Benchmarking last_value evaluate_bench nulls=90%, filter=true, first(2): Collecting 100 samples in estimated 5.7484 s (5050 ite last_value evaluate_bench nulls=90%, filter=true, first(2) time: [56.238 µs 57.148 µs 58.202 µs] change: [−31.872% −29.982% −27.838%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 4 (4.00%) high mild 3 (3.00%) high severe Benchmarking last_value evaluate_bench nulls=90%, filter=true, all: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.8s, enable flat sampling, or reduce sample count to 60. Benchmarking last_value evaluate_bench nulls=90%, filter=true, all: Collecting 100 samples in estimated 5.7562 s (5050 iteratio last_value evaluate_bench nulls=90%, filter=true, all time: [51.057 µs 52.223 µs 53.542 µs] change: [−21.819% −19.372% −16.998%] (p = 0.00 < 0.05) Performance has improved. Found 7 outliers among 100 measurements (7.00%) 1 (1.00%) low mild 4 (4.00%) high mild 2 (2.00%) high severe cargo bench --bench first_last -- --baseline main-first_last4 786.76s user 16.16s system 135% cpu 9:50.80 total </details> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 1bb588e commit 95ef332

2 files changed

Lines changed: 22 additions & 29 deletions

File tree

datafusion/functions-aggregate/src/first_last.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> {
426426
return Ok(true);
427427
}
428428

429-
assert!(new_ordering_values.len() == self.ordering_req.len());
429+
debug_assert!(new_ordering_values.len() == self.ordering_req.len());
430430
let current_ordering = &self.orderings[group_idx];
431431
compare_rows(current_ordering, new_ordering_values, &self.sort_options).map(|x| {
432432
if self.pick_first_in_group {
@@ -485,7 +485,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> {
485485
self.state.update(group_idx, array, idx)?;
486486
self.is_sets.set_bit(group_idx, true);
487487

488-
assert!(orderings.len() == self.ordering_req.len());
488+
debug_assert!(orderings.len() == self.ordering_req.len());
489489
let old_size = ScalarValue::size_of_vec(&self.orderings[group_idx]);
490490
self.orderings[group_idx].clear();
491491
self.orderings[group_idx].extend_from_slice(orderings);
@@ -650,7 +650,7 @@ impl<S: ValueState + 'static> GroupsAccumulator for FirstLastGroupsAccumulator<S
650650
ordering_cols.push(Vec::with_capacity(self.orderings.len()));
651651
}
652652
for row in orderings.into_iter() {
653-
assert_eq!(row.len(), self.ordering_req.len());
653+
debug_assert!(row.len() == self.ordering_req.len());
654654
for (col_idx, ordering) in row.into_iter().enumerate() {
655655
ordering_cols[col_idx].push(ordering);
656656
}
@@ -784,8 +784,7 @@ impl Accumulator for TrivialFirstValueAccumulator {
784784
first_idx = Some(0);
785785
}
786786
if let Some(first_idx) = first_idx {
787-
let mut row = get_row_at_idx(values, first_idx)?;
788-
self.first = row.swap_remove(0);
787+
self.first = ScalarValue::try_from_array(&values[0], first_idx)?;
789788
self.first.compact();
790789
self.is_set = true;
791790
}
@@ -831,6 +830,8 @@ pub struct FirstValueAccumulator {
831830
orderings: Vec<ScalarValue>,
832831
// Stores the applicable ordering requirement.
833832
ordering_req: LexOrdering,
833+
// derived from `ordering_req`.
834+
sort_options: Vec<SortOptions>,
834835
// Stores whether incoming data already satisfies the ordering requirement.
835836
is_input_pre_ordered: bool,
836837
// Ignore null values.
@@ -850,11 +851,13 @@ impl FirstValueAccumulator {
850851
.iter()
851852
.map(ScalarValue::try_from)
852853
.collect::<Result<_>>()?;
854+
let sort_options = get_sort_options(&ordering_req);
853855
ScalarValue::try_from(data_type).map(|first| Self {
854856
first,
855857
is_set: false,
856858
orderings,
857859
ordering_req,
860+
sort_options,
858861
is_input_pre_ordered,
859862
ignore_nulls,
860863
})
@@ -927,12 +930,8 @@ impl Accumulator for FirstValueAccumulator {
927930
let row = get_row_at_idx(values, first_idx)?;
928931
if !self.is_set
929932
|| (!self.is_input_pre_ordered
930-
&& compare_rows(
931-
&self.orderings,
932-
&row[1..],
933-
&get_sort_options(&self.ordering_req),
934-
)?
935-
.is_gt())
933+
&& compare_rows(&self.orderings, &row[1..], &self.sort_options)?
934+
.is_gt())
936935
{
937936
self.update_with_new_row(row);
938937
}
@@ -960,10 +959,10 @@ impl Accumulator for FirstValueAccumulator {
960959
let mut first_row = get_row_at_idx(&filtered_states, first_idx)?;
961960
// When collecting orderings, we exclude the is_set flag from the state.
962961
let first_ordering = &first_row[1..is_set_idx];
963-
let sort_options = get_sort_options(&self.ordering_req);
964962
// Either there is no existing value, or there is an earlier version in new data.
965963
if !self.is_set
966-
|| compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt()
964+
|| compare_rows(&self.orderings, first_ordering, &self.sort_options)?
965+
.is_gt()
967966
{
968967
// Update with first value in the state. Note that we should exclude the
969968
// is_set flag from the state. Otherwise, we will end up with a state
@@ -1176,8 +1175,7 @@ impl Accumulator for TrivialLastValueAccumulator {
11761175
last_idx = Some(value.len() - 1);
11771176
}
11781177
if let Some(last_idx) = last_idx {
1179-
let mut row = get_row_at_idx(values, last_idx)?;
1180-
self.last = row.swap_remove(0);
1178+
self.last = ScalarValue::try_from_array(&values[0], last_idx)?;
11811179
self.last.compact();
11821180
self.is_set = true;
11831181
}
@@ -1221,6 +1219,8 @@ struct LastValueAccumulator {
12211219
orderings: Vec<ScalarValue>,
12221220
// Stores the applicable ordering requirement.
12231221
ordering_req: LexOrdering,
1222+
// derived from `ordering_req`.
1223+
sort_options: Vec<SortOptions>,
12241224
// Stores whether incoming data already satisfies the ordering requirement.
12251225
is_input_pre_ordered: bool,
12261226
// Ignore null values.
@@ -1240,11 +1240,13 @@ impl LastValueAccumulator {
12401240
.iter()
12411241
.map(ScalarValue::try_from)
12421242
.collect::<Result<_>>()?;
1243+
let sort_options = get_sort_options(&ordering_req);
12431244
ScalarValue::try_from(data_type).map(|last| Self {
12441245
last,
12451246
is_set: false,
12461247
orderings,
12471248
ordering_req,
1249+
sort_options,
12481250
is_input_pre_ordered,
12491251
ignore_nulls,
12501252
})
@@ -1317,12 +1319,7 @@ impl Accumulator for LastValueAccumulator {
13171319
// Update when there is a more recent entry
13181320
if !self.is_set
13191321
|| self.is_input_pre_ordered
1320-
|| compare_rows(
1321-
&self.orderings,
1322-
orderings,
1323-
&get_sort_options(&self.ordering_req),
1324-
)?
1325-
.is_lt()
1322+
|| compare_rows(&self.orderings, orderings, &self.sort_options)?.is_lt()
13261323
{
13271324
self.update_with_new_row(row);
13281325
}
@@ -1350,12 +1347,12 @@ impl Accumulator for LastValueAccumulator {
13501347
let mut last_row = get_row_at_idx(&filtered_states, last_idx)?;
13511348
// When collecting orderings, we exclude the is_set flag from the state.
13521349
let last_ordering = &last_row[1..is_set_idx];
1353-
let sort_options = get_sort_options(&self.ordering_req);
13541350
// Either there is no existing value, or there is a newer (latest)
13551351
// version in the new data:
13561352
if !self.is_set
13571353
|| self.is_input_pre_ordered
1358-
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
1354+
|| compare_rows(&self.orderings, last_ordering, &self.sort_options)?
1355+
.is_lt()
13591356
{
13601357
// Update with last value in the state. Note that we should exclude the
13611358
// is_set flag from the state. Otherwise, we will end up with a state

datafusion/functions-aggregate/src/first_last/state.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,9 @@ pub(crate) fn take_need(
300300
EmitTo::First(n) => {
301301
// split off the first N values in seen_values
302302
//
303-
// TODO make this more efficient rather than two
304-
// copies and bitwise manipulation
305-
let first_n: BooleanBuffer = bool_buf.iter().take(n).collect();
303+
let first_n: BooleanBuffer = bool_buf.slice(0, n);
306304
// reset the existing buffer
307-
for b in bool_buf.iter().skip(n) {
308-
bool_buf_builder.append(b);
309-
}
305+
bool_buf_builder.append_buffer(&bool_buf.slice(n, bool_buf.len() - n));
310306
first_n
311307
}
312308
}

0 commit comments

Comments
 (0)