@@ -1196,42 +1196,60 @@ data_sort_pushdown_inexact() {
11961196 echo " Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR} "
11971197 ls -la " ${INEXACT_DIR} "
11981198
1199- # Also generate a file with partially overlapping row groups.
1200- # Simulates streaming data with network delays: each chunk is mostly
1201- # in order but has a small overlap with the next chunk (±5% of the
1202- # chunk range). This is the pattern described by @adriangb — data
1203- # arriving with timestamps that are generally increasing but with
1204- # network-induced jitter causing small overlaps between row groups.
1199+ # Also generate files with scrambled file order.
1200+ # Simulates streaming data with network delays: chunks arrive out
1201+ # of sequence. This is the pattern described by @adriangb — data
1202+ # arriving with timestamps that are generally increasing but
1203+ # network-induced delays cause chunks to arrive out of order.
1204+ #
1205+ # Strategy:
1206+ # 1. Write a sorted parquet file with small RGs (100K rows each),
1207+ # producing ~61 RGs with narrow, non-overlapping l_orderkey ranges.
1208+ # 2. Split into individual files (one per RG) and rename using a
1209+ # deterministic permutation so alphabetical file order !=
1210+ # l_orderkey order. reorder_by_statistics fixes this.
1211+ #
1212+ # Writing a single file with ORDER BY and jitter/scrambling does NOT
1213+ # work: the parquet writer merges rows from adjacent chunks into the
1214+ # same RG at chunk boundaries, widening RG ranges and defeating
1215+ # reorder. Splitting into separate files avoids this.
1216+ #
1217+ # Requires pyarrow for splitting (pip install pyarrow).
12051218 OVERLAP_DIR=" ${DATA_DIR} /sort_pushdown_inexact_overlap/lineitem"
12061219 if [ -d " ${OVERLAP_DIR} " ] && [ " $( ls -A ${OVERLAP_DIR} /* .parquet 2> /dev/null) " ]; then
12071220 echo " Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR} "
12081221 return
12091222 fi
12101223
1211- echo " Generating sort pushdown Inexact overlap data (partially overlapping RGs )..."
1224+ echo " Generating sort pushdown Inexact overlap data (scrambled file order )..."
12121225 mkdir -p " ${OVERLAP_DIR} "
12131226
1227+ # Step 1: Write a single sorted file with small (100K-row) RGs
1228+ TMPFILE=" ${OVERLAP_DIR} /_sorted_small_rgs.parquet"
12141229 (cd " ${SCRIPT_DIR} /.." && cargo run --release -p datafusion-cli -- -c "
12151230 CREATE EXTERNAL TABLE src
12161231 STORED AS PARQUET
12171232 LOCATION '${SRC_DIR} ';
12181233
1219- -- Add jitter to l_orderkey: shift each row by a random-ish offset
1220- -- proportional to its position. This creates overlap between adjacent
1221- -- row groups while preserving the general ascending trend.
1222- -- Formula: l_orderkey + (l_orderkey * 7 % 5000) - 2500
1223- -- This adds ±2500 jitter, creating ~5K overlap between adjacent 100K-row RGs.
1224- COPY (
1225- SELECT * FROM src
1226- ORDER BY l_orderkey + (l_orderkey * 7 % 5000) - 2500
1227- )
1228- TO '${OVERLAP_DIR} /overlapping.parquet'
1234+ COPY (SELECT * FROM src ORDER BY l_orderkey)
1235+ TO '${TMPFILE} '
12291236 STORED AS PARQUET
12301237 OPTIONS ('format.max_row_group_size' '100000');
12311238 " )
12321239
1233- echo " Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR} "
1234- ls -la " ${OVERLAP_DIR} "
1240+ # Step 2: Split into per-RG files with scrambled names via pyarrow
1241+ python3 -c "
1242+ import pyarrow.parquet as pq
1243+ pf = pq.ParquetFile('${TMPFILE} ')
1244+ n = pf.metadata.num_row_groups
1245+ for rg_idx in range(n):
1246+ slot = (rg_idx * 37 + 13) % n
1247+ table = pf.read_row_group(rg_idx)
1248+ pq.write_table(table, '${OVERLAP_DIR} /chunk_%03d.parquet' % slot)
1249+ print(f'Split {n} RGs into scrambled files')
1250+ "
1251+
1252+ rm -f " ${TMPFILE} "
12351253}
12361254
12371255# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
@@ -1256,13 +1274,13 @@ run_sort_pushdown_inexact_unsorted() {
12561274 debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path " ${INEXACT_DIR} " --queries-path " ${SCRIPT_DIR} /queries/sort_pushdown_inexact_unsorted" -o " ${RESULTS_FILE} " ${QUERY_ARG} ${LATENCY_ARG}
12571275}
12581276
1259- # Runs the sort pushdown benchmark with partially overlapping RGs .
1260- # Simulates streaming data with network jitter — RGs are mostly in order
1261- # but have small overlaps (±2500 orderkey jitter between adjacent RGs) .
1277+ # Runs the sort pushdown benchmark with scrambled file order .
1278+ # Simulates streaming data with network delays — files have narrow ranges
1279+ # but appear in scrambled order. Tests reorder_by_statistics effectiveness .
12621280run_sort_pushdown_inexact_overlap () {
12631281 OVERLAP_DIR=" ${DATA_DIR} /sort_pushdown_inexact_overlap"
12641282 RESULTS_FILE=" ${RESULTS_DIR} /sort_pushdown_inexact_overlap.json"
1265- echo " Running sort pushdown Inexact benchmark (overlapping RGs , streaming data pattern)..."
1283+ echo " Running sort pushdown Inexact benchmark (scrambled file order , streaming data pattern)..."
12661284 DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12671285 debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path " ${OVERLAP_DIR} " --queries-path " ${SCRIPT_DIR} /queries/sort_pushdown_inexact_overlap" -o " ${RESULTS_FILE} " ${QUERY_ARG} ${LATENCY_ARG}
12681286}
0 commit comments