Skip to content

Commit 54f4fd4

Browse files
committed
perf: move stats init before PruningPredicate build + fix CastExpr unwrap
Critical fix: PruningPredicate compiles the expression at build time, so the DynamicFilterPhysicalExpr must be updated BEFORE the predicate is built. Previously stats init ran after, making RG pruning ineffective for the current file. Also fixes: - Unwrap CastExpr to find the inner Column (projection may add casts) - Use limit=1 default when scan limit is None (TopK fetch is at SortExec level, not pushed to scan) - Only init threshold in sort pushdown path to avoid tie-breaking changes for non-sort-pushdown TopK queries Local benchmark: single file with 61 sorted RGs, DESC LIMIT Baseline: 22-25ms per query Feature: 0.4-1.2ms per query (20-58x faster)
1 parent c94bdcc commit 54f4fd4

2 files changed

Lines changed: 151 additions & 75 deletions

File tree

benchmarks/bench.sh

Lines changed: 102 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ clickbench_extended: ClickBench \"inspired\" queries against a single parquet
109109
# Sort Pushdown Benchmarks
110110
sort_pushdown: Sort pushdown baseline (no WITH ORDER) on TPC-H data (SF=1)
111111
sort_pushdown_sorted: Sort pushdown with WITH ORDER — tests sort elimination on non-overlapping files
112-
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — tests reverse scan + RG reorder
113-
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — tests Unsupported path + RG reorder
114-
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — partially overlapping RGs (streaming data scenario)
112+
sort_pushdown_inexact: Sort pushdown Inexact path (--sorted DESC) — multi-file with scrambled RGs, tests reverse scan + RG reorder
113+
sort_pushdown_inexact_unsorted: Sort pushdown Inexact path (no WITH ORDER) — same data, tests Unsupported path + RG reorder
114+
sort_pushdown_inexact_overlap: Sort pushdown Inexact path — multi-file scrambled RGs (streaming data scenario)
115115
116116
# Sorted Data Benchmarks (ORDER BY Optimization)
117117
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)
@@ -1154,93 +1154,143 @@ run_sort_pushdown_sorted() {
11541154

11551155
# Generates data for sort pushdown Inexact benchmark.
11561156
#
1157-
# Produces a single large lineitem parquet file where row groups have
1158-
# NON-OVERLAPPING but OUT-OF-ORDER l_orderkey ranges (each RG internally
1159-
# sorted, RGs shuffled). This simulates append-heavy workloads where data
1160-
# is written in batches at different times.
1157+
# Produces multiple parquet files where each file has MULTIPLE row groups
1158+
# with scrambled RG order. This tests both:
1159+
# - Row-group-level reorder within each file (reorder_by_statistics)
1160+
# - TopK threshold initialization from RG statistics
1161+
#
1162+
# Strategy:
1163+
# 1. Write a single sorted file with small (100K-row) RGs (~61 RGs total).
1164+
# 2. Use pyarrow to redistribute RGs into N_FILES files, scrambling the
1165+
# RG order within each file using a deterministic permutation.
1166+
# Each file gets ~61/N_FILES RGs with narrow, non-overlapping ranges
1167+
# but in scrambled order.
1168+
#
1169+
# Writing a single file with ORDER BY scramble does NOT work: the parquet
1170+
# writer merges rows from adjacent chunks at RG boundaries, widening
1171+
# ranges and defeating reorder_by_statistics.
1172+
#
1173+
# Requires pyarrow (pip install pyarrow).
11611174
data_sort_pushdown_inexact() {
11621175
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact/lineitem"
11631176
if [ -d "${INEXACT_DIR}" ] && [ "$(ls -A ${INEXACT_DIR}/*.parquet 2>/dev/null)" ]; then
11641177
echo "Sort pushdown Inexact data already exists at ${INEXACT_DIR}"
11651178
return
11661179
fi
11671180

1168-
echo "Generating sort pushdown Inexact benchmark data (single file, shuffled RGs)..."
1181+
echo "Generating sort pushdown Inexact benchmark data (multi-file, scrambled RGs)..."
11691182

11701183
# Re-use the sort_pushdown data as the source (generate if missing)
11711184
data_sort_pushdown
11721185

11731186
mkdir -p "${INEXACT_DIR}"
11741187
SRC_DIR="${DATA_DIR}/sort_pushdown/lineitem"
11751188

1176-
# Use datafusion-cli to bucket rows into 64 groups by a deterministic
1177-
# scrambler, then sort within each bucket by orderkey. This produces
1178-
# ~64 RG-sized segments where each has a tight orderkey range but the
1179-
# segments appear in scrambled (non-sorted) order in the file.
1189+
# Step 1: Write a single sorted file with small (100K-row) RGs
1190+
TMPFILE="${INEXACT_DIR}/_sorted_small_rgs.parquet"
11801191
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
11811192
CREATE EXTERNAL TABLE src
11821193
STORED AS PARQUET
11831194
LOCATION '${SRC_DIR}';
11841195
1185-
COPY (
1186-
SELECT * FROM src
1187-
ORDER BY
1188-
(l_orderkey * 1664525 + 1013904223) % 64,
1189-
l_orderkey
1190-
)
1191-
TO '${INEXACT_DIR}/shuffled.parquet'
1196+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1197+
TO '${TMPFILE}'
11921198
STORED AS PARQUET
11931199
OPTIONS ('format.max_row_group_size' '100000');
11941200
")
11951201

1196-
echo "Sort pushdown Inexact shuffled data generated at ${INEXACT_DIR}"
1202+
# Step 2: Redistribute RGs into 3 files with scrambled RG order.
1203+
# Each file gets ~20 RGs. RG assignment: rg_idx % 3 determines file,
1204+
# permutation (rg_idx * 41 + 7) % n scrambles the order within file.
1205+
python3 -c "
1206+
import pyarrow.parquet as pq
1207+
1208+
pf = pq.ParquetFile('${TMPFILE}')
1209+
n = pf.metadata.num_row_groups
1210+
n_files = 3
1211+
1212+
# Assign each RG to a file, scramble order within each file
1213+
file_rgs = [[] for _ in range(n_files)]
1214+
for rg_idx in range(n):
1215+
slot = (rg_idx * 41 + 7) % n # scrambled index
1216+
file_id = slot % n_files
1217+
file_rgs[file_id].append(rg_idx)
1218+
1219+
# Write each file with its assigned RGs (in scrambled order)
1220+
for file_id in range(n_files):
1221+
rgs = file_rgs[file_id]
1222+
if not rgs:
1223+
continue
1224+
tables = [pf.read_row_group(rg) for rg in rgs]
1225+
writer = pq.ParquetWriter(
1226+
'${INEXACT_DIR}/part_%03d.parquet' % file_id,
1227+
pf.schema_arrow)
1228+
for t in tables:
1229+
writer.write_table(t)
1230+
writer.close()
1231+
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1232+
"
1233+
1234+
rm -f "${TMPFILE}"
1235+
echo "Sort pushdown Inexact data generated at ${INEXACT_DIR}"
11971236
ls -la "${INEXACT_DIR}"
11981237

1199-
# Also generate a file with partially overlapping AND out-of-order
1200-
# row groups. Simulates streaming data with network delays: chunks
1201-
# arrive out of sequence, and each chunk has small jitter causing
1202-
# overlap with neighbors. This is the pattern described by
1203-
# @adriangb — data arriving with timestamps that are generally
1204-
# increasing but network-induced delays cause chunks to arrive
1205-
# out of order with small overlaps between row groups.
1238+
# Also generate overlap data: same strategy but with different file count
1239+
# and permutation. Simulates streaming data with network delays where
1240+
# chunks arrive out of sequence.
12061241
#
1207-
# Strategy: bucket rows into 60 chunks (~100K rows each), sort
1208-
# within each chunk (with jitter for overlap), then scramble the
1209-
# chunk order using a deterministic permutation. This produces
1210-
# RGs that are individually sorted but appear in scrambled order
1211-
# in the file — so reorder_by_statistics has real work to do.
1242+
# Requires pyarrow (pip install pyarrow).
12121243
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap/lineitem"
12131244
if [ -d "${OVERLAP_DIR}" ] && [ "$(ls -A ${OVERLAP_DIR}/*.parquet 2>/dev/null)" ]; then
12141245
echo "Sort pushdown Inexact overlap data already exists at ${OVERLAP_DIR}"
12151246
return
12161247
fi
12171248

1218-
echo "Generating sort pushdown Inexact overlap data (scrambled + overlapping RGs)..."
1249+
echo "Generating sort pushdown Inexact overlap data (multi-file, scrambled RGs)..."
12191250
mkdir -p "${OVERLAP_DIR}"
12201251

1252+
# Step 1: Write a single sorted file with small (100K-row) RGs
1253+
TMPFILE="${OVERLAP_DIR}/_sorted_small_rgs.parquet"
12211254
(cd "${SCRIPT_DIR}/.." && cargo run --release -p datafusion-cli -- -c "
12221255
CREATE EXTERNAL TABLE src
12231256
STORED AS PARQUET
12241257
LOCATION '${SRC_DIR}';
12251258
1226-
-- Bucket into 60 chunks (each ~100K rows), sort within each chunk,
1227-
-- then scramble chunk order. This produces overlapping RGs that are
1228-
-- individually sorted but appear in scrambled order in the file.
1229-
COPY (
1230-
SELECT * FROM src
1231-
ORDER BY
1232-
-- Scramble chunk order: chunk_id -> permuted_chunk_id
1233-
(CAST(l_orderkey / 100000 AS INT) * 37 + 13) % 60,
1234-
-- Within each chunk, add small jitter for overlap
1235-
l_orderkey + (l_orderkey * 7 % 5000) - 2500
1236-
)
1237-
TO '${OVERLAP_DIR}/overlapping.parquet'
1259+
COPY (SELECT * FROM src ORDER BY l_orderkey)
1260+
TO '${TMPFILE}'
12381261
STORED AS PARQUET
12391262
OPTIONS ('format.max_row_group_size' '100000');
12401263
")
12411264

1242-
echo "Sort pushdown Inexact overlap data generated at ${OVERLAP_DIR}"
1243-
ls -la "${OVERLAP_DIR}"
1265+
# Step 2: Redistribute into 5 files with scrambled RG order.
1266+
python3 -c "
1267+
import pyarrow.parquet as pq
1268+
1269+
pf = pq.ParquetFile('${TMPFILE}')
1270+
n = pf.metadata.num_row_groups
1271+
n_files = 5
1272+
1273+
file_rgs = [[] for _ in range(n_files)]
1274+
for rg_idx in range(n):
1275+
slot = (rg_idx * 37 + 13) % n
1276+
file_id = slot % n_files
1277+
file_rgs[file_id].append(rg_idx)
1278+
1279+
for file_id in range(n_files):
1280+
rgs = file_rgs[file_id]
1281+
if not rgs:
1282+
continue
1283+
tables = [pf.read_row_group(rg) for rg in rgs]
1284+
writer = pq.ParquetWriter(
1285+
'${OVERLAP_DIR}/part_%03d.parquet' % file_id,
1286+
pf.schema_arrow)
1287+
for t in tables:
1288+
writer.write_table(t)
1289+
writer.close()
1290+
print(f'File part_{file_id:03d}.parquet: {len(rgs)} RGs')
1291+
"
1292+
1293+
rm -f "${TMPFILE}"
12441294
}
12451295

12461296
# Runs the sort pushdown Inexact benchmark (tests RG reorder by statistics).
@@ -1249,7 +1299,7 @@ data_sort_pushdown_inexact() {
12491299
run_sort_pushdown_inexact() {
12501300
INEXACT_DIR="${DATA_DIR}/sort_pushdown_inexact"
12511301
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact.json"
1252-
echo "Running sort pushdown Inexact benchmark (--sorted, DESC, reverse scan path)..."
1302+
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, --sorted DESC)..."
12531303
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12541304
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12551305
}
@@ -1265,13 +1315,13 @@ run_sort_pushdown_inexact_unsorted() {
12651315
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --iterations 5 --path "${INEXACT_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_unsorted" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12661316
}
12671317

1268-
# Runs the sort pushdown benchmark with scrambled + overlapping RGs.
1269-
# Simulates streaming data with network delays — RGs are out of order
1270-
# AND have small overlaps (jitter). Tests reorder_by_statistics effectiveness.
1318+
# Runs the sort pushdown benchmark with multi-file scrambled RG order.
1319+
# Simulates streaming data with network delays — multiple files, each with
1320+
# scrambled RGs. Tests both RG-level reorder and TopK stats initialization.
12711321
run_sort_pushdown_inexact_overlap() {
12721322
OVERLAP_DIR="${DATA_DIR}/sort_pushdown_inexact_overlap"
12731323
RESULTS_FILE="${RESULTS_DIR}/sort_pushdown_inexact_overlap.json"
1274-
echo "Running sort pushdown Inexact benchmark (overlapping RGs, streaming data pattern)..."
1324+
echo "Running sort pushdown Inexact benchmark (multi-file scrambled RGs, streaming data pattern)..."
12751325
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true \
12761326
debug_run $CARGO_COMMAND --bin dfbench -- sort-pushdown --sorted --iterations 5 --path "${OVERLAP_DIR}" --queries-path "${SCRIPT_DIR}/queries/sort_pushdown_inexact_overlap" -o "${RESULTS_FILE}" ${QUERY_ARG} ${LATENCY_ARG}
12771327
}

datafusion/datasource-parquet/src/opener.rs

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,30 @@ impl MetadataLoadedParquetOpen {
834834
}
835835
prepared.physical_file_schema = Arc::clone(&physical_file_schema);
836836

837+
// Initialize TopK dynamic filter threshold from row group statistics
838+
// BEFORE building the pruning predicate. The PruningPredicate compiles
839+
// the expression at build time, so the DynamicFilterPhysicalExpr must
840+
// already have the threshold set for pruning to be effective.
841+
// Only initialize TopK threshold when sort pushdown is active.
842+
// For non-sort-pushdown TopK, pruning changes which RGs are read,
843+
// altering tie-breaking for equal values (e.g. NULLs).
844+
if prepared.sort_order_for_reorder.is_some() {
845+
let file_metadata = reader_metadata.metadata();
846+
let rg_metadata = file_metadata.row_groups();
847+
let topk_limit = prepared.limit.unwrap_or(1);
848+
if let Some(predicate) = &prepared.predicate
849+
&& let Err(e) = try_init_topk_threshold(
850+
predicate,
851+
topk_limit,
852+
rg_metadata,
853+
&physical_file_schema,
854+
reader_metadata.parquet_schema(),
855+
)
856+
{
857+
debug!("Skipping TopK threshold init from statistics: {e}");
858+
}
859+
}
860+
837861
// Build predicates for this specific file
838862
let pruning_predicate = build_pruning_predicates(
839863
prepared.predicate.as_ref(),
@@ -905,22 +929,6 @@ impl FiltersPreparedParquetOpen {
905929
row_groups.prune_by_range(rg_metadata, range);
906930
}
907931

908-
// Initialize TopK dynamic filter threshold from row group statistics
909-
// BEFORE row group pruning, so that this file's own RGs can be pruned
910-
// by the threshold. For the first file, this sets the initial threshold;
911-
// for subsequent files, the threshold is already set by earlier files.
912-
if let (Some(predicate), Some(limit)) = (&prepared.predicate, prepared.limit)
913-
&& let Err(e) = try_init_topk_threshold(
914-
predicate,
915-
limit,
916-
rg_metadata,
917-
&prepared.physical_file_schema,
918-
loaded.reader_metadata.parquet_schema(),
919-
)
920-
{
921-
debug!("Skipping TopK threshold initialization from statistics: {e}");
922-
}
923-
924932
// If there is a predicate that can be evaluated against the metadata
925933
if let Some(predicate) = self.pruning_predicate.as_ref().map(|p| p.as_ref()) {
926934
if prepared.enable_row_group_stats_pruning {
@@ -1350,15 +1358,18 @@ fn try_init_topk_threshold(
13501358

13511359
let is_descending = sort_options[0].descending;
13521360

1353-
// The child must be a Column expression so we can look up statistics.
1361+
// The child must be a Column expression (possibly wrapped in CastExpr).
13541362
let children = dynamic_filter.children();
13551363
let col_expr: Arc<dyn PhysicalExpr> = Arc::clone(children[0]);
1356-
let col_any: &dyn std::any::Any = col_expr.as_ref();
1357-
let column = col_any.downcast_ref::<Column>().ok_or_else(|| {
1358-
DataFusionError::Internal(
1359-
"TopK threshold init: sort child is not a Column expression".to_string(),
1360-
)
1361-
})?;
1364+
let column = match find_column_in_expr(&col_expr) {
1365+
Some(col) => col,
1366+
None => {
1367+
debug!(
1368+
"Skipping TopK threshold init: cannot find Column in child expr {col_expr:?}",
1369+
);
1370+
return Ok(());
1371+
}
1372+
};
13621373

13631374
let col_name = column.name();
13641375

@@ -1439,6 +1450,21 @@ fn find_dynamic_filter(
14391450
None
14401451
}
14411452

1453+
/// Find a [`Column`] expression by unwrapping wrappers like `CastExpr`.
1454+
fn find_column_in_expr(expr: &Arc<dyn PhysicalExpr>) -> Option<Column> {
1455+
// Direct Column
1456+
let any_ref: &dyn std::any::Any = expr.as_ref();
1457+
if let Some(col) = any_ref.downcast_ref::<Column>() {
1458+
return Some(col.clone());
1459+
}
1460+
// Unwrap single-child wrappers (e.g. CastExpr)
1461+
let children = expr.children();
1462+
if children.len() == 1 {
1463+
return find_column_in_expr(children[0]);
1464+
}
1465+
None
1466+
}
1467+
14421468
/// Compute the best threshold from row group statistics.
14431469
///
14441470
/// For `want_max = true` (DESC): finds the maximum value from the stats array

0 commit comments

Comments
 (0)