Skip to content

Commit ab28234

Browse files
authored
Support columns_sorted in row_filters (#20497)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #3476. ## Rationale for this change Improving predicate ordering for predicate pushdown <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? Building on changes from #3477 and #7528 - Implement the `columns_sorted` function - Change `should_enable_page_index` to use index when choose to reorder predicates in config <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes, unit tests <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 1f59d32 commit ab28234

2 files changed

Lines changed: 7 additions & 33 deletions

File tree

benchmarks/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -879,13 +879,13 @@ The benchmark includes queries that:
879879

880880
The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
881881
```bash
882-
./bench.sh data data_sorted_clickbench
882+
./bench.sh data clickbench_sorted
883883
```
884884

885885
To create the sorted dataset, for example with 16GB of memory, run:
886886

887887
```bash
888-
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
888+
DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted
889889
```
890890

891891
This command will:
@@ -896,7 +896,7 @@ This command will:
896896
#### Running the Benchmark
897897

898898
```bash
899-
./bench.sh run data_sorted_clickbench
899+
./bench.sh run clickbench_sorted
900900
```
901901

902902
This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,9 @@
5050
//! 2. Determine whether each predicate can be evaluated as an `ArrowPredicate`.
5151
//! 3. Determine, for each predicate, the total compressed size of all
5252
//! columns required to evaluate the predicate.
53-
//! 4. Determine, for each predicate, whether all columns required to
54-
//! evaluate the expression are sorted.
55-
//! 5. Re-order the predicate by total size (from step 3).
56-
//! 6. Partition the predicates according to whether they are sorted (from step 4)
57-
//! 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
58-
//! 8. Build the `RowFilter` with the sorted predicates followed by
59-
//! the unsorted predicates. Within each partition, predicates are
60-
//! still be sorted by size.
53+
//! 4. Re-order predicates by total size (from step 3).
54+
//! 5. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
55+
//! 6. Build the `RowFilter` from the ordered predicates.
6156
//!
6257
//! List-aware predicates (for example, `array_has`, `array_has_all`, and
6358
//! `array_has_any`) can be evaluated directly during Parquet decoding.
@@ -70,7 +65,6 @@
7065
//! - `WHERE s['value'] > 5` — pushed down (accesses a primitive leaf)
7166
//! - `WHERE s IS NOT NULL` — not pushed down (references the whole struct)
7267
73-
use std::cmp::Ordering;
7468
use std::collections::BTreeSet;
7569
use std::sync::Arc;
7670

@@ -197,8 +191,6 @@ pub(crate) struct FilterCandidate {
197191
/// the filter and to order the filters when `reorder_predicates` is true.
198192
/// This is generated by summing the compressed size of all columns that the filter references.
199193
required_bytes: usize,
200-
/// Can this filter use an index (e.g. a page index) to prune rows?
201-
can_use_index: bool,
202194
/// Column indices into the parquet file schema required to evaluate this filter.
203195
projection: LeafProjection,
204196
/// The Arrow schema containing only the columns required by this filter,
@@ -273,12 +265,9 @@ impl FilterCandidateBuilder {
273265
);
274266

275267
let required_bytes = size_of_columns(&leaf_indices, metadata)?;
276-
let can_use_index = columns_sorted(&leaf_indices, metadata)?;
277-
278268
Ok(Some(FilterCandidate {
279269
expr: self.expr,
280270
required_bytes,
281-
can_use_index,
282271
projection: LeafProjection { leaf_indices },
283272
filter_schema: projected_schema,
284273
}))
@@ -817,16 +806,6 @@ fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usiz
817806
Ok(total_size)
818807
}
819808

820-
/// For a given set of `Column`s required for predicate `Expr` determine whether
821-
/// all columns are sorted.
822-
///
823-
/// Sorted columns may be queried more efficiently in the presence of
824-
/// a PageIndex.
825-
fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
826-
// TODO How do we know this?
827-
Ok(false)
828-
}
829-
830809
/// Build a [`RowFilter`] from the given predicate expression if possible.
831810
///
832811
/// # Arguments
@@ -881,12 +860,7 @@ pub fn build_row_filter(
881860
}
882861

883862
if reorder_predicates {
884-
candidates.sort_unstable_by(|c1, c2| {
885-
match c1.can_use_index.cmp(&c2.can_use_index) {
886-
Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
887-
ord => ord,
888-
}
889-
});
863+
candidates.sort_unstable_by_key(|c| c.required_bytes);
890864
}
891865

892866
// To avoid double-counting metrics when multiple predicates are used:

0 commit comments

Comments
 (0)