Skip to content

Commit f9a8451

Browse files
authored
Merge branch 'main' into fix/filter-pushdown-duplicate-columns
2 parents 61f819b + 2aab559 commit f9a8451

43 files changed

Lines changed: 2271 additions & 527 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ Generate the data required for the compile profile helper (TPC-H SF=1):
9595
./bench.sh data compile_profile
9696
```
9797

98-
Run the benchmark across all default Cargo profiles (`dev`, `release`, `ci`, `release-nonlto`):
98+
Run the benchmark across all default Cargo profiles (`dev`, `release`, `ci`, `ci-optimized`, `release-nonlto`, `profiling`):
9999

100100
```shell
101101
./bench.sh run compile_profile

benchmarks/bench.sh

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
106106
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
107107
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
108108
109+
# Sort Pushdown Benchmarks
110+
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
111+
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
112+
109113
# Sorted Data Benchmarks (ORDER BY Optimization)
110114
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
111115
@@ -309,6 +313,10 @@ main() {
309313
# same data as for tpch
310314
data_tpch "1" "parquet"
311315
;;
316+
sort_pushdown|sort_pushdown_sorted)
317+
# same data as for tpch
318+
data_tpch "1" "parquet"
319+
;;
312320
sort_tpch)
313321
# same data as for tpch
314322
data_tpch "1" "parquet"
@@ -509,6 +517,12 @@ main() {
509517
external_aggr)
510518
run_external_aggr
511519
;;
520+
sort_pushdown)
521+
run_sort_pushdown
522+
;;
523+
sort_pushdown_sorted)
524+
run_sort_pushdown_sorted
525+
;;
512526
sort_tpch)
513527
run_sort_tpch "1"
514528
;;
@@ -1070,6 +1084,22 @@ run_external_aggr() {
10701084
debug_run $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG}
10711085
}
10721086

1087+
# Runs the sort pushdown benchmark (without WITH ORDER)
1088+
run_sort_pushdown() {
1089+
TPCH_DIR="${DATA_DIR}/tpch_sf1"
1090+
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown.json"
1091+
echo "Running sort pushdown benchmark (no WITH ORDER)..."
1092+
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
1093+
}
1094+
1095+
# Runs the sort pushdown benchmark with WITH ORDER (enables sort elimination)
1096+
run_sort_pushdown_sorted() {
1097+
TPCH_DIR="${DATA_DIR}/tpch_sf1"
1098+
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_sorted.json"
1099+
echo "Running sort pushdown benchmark (with WITH ORDER)..."
1100+
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
1101+
}
1102+
10731103
# Runs the sort integration benchmark
10741104
run_sort_tpch() {
10751105
SCALE_FACTOR=$1

benchmarks/compile_profile.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
"""Compile profile benchmark runner for DataFusion.
2121
22-
Builds the `tpch` benchmark binary with several Cargo profiles (e.g. `--release` or `--profile ci`), runs the full TPC-H suite against the Parquet data under `benchmarks/data/tpch_sf1`, and reports compile time, execution time, and resulting
23-
binary size.
22+
Builds the `dfbench` benchmark binary with several Cargo profiles
23+
(e.g. `--release` or `--profile ci`), runs the full TPC-H suite against
24+
the Parquet data under `benchmarks/data/tpch_sf1`, and reports compile
25+
time, execution time, and resulting binary size.
2426
2527
See `benchmarks/README.md` for usages.
2628
"""
@@ -40,12 +42,15 @@
4042
DEFAULT_ITERATIONS = 1
4143
DEFAULT_FORMAT = "parquet"
4244
DEFAULT_PARTITIONS: int | None = None
43-
TPCH_BINARY = "tpch.exe" if os.name == "nt" else "tpch"
45+
BENCHMARK_PACKAGE = "datafusion-benchmarks"
46+
BENCHMARK_BINARY = "dfbench.exe" if os.name == "nt" else "dfbench"
4447
PROFILE_TARGET_DIR = {
4548
"dev": "debug",
4649
"release": "release",
4750
"ci": "ci",
51+
"ci-optimized": "ci-optimized",
4852
"release-nonlto": "release-nonlto",
53+
"profiling": "profiling",
4954
}
5055

5156

@@ -62,7 +67,10 @@ def parse_args() -> argparse.Namespace:
6267
"--profiles",
6368
nargs="+",
6469
default=list(PROFILE_TARGET_DIR.keys()),
65-
help="Cargo profiles to test (default: dev release ci release-nonlto)",
70+
help=(
71+
"Cargo profiles to test "
72+
"(default: dev release ci ci-optimized release-nonlto profiling)"
73+
),
6674
)
6775
parser.add_argument(
6876
"--data",
@@ -84,9 +92,25 @@ def timed_run(command: Iterable[str]) -> float:
8492

8593
def cargo_build(profile: str) -> float:
8694
if profile == "dev":
87-
command = ["cargo", "build", "--bin", "tpch"]
95+
command = [
96+
"cargo",
97+
"build",
98+
"--package",
99+
BENCHMARK_PACKAGE,
100+
"--bin",
101+
"dfbench",
102+
]
88103
else:
89-
command = ["cargo", "build", "--profile", profile, "--bin", "tpch"]
104+
command = [
105+
"cargo",
106+
"build",
107+
"--profile",
108+
profile,
109+
"--package",
110+
BENCHMARK_PACKAGE,
111+
"--bin",
112+
"dfbench",
113+
]
90114
return timed_run(command)
91115

92116

@@ -102,14 +126,13 @@ def run_benchmark(profile: str, data_path: Path) -> float:
102126
binary_dir = PROFILE_TARGET_DIR.get(profile)
103127
if not binary_dir:
104128
raise ValueError(f"unknown profile '{profile}'")
105-
binary_path = REPO_ROOT / "target" / binary_dir / TPCH_BINARY
129+
binary_path = REPO_ROOT / "target" / binary_dir / BENCHMARK_BINARY
106130
if not binary_path.exists():
107131
raise FileNotFoundError(f"compiled binary not found at {binary_path}")
108132

109133
command = [
110134
str(binary_path),
111-
"benchmark",
112-
"datafusion",
135+
"tpch",
113136
"--iterations",
114137
str(DEFAULT_ITERATIONS),
115138
"--path",
@@ -132,7 +155,7 @@ def run_benchmark(profile: str, data_path: Path) -> float:
132155

133156
def binary_size(profile: str) -> int:
134157
binary_dir = PROFILE_TARGET_DIR[profile]
135-
binary_path = REPO_ROOT / "target" / binary_dir / TPCH_BINARY
158+
binary_path = REPO_ROOT / "target" / binary_dir / BENCHMARK_BINARY
136159
return binary_path.stat().st_size
137160

138161

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
-- Sort elimination: ORDER BY sort key ASC (full scan)
2+
-- With --sorted: SortExec removed, sequential scan in file order
3+
-- Without --sorted: full SortExec required
4+
SELECT l_orderkey, l_partkey, l_suppkey
5+
FROM lineitem
6+
ORDER BY l_orderkey
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- Sort elimination + limit pushdown
2+
-- With --sorted: SortExec removed + limit pushed to DataSourceExec
3+
-- Without --sorted: TopK sort over all data
4+
SELECT l_orderkey, l_partkey, l_suppkey
5+
FROM lineitem
6+
ORDER BY l_orderkey
7+
LIMIT 100
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Sort elimination: wide projection (all columns)
2+
-- Tests sort elimination benefit with larger row payload
3+
SELECT *
4+
FROM lineitem
5+
ORDER BY l_orderkey
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Sort elimination + limit: wide projection
2+
SELECT *
3+
FROM lineitem
4+
ORDER BY l_orderkey
5+
LIMIT 100

benchmarks/src/bin/dfbench.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3434
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
3535

3636
use datafusion_benchmarks::{
37-
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
37+
cancellation, clickbench, h2o, hj, imdb, nlj, smj, sort_pushdown, sort_tpch, tpcds,
38+
tpch,
3839
};
3940

4041
#[derive(Debug, Parser)]
@@ -53,6 +54,7 @@ enum Options {
5354
Imdb(imdb::RunOpt),
5455
Nlj(nlj::RunOpt),
5556
Smj(smj::RunOpt),
57+
SortPushdown(sort_pushdown::RunOpt),
5658
SortTpch(sort_tpch::RunOpt),
5759
Tpch(tpch::RunOpt),
5860
Tpcds(tpcds::RunOpt),
@@ -72,6 +74,7 @@ pub async fn main() -> Result<()> {
7274
Options::Imdb(opt) => Box::pin(opt.run()).await,
7375
Options::Nlj(opt) => opt.run().await,
7476
Options::Smj(opt) => opt.run().await,
77+
Options::SortPushdown(opt) => opt.run().await,
7578
Options::SortTpch(opt) => opt.run().await,
7679
Options::Tpch(opt) => Box::pin(opt.run()).await,
7780
Options::Tpcds(opt) => Box::pin(opt.run()).await,

benchmarks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod hj;
2323
pub mod imdb;
2424
pub mod nlj;
2525
pub mod smj;
26+
pub mod sort_pushdown;
2627
pub mod sort_tpch;
2728
pub mod tpcds;
2829
pub mod tpch;

0 commit comments

Comments
 (0)