feat: globally sort shared FileStream work queue on Inexact sort pushdown#60
Draft
feat: globally sort shared FileStream work queue on Inexact sort pushdown#60
Conversation
…down After apache#21182 sorts files per-group and apache#21351 lets partitions share a single work queue via `SharedWorkSource`, the per-group order is lost at the queue level — each group's files land as a contiguous block regardless of cross-group stats. For Inexact pushdown (e.g. DESC TopK with a dynamic filter) we want workers pulling files in globally best-first order so the filter threshold tightens fastest across the whole scan. Approach: carry a `work_order_hint: Option<LexOrdering>` on `FileScanConfig` — a "desired-but-not-required" ordering populated by the Inexact sort-pushdown branches. `SharedWorkSource::from_config` reads it and seeds the queue via a new `sort_files_globally_by_statistics` helper that flattens all files, builds one `MinMaxStatistics`, and orders by min value. Falls back to today's flat `flat_map(FileGroup::iter)` order when no hint is set or stats are unusable. `file_groups` composition is untouched, so `output_partitioning()` stays stable. Skipped on the reverse-scan path (`reverse_file_groups = true`): the per-group reversal must not be overridden by a globally ascending sort. A globally descending variant is a follow-up. Tests cover hint population on both Inexact branches, the reverse-scan leave-none case, and `SharedWorkSource::from_config` honouring the hint, falling back on missing stats, and preserving today's behaviour when no hint is present. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
After apache#21182 sorts files per-group and apache#21351 lets partitions share a single work queue via
SharedWorkSource, the per-group order is lost at the queue level — each group's files land as a contiguous block regardless of cross-group stats. For Inexact pushdown (the common case for TopK DESC with a dynamic filter) we want workers pulling files in globally best-first order so the filter threshold tightens fastest across the whole scan.Today's flow (Inexact, SharedWorkSource active):
Worker 0 gets
G0_lo(whatever its max is — not necessarily the global best). TopK's dynamic filter tightens based on whatever max the first-completed scan happens to produce, not the global max.What changes are included in this PR?
Carry a
work_order_hint: Option<LexOrdering>onFileScanConfig— a desired-but-not-required ordering populated by the Inexact sort-pushdown branches (inrebuild_with_sourceandtry_sort_file_groups_by_statistics).SharedWorkSource::from_configreads the hint and seeds the queue via a newsort_files_globally_by_statisticshelper that flattens all files, builds oneMinMaxStatistics, and orders by min value. Falls back to today'sflat_map(FileGroup::iter)order when no hint is set or stats are unusable.file_groupscomposition is untouched, sooutput_partitioning()stays stable — parallelism is preserved.The hint is skipped on the reverse-scan path (
reverse_file_groups = true): the per-group reversal must not be overridden by a globally ascending sort. A globally descending variant for that path is a possible follow-up.Files changed:
datafusion/datasource/src/file_scan_config/mod.rs— newwork_order_hintfield, threaded through builder,From<FileScanConfig>andbuild().datafusion/datasource/src/file_scan_config/sort_pushdown.rs— newsort_files_globally_by_statisticshelper; hint populated in both Inexact branches.datafusion/datasource/src/file_stream/work_source.rs—SharedWorkSource::from_configconsumes the hint; new unit tests for queue seeding.Are these changes tested?
Yes:
sort_pushdown_inexact_populates_work_order_hint— interleaved-stats multi-group Inexact scan; asserts the hint matches the requested ordering and partition count is preserved.sort_pushdown_rebuild_reverse_leaves_hint_none— reverse-scan path leaveswork_order_hint = None.from_config_without_hint_preserves_flat_group_order— no-hint case matches today's queue order exactly.from_config_with_hint_seeds_globally_sorted_queue— queue is drained in global min-ascending order across groups, not group-concatenation order.from_config_falls_back_when_stats_missing— any file without stats falls back to flat order without panicking.All 142
datafusion-datasourcelib tests pass. Clippy clean ondatafusion-datasource --all-targets --all-features.Broader tests (full workspace clippy, physical-optimizer integration,
sort_pushdown.slt) deferred to CI — local disk was exhausted by prior worktree build caches and the worktreetargetcouldn't finish compiling the SLT binary. No changes outsidedatafusion-datasourceso the blast radius is narrow.Are there any user-facing changes?
No.
work_order_hintispub(crate)and purely an internal scheduling hint; no API changes. Existing behaviour is preserved when the hint isNone.