Skip to content

Commit bb87ff6

Browse files
authored
Merge branch 'main' into feat/reorder-row-groups-by-stats
2 parents f4aecb9 + 64619a6 commit bb87ff6

6 files changed

Lines changed: 173 additions & 44 deletions

File tree

benchmarks/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -900,3 +900,44 @@ This command will:
900900
```
901901

902902
This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.
903+
904+
## Sort Pushdown
905+
906+
Benchmarks for sort pushdown optimizations on TPC-H lineitem data (SF=1).
907+
908+
### Variants
909+
910+
| Benchmark | Description |
911+
|-----------|-------------|
912+
| `sort_pushdown` | Baseline — no `WITH ORDER`, tests standard sort behavior |
913+
| `sort_pushdown_sorted` | With `WITH ORDER` — tests sort elimination on sorted files |
914+
| `sort_pushdown_inexact` | Inexact path (`--sorted` DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder |
915+
| `sort_pushdown_inexact_unsorted` | No `WITH ORDER` — same data, tests Unsupported path + RG reorder |
916+
| `sort_pushdown_inexact_overlap` | Multi-file scrambled RGs — streaming data scenario |
917+
918+
### Queries
919+
920+
**sort_pushdown / sort_pushdown_sorted** (q1-q8):
921+
- q1-q4: ASC queries (sort elimination with `--sorted`)
922+
- q5-q8: DESC LIMIT queries (reverse scan + TopK optimization with `--sorted`)
923+
924+
**sort_pushdown_inexact** (q1-q4): DESC LIMIT queries on scrambled data
925+
926+
### Data Generation
927+
928+
The inexact/overlap data requires pyarrow (`pip install pyarrow`) to generate
929+
multi-file parquet with scrambled row group order. DataFusion's COPY cannot produce
930+
narrow-range RGs in scrambled order because the parquet writer merges rows from
931+
adjacent chunks at RG boundaries.
932+
933+
### Running
934+
935+
```bash
936+
# Generate data and run all sort pushdown benchmarks
937+
./bench.sh data sort_pushdown
938+
./bench.sh data sort_pushdown_inexact
939+
./bench.sh run sort_pushdown
940+
./bench.sh run sort_pushdown_sorted
941+
./bench.sh run sort_pushdown_inexact
942+
./bench.sh run sort_pushdown_inexact_overlap
943+
```

benchmarks/bench.sh

Lines changed: 110 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet
109109
# Sort Pushdown Benchmarks
110110
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
111111
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
112-
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder
113-
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder
114-
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario)
112+
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder
113+
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder
114+
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario)
115115
116116
# Sorted Data Benchmarks (ORDER BY Optimization)
117117
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
@@ -1154,84 +1154,150 @@ run_sort_pushdown_sorted() {
11541154

11551155
# Generates data for sort pushdown Inexact benchmark.
11561156
#
1157-
# Produces a single large lineitem parquet file where row groups have
1158-
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
1159-
# sorted, RGs shuffled). This simulates append-heavy workloads where data
1160-
# is written in batches at different times.
1157+
# Produces multiple parquet files where each file has MULTIPLE row groups
1158+
# with scrambled RG order. This tests both:
1159+
# - Row-group-level reorder within each file (reorder_by_statistics)
1160+
# - TopK threshold initialization from RG statistics
1161+
#
1162+
# Strategy:
1163+
# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total).
1164+
# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the
1165+
# RG order within each file using a deterministic permutation.
1166+
# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges
1167+
# but in scrambled order.
1168+
#
1169+
# Writing a single file with ORDER BY scramble does NOT work: the parquet
1170+
# writer merges rows from adjacent chunks at RG boundaries, widening
1171+
# ranges and defeating reorder_by_statistics.
1172+
#
1173+
# Requires pyarrow (pip install pyarrow).
11611174
data_sort_pushdown_inexact() {
11621175
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
11631176
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
11641177
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
11651178
return
11661179
fi
11671180

1168-
echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
1181+
# Check pyarrow dependency (needed to split/scramble RGs)
1182+
if ! python3 -c "import pyarrow" 2>/dev/null; then
1183+
echo "Error: pyarrow is required for sort pushdown Inexact data generation."
1184+
echo "Install with: pip install pyarrow"
1185+
return 1
1186+
fi
1187+
1188+
echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..."
11691189

11701190
# Re-use the sort_pushdown data as the source (generate if missing)
11711191
data_sort_pushdown
11721192

11731193
mkdir -p "${INEXACT_DIR}"
11741194
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"
11751195

1176-
# Use datafusion-cli to bucket rows into 64 groups by a deterministic
1177-
# scrambler, then sort within each bucket by orderkey. This produces
1178-
# ~64 RG-sized segments where each has a tight orderkey range but the
1179-
# segments appear in scrambled (non-sorted) order in the file.
1196+
# Step 1: Write a single sorted file with small (100K-row) RGs
1197+
TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet"
11801198
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
11811199
CREATE EXTERNAL TABLE src
11821200
STORED AS PARQUET
11831201
LOCATION '${SRC_DIR}';
11841202
1185-
COPY (
1186-
SELECT * FROM src
1187-
ORDER BY
1188-
(l_orderkey * 1664525 + 1013904223) % 64,
1189-
l_orderkey
1190-
)
1191-
TO '${INEXACT_DIR}/shuffled.parquet'
1203+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1204+
TO '${TMPFILE}'
11921205
STORED AS PARQUET
11931206
OPTIONS ('format.max_row_group_size' '100000');
11941207
")
11951208

1196-
echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
1209+
# Step 2: Redistribute RGs into 3 files with scrambled RG order.
1210+
# Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file,
1211+
# permutation (rg_idx * 41 + 7) % n scrambles the order within file.
1212+
python3 -c "
1213+
import pyarrow.parquet as pq
1214+
1215+
pf = pq.ParquetFile('${TMPFILE}')
1216+
n = pf.metadata.num_row_groups
1217+
n_files = 3
1218+
1219+
# Assign each RG to a file, scramble order within each file
1220+
file_rgs = [[] for _ in range(n_files)]
1221+
for rg_idx in range(n):
1222+
slot = (rg_idx * 41 + 7) % n # scrambled index
1223+
file_id = slot % n_files
1224+
file_rgs[file_id].append(rg_idx)
1225+
1226+
# Write each file with its assigned RGs (in scrambled order)
1227+
for file_id in range(n_files):
1228+
rgs = file_rgs[file_id]
1229+
if not rgs:
1230+
continue
1231+
tables = [pf.read_row_group(rg) for rg in rgs]
1232+
writer = pq.ParquetWriter(
1233+
'${INEXACT_DIR}/part_%03d.parquet' % file_id,
1234+
pf.schema_arrow)
1235+
for t in tables:
1236+
writer.write_table(t)
1237+
writer.close()
1238+
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1239+
"
1240+
1241+
rm -f "${TMPFILE}"
1242+
echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}"
11971243
ls -la "${INEXACT_DIR}"
11981244

1199-
# Also generate a file with partially overlapping row groups.
1200-
# Simulates streaming data with network delays: each chunk is mostly
1201-
# in order but has a small overlap with the next chunk (±5% of the
1202-
# chunk range). This is the pattern described by @adriangb — data
1203-
# arriving with timestamps that are generally increasing but with
1204-
# network-induced jitter causing small overlaps between row groups.
1245+
# Also generate overlap data: same strategy but with different file count
1246+
# and permutation. Simulates streaming data with network delays where
1247+
# chunks arrive out of sequence.
1248+
#
1249+
# Requires pyarrow (pip install pyarrow).
12051250
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
12061251
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
12071252
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
12081253
return
12091254
fi
12101255

1211-
echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
1256+
echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..."
12121257
mkdir -p "${OVERLAP_DIR}"
12131258

1259+
# Step 1: Write a single sorted file with small (100K-row) RGs
1260+
TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet"
12141261
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
12151262
CREATE EXTERNAL TABLE src
12161263
STORED AS PARQUET
12171264
LOCATION '${SRC_DIR}';
12181265
1219-
-- Add jitter to l_orderkey: shift each row by a random-ish offset
1220-
-- proportional to its position. This creates overlap between adjacent
1221-
-- row groups while preserving the general ascending trend.
1222-
-- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
1223-
-- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
1224-
COPY (
1225-
SELECT * FROM src
1226-
ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
1227-
)
1228-
TO '${OVERLAP_DIR}/overlapping.parquet'
1266+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1267+
TO '${TMPFILE}'
12291268
STORED AS PARQUET
12301269
OPTIONS ('format.max_row_group_size' '100000');
12311270
")
12321271

1233-
echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
1234-
ls -la "${OVERLAP_DIR}"
1272+
# Step 2: Redistribute into 5 files with scrambled RG order.
1273+
python3 -c "
1274+
import pyarrow.parquet as pq
1275+
1276+
pf = pq.ParquetFile('${TMPFILE}')
1277+
n = pf.metadata.num_row_groups
1278+
n_files = 5
1279+
1280+
file_rgs = [[] for _ in range(n_files)]
1281+
for rg_idx in range(n):
1282+
slot = (rg_idx * 37 + 13) % n
1283+
file_id = slot % n_files
1284+
file_rgs[file_id].append(rg_idx)
1285+
1286+
for file_id in range(n_files):
1287+
rgs = file_rgs[file_id]
1288+
if not rgs:
1289+
continue
1290+
tables = [pf.read_row_group(rg) for rg in rgs]
1291+
writer = pq.ParquetWriter(
1292+
'${OVERLAP_DIR}/part_%03d.parquet' % file_id,
1293+
pf.schema_arrow)
1294+
for t in tables:
1295+
writer.write_table(t)
1296+
writer.close()
1297+
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1298+
"
1299+
1300+
rm -f "${TMPFILE}"
12351301
}
12361302

12371303
# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
@@ -1240,7 +1306,7 @@ data_sort_pushdown_inexact() {
12401306
run_sort_pushdown_inexact() {
12411307
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact"
12421308
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json"
1243-
echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..."
1309+
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..."
12441310
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12451311
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12461312
}
@@ -1256,13 +1322,13 @@ run_sort_pushdown_inexact_unsorted() {
12561322
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12571323
}
12581324

1259-
# Runs the sort pushdown benchmark with partially overlapping RGs.
1260-
# Simulates streaming data with network jitterRGs are mostly in order
1261-
# but have small overlaps (±2500 orderkey jitter between adjacent RGs).
1325+
# Runs the sort pushdown benchmark with multi-file scrambled RG order.
1326+
# Simulates streaming data with network delaysmultiple files, each with
1327+
# scrambled RGs. Tests both RG-level reorder and TopK stats initialization.
12621328
run_sort_pushdown_inexact_overlap() {
12631329
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
12641330
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
1265-
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
1331+
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..."
12661332
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12671333
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12681334
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-- Reverse scan: ORDER BY DESC LIMIT (narrow projection)
2+
-- With --sorted: reverse_row_groups=true + TopK + stats init + cumulative prune
3+
-- Without --sorted: full TopK sort over all data
4+
SELECT l_orderkey, l_partkey, l_suppkey
5+
FROM lineitem
6+
ORDER BY l_orderkey DESC
7+
LIMIT 100
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Reverse scan: ORDER BY DESC LIMIT larger fetch (narrow projection)
2+
SELECT l_orderkey, l_partkey, l_suppkey
3+
FROM lineitem
4+
ORDER BY l_orderkey DESC
5+
LIMIT 1000
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Reverse scan: wide projection + DESC LIMIT
2+
SELECT *
3+
FROM lineitem
4+
ORDER BY l_orderkey DESC
5+
LIMIT 100
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Reverse scan: wide projection + DESC LIMIT larger fetch
2+
SELECT *
3+
FROM lineitem
4+
ORDER BY l_orderkey DESC
5+
LIMIT 1000

0 commit comments

Comments
 (0)