Skip to content

Commit 34e12b7

Browse files
committed
add % of dataset trigger
1 parent b95348b commit 34e12b7

15 files changed

Lines changed: 301 additions & 25 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,14 @@ config_namespace! {
776776
/// Set to 0 to disable the collection phase entirely.
777777
pub filter_statistics_collection_min_rows: u64, default = 10_000
778778

779+
/// (reading) Fraction of total dataset rows to use for the statistics
780+
/// collection phase. When > 0 and the dataset row count is known, the
781+
/// effective collection threshold is max(min_rows, fraction * total_rows).
782+
/// 0.0 (default) = disabled, use filter_statistics_collection_min_rows only.
783+
/// 0.05 = collect stats on at least 5% of the dataset.
784+
/// Must be in [0.0, 1.0].
785+
pub filter_statistics_collection_fraction: f64, default = 0.0
786+
779787
// The following options affect writing to parquet files
780788
// and map to parquet::file::properties::WriterProperties
781789

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ impl ParquetOptions {
212212
filter_pushdown_min_bytes_per_sec: _, // not used for writer props
213213
filter_correlation_threshold: _, // not used for writer props
214214
filter_statistics_collection_min_rows: _, // not used for writer props
215+
filter_statistics_collection_fraction: _, // not used for writer props
215216
} = self;
216217

217218
let mut builder = WriterProperties::builder()

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1499,7 +1499,7 @@ mod test {
14991499
// (skipping adaptive logic), and min_rows_for_collection=0 to
15001500
// disable the collection phase.
15011501
selectivity_tracker: Arc::new(parking_lot::RwLock::new(
1502-
SelectivityTracker::new_with_config(0.0, 1.5, 0),
1502+
SelectivityTracker::new_with_config(0.0, 1.5, 0, 0.0),
15031503
)),
15041504
}
15051505
}

datafusion/datasource-parquet/src/selectivity.rs

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@ pub struct SelectivityTracker {
253253
/// During collection, all filters go to post-scan for accurate measurement.
254254
/// Default: 10_000
255255
min_rows_for_collection: u64,
256+
/// Fraction of total dataset rows for collection phase (0.0 = disabled).
257+
/// When > 0 and dataset size is known, effective threshold =
258+
/// max(min_rows_for_collection, (fraction * total_rows) as u64).
259+
collection_fraction: f64,
260+
/// Resolved minimum rows after notify_dataset_rows() is called.
261+
/// None = not yet resolved (use min_rows_for_collection as-is).
262+
resolved_min_rows: Option<u64>,
256263
}
257264

258265
impl Default for SelectivityTracker {
@@ -275,6 +282,8 @@ impl SelectivityTracker {
275282
min_bytes_per_sec,
276283
correlation_threshold: 1.5,
277284
min_rows_for_collection: 10_000,
285+
collection_fraction: 0.0,
286+
resolved_min_rows: None,
278287
}
279288
}
280289

@@ -283,13 +292,16 @@ impl SelectivityTracker {
283292
min_bytes_per_sec: f64,
284293
correlation_threshold: f64,
285294
min_rows_for_collection: u64,
295+
collection_fraction: f64,
286296
) -> Self {
287297
Self {
288298
stats: HashMap::new(),
289299
correlations: HashMap::new(),
290300
min_bytes_per_sec,
291301
correlation_threshold,
292302
min_rows_for_collection,
303+
collection_fraction,
304+
resolved_min_rows: None,
293305
}
294306
}
295307

@@ -298,6 +310,25 @@ impl SelectivityTracker {
298310
self.min_bytes_per_sec
299311
}
300312

313+
/// Returns the effective minimum rows for collection, taking into account
314+
/// the fraction-based threshold if it has been resolved.
315+
fn effective_min_rows(&self) -> u64 {
316+
self.resolved_min_rows.unwrap_or(self.min_rows_for_collection)
317+
}
318+
319+
/// Notify the tracker of the total dataset row count so the fraction-based
320+
/// threshold can be resolved.
321+
///
322+
/// When `collection_fraction > 0`, computes:
323+
/// `resolved_min_rows = max(min_rows_for_collection, (fraction * total_rows) as u64)`
324+
pub fn notify_dataset_rows(&mut self, total_rows: u64) {
325+
if self.collection_fraction > 0.0 {
326+
let fraction_rows = (self.collection_fraction * total_rows as f64) as u64;
327+
self.resolved_min_rows =
328+
Some(self.min_rows_for_collection.max(fraction_rows));
329+
}
330+
}
331+
301332
/// Get the effectiveness for a filter expression, if known.
302333
pub fn get_effectiveness(&self, expr: &Arc<dyn PhysicalExpr>) -> Option<f64> {
303334
let key = ExprKey::new(expr);
@@ -314,17 +345,16 @@ impl SelectivityTracker {
314345
/// Returns false if no stats exist yet (no filters registered) or if
315346
/// min_rows_for_collection is 0 (collection disabled).
316347
pub fn in_collection_phase(&self) -> bool {
317-
if self.min_rows_for_collection == 0 {
348+
let min_rows = self.effective_min_rows();
349+
if min_rows == 0 {
318350
return false;
319351
}
320352
if self.stats.is_empty() {
321353
// No filters registered yet - treat as collection phase
322354
// so the first file's filters go to post-scan for measurement
323355
return true;
324356
}
325-
self.stats
326-
.values()
327-
.any(|s| s.rows_total < self.min_rows_for_collection)
357+
self.stats.values().any(|s| s.rows_total < min_rows)
328358
}
329359

330360
/// Partition filters into row_filters and post_scan based on bytes/sec throughput.
@@ -498,16 +528,15 @@ impl SelectivityTracker {
498528
let stats_b = self.stats.get(&key_b)?;
499529

500530
// Need sufficient data
501-
if stats_a.rows_total < self.min_rows_for_collection
502-
|| stats_b.rows_total < self.min_rows_for_collection
503-
{
531+
let min_rows = self.effective_min_rows();
532+
if stats_a.rows_total < min_rows || stats_b.rows_total < min_rows {
504533
return None;
505534
}
506535

507536
let pair_key = PairKey::new(&key_a, &key_b);
508537
let pair_stats = self.correlations.get(&pair_key)?;
509538

510-
if pair_stats.rows_total < self.min_rows_for_collection {
539+
if pair_stats.rows_total < min_rows {
511540
return None;
512541
}
513542

@@ -890,7 +919,7 @@ mod tests {
890919

891920
#[test]
892921
fn test_correlation_stats_update() {
893-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100);
922+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
894923

895924
let filter_a = make_filter("a", 5);
896925
let filter_b = make_filter("a", 10);
@@ -916,7 +945,7 @@ mod tests {
916945

917946
#[test]
918947
fn test_correlation_ratio_independent() {
919-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100);
948+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
920949

921950
let filter_a = make_filter("a", 5);
922951
let filter_b = make_filter("a", 10);
@@ -932,7 +961,7 @@ mod tests {
932961

933962
#[test]
934963
fn test_correlation_ratio_insufficient_data() {
935-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 1000);
964+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 1000, 0.0);
936965

937966
let filter_a = make_filter("a", 5);
938967
let filter_b = make_filter("a", 10);
@@ -947,7 +976,7 @@ mod tests {
947976

948977
#[test]
949978
fn test_in_collection_phase() {
950-
let mut tracker = SelectivityTracker::new_with_config(100.0, 1.5, 1000);
979+
let mut tracker = SelectivityTracker::new_with_config(100.0, 1.5, 1000, 0.0);
951980

952981
// No stats yet - in collection phase
953982
assert!(tracker.in_collection_phase());
@@ -966,15 +995,15 @@ mod tests {
966995

967996
#[test]
968997
fn test_in_collection_phase_disabled() {
969-
let tracker = SelectivityTracker::new_with_config(100.0, 1.5, 0);
998+
let tracker = SelectivityTracker::new_with_config(100.0, 1.5, 0, 0.0);
970999

9711000
// min_rows = 0 means collection is disabled
9721001
assert!(!tracker.in_collection_phase());
9731002
}
9741003

9751004
#[test]
9761005
fn test_partition_filters_grouped_collection_phase() {
977-
let tracker = SelectivityTracker::new_with_config(100.0, 1.5, 10_000);
1006+
let tracker = SelectivityTracker::new_with_config(100.0, 1.5, 10_000, 0.0);
9781007

9791008
let filter_a = make_filter("a", 5);
9801009
let filter_b = make_filter("a", 10);
@@ -991,7 +1020,7 @@ mod tests {
9911020

9921021
#[test]
9931022
fn test_partition_filters_grouped_all_independent() {
994-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100);
1023+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
9951024

9961025
let filter_a = make_filter("a", 5);
9971026
let filter_b = make_filter("a", 10);
@@ -1018,7 +1047,7 @@ mod tests {
10181047

10191048
#[test]
10201049
fn test_partition_filters_grouped_correlated() {
1021-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100);
1050+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
10221051

10231052
let filter_a = make_filter("a", 5);
10241053
let filter_b = make_filter("a", 10);
@@ -1043,7 +1072,7 @@ mod tests {
10431072

10441073
#[test]
10451074
fn test_partition_filters_grouped_mixed() {
1046-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100);
1075+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
10471076

10481077
let filter_a = make_filter("a", 5);
10491078
let filter_b = make_filter("a", 10);
@@ -1096,7 +1125,7 @@ mod tests {
10961125

10971126
#[test]
10981127
fn test_partition_filters_grouped_single_filter() {
1099-
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100);
1128+
let mut tracker = SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
11001129

11011130
let filter_a = make_filter("a", 5);
11021131
tracker.update(&filter_a, 10, 100, 0);
@@ -1113,7 +1142,7 @@ mod tests {
11131142
#[test]
11141143
fn test_partition_filters_grouped_with_low_throughput() {
11151144
// Use a bytes/sec threshold: 100 bytes/sec
1116-
let mut tracker = SelectivityTracker::new_with_config(100.0, 1.5, 100);
1145+
let mut tracker = SelectivityTracker::new_with_config(100.0, 1.5, 100, 0.0);
11171146

11181147
let filter_a = make_filter("a", 5);
11191148
let filter_b = make_filter("a", 10);
@@ -1136,4 +1165,53 @@ mod tests {
11361165
assert_eq!(result.row_filter_groups[0].len(), 1);
11371166
assert_eq!(result.post_scan.len(), 1);
11381167
}
1168+
1169+
#[test]
1170+
fn test_notify_dataset_rows_resolves_fraction() {
1171+
let mut tracker =
1172+
SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.05);
1173+
// Before notify, effective_min_rows = min_rows_for_collection
1174+
assert_eq!(tracker.effective_min_rows(), 100);
1175+
1176+
// 5% of 10_000 = 500, which is > 100
1177+
tracker.notify_dataset_rows(10_000);
1178+
assert_eq!(tracker.effective_min_rows(), 500);
1179+
}
1180+
1181+
#[test]
1182+
fn test_notify_dataset_rows_floor_behavior() {
1183+
let mut tracker =
1184+
SelectivityTracker::new_with_config(0.0, 1.5, 1000, 0.05);
1185+
// 5% of 10_000 = 500, but min_rows = 1000 is larger
1186+
tracker.notify_dataset_rows(10_000);
1187+
assert_eq!(tracker.effective_min_rows(), 1000);
1188+
}
1189+
1190+
#[test]
1191+
fn test_notify_dataset_rows_fraction_disabled() {
1192+
let mut tracker =
1193+
SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.0);
1194+
tracker.notify_dataset_rows(1_000_000);
1195+
// fraction = 0.0, so resolved_min_rows stays None
1196+
assert_eq!(tracker.effective_min_rows(), 100);
1197+
}
1198+
1199+
#[test]
1200+
fn test_collection_phase_with_fraction() {
1201+
let filter = make_filter("a", 5);
1202+
1203+
let mut tracker =
1204+
SelectivityTracker::new_with_config(0.0, 1.5, 100, 0.05);
1205+
// 5% of 100_000 = 5000
1206+
tracker.notify_dataset_rows(100_000);
1207+
assert_eq!(tracker.effective_min_rows(), 5000);
1208+
1209+
// Record 200 rows — still in collection (200 < 5000)
1210+
tracker.update(&filter, 80, 200, 10_000_000);
1211+
assert!(tracker.in_collection_phase());
1212+
1213+
// Record enough to pass threshold
1214+
tracker.update(&filter, 1000, 5000, 50_000_000);
1215+
assert!(!tracker.in_collection_phase());
1216+
}
11391217
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ impl ParquetSource {
355355
opts.filter_pushdown_min_bytes_per_sec,
356356
opts.filter_correlation_threshold,
357357
opts.filter_statistics_collection_min_rows,
358+
opts.filter_statistics_collection_fraction,
358359
),
359360
));
360361
self.table_parquet_options = table_parquet_options;
@@ -511,6 +512,7 @@ impl ParquetSource {
511512
min_bytes_per_sec,
512513
opts.filter_correlation_threshold,
513514
opts.filter_statistics_collection_min_rows,
515+
opts.filter_statistics_collection_fraction,
514516
),
515517
));
516518
self
@@ -633,6 +635,16 @@ impl FileSource for ParquetSource {
633635
reverse_row_groups: self.reverse_row_groups,
634636
selectivity_tracker: Arc::clone(&self.selectivity_tracker),
635637
});
638+
639+
// Notify the selectivity tracker of the total dataset row count
640+
// so fraction-based collection thresholds can be resolved.
641+
if let Some(&total_rows) = base_config.statistics().num_rows.get_value()
642+
{
643+
self.selectivity_tracker
644+
.write()
645+
.notify_dataset_rows(total_rows as u64);
646+
}
647+
636648
Ok(opener)
637649
}
638650

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,18 @@ message ParquetOptions {
607607
oneof filter_pushdown_min_bytes_per_sec_opt {
608608
double filter_pushdown_min_bytes_per_sec = 35;
609609
}
610+
611+
oneof filter_correlation_threshold_opt {
612+
double filter_correlation_threshold = 36;
613+
}
614+
615+
oneof filter_statistics_collection_min_rows_opt {
616+
uint64 filter_statistics_collection_min_rows = 37;
617+
}
618+
619+
oneof filter_statistics_collection_fraction_opt {
620+
double filter_statistics_collection_fraction = 38;
621+
}
610622
}
611623

612624
enum JoinSide {

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,8 +1093,15 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
10931093
filter_pushdown_min_bytes_per_sec: value.filter_pushdown_min_bytes_per_sec_opt.map(|opt| match opt {
10941094
protobuf::parquet_options::FilterPushdownMinBytesPerSecOpt::FilterPushdownMinBytesPerSec(v) => v,
10951095
}).unwrap_or(f64::INFINITY),
1096-
filter_correlation_threshold: 1.5,
1097-
filter_statistics_collection_min_rows: 10_000,
1096+
filter_correlation_threshold: value.filter_correlation_threshold_opt.map(|opt| match opt {
1097+
protobuf::parquet_options::FilterCorrelationThresholdOpt::FilterCorrelationThreshold(v) => v,
1098+
}).unwrap_or(1.5),
1099+
filter_statistics_collection_min_rows: value.filter_statistics_collection_min_rows_opt.map(|opt| match opt {
1100+
protobuf::parquet_options::FilterStatisticsCollectionMinRowsOpt::FilterStatisticsCollectionMinRows(v) => v,
1101+
}).unwrap_or(10_000),
1102+
filter_statistics_collection_fraction: value.filter_statistics_collection_fraction_opt.map(|opt| match opt {
1103+
protobuf::parquet_options::FilterStatisticsCollectionFractionOpt::FilterStatisticsCollectionFraction(v) => v,
1104+
}).unwrap_or(0.0),
10981105
})
10991106
}
11001107
}

0 commit comments

Comments
 (0)