Skip to content

Commit bdb79be

Browse files
committed
update slts
1 parent d79f538 commit bdb79be

6 files changed

Lines changed: 26 additions & 18 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,9 +577,8 @@ impl FileOpener for ParquetOpener {
577577
// Apply the prepared plan to the builder
578578
builder = prepared_plan.apply_to_builder(builder);
579579

580-
if let Some(limit) = limit {
581-
builder = builder.with_limit(limit)
582-
}
580+
// Note: limit is applied later, after filter partitioning,
581+
// because post-scan filters must run before limiting.
583582

584583
if let Some(max_predicate_cache_size) = max_predicate_cache_size {
585584
builder = builder.with_max_predicate_cache_size(max_predicate_cache_size);
@@ -672,6 +671,15 @@ impl FileOpener for ParquetOpener {
672671
};
673672
// tracker lock released here
674673

674+
// Apply limit to the reader only when there are no post-scan filters.
675+
// If post-scan filters exist, the limit must be enforced after filtering
676+
// (otherwise the reader stops reading before the filter can find matches).
677+
if post_scan_filters.is_empty() {
678+
if let Some(limit) = limit {
679+
builder = builder.with_limit(limit);
680+
}
681+
}
682+
675683
let stream = builder
676684
.with_projection(mask)
677685
.with_batch_size(batch_size)

datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ physical_plan
158158
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
159159
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
160160
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
161-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
161+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
162162

163163
# Disable Join dynamic filter pushdown
164164
statement ok
@@ -213,7 +213,7 @@ physical_plan
213213
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
214214
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
215215
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
216-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
216+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
217217

218218
# Enable TopK, disable Join
219219
statement ok
@@ -436,7 +436,7 @@ physical_plan
436436
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
437437
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)], projection=[info@1, id@2, data@3]
438438
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id, info], file_type=parquet
439-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
439+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
440440

441441
# Cleanup
442442

datafusion/sqllogictest/test_files/preserve_file_partitioning.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ physical_plan
370370
10)------------------FilterExec: service@2 = log
371371
11)--------------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
372372
12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
373-
13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
373+
13)----------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
374374

375375
# Verify results without optimization
376376
query TTTIR rowsort
@@ -422,7 +422,7 @@ physical_plan
422422
07)------------FilterExec: service@2 = log
423423
08)--------------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
424424
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension/data.parquet]]}, projection=[d_dkey, env, service], file_type=parquet, predicate=service@2 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
425-
10)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilter [ empty ]
425+
10)----------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
426426

427427
query TTTIR rowsort
428428
SELECT f.f_dkey, MAX(d.env), MAX(d.service), count(*), sum(f.value)
@@ -648,7 +648,7 @@ physical_plan
648648
06)----------RepartitionExec: partitioning=Hash([d_dkey@1], 3), input_partitions=3
649649
07)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet
650650
08)----------RepartitionExec: partitioning=Hash([f_dkey@1], 3), input_partitions=3
651-
09)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]
651+
09)------------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
652652

653653
query TTR rowsort
654654
SELECT f.f_dkey, d.env, sum(f.value)

datafusion/sqllogictest/test_files/projection_pushdown.slt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,7 +1457,7 @@ physical_plan
14571457
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
14581458
02)--FilterExec: __datafusion_extracted_1@0 > 150, projection=[id@1]
14591459
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, id], file_type=parquet
1460-
04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
1460+
04)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
14611461

14621462
# Verify correctness - id matches and value > 150
14631463
query II
@@ -1497,7 +1497,7 @@ physical_plan
14971497
02)--FilterExec: __datafusion_extracted_1@0 > 100, projection=[id@1]
14981498
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, id], file_type=parquet
14991499
04)--FilterExec: __datafusion_extracted_2@0 > 3, projection=[id@1]
1500-
05)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_2, id], file_type=parquet, predicate=DynamicFilter [ empty ]
1500+
05)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_2, id], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
15011501

15021502
# Verify correctness - id matches, value > 100, and level > 3
15031503
# Matching ids where value > 100: 2(200), 3(150), 4(300), 5(250)
@@ -1533,7 +1533,7 @@ physical_plan
15331533
01)ProjectionExec: expr=[id@1 as id, __datafusion_extracted_1@0 as simple_struct.s[label], __datafusion_extracted_2@2 as join_right.s[role]]
15341534
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@1, id@1)], projection=[__datafusion_extracted_1@0, id@1, __datafusion_extracted_2@2]
15351535
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, label) as __datafusion_extracted_1, id], file_type=parquet
1536-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, role) as __datafusion_extracted_2, id], file_type=parquet, predicate=DynamicFilter [ empty ]
1536+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, role) as __datafusion_extracted_2, id], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
15371537

15381538
# Verify correctness
15391539
query ITT
@@ -1565,7 +1565,7 @@ logical_plan
15651565
physical_plan
15661566
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@0, id@0)]
15671567
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id], file_type=parquet
1568-
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
1568+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
15691569

15701570
# Verify correctness
15711571
query II
@@ -1896,7 +1896,7 @@ physical_plan
18961896
01)ProjectionExec: expr=[__datafusion_extracted_3@1 as s.s[value], __datafusion_extracted_4@0 as j.s[role]]
18971897
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@2, id@2)], filter=__datafusion_extracted_1@1 > __datafusion_extracted_2@0, projection=[__datafusion_extracted_4@1, __datafusion_extracted_3@4]
18981898
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_2, get_field(s@1, role) as __datafusion_extracted_4, id], file_type=parquet
1899-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, get_field(s@1, value) as __datafusion_extracted_3, id], file_type=parquet, predicate=DynamicFilter [ empty ]
1899+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, get_field(s@1, value) as __datafusion_extracted_3, id], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
19001900

19011901
# Verify correctness - only admin roles match (ids 1 and 4)
19021902
query II
@@ -1932,7 +1932,7 @@ logical_plan
19321932
physical_plan
19331933
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(id@1, id@1)], filter=__datafusion_extracted_1@0 > __datafusion_extracted_2@1, projection=[id@1, id@3]
19341934
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, id], file_type=parquet
1935-
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_2, id], file_type=parquet, predicate=DynamicFilter [ empty ]
1935+
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]}, projection=[get_field(s@1, level) as __datafusion_extracted_2, id], file_type=parquet, predicate=Optional(DynamicFilter [ empty ])
19361936

19371937
# Verify correctness - all rows match since value >> level for all ids
19381938
# simple_struct: (1,100), (2,200), (3,150), (4,300), (5,250)

datafusion/sqllogictest/test_files/push_down_filter.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ physical_plan
303303
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
304304
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
305305
03)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
306-
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilter [ empty ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
306+
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND Optional(DynamicFilter [ empty ]), pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
307307

308308
statement ok
309309
drop table small_table;

0 commit comments

Comments
 (0)