Skip to content

Commit edf8ad3

Browse files
authored
fix(benchmarks): correct TPC-H benchmark SQL (#21615)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21368. ## Rationale for this change TPC-H query 11 was using a fixed `0.0001` threshold in the HAVING clause. Per the TPC-H spec, this value must be `0.0001 / SF`. This means the benchmark query is only correct for scale factor 1. For larger scale factors the filter becomes too strict, and for smaller scale factors it becomes too loose. There are also few benchmark queries using fixed end dates where the spec uses `date + interval`. Those are equivalent but using intervals matches the TPC-H query definitions more closely. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Make TPC-H query 11 use scale-factor substitution. - Add scale factor support in the benchmark runner. - Infer scale factor from dataset paths like `tpch_sf10`. - Pass the scale factor from `bench.sh`. - Keep the old query-loading entry point working with the default scale factor of 1. - Update query 5, 6, 10, 12, and 14 to use interval-based date ranges. - Add regression tests for scale-factor substitution, scale-factor parsing, and invalid scale factors. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? TPC-H benchmark query 11 now returns correct results when the scale factor is not 1. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7b2f284 commit edf8ad3

9 files changed

Lines changed: 190 additions & 18 deletions

File tree

benchmarks/bench.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ run_tpch() {
677677
echo "Running tpch benchmark..."
678678

679679
FORMAT=$2
680-
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
680+
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --scale-factor "${SCALE_FACTOR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
681681
}
682682

683683
# Runs the tpch in memory (needs tpch parquet data)
@@ -693,7 +693,7 @@ run_tpch_mem() {
693693
echo "RESULTS_FILE: ${RESULTS_FILE}"
694694
echo "Running tpch_mem benchmark..."
695695
# -m means in memory
696-
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
696+
debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --scale-factor "${SCALE_FACTOR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
697697
}
698698

699699
# Runs the tpcds benchmark

benchmarks/queries/q10.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ where
1616
c_custkey = o_custkey
1717
and l_orderkey = o_orderkey
1818
and o_orderdate >= date '1993-10-01'
19-
and o_orderdate < date '1994-01-01'
19+
and o_orderdate < date '1993-10-01' + interval '3' month
2020
and l_returnflag = 'R'
2121
and c_nationkey = n_nationkey
2222
group by

benchmarks/queries/q11.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ group by
1313
ps_partkey having
1414
sum(ps_supplycost * ps_availqty) > (
1515
select
16-
sum(ps_supplycost * ps_availqty) * 0.0001
16+
sum(ps_supplycost * ps_availqty) * 0.0001 /* __TPCH_Q11_FRACTION__ */
1717
from
1818
partsupp,
1919
supplier,
@@ -24,4 +24,4 @@ group by
2424
and n_name = 'GERMANY'
2525
)
2626
order by
27-
value desc;
27+
value desc;

benchmarks/queries/q12.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ where
2323
and l_commitdate < l_receiptdate
2424
and l_shipdate < l_commitdate
2525
and l_receiptdate >= date '1994-01-01'
26-
and l_receiptdate < date '1995-01-01'
26+
and l_receiptdate < date '1994-01-01' + interval '1' year
2727
group by
2828
l_shipmode
2929
order by
30-
l_shipmode;
30+
l_shipmode;

benchmarks/queries/q14.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ from
1010
where
1111
l_partkey = p_partkey
1212
and l_shipdate >= date '1995-09-01'
13-
and l_shipdate < date '1995-10-01';
13+
and l_shipdate < date '1995-09-01' + interval '1' month;

benchmarks/queries/q5.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ where
1717
and n_regionkey = r_regionkey
1818
and r_name = 'ASIA'
1919
and o_orderdate >= date '1994-01-01'
20-
and o_orderdate < date '1995-01-01'
20+
and o_orderdate < date '1994-01-01' + interval '1' year
2121
group by
2222
n_name
2323
order by
24-
revenue desc;
24+
revenue desc;

benchmarks/queries/q6.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ from
44
lineitem
55
where
66
l_shipdate >= date '1994-01-01'
7-
and l_shipdate < date '1995-01-01'
7+
and l_shipdate < date '1994-01-01' + interval '1' year
88
and l_discount between 0.06 - 0.01 and 0.06 + 0.01
9-
and l_quantity < 24;
9+
and l_quantity < 24;

benchmarks/src/tpch/mod.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[
3333

3434
pub const TPCH_QUERY_START_ID: usize = 1;
3535
pub const TPCH_QUERY_END_ID: usize = 22;
36+
const TPCH_Q11_FRACTION_SENTINEL: &str = "0.0001 /* __TPCH_Q11_FRACTION__ */";
3637

3738
/// The `.tbl` file contains a trailing column
3839
pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
@@ -139,6 +140,21 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
139140

140141
/// Get the SQL statements from the specified query file
141142
pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
143+
get_query_sql_for_scale_factor(query, 1.0)
144+
}
145+
146+
/// Get the SQL statements from the specified query file using the provided scale factor for
147+
/// TPC-H substitutions such as Q11 FRACTION.
148+
pub fn get_query_sql_for_scale_factor(
149+
query: usize,
150+
scale_factor: f64,
151+
) -> Result<Vec<String>> {
152+
if !(scale_factor.is_finite() && scale_factor > 0.0) {
153+
return plan_err!(
154+
"invalid scale factor. Expected a positive finite value, got {scale_factor}"
155+
);
156+
}
157+
142158
if query > 0 && query < 23 {
143159
let possibilities = vec![
144160
format!("queries/q{query}.sql"),
@@ -148,6 +164,7 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
148164
for filename in possibilities {
149165
match fs::read_to_string(&filename) {
150166
Ok(contents) => {
167+
let contents = customize_query_sql(query, contents, scale_factor)?;
151168
return Ok(contents
152169
.split(';')
153170
.map(|s| s.trim())
@@ -164,6 +181,27 @@ pub fn get_query_sql(query: usize) -> Result<Vec<String>> {
164181
}
165182
}
166183

184+
fn customize_query_sql(
185+
query: usize,
186+
contents: String,
187+
scale_factor: f64,
188+
) -> Result<String> {
189+
if query != 11 {
190+
return Ok(contents);
191+
}
192+
193+
if !contents.contains(TPCH_Q11_FRACTION_SENTINEL) {
194+
return plan_err!(
195+
"invalid query 11. Missing fraction marker {TPCH_Q11_FRACTION_SENTINEL}"
196+
);
197+
}
198+
199+
Ok(contents.replace(
200+
TPCH_Q11_FRACTION_SENTINEL,
201+
&format!("(0.0001 / {scale_factor})"),
202+
))
203+
}
204+
167205
pub const QUERY_LIMIT: [Option<usize>; 22] = [
168206
None,
169207
Some(100),
@@ -188,3 +226,51 @@ pub const QUERY_LIMIT: [Option<usize>; 22] = [
188226
Some(100),
189227
None,
190228
];
229+
230+
#[cfg(test)]
231+
mod tests {
232+
use super::{get_query_sql, get_query_sql_for_scale_factor};
233+
use datafusion::error::Result;
234+
235+
fn get_single_query(query: usize) -> Result<String> {
236+
let mut queries = get_query_sql(query)?;
237+
assert_eq!(queries.len(), 1);
238+
Ok(queries.remove(0))
239+
}
240+
241+
fn get_single_query_for_scale_factor(
242+
query: usize,
243+
scale_factor: f64,
244+
) -> Result<String> {
245+
let mut queries = get_query_sql_for_scale_factor(query, scale_factor)?;
246+
assert_eq!(queries.len(), 1);
247+
Ok(queries.remove(0))
248+
}
249+
250+
#[test]
251+
fn q11_uses_scale_factor_substitution() -> Result<()> {
252+
let sf1_sql = get_single_query(11)?;
253+
assert!(sf1_sql.contains("(0.0001 / 1)"));
254+
255+
let sf01_sql = get_single_query_for_scale_factor(11, 0.1)?;
256+
assert!(sf01_sql.contains("(0.0001 / 0.1)"));
257+
258+
let sf10_sql = get_single_query_for_scale_factor(11, 10.0)?;
259+
assert!(sf10_sql.contains("(0.0001 / 10)"));
260+
261+
let sf30_sql = get_single_query_for_scale_factor(11, 30.0)?;
262+
assert!(sf30_sql.contains("(0.0001 / 30)"));
263+
assert!(!sf10_sql.contains("__TPCH_Q11_FRACTION__"));
264+
Ok(())
265+
}
266+
267+
#[test]
268+
fn interval_queries_use_interval_arithmetic() -> Result<()> {
269+
assert!(get_single_query(5)?.contains("date '1994-01-01' + interval '1' year"));
270+
assert!(get_single_query(6)?.contains("date '1994-01-01' + interval '1' year"));
271+
assert!(get_single_query(10)?.contains("date '1993-10-01' + interval '3' month"));
272+
assert!(get_single_query(12)?.contains("date '1994-01-01' + interval '1' year"));
273+
assert!(get_single_query(14)?.contains("date '1995-09-01' + interval '1' month"));
274+
Ok(())
275+
}
276+
}

benchmarks/src/tpch/run.rs

Lines changed: 92 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,18 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::path::PathBuf;
18+
use std::path::{Path, PathBuf};
1919
use std::sync::Arc;
2020

2121
use super::{
22-
TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql,
22+
TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, get_query_sql_for_scale_factor,
2323
get_tbl_tpch_table_schema, get_tpch_table_schema,
2424
};
2525
use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};
2626

2727
use arrow::record_batch::RecordBatch;
2828
use arrow::util::pretty::{self, pretty_format_batches};
29+
use datafusion::common::exec_err;
2930
use datafusion::datasource::file_format::FileFormat;
3031
use datafusion::datasource::file_format::csv::CsvFormat;
3132
use datafusion::datasource::file_format::parquet::ParquetFormat;
@@ -71,6 +72,11 @@ pub struct RunOpt {
7172
#[arg(required = true, short = 'p', long = "path")]
7273
path: PathBuf,
7374

75+
/// TPC-H scale factor used for query substitutions such as Q11 FRACTION.
76+
/// If omitted, the benchmark tries to infer it from paths like `.../tpch_sf10/...`.
77+
#[arg(long)]
78+
scale_factor: Option<f64>,
79+
7480
/// File format: `csv` or `parquet`
7581
#[arg(short = 'f', long = "format", default_value = "csv")]
7682
file_format: String,
@@ -133,10 +139,11 @@ impl RunOpt {
133139
let ctx = SessionContext::new_with_config_rt(config, rt);
134140
// register tables
135141
self.register_tables(&ctx).await?;
142+
let scale_factor = self.scale_factor()?;
136143

137144
for query_id in query_range {
138145
benchmark_run.start_new_case(&format!("Query {query_id}"));
139-
let query_run = self.benchmark_query(query_id, &ctx).await;
146+
let query_run = self.benchmark_query(query_id, scale_factor, &ctx).await;
140147
match query_run {
141148
Ok(query_results) => {
142149
for iter in query_results {
@@ -157,13 +164,14 @@ impl RunOpt {
157164
async fn benchmark_query(
158165
&self,
159166
query_id: usize,
167+
scale_factor: f64,
160168
ctx: &SessionContext,
161169
) -> Result<Vec<QueryResult>> {
162170
let mut millis = vec![];
163171
// run benchmark
164172
let mut query_results = vec![];
165173

166-
let sql = &get_query_sql(query_id)?;
174+
let sql = &get_query_sql_for_scale_factor(query_id, scale_factor)?;
167175

168176
for i in 0..self.iterations() {
169177
let start = Instant::now();
@@ -346,6 +354,82 @@ impl RunOpt {
346354
.partitions
347355
.unwrap_or_else(get_available_parallelism)
348356
}
357+
358+
fn scale_factor(&self) -> Result<f64> {
359+
resolve_scale_factor(self.scale_factor, &self.path)
360+
}
361+
}
362+
363+
fn resolve_scale_factor(scale_factor: Option<f64>, path: &Path) -> Result<f64> {
364+
let scale_factor = scale_factor
365+
.or_else(|| infer_scale_factor_from_path(path))
366+
.unwrap_or(1.0);
367+
368+
if scale_factor.is_finite() && scale_factor > 0.0 {
369+
Ok(scale_factor)
370+
} else {
371+
exec_err!(
372+
"Invalid TPC-H scale factor {scale_factor}. Expected a positive finite value"
373+
)
374+
}
375+
}
376+
377+
fn infer_scale_factor_from_path(path: &Path) -> Option<f64> {
378+
path.iter().find_map(|component| {
379+
component
380+
.to_str()?
381+
.strip_prefix("tpch_sf")?
382+
.parse::<f64>()
383+
.ok()
384+
})
385+
}
386+
387+
#[cfg(test)]
388+
mod scale_factor_tests {
389+
use std::path::Path;
390+
391+
use super::{infer_scale_factor_from_path, resolve_scale_factor};
392+
use datafusion::error::Result;
393+
394+
#[test]
395+
fn uses_explicit_scale_factor_when_provided() -> Result<()> {
396+
let scale_factor =
397+
resolve_scale_factor(Some(30.0), Path::new("benchmarks/data/tpch_sf10"))?;
398+
assert_eq!(scale_factor, 30.0);
399+
Ok(())
400+
}
401+
402+
#[test]
403+
fn infers_scale_factor_from_standard_tpch_path() -> Result<()> {
404+
let scale_factor =
405+
resolve_scale_factor(None, Path::new("benchmarks/data/tpch_sf10"))?;
406+
assert_eq!(scale_factor, 10.0);
407+
assert_eq!(
408+
infer_scale_factor_from_path(Path::new("benchmarks/data/tpch_sf0.1")),
409+
Some(0.1)
410+
);
411+
Ok(())
412+
}
413+
414+
#[test]
415+
fn defaults_to_sf1_when_path_has_no_scale_factor() -> Result<()> {
416+
let scale_factor = resolve_scale_factor(None, Path::new("benchmarks/data"))?;
417+
assert_eq!(scale_factor, 1.0);
418+
Ok(())
419+
}
420+
421+
#[test]
422+
fn rejects_invalid_scale_factors() {
423+
assert!(resolve_scale_factor(Some(0.0), Path::new("benchmarks/data")).is_err());
424+
assert!(resolve_scale_factor(Some(-1.0), Path::new("benchmarks/data")).is_err());
425+
assert!(
426+
resolve_scale_factor(Some(f64::NAN), Path::new("benchmarks/data")).is_err()
427+
);
428+
assert!(
429+
resolve_scale_factor(Some(f64::INFINITY), Path::new("benchmarks/data"))
430+
.is_err()
431+
);
432+
}
349433
}
350434

351435
#[cfg(test)]
@@ -392,6 +476,7 @@ mod tests {
392476
query: Some(query),
393477
common,
394478
path: PathBuf::from(path.to_string()),
479+
scale_factor: Some(1.0),
395480
file_format: "tbl".to_string(),
396481
mem_table: false,
397482
output_path: None,
@@ -402,7 +487,7 @@ mod tests {
402487
hash_join_buffering_capacity: 0,
403488
};
404489
opt.register_tables(&ctx).await?;
405-
let queries = get_query_sql(query)?;
490+
let queries = crate::tpch::get_query_sql(query)?;
406491
for query in queries {
407492
let plan = ctx.sql(&query).await?;
408493
let plan = plan.into_optimized_plan()?;
@@ -432,6 +517,7 @@ mod tests {
432517
query: Some(query),
433518
common,
434519
path: PathBuf::from(path.to_string()),
520+
scale_factor: Some(1.0),
435521
file_format: "tbl".to_string(),
436522
mem_table: false,
437523
output_path: None,
@@ -442,7 +528,7 @@ mod tests {
442528
hash_join_buffering_capacity: 0,
443529
};
444530
opt.register_tables(&ctx).await?;
445-
let queries = get_query_sql(query)?;
531+
let queries = crate::tpch::get_query_sql(query)?;
446532
for query in queries {
447533
let plan = ctx.sql(&query).await?;
448534
let plan = plan.create_physical_plan().await?;

0 commit comments

Comments
 (0)