Skip to content

Commit d0692b8

Browse files
authored
bench: Scale sort benchmarks to 1M rows to exercise merge path (#21630)
## Which issue does this PR close? - Partially addresses #21543. Also needed to properly evaluate the ExternalSorter refactor in #21629, which improves the merge path. ## Rationale for this change Current sort benchmarks use 100K rows across 8 partitions (~12.5K rows per partition, ~100KB for integers). This falls below the `sort_in_place_threshold_bytes` (1MB), so the "sort partitioned" benchmarks always take the concat-and-sort-in-place path and never exercise the sort-then-merge path that dominates real workloads. ## What changes are included in this PR? Parameterizes the sort benchmark on input size, running each case at both 100K rows (existing) and 1M rows (new). At 1M rows, each partition holds ~125K rows (~1MB for integers), which exercises the merge path. - `INPUT_SIZE` constant replaced with `INPUT_SIZES` array: `[(100_000, "100k"), (1_000_000, "1M")]` - `DataGenerator` takes `input_size` as a constructor parameter - All stream generator functions accept `input_size` - Benchmark names include size label (e.g. `sort partitioned i64 100k`, `sort partitioned i64 10M`) - Data distribution and cardinality ratios are preserved across sizes ## Are these changes tested? Benchmark compiles and runs. No functional test changes. ## Are there any user-facing changes? No.
1 parent 26c6121 commit d0692b8

1 file changed

Lines changed: 136 additions & 82 deletions

File tree

datafusion/core/benches/sort.rs

Lines changed: 136 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -102,61 +102,104 @@ const NUM_STREAMS: usize = 8;
102102
/// The size of each batch within each stream
103103
const BATCH_SIZE: usize = 1024;
104104

105-
/// Total number of input rows to generate
106-
const INPUT_SIZE: u64 = 100000;
105+
/// Input sizes to benchmark. The small size (100K) exercises the
106+
/// in-memory concat-and-sort path; the large size (10M) exercises
107+
/// the sort-then-merge path with high fan-in.
108+
const INPUT_SIZES: &[(u64, &str)] = &[(100_000, "100k"), (1_000_000, "1M")];
107109

108110
type PartitionedBatches = Vec<Vec<RecordBatch>>;
111+
type StreamGenerator = Box<dyn Fn(bool) -> PartitionedBatches>;
109112

110113
fn criterion_benchmark(c: &mut Criterion) {
111-
let cases: Vec<(&str, &dyn Fn(bool) -> PartitionedBatches)> = vec![
112-
("i64", &i64_streams),
113-
("f64", &f64_streams),
114-
("utf8 low cardinality", &utf8_low_cardinality_streams),
115-
("utf8 high cardinality", &utf8_high_cardinality_streams),
116-
(
117-
"utf8 view low cardinality",
118-
&utf8_view_low_cardinality_streams,
119-
),
120-
(
121-
"utf8 view high cardinality",
122-
&utf8_view_high_cardinality_streams,
123-
),
124-
("utf8 tuple", &utf8_tuple_streams),
125-
("utf8 view tuple", &utf8_view_tuple_streams),
126-
("utf8 dictionary", &dictionary_streams),
127-
("utf8 dictionary tuple", &dictionary_tuple_streams),
128-
("mixed dictionary tuple", &mixed_dictionary_tuple_streams),
129-
("mixed tuple", &mixed_tuple_streams),
130-
(
131-
"mixed tuple with utf8 view",
132-
&mixed_tuple_with_utf8_view_streams,
133-
),
134-
];
135-
136-
for (name, f) in cases {
137-
c.bench_function(&format!("merge sorted {name}"), |b| {
138-
let data = f(true);
139-
let case = BenchCase::merge_sorted(&data);
140-
b.iter(move || case.run())
141-
});
142-
143-
c.bench_function(&format!("sort merge {name}"), |b| {
144-
let data = f(false);
145-
let case = BenchCase::sort_merge(&data);
146-
b.iter(move || case.run())
147-
});
148-
149-
c.bench_function(&format!("sort {name}"), |b| {
150-
let data = f(false);
151-
let case = BenchCase::sort(&data);
152-
b.iter(move || case.run())
153-
});
154-
155-
c.bench_function(&format!("sort partitioned {name}"), |b| {
156-
let data = f(false);
157-
let case = BenchCase::sort_partitioned(&data);
158-
b.iter(move || case.run())
159-
});
114+
for &(input_size, size_label) in INPUT_SIZES {
115+
let cases: Vec<(&str, StreamGenerator)> = vec![
116+
(
117+
"i64",
118+
Box::new(move |sorted| i64_streams(sorted, input_size)),
119+
),
120+
(
121+
"f64",
122+
Box::new(move |sorted| f64_streams(sorted, input_size)),
123+
),
124+
(
125+
"utf8 low cardinality",
126+
Box::new(move |sorted| utf8_low_cardinality_streams(sorted, input_size)),
127+
),
128+
(
129+
"utf8 high cardinality",
130+
Box::new(move |sorted| utf8_high_cardinality_streams(sorted, input_size)),
131+
),
132+
(
133+
"utf8 view low cardinality",
134+
Box::new(move |sorted| {
135+
utf8_view_low_cardinality_streams(sorted, input_size)
136+
}),
137+
),
138+
(
139+
"utf8 view high cardinality",
140+
Box::new(move |sorted| {
141+
utf8_view_high_cardinality_streams(sorted, input_size)
142+
}),
143+
),
144+
(
145+
"utf8 tuple",
146+
Box::new(move |sorted| utf8_tuple_streams(sorted, input_size)),
147+
),
148+
(
149+
"utf8 view tuple",
150+
Box::new(move |sorted| utf8_view_tuple_streams(sorted, input_size)),
151+
),
152+
(
153+
"utf8 dictionary",
154+
Box::new(move |sorted| dictionary_streams(sorted, input_size)),
155+
),
156+
(
157+
"utf8 dictionary tuple",
158+
Box::new(move |sorted| dictionary_tuple_streams(sorted, input_size)),
159+
),
160+
(
161+
"mixed dictionary tuple",
162+
Box::new(move |sorted| {
163+
mixed_dictionary_tuple_streams(sorted, input_size)
164+
}),
165+
),
166+
(
167+
"mixed tuple",
168+
Box::new(move |sorted| mixed_tuple_streams(sorted, input_size)),
169+
),
170+
(
171+
"mixed tuple with utf8 view",
172+
Box::new(move |sorted| {
173+
mixed_tuple_with_utf8_view_streams(sorted, input_size)
174+
}),
175+
),
176+
];
177+
178+
for (name, f) in &cases {
179+
c.bench_function(&format!("merge sorted {name} {size_label}"), |b| {
180+
let data = f(true);
181+
let case = BenchCase::merge_sorted(&data);
182+
b.iter(move || case.run())
183+
});
184+
185+
c.bench_function(&format!("sort merge {name} {size_label}"), |b| {
186+
let data = f(false);
187+
let case = BenchCase::sort_merge(&data);
188+
b.iter(move || case.run())
189+
});
190+
191+
c.bench_function(&format!("sort {name} {size_label}"), |b| {
192+
let data = f(false);
193+
let case = BenchCase::sort(&data);
194+
b.iter(move || case.run())
195+
});
196+
197+
c.bench_function(&format!("sort partitioned {name} {size_label}"), |b| {
198+
let data = f(false);
199+
let case = BenchCase::sort_partitioned(&data);
200+
b.iter(move || case.run())
201+
});
202+
}
160203
}
161204
}
162205

@@ -279,8 +322,8 @@ fn make_sort_exprs(schema: &Schema) -> LexOrdering {
279322
}
280323

281324
/// Create streams of int64 (where approximately 1/3 values is repeated)
282-
fn i64_streams(sorted: bool) -> PartitionedBatches {
283-
let mut values = DataGenerator::new().i64_values();
325+
fn i64_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
326+
let mut values = DataGenerator::new(input_size).i64_values();
284327
if sorted {
285328
values.sort_unstable();
286329
}
@@ -293,8 +336,8 @@ fn i64_streams(sorted: bool) -> PartitionedBatches {
293336

294337
/// Create streams of f64 (where approximately 1/3 values are repeated)
295338
/// with the same distribution as i64_streams
296-
fn f64_streams(sorted: bool) -> PartitionedBatches {
297-
let mut values = DataGenerator::new().f64_values();
339+
fn f64_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
340+
let mut values = DataGenerator::new(input_size).f64_values();
298341
if sorted {
299342
values.sort_unstable_by(|a, b| a.total_cmp(b));
300343
}
@@ -306,8 +349,8 @@ fn f64_streams(sorted: bool) -> PartitionedBatches {
306349
}
307350

308351
/// Create streams of random low cardinality utf8 values
309-
fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
310-
let mut values = DataGenerator::new().utf8_low_cardinality_values();
352+
fn utf8_low_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
353+
let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values();
311354
if sorted {
312355
values.sort_unstable();
313356
}
@@ -318,8 +361,11 @@ fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
318361
}
319362

320363
/// Create streams of random low cardinality utf8_view values
321-
fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
322-
let mut values = DataGenerator::new().utf8_low_cardinality_values();
364+
fn utf8_view_low_cardinality_streams(
365+
sorted: bool,
366+
input_size: u64,
367+
) -> PartitionedBatches {
368+
let mut values = DataGenerator::new(input_size).utf8_low_cardinality_values();
323369
if sorted {
324370
values.sort_unstable();
325371
}
@@ -330,8 +376,11 @@ fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches {
330376
}
331377

332378
/// Create streams of high cardinality (~ no duplicates) utf8_view values
333-
fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
334-
let mut values = DataGenerator::new().utf8_high_cardinality_values();
379+
fn utf8_view_high_cardinality_streams(
380+
sorted: bool,
381+
input_size: u64,
382+
) -> PartitionedBatches {
383+
let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values();
335384
if sorted {
336385
values.sort_unstable();
337386
}
@@ -342,8 +391,8 @@ fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
342391
}
343392

344393
/// Create streams of high cardinality (~ no duplicates) utf8 values
345-
fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
346-
let mut values = DataGenerator::new().utf8_high_cardinality_values();
394+
fn utf8_high_cardinality_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
395+
let mut values = DataGenerator::new(input_size).utf8_high_cardinality_values();
347396
if sorted {
348397
values.sort_unstable();
349398
}
@@ -354,8 +403,8 @@ fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches {
354403
}
355404

356405
/// Create a batch of (utf8_low, utf8_low, utf8_high)
357-
fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
358-
let mut data_gen = DataGenerator::new();
406+
fn utf8_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
407+
let mut data_gen = DataGenerator::new(input_size);
359408

360409
// need to sort by the combined key, so combine them together
361410
let mut tuples: Vec<_> = data_gen
@@ -387,8 +436,8 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches {
387436
}
388437

389438
/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high)
390-
fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
391-
let mut data_gen = DataGenerator::new();
439+
fn utf8_view_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
440+
let mut data_gen = DataGenerator::new(input_size);
392441

393442
// need to sort by the combined key, so combine them together
394443
let mut tuples: Vec<_> = data_gen
@@ -420,8 +469,8 @@ fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches {
420469
}
421470

422471
/// Create a batch of (f64, utf8_low, utf8_low, i64)
423-
fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
424-
let mut data_gen = DataGenerator::new();
472+
fn mixed_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
473+
let mut data_gen = DataGenerator::new(input_size);
425474

426475
// need to sort by the combined key, so combine them together
427476
let mut tuples: Vec<_> = data_gen
@@ -458,8 +507,11 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches {
458507
}
459508

460509
/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64)
461-
fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
462-
let mut data_gen = DataGenerator::new();
510+
fn mixed_tuple_with_utf8_view_streams(
511+
sorted: bool,
512+
input_size: u64,
513+
) -> PartitionedBatches {
514+
let mut data_gen = DataGenerator::new(input_size);
463515

464516
// need to sort by the combined key, so combine them together
465517
let mut tuples: Vec<_> = data_gen
@@ -496,8 +548,8 @@ fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches {
496548
}
497549

498550
/// Create a batch of (utf8_dict)
499-
fn dictionary_streams(sorted: bool) -> PartitionedBatches {
500-
let mut data_gen = DataGenerator::new();
551+
fn dictionary_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
552+
let mut data_gen = DataGenerator::new(input_size);
501553
let mut values = data_gen.utf8_low_cardinality_values();
502554
if sorted {
503555
values.sort_unstable();
@@ -511,8 +563,8 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches {
511563
}
512564

513565
/// Create a batch of (utf8_dict, utf8_dict, utf8_dict)
514-
fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
515-
let mut data_gen = DataGenerator::new();
566+
fn dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
567+
let mut data_gen = DataGenerator::new(input_size);
516568
let mut tuples: Vec<_> = data_gen
517569
.utf8_low_cardinality_values()
518570
.into_iter()
@@ -542,8 +594,8 @@ fn dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
542594
}
543595

544596
/// Create a batch of (utf8_dict, utf8_dict, utf8_dict, i64)
545-
fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
546-
let mut data_gen = DataGenerator::new();
597+
fn mixed_dictionary_tuple_streams(sorted: bool, input_size: u64) -> PartitionedBatches {
598+
let mut data_gen = DataGenerator::new(input_size);
547599
let mut tuples: Vec<_> = data_gen
548600
.utf8_low_cardinality_values()
549601
.into_iter()
@@ -579,19 +631,21 @@ fn mixed_dictionary_tuple_streams(sorted: bool) -> PartitionedBatches {
579631
/// Encapsulates creating data for this test
580632
struct DataGenerator {
581633
rng: StdRng,
634+
input_size: u64,
582635
}
583636

584637
impl DataGenerator {
585-
fn new() -> Self {
638+
fn new(input_size: u64) -> Self {
586639
Self {
587640
rng: StdRng::seed_from_u64(42),
641+
input_size,
588642
}
589643
}
590644

591645
/// Create an array of i64 sorted values (where approximately 1/3 values is repeated)
592646
fn i64_values(&mut self) -> Vec<i64> {
593-
let mut vec: Vec<_> = (0..INPUT_SIZE)
594-
.map(|_| self.rng.random_range(0..INPUT_SIZE as i64))
647+
let mut vec: Vec<_> = (0..self.input_size)
648+
.map(|_| self.rng.random_range(0..self.input_size as i64))
595649
.collect();
596650

597651
vec.sort_unstable();
@@ -614,7 +668,7 @@ impl DataGenerator {
614668
.collect::<Vec<_>>();
615669

616670
// pick from the 100 strings randomly
617-
let mut input = (0..INPUT_SIZE)
671+
let mut input = (0..self.input_size)
618672
.map(|_| {
619673
let idx = self.rng.random_range(0..strings.len());
620674
let s = Arc::clone(&strings[idx]);
@@ -629,7 +683,7 @@ impl DataGenerator {
629683
/// Create sorted values of high cardinality (~ no duplicates) utf8 values
630684
fn utf8_high_cardinality_values(&mut self) -> Vec<Option<String>> {
631685
// make random strings
632-
let mut input = (0..INPUT_SIZE)
686+
let mut input = (0..self.input_size)
633687
.map(|_| Some(self.random_string()))
634688
.collect::<Vec<_>>();
635689

0 commit comments

Comments
 (0)