Skip to content

Commit 122de73

Browse files
committed
fix: generate multi-file benchmark data with scrambled RG order
Both the inexact and overlap benchmark data generation had problems: 1. The original single-file ORDER BY approach caused the parquet writer to merge rows from adjacent chunks at RG boundaries, widening RG ranges to ~6M and making reorder_by_statistics a no-op. 2. The per-file split fix (one RG per file) meant reorder_by_statistics had nothing to reorder within each file, since each had only 1 RG. RG reorder is an intra-file optimization. Fix by generating multiple files where each file has MULTIPLE row groups with scrambled order: - inexact: 3 files x ~20 RGs each (scrambled within each file) - overlap: 5 files x ~12 RGs each (different permutation) Each RG has a narrow l_orderkey range (~100K) but appears in scrambled order within its file. This properly tests: - Row-group-level reorder (reorder_by_statistics within each file) - TopK threshold initialization from RG statistics - File-level ordering effects Uses pyarrow to read RGs from a sorted temp file and redistribute them into multiple output files with scrambled RG order.
1 parent 29f1acd commit 122de73

1 file changed

Lines changed: 103 additions & 44 deletions

File tree

benchmarks/bench.sh

Lines changed: 103 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet
109109
# Sort Pushdown Benchmarks
110110
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
111111
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
112-
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder
113-
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder
114-
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario)
112+
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder
113+
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder
114+
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario)
115115
116116
# Sorted Data Benchmarks (ORDER BY Optimization)
117117
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
@@ -1154,84 +1154,143 @@ run_sort_pushdown_sorted() {
11541154

11551155
# Generates data for sort pushdown Inexact benchmark.
11561156
#
1157-
# Produces a single large lineitem parquet file where row groups have
1158-
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
1159-
# sorted, RGs shuffled). This simulates append-heavy workloads where data
1160-
# is written in batches at different times.
1157+
# Produces multiple parquet files where each file has MULTIPLE row groups
1158+
# with scrambled RG order. This tests both:
1159+
# - Row-group-level reorder within each file (reorder_by_statistics)
1160+
# - TopK threshold initialization from RG statistics
1161+
#
1162+
# Strategy:
1163+
# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total).
1164+
# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the
1165+
# RG order within each file using a deterministic permutation.
1166+
# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges
1167+
# but in scrambled order.
1168+
#
1169+
# Writing a single file with ORDER BY scramble does NOT work: the parquet
1170+
# writer merges rows from adjacent chunks at RG boundaries, widening
1171+
# ranges and defeating reorder_by_statistics.
1172+
#
1173+
# Requires pyarrow (pip install pyarrow).
11611174
data_sort_pushdown_inexact() {
11621175
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
11631176
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
11641177
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
11651178
return
11661179
fi
11671180

1168-
echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
1181+
echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..."
11691182

11701183
# Re-use the sort_pushdown data as the source (generate if missing)
11711184
data_sort_pushdown
11721185

11731186
mkdir -p "${INEXACT_DIR}"
11741187
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"
11751188

1176-
# Use datafusion-cli to bucket rows into 64 groups by a deterministic
1177-
# scrambler, then sort within each bucket by orderkey. This produces
1178-
# ~64 RG-sized segments where each has a tight orderkey range but the
1179-
# segments appear in scrambled (non-sorted) order in the file.
1189+
# Step 1: Write a single sorted file with small (100K-row) RGs
1190+
TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet"
11801191
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
11811192
CREATE EXTERNAL TABLE src
11821193
STORED AS PARQUET
11831194
LOCATION '${SRC_DIR}';
11841195
1185-
COPY (
1186-
SELECT * FROM src
1187-
ORDER BY
1188-
(l_orderkey * 1664525 + 1013904223) % 64,
1189-
l_orderkey
1190-
)
1191-
TO '${INEXACT_DIR}/shuffled.parquet'
1196+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1197+
TO '${TMPFILE}'
11921198
STORED AS PARQUET
11931199
OPTIONS ('format.max_row_group_size' '100000');
11941200
")
11951201

1196-
echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
1202+
# Step 2: Redistribute RGs into 3 files with scrambled RG order.
1203+
# Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file,
1204+
# permutation (rg_idx * 41 + 7) % n scrambles the order within file.
1205+
python3 -c "
1206+
import pyarrow.parquet as pq
1207+
1208+
pf = pq.ParquetFile('${TMPFILE}')
1209+
n = pf.metadata.num_row_groups
1210+
n_files = 3
1211+
1212+
# Assign each RG to a file, scramble order within each file
1213+
file_rgs = [[] for _ in range(n_files)]
1214+
for rg_idx in range(n):
1215+
slot = (rg_idx * 41 + 7) % n # scrambled index
1216+
file_id = slot % n_files
1217+
file_rgs[file_id].append(rg_idx)
1218+
1219+
# Write each file with its assigned RGs (in scrambled order)
1220+
for file_id in range(n_files):
1221+
rgs = file_rgs[file_id]
1222+
if not rgs:
1223+
continue
1224+
tables = [pf.read_row_group(rg) for rg in rgs]
1225+
writer = pq.ParquetWriter(
1226+
'${INEXACT_DIR}/part_%03d.parquet' % file_id,
1227+
pf.schema_arrow)
1228+
for t in tables:
1229+
writer.write_table(t)
1230+
writer.close()
1231+
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1232+
"
1233+
1234+
rm -f "${TMPFILE}"
1235+
echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}"
11971236
ls -la "${INEXACT_DIR}"
11981237

1199-
# Also generate a file with partially overlapping row groups.
1200-
# Simulates streaming data with network delays: each chunk is mostly
1201-
# in order but has a small overlap with the next chunk (±5% of the
1202-
# chunk range). This is the pattern described by @adriangb — data
1203-
# arriving with timestamps that are generally increasing but with
1204-
# network-induced jitter causing small overlaps between row groups.
1238+
# Also generate overlap data: same strategy but with different file count
1239+
# and permutation. Simulates streaming data with network delays where
1240+
# chunks arrive out of sequence.
1241+
#
1242+
# Requires pyarrow (pip install pyarrow).
12051243
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
12061244
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
12071245
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
12081246
return
12091247
fi
12101248

1211-
echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
1249+
echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..."
12121250
mkdir -p "${OVERLAP_DIR}"
12131251

1252+
# Step 1: Write a single sorted file with small (100K-row) RGs
1253+
TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet"
12141254
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
12151255
CREATE EXTERNAL TABLE src
12161256
STORED AS PARQUET
12171257
LOCATION '${SRC_DIR}';
12181258
1219-
-- Add jitter to l_orderkey: shift each row by a random-ish offset
1220-
-- proportional to its position. This creates overlap between adjacent
1221-
-- row groups while preserving the general ascending trend.
1222-
-- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
1223-
-- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
1224-
COPY (
1225-
SELECT * FROM src
1226-
ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
1227-
)
1228-
TO '${OVERLAP_DIR}/overlapping.parquet'
1259+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1260+
TO '${TMPFILE}'
12291261
STORED AS PARQUET
12301262
OPTIONS ('format.max_row_group_size' '100000');
12311263
")
12321264

1233-
echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
1234-
ls -la "${OVERLAP_DIR}"
1265+
# Step 2: Redistribute into 5 files with scrambled RG order.
1266+
python3 -c "
1267+
import pyarrow.parquet as pq
1268+
1269+
pf = pq.ParquetFile('${TMPFILE}')
1270+
n = pf.metadata.num_row_groups
1271+
n_files = 5
1272+
1273+
file_rgs = [[] for _ in range(n_files)]
1274+
for rg_idx in range(n):
1275+
slot = (rg_idx * 37 + 13) % n
1276+
file_id = slot % n_files
1277+
file_rgs[file_id].append(rg_idx)
1278+
1279+
for file_id in range(n_files):
1280+
rgs = file_rgs[file_id]
1281+
if not rgs:
1282+
continue
1283+
tables = [pf.read_row_group(rg) for rg in rgs]
1284+
writer = pq.ParquetWriter(
1285+
'${OVERLAP_DIR}/part_%03d.parquet' % file_id,
1286+
pf.schema_arrow)
1287+
for t in tables:
1288+
writer.write_table(t)
1289+
writer.close()
1290+
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1291+
"
1292+
1293+
rm -f "${TMPFILE}"
12351294
}
12361295

12371296
# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
@@ -1240,7 +1299,7 @@ data_sort_pushdown_inexact() {
12401299
run_sort_pushdown_inexact() {
12411300
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact"
12421301
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json"
1243-
echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..."
1302+
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..."
12441303
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12451304
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12461305
}
@@ -1256,13 +1315,13 @@ run_sort_pushdown_inexact_unsorted() {
12561315
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12571316
}
12581317

1259-
# Runs the sort pushdown benchmark with partially overlapping RGs.
1260-
# Simulates streaming data with network jitterRGs are mostly in order
1261-
# but have small overlaps (±2500 orderkey jitter between adjacent RGs).
1318+
# Runs the sort pushdown benchmark with multi-file scrambled RG order.
1319+
# Simulates streaming data with network delaysmultiple files, each with
1320+
# scrambled RGs. Tests both RG-level reorder and TopK stats initialization.
12621321
run_sort_pushdown_inexact_overlap() {
12631322
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
12641323
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
1265-
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
1324+
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..."
12661325
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12671326
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12681327
}

0 commit comments

Comments
 (0)