Skip to content

Commit 1bd60e9

Browse files
committed
fix some issues
1 parent dbab02b commit 1bd60e9

7 files changed

Lines changed: 28 additions & 31 deletions

File tree

datafusion-examples/examples/data_io/json_shredding.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ pub async fn json_shredding() -> Result<()> {
144144
let plan = format!("{}", arrow::util::pretty::pretty_format_batches(&batches)?);
145145
println!("{plan}");
146146
assert_contains!(&plan, "row_groups_pruned_statistics=2 total → 1 matched");
147-
assert_contains!(&plan, "pushdown_rows_pruned=1");
147+
assert_contains!(&plan, "pushdown_rows_pruned=0");
148148

149149
Ok(())
150150
}

datafusion/common/src/config.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -779,10 +779,10 @@ config_namespace! {
779779
/// (reading) Fraction of total dataset rows to use for the statistics
780780
/// collection phase. When > 0 and the dataset row count is known, the
781781
/// effective collection threshold is max(min_rows, fraction * total_rows).
782-
/// 0.0 (default) = disabled, use filter_statistics_collection_min_rows only.
783-
/// 0.05 = collect stats on at least 5% of the dataset.
782+
/// 0.0 = disabled, use filter_statistics_collection_min_rows only.
783+
/// 0.05 (default) = collect stats on at least 5% of the dataset.
784784
/// Must be in [0.0, 1.0].
785-
pub filter_statistics_collection_fraction: f64, default = 0.0
785+
pub filter_statistics_collection_fraction: f64, default = 0.05
786786

787787
// The following options affect writing to parquet files
788788
// and map to parquet::file::properties::WriterProperties

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
263263
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
264264
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
265265
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
266-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
266+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ]) AND Optional(DynamicFilter [ empty ])
267267
"
268268
);
269269

@@ -287,7 +287,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
287287
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
288288
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
289289
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
290-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
290+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ]) AND Optional(DynamicFilter [ e@1 IS NULL OR e@1 < bb ])
291291
"
292292
);
293293
}
@@ -1003,7 +1003,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
10031003
Ok:
10041004
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
10051005
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1006-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1006+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
10071007
",
10081008
);
10091009

@@ -1037,7 +1037,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
10371037
@r"
10381038
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
10391039
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1040-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
1040+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ])
10411041
"
10421042
);
10431043
}
@@ -1213,7 +1213,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
12131213
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
12141214
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
12151215
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1216-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1216+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
12171217
"
12181218
);
12191219

@@ -1247,7 +1247,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
12471247
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
12481248
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
12491249
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1250-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ]
1250+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ CASE hash_repartition % 12 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:ab,c1:bb}]) WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}]) ELSE false END ])
12511251
"
12521252
);
12531253

@@ -1265,7 +1265,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
12651265
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
12661266
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
12671267
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1268-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
1268+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ])
12691269
"
12701270
);
12711271

@@ -1405,7 +1405,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
14051405
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
14061406
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
14071407
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1408-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1408+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
14091409
"
14101410
);
14111411

@@ -1437,7 +1437,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
14371437
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
14381438
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
14391439
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1440-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
1440+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ])
14411441
"
14421442
);
14431443

@@ -1579,8 +1579,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
15791579
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
15801580
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
15811581
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1582-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1583-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
1582+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
1583+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
15841584
",
15851585
);
15861586

@@ -1610,8 +1610,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
16101610
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
16111611
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
16121612
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1613-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
1614-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ]
1613+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ])
1614+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ])
16151615
"
16161616
);
16171617
}
@@ -3095,7 +3095,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
30953095
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
30963096
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
30973097
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
3098-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
3098+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
30993099
"
31003100
);
31013101

@@ -3120,7 +3120,7 @@ async fn test_hashjoin_dynamic_filter_all_partitions_empty() {
31203120
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
31213121
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
31223122
- RepartitionExec: partitioning=Hash([a@0, b@1], 4), input_partitions=1
3123-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ false ]
3123+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ false ])
31243124
"
31253125
);
31263126
}
@@ -3222,7 +3222,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
32223222
@r"
32233223
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
32243224
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
3225-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]
3225+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ empty ])
32263226
"
32273227
);
32283228

@@ -3245,7 +3245,7 @@ async fn test_hashjoin_dynamic_filter_with_nulls() {
32453245
@r"
32463246
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
32473247
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b], file_type=test, pushdown_supported=true
3248-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ]
3248+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=Optional(DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= 1 AND b@1 <= 2 AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:1}, {c0:,c1:2}, {c0:ab,c1:}]) ])
32493249
"
32503250
);
32513251

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1032,7 +1032,7 @@ struct DatafusionArrowPredicateWithMetrics {
10321032
}
10331033

10341034
impl DatafusionArrowPredicateWithMetrics {
1035-
#[allow(clippy::too_many_arguments)]
1035+
#[expect(clippy::too_many_arguments)]
10361036
fn try_new(
10371037
candidate: FilterCandidate,
10381038
metadata: &ParquetMetaData,

datafusion/datasource-parquet/src/selectivity.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ impl SelectivityTracker {
430430
/// Partition filters into row_filters and post_scan based on bytes/sec throughput.
431431
///
432432
/// This is the non-grouped variant used primarily for testing. For production
433-
/// use, prefer [`partition_filters_grouped`] which also handles correlation-based
433+
/// use, prefer [`Self::partition_filters_grouped`] which also handles correlation-based
434434
/// grouping.
435435
///
436436
/// - `AllPromoted` (0.0): all filters pushed as row filters

datafusion/proto/src/generated/datafusion_proto_common.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -876,10 +876,7 @@ pub struct ParquetOptions {
876876
pub filter_pushdown_min_bytes_per_sec_opt: ::core::option::Option<
877877
parquet_options::FilterPushdownMinBytesPerSecOpt,
878878
>,
879-
#[prost(
880-
oneof = "parquet_options::FilterCorrelationThresholdOpt",
881-
tags = "36"
882-
)]
879+
#[prost(oneof = "parquet_options::FilterCorrelationThresholdOpt", tags = "36")]
883880
pub filter_correlation_threshold_opt: ::core::option::Option<
884881
parquet_options::FilterCorrelationThresholdOpt,
885882
>,

0 commit comments

Comments
 (0)