@@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet
109109# Sort Pushdown Benchmarks
110110sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
111111sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
112- sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder
113- sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder
114- sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario)
112+ sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder
113+ sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder
114+ sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario)
115115
116116# Sorted Data Benchmarks (ORDER BY Optimization)
117117clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
@@ -1154,143 +1154,84 @@ run_sort_pushdown_sorted() {
11541154
11551155# Generates data for sort pushdown Inexact benchmark.
11561156#
1157- # Produces multiple parquet files where each file has MULTIPLE row groups
1158- # with scrambled RG order. This tests both:
1159- # - Row-group-level reorder within each file (reorder_by_statistics)
1160- # - TopK threshold initialization from RG statistics
1161- #
1162- # Strategy:
1163- # 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total).
1164- # 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the
1165- # RG order within each file using a deterministic permutation.
1166- # Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges
1167- # but in scrambled order.
1168- #
1169- # Writing a single file with ORDER BY scramble does NOT work: the parquet
1170- # writer merges rows from adjacent chunks at RG boundaries, widening
1171- # ranges and defeating reorder_by_statistics.
1172- #
1173- # Requires pyarrow (pip install pyarrow).
1157+ # Produces a single large lineitem parquet file where row groups have
1158+ # NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
1159+ # sorted, RGs shuffled). This simulates append-heavy workloads where data
1160+ # is written in batches at different times.
11741161data_sort_pushdown_inexact () {
11751162 INEXACT_DIR=" ${DATA_DIR} /sort_pushdown_inexact/lineitem"
11761163 if [ -d " ${INEXACT_DIR} " ] && [ " $( ls -A ${INEXACT_DIR} /* .parquet 2> /dev/null) " ]; then
11771164 echo " Sort pushdown Inexact data already exists at ${INEXACT_DIR} "
11781165 return
11791166 fi
11801167
1181- echo " Generating sort pushdown Inexact benchmark data (multi- file, scrambled RGs)..."
1168+ echo " Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
11821169
11831170 # Re-use the sort_pushdown data as the source (generate if missing)
11841171 data_sort_pushdown
11851172
11861173 mkdir -p " ${INEXACT_DIR} "
11871174 SRC_DIR=" ${DATA_DIR} /sort_pushdown/lineitem"
11881175
1189- # Step 1: Write a single sorted file with small (100K-row) RGs
1190- TMPFILE=" ${INEXACT_DIR} /_sorted_small_rgs.parquet"
1176+ # Use datafusion-cli to bucket rows into 64 groups by a deterministic
1177+ # scrambler, then sort within each bucket by orderkey. This produces
1178+ # ~64 RG-sized segments where each has a tight orderkey range but the
1179+ # segments appear in scrambled (non-sorted) order in the file.
11911180 (cd " ${SCRIPT_DIR} /.." && cargo run --release -p datafusion-cli -- -c "
11921181 CREATE EXTERNAL TABLE src
11931182 STORED AS PARQUET
11941183 LOCATION '${SRC_DIR} ';
11951184
1196- COPY (SELECT * FROM src ORDER BY l_orderkey)
1197- TO '${TMPFILE} '
1185+ COPY (
1186+ SELECT * FROM src
1187+ ORDER BY
1188+ (l_orderkey * 1664525 + 1013904223) % 64,
1189+ l_orderkey
1190+ )
1191+ TO '${INEXACT_DIR} /shuffled.parquet'
11981192 STORED AS PARQUET
11991193 OPTIONS ('format.max_row_group_size' '100000');
12001194 " )
12011195
1202- # Step 2: Redistribute RGs into 3 files with scrambled RG order.
1203- # Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file,
1204- # permutation (rg_idx * 41 + 7) % n scrambles the order within file.
1205- python3 -c "
1206- import pyarrow.parquet as pq
1207-
1208- pf = pq.ParquetFile('${TMPFILE} ')
1209- n = pf.metadata.num_row_groups
1210- n_files = 3
1211-
1212- # Assign each RG to a file, scramble order within each file
1213- file_rgs = [[] for _ in range(n_files)]
1214- for rg_idx in range(n):
1215- slot = (rg_idx * 41 + 7) % n # scrambled index
1216- file_id = slot % n_files
1217- file_rgs[file_id].append(rg_idx)
1218-
1219- # Write each file with its assigned RGs (in scrambled order)
1220- for file_id in range(n_files):
1221- rgs = file_rgs[file_id]
1222- if not rgs:
1223- continue
1224- tables = [pf.read_row_group(rg) for rg in rgs]
1225- writer = pq.ParquetWriter(
1226- '${INEXACT_DIR} /part_%03d.parquet' % file_id,
1227- pf.schema_arrow)
1228- for t in tables:
1229- writer.write_table(t)
1230- writer.close()
1231- print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1232- "
1233-
1234- rm -f " ${TMPFILE} "
1235- echo " Sort pushdown Inexact data generated at ${INEXACT_DIR} "
1196+ echo " Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR} "
12361197 ls -la " ${INEXACT_DIR} "
12371198
1238- # Also generate overlap data: same strategy but with different file count
1239- # and permutation. Simulates streaming data with network delays where
1240- # chunks arrive out of sequence.
1241- #
1242- # Requires pyarrow (pip install pyarrow).
1199+ # Also generate a file with partially overlapping row groups.
1200+ # Simulates streaming data with network delays: each chunk is mostly
1201+ # in order but has a small overlap with the next chunk (±5% of the
1202+ # chunk range). This is the pattern described by @adriangb — data
1203+ # arriving with timestamps that are generally increasing but with
1204+ # network-induced jitter causing small overlaps between row groups.
12431205 OVERLAP_DIR=" ${DATA_DIR} /sort_pushdown_inexact_overlap/lineitem"
12441206 if [ -d " ${OVERLAP_DIR} " ] && [ " $( ls -A ${OVERLAP_DIR} /* .parquet 2> /dev/null) " ]; then
12451207 echo " Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR} "
12461208 return
12471209 fi
12481210
1249- echo " Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..."
1211+ echo " Generating sort pushdown Inexact overlap data (partially overlapping RGs)..."
12501212 mkdir -p " ${OVERLAP_DIR} "
12511213
1252- # Step 1: Write a single sorted file with small (100K-row) RGs
1253- TMPFILE=" ${OVERLAP_DIR} /_sorted_small_rgs.parquet"
12541214 (cd " ${SCRIPT_DIR} /.." && cargo run --release -p datafusion-cli -- -c "
12551215 CREATE EXTERNAL TABLE src
12561216 STORED AS PARQUET
12571217 LOCATION '${SRC_DIR} ';
12581218
1259- COPY (SELECT * FROM src ORDER BY l_orderkey)
1260- TO '${TMPFILE} '
1219+ -- Add jitter to l_orderkey: shift each row by a random-ish offset
1220+ -- proportional to its position. This creates overlap between adjacent
1221+ -- row groups while preserving the general ascending trend.
1222+ -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
1223+ -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
1224+ COPY (
1225+ SELECT * FROM src
1226+ ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
1227+ )
1228+ TO '${OVERLAP_DIR} /overlapping.parquet'
12611229 STORED AS PARQUET
12621230 OPTIONS ('format.max_row_group_size' '100000');
12631231 " )
12641232
1265- # Step 2: Redistribute into 5 files with scrambled RG order.
1266- python3 -c "
1267- import pyarrow.parquet as pq
1268-
1269- pf = pq.ParquetFile('${TMPFILE} ')
1270- n = pf.metadata.num_row_groups
1271- n_files = 5
1272-
1273- file_rgs = [[] for _ in range(n_files)]
1274- for rg_idx in range(n):
1275- slot = (rg_idx * 37 + 13) % n
1276- file_id = slot % n_files
1277- file_rgs[file_id].append(rg_idx)
1278-
1279- for file_id in range(n_files):
1280- rgs = file_rgs[file_id]
1281- if not rgs:
1282- continue
1283- tables = [pf.read_row_group(rg) for rg in rgs]
1284- writer = pq.ParquetWriter(
1285- '${OVERLAP_DIR} /part_%03d.parquet' % file_id,
1286- pf.schema_arrow)
1287- for t in tables:
1288- writer.write_table(t)
1289- writer.close()
1290- print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1291- "
1292-
1293- rm -f " ${TMPFILE} "
1233+ echo " Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR} "
1234+ ls -la " ${OVERLAP_DIR} "
12941235}
12951236
12961237# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
@@ -1299,7 +1240,7 @@ for file_id in range(n_files):
12991240run_sort_pushdown_inexact () {
13001241 INEXACT_DIR=" ${DATA_DIR} /sort_pushdown_inexact"
13011242 RESULTS_FILE=" ${RESULTS_DIR} /sort_pushdown_inexact.json"
1302- echo " Running sort pushdown Inexact benchmark (multi-file scrambled RGs, -- sorted DESC)..."
1243+ echo " Running sort pushdown Inexact benchmark (-- sorted, DESC, reverse scan path )..."
13031244 DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
13041245 debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path " ${INEXACT_DIR} " --queries-path " ${SCRIPT_DIR} /queries/sort_pushdown_inexact" -o " ${RESULTS_FILE} " ${QUERY_ARG} ${LATENCY_ARG}
13051246}
@@ -1315,13 +1256,13 @@ run_sort_pushdown_inexact_unsorted() {
13151256 debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path " ${INEXACT_DIR} " --queries-path " ${SCRIPT_DIR} /queries/sort_pushdown_inexact_unsorted" -o " ${RESULTS_FILE} " ${QUERY_ARG} ${LATENCY_ARG}
13161257}
13171258
1318- # Runs the sort pushdown benchmark with multi-file scrambled RG order .
1319- # Simulates streaming data with network delays — multiple files, each with
1320- # scrambled RGs. Tests both RG-level reorder and TopK stats initialization .
1259+ # Runs the sort pushdown benchmark with partially overlapping RGs .
1260+ # Simulates streaming data with network jitter — RGs are mostly in order
1261+ # but have small overlaps (±2500 orderkey jitter between adjacent RGs) .
13211262run_sort_pushdown_inexact_overlap () {
13221263 OVERLAP_DIR=" ${DATA_DIR} /sort_pushdown_inexact_overlap"
13231264 RESULTS_FILE=" ${RESULTS_DIR} /sort_pushdown_inexact_overlap.json"
1324- echo " Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..."
1265+ echo " Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
13251266 DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
13261267 debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path " ${OVERLAP_DIR} " --queries-path " ${SCRIPT_DIR} /queries/sort_pushdown_inexact_overlap" -o " ${RESULTS_FILE} " ${QUERY_ARG} ${LATENCY_ARG}
13271268}
0 commit comments