Skip to content

Commit 6aa52df

Browse files
committed
fix: generate scrambled benchmark data with correct per-file RG ranges
Both the inexact and overlap benchmark data generation had the same fundamental problem: writing a single parquet file with ORDER BY scramble/jitter causes the parquet writer to merge rows from adjacent chunks into the same RG at chunk boundaries, widening RG ranges to span the full data range (~6M instead of ~100K). This makes reorder_by_statistics a no-op — there's nothing meaningful to reorder. Fix both by using a two-step approach: 1. Write a single sorted file with small (100K-row) RGs 2. Use pyarrow to split into per-RG files with scrambled names Each file has a narrow l_orderkey range (~100K) but appears in scrambled alphabetical order, so reorder_by_statistics has real work to do. Local benchmark results (feature branch vs upstream/main): inexact: Q1 (DESC LIMIT 100): 4.99ms -> 4.27ms (-14%) Q2 (DESC LIMIT 1000): 2.54ms -> 2.26ms (-11%) overlap: Q1 (DESC LIMIT 100): 5.03ms -> 3.97ms (-21%) Q4 (SELECT * LIMIT 1000): 5.25ms -> 4.90ms (-7%)
1 parent afc0784 commit 6aa52df

1 file changed

Lines changed: 73 additions & 39 deletions

File tree

benchmarks/bench.sh

Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1154,84 +1154,118 @@ run_sort_pushdown_sorted() {
11541154

11551155
# Generates data for sort pushdown Inexact benchmark.
11561156
#
1157-
# Produces a single large lineitem parquet file where row groups have
1158-
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
1159-
# sorted, RGs shuffled). This simulates append-heavy workloads where data
1157+
# Produces multiple parquet files where each file has a narrow, non-overlapping
1158+
# l_orderkey range (internally sorted), but files appear in scrambled
1159+
# alphabetical order. This simulates append-heavy workloads where data
11601160
# is written in batches at different times.
1161+
#
1162+
# Strategy: write a single sorted file with small (100K-row) RGs, then
1163+
# use pyarrow to split into per-RG files with scrambled names.
1164+
# Writing a single file with ORDER BY scramble does NOT work: the parquet
1165+
# writer merges rows from adjacent chunks at RG boundaries, widening
1166+
# ranges and defeating reorder_by_statistics.
1167+
#
1168+
# Requires pyarrow for splitting (pip install pyarrow).
11611169
data_sort_pushdown_inexact() {
11621170
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
11631171
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
11641172
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
11651173
return
11661174
fi
11671175

1168-
echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
1176+
echo "Generating sort pushdown Inexact benchmark data (scrambled file order)..."
11691177

11701178
# Re-use the sort_pushdown data as the source (generate if missing)
11711179
data_sort_pushdown
11721180

11731181
mkdir -p "${INEXACT_DIR}"
11741182
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"
11751183

1176-
# Use datafusion-cli to bucket rows into 64 groups by a deterministic
1177-
# scrambler, then sort within each bucket by orderkey. This produces
1178-
# ~64 RG-sized segments where each has a tight orderkey range but the
1179-
# segments appear in scrambled (non-sorted) order in the file.
1184+
# Step 1: Write a single sorted file with small (100K-row) RGs
1185+
TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet"
11801186
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
11811187
CREATE EXTERNAL TABLE src
11821188
STORED AS PARQUET
11831189
LOCATION '${SRC_DIR}';
11841190
1185-
COPY (
1186-
SELECT * FROM src
1187-
ORDER BY
1188-
(l_orderkey * 1664525 + 1013904223) % 64,
1189-
l_orderkey
1190-
)
1191-
TO '${INEXACT_DIR}/shuffled.parquet'
1191+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1192+
TO '${TMPFILE}'
11921193
STORED AS PARQUET
11931194
OPTIONS ('format.max_row_group_size' '100000');
11941195
")
11951196

1196-
echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
1197+
# Step 2: Split into per-RG files with scrambled names via pyarrow.
1198+
# Uses a different permutation than the overlap benchmark so the two
1199+
# datasets exercise different file orderings.
1200+
python3 -c "
1201+
import pyarrow.parquet as pq
1202+
pf = pq.ParquetFile('${TMPFILE}')
1203+
n = pf.metadata.num_row_groups
1204+
for rg_idx in range(n):
1205+
slot = (rg_idx * 41 + 7) % n
1206+
table = pf.read_row_group(rg_idx)
1207+
pq.write_table(table, '${INEXACT_DIR}/chunk_%03d.parquet' % slot)
1208+
print(f'Split {n} RGs into scrambled files')
1209+
"
1210+
1211+
rm -f "${TMPFILE}"
1212+
echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}"
11971213
ls -la "${INEXACT_DIR}"
11981214

1199-
# Also generate a file with partially overlapping row groups.
1200-
# Simulates streaming data with network delays: each chunk is mostly
1201-
# in order but has a small overlap with the next chunk (±5% of the
1202-
# chunk range). This is the pattern described by @adriangb — data
1203-
# arriving with timestamps that are generally increasing but with
1204-
# network-induced jitter causing small overlaps between row groups.
1215+
# Also generate files with scrambled file order.
1216+
# Simulates streaming data with network delays: chunks arrive out
1217+
# of sequence. This is the pattern described by @adriangb — data
1218+
# arriving with timestamps that are generally increasing but
1219+
# network-induced delays cause chunks to arrive out of order.
1220+
#
1221+
# Strategy:
1222+
# 1. Write a sorted parquet file with small RGs (100K rows each),
1223+
# producing ~61 RGs with narrow, non-overlapping l_orderkey ranges.
1224+
# 2. Split into individual files (one per RG) and rename using a
1225+
# deterministic permutation so alphabetical file order !=
1226+
# l_orderkey order. reorder_by_statistics fixes this.
1227+
#
1228+
# Writing a single file with ORDER BY and jitter/scrambling does NOT
1229+
# work: the parquet writer merges rows from adjacent chunks into the
1230+
# same RG at chunk boundaries, widening RG ranges and defeating
1231+
# reorder. Splitting into separate files avoids this.
1232+
#
1233+
# Requires pyarrow for splitting (pip install pyarrow).
12051234
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
12061235
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
12071236
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
12081237
return
12091238
fi
12101239

1211-
echo "Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
1240+
echo "Generating sort pushdown Inexact overlap data (scrambled file order)..."
12121241
mkdir -p "${OVERLAP_DIR}"
12131242

1243+
# Step 1: Write a single sorted file with small (100K-row) RGs
1244+
TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet"
12141245
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
12151246
CREATE EXTERNAL TABLE src
12161247
STORED AS PARQUET
12171248
LOCATION '${SRC_DIR}';
12181249
1219-
-- Add jitter to l_orderkey: shift each row by a random-ish offset
1220-
-- proportional to its position. This creates overlap between adjacent
1221-
-- row groups while preserving the general ascending trend.
1222-
-- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
1223-
-- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
1224-
COPY (
1225-
SELECT * FROM src
1226-
ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
1227-
)
1228-
TO '${OVERLAP_DIR}/overlapping.parquet'
1250+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1251+
TO '${TMPFILE}'
12291252
STORED AS PARQUET
12301253
OPTIONS ('format.max_row_group_size' '100000');
12311254
")
12321255

1233-
echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
1234-
ls -la "${OVERLAP_DIR}"
1256+
# Step 2: Split into per-RG files with scrambled names via pyarrow
1257+
python3 -c "
1258+
import pyarrow.parquet as pq
1259+
pf = pq.ParquetFile('${TMPFILE}')
1260+
n = pf.metadata.num_row_groups
1261+
for rg_idx in range(n):
1262+
slot = (rg_idx * 37 + 13) % n
1263+
table = pf.read_row_group(rg_idx)
1264+
pq.write_table(table, '${OVERLAP_DIR}/chunk_%03d.parquet' % slot)
1265+
print(f'Split {n} RGs into scrambled files')
1266+
"
1267+
1268+
rm -f "${TMPFILE}"
12351269
}
12361270

12371271
# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
@@ -1256,13 +1290,13 @@ run_sort_pushdown_inexact_unsorted() {
12561290
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12571291
}
12581292

1259-
# Runs the sort pushdown benchmark with partially overlapping RGs.
1260-
# Simulates streaming data with network jitterRGs are mostly in order
1261-
# but have small overlaps (±2500 orderkey jitter between adjacent RGs).
1293+
# Runs the sort pushdown benchmark with scrambled file order.
1294+
# Simulates streaming data with network delaysfiles have narrow ranges
1295+
# but appear in scrambled order. Tests reorder_by_statistics effectiveness.
12621296
run_sort_pushdown_inexact_overlap() {
12631297
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
12641298
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
1265-
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
1299+
echo "Running sort pushdown Inexact benchmark (scrambled file order, streaming data pattern)..."
12661300
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12671301
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12681302
}

0 commit comments

Comments
 (0)