Skip to content

Commit 030621c

Browse files
authored
refactor: Rename FileSource::try_reverse_output to FileSource::try_pushdown_sort (#20043)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19723. ## Rationale for this change - The API is not “reverse-only”: it takes an arbitrary requested ordering and can accept/deny/partially match it, so try_pushdown_sort is a more accurate name. - FileScanConfig::rebuild_with_source shouldn’t assume reversal; reversal is only valid when it helps satisfy the caller’s requested order. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Renames FileSource::try_reverse_output to FileSource::try_pushdown_sort (keeps try_reverse_output as a deprecated shim for compatibility). - Updates Parquet’s implementation and internal call sites to use the new name. - Updates FileScanConfig::rebuild_with_source to take the caller’s requested ordering and only reverse file_groups when the request is actually a reverse of the current output_ordering (adds a unit test). <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? - Yes: FileSource implementers should prefer try_pushdown_sort; try_reverse_output remains but is deprecated. - Slight behavioral change: file group reversal during sort pushdown is now conditional instead of always reversing <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent d28a03c commit 030621c

4 files changed

Lines changed: 174 additions & 19 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ impl FileOpener for ParquetOpener {
581581

582582
// ----------------------------------------------------------
583583
// Step: potentially reverse the access plan for performance.
584-
// See `ParquetSource::try_reverse_output` for the rationale.
584+
// See `ParquetSource::try_pushdown_sort` for the rationale.
585585
// ----------------------------------------------------------
586586
if reverse_row_groups {
587587
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -757,7 +757,7 @@ impl FileSource for ParquetSource {
757757
/// # Returns
758758
/// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
759759
/// - `Unsupported`: Cannot optimize for this ordering
760-
fn try_reverse_output(
760+
fn try_pushdown_sort(
761761
&self,
762762
order: &[PhysicalSortExpr],
763763
eq_properties: &EquivalenceProperties,

datafusion/datasource/src/file.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,29 @@ pub trait FileSource: Send + Sync {
189189
/// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
190190
/// * `Unsupported` - Cannot optimize for this ordering
191191
///
192-
/// Default implementation returns `Unsupported`.
192+
/// # Deprecation / migration notes
193+
/// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `53.0.0`.
194+
/// Per DataFusion's deprecation guidelines, it will be removed in `59.0.0` or later
195+
/// (6 major versions or 6 months, whichever is longer).
196+
/// - New implementations should override [`Self::try_pushdown_sort`] directly.
197+
/// - For backwards compatibility, the default implementation of
198+
/// [`Self::try_pushdown_sort`] delegates to the deprecated
199+
/// [`Self::try_reverse_output`] until it is removed. After that point, the
200+
/// default implementation will return [`SortOrderPushdownResult::Unsupported`].
201+
fn try_pushdown_sort(
202+
&self,
203+
order: &[PhysicalSortExpr],
204+
eq_properties: &EquivalenceProperties,
205+
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
206+
#[expect(deprecated)]
207+
self.try_reverse_output(order, eq_properties)
208+
}
209+
210+
/// Deprecated: Renamed to [`Self::try_pushdown_sort`].
211+
#[deprecated(
212+
since = "53.0.0",
213+
note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later."
214+
)]
193215
fn try_reverse_output(
194216
&self,
195217
_order: &[PhysicalSortExpr],
@@ -232,7 +254,7 @@ pub trait FileSource: Send + Sync {
232254
/// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
233255
/// See `upgrading.md` for more details.
234256
#[deprecated(
235-
since = "52.0.0",
257+
since = "53.0.0",
236258
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
237259
)]
238260
#[expect(deprecated)]
@@ -250,7 +272,7 @@ pub trait FileSource: Send + Sync {
250272
/// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
251273
/// See `upgrading.md` for more details.
252274
#[deprecated(
253-
since = "52.0.0",
275+
since = "53.0.0",
254276
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
255277
)]
256278
#[expect(deprecated)]

datafusion/datasource/src/file_scan_config.rs

Lines changed: 147 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -851,20 +851,20 @@ impl DataSource for FileScanConfig {
851851
&self,
852852
order: &[PhysicalSortExpr],
853853
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
854-
// Delegate to FileSource to check if reverse scanning can satisfy the request.
854+
// Delegate to FileSource to see if it can optimize for the requested ordering.
855855
let pushdown_result = self
856856
.file_source
857-
.try_reverse_output(order, &self.eq_properties())?;
857+
.try_pushdown_sort(order, &self.eq_properties())?;
858858

859859
match pushdown_result {
860860
SortOrderPushdownResult::Exact { inner } => {
861861
Ok(SortOrderPushdownResult::Exact {
862-
inner: self.rebuild_with_source(inner, true)?,
862+
inner: self.rebuild_with_source(inner, true, order)?,
863863
})
864864
}
865865
SortOrderPushdownResult::Inexact { inner } => {
866866
Ok(SortOrderPushdownResult::Inexact {
867-
inner: self.rebuild_with_source(inner, false)?,
867+
inner: self.rebuild_with_source(inner, false, order)?,
868868
})
869869
}
870870
SortOrderPushdownResult::Unsupported => {
@@ -1157,19 +1157,44 @@ impl FileScanConfig {
11571157
&self,
11581158
new_file_source: Arc<dyn FileSource>,
11591159
is_exact: bool,
1160+
order: &[PhysicalSortExpr],
11601161
) -> Result<Arc<dyn DataSource>> {
11611162
let mut new_config = self.clone();
11621163

1163-
// Reverse file groups (FileScanConfig's responsibility)
1164-
new_config.file_groups = new_config
1165-
.file_groups
1166-
.into_iter()
1167-
.map(|group| {
1168-
let mut files = group.into_inner();
1169-
files.reverse();
1170-
files.into()
1171-
})
1172-
.collect();
1164+
// Reverse file order (within each group) if the caller is requesting a reversal of this
1165+
// scan's declared output ordering.
1166+
//
1167+
// Historically this function always reversed `file_groups` because it was only reached
1168+
// via `FileSource::try_reverse_output` (where a reversal was the only supported
1169+
// optimization).
1170+
//
1171+
// Now that `FileSource::try_pushdown_sort` is generic, we must not assume reversal: other
1172+
// optimizations may become possible (e.g. already-sorted data, statistics-based file
1173+
// reordering). Therefore we only reverse files when it is known to help satisfy the
1174+
// requested ordering.
1175+
let reverse_file_groups = if self.output_ordering.is_empty() {
1176+
false
1177+
} else if let Some(requested) = LexOrdering::new(order.iter().cloned()) {
1178+
let projected_schema = self.projected_schema()?;
1179+
let orderings = project_orderings(&self.output_ordering, &projected_schema);
1180+
orderings
1181+
.iter()
1182+
.any(|ordering| ordering.is_reverse(&requested))
1183+
} else {
1184+
false
1185+
};
1186+
1187+
if reverse_file_groups {
1188+
new_config.file_groups = new_config
1189+
.file_groups
1190+
.into_iter()
1191+
.map(|group| {
1192+
let mut files = group.into_inner();
1193+
files.reverse();
1194+
files.into()
1195+
})
1196+
.collect();
1197+
}
11731198

11741199
new_config.file_source = new_file_source;
11751200

@@ -1392,6 +1417,62 @@ mod tests {
13921417
use datafusion_physical_expr::projection::ProjectionExpr;
13931418
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
13941419

1420+
#[derive(Clone)]
1421+
struct InexactSortPushdownSource {
1422+
metrics: ExecutionPlanMetricsSet,
1423+
table_schema: TableSchema,
1424+
}
1425+
1426+
impl InexactSortPushdownSource {
1427+
fn new(table_schema: TableSchema) -> Self {
1428+
Self {
1429+
metrics: ExecutionPlanMetricsSet::new(),
1430+
table_schema,
1431+
}
1432+
}
1433+
}
1434+
1435+
impl FileSource for InexactSortPushdownSource {
1436+
fn create_file_opener(
1437+
&self,
1438+
_object_store: Arc<dyn object_store::ObjectStore>,
1439+
_base_config: &FileScanConfig,
1440+
_partition: usize,
1441+
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
1442+
unimplemented!()
1443+
}
1444+
1445+
fn as_any(&self) -> &dyn Any {
1446+
self
1447+
}
1448+
1449+
fn table_schema(&self) -> &TableSchema {
1450+
&self.table_schema
1451+
}
1452+
1453+
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
1454+
Arc::new(self.clone())
1455+
}
1456+
1457+
fn metrics(&self) -> &ExecutionPlanMetricsSet {
1458+
&self.metrics
1459+
}
1460+
1461+
fn file_type(&self) -> &str {
1462+
"mock"
1463+
}
1464+
1465+
fn try_pushdown_sort(
1466+
&self,
1467+
_order: &[PhysicalSortExpr],
1468+
_eq_properties: &EquivalenceProperties,
1469+
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
1470+
Ok(SortOrderPushdownResult::Inexact {
1471+
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
1472+
})
1473+
}
1474+
}
1475+
13951476
#[test]
13961477
fn physical_plan_config_no_projection_tab_cols_as_field() {
13971478
let file_schema = aggr_test_schema();
@@ -2337,4 +2418,56 @@ mod tests {
23372418
_ => panic!("Expected Hash partitioning"),
23382419
}
23392420
}
2421+
2422+
#[test]
2423+
fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
2424+
-> Result<()> {
2425+
let file_schema =
2426+
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
2427+
2428+
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2429+
let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2430+
2431+
let file_groups = vec![FileGroup::new(vec![
2432+
PartitionedFile::new("file1", 1),
2433+
PartitionedFile::new("file2", 1),
2434+
])];
2435+
2436+
let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2437+
let config =
2438+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2439+
.with_file_groups(file_groups)
2440+
.with_output_ordering(vec![
2441+
LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
2442+
])
2443+
.build();
2444+
2445+
let requested_asc = vec![sort_expr_asc.clone()];
2446+
let result = config.try_pushdown_sort(&requested_asc)?;
2447+
let SortOrderPushdownResult::Inexact { inner } = result else {
2448+
panic!("Expected Inexact result");
2449+
};
2450+
let pushed_config = inner
2451+
.as_any()
2452+
.downcast_ref::<FileScanConfig>()
2453+
.expect("Expected FileScanConfig");
2454+
let pushed_files = pushed_config.file_groups[0].files();
2455+
assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
2456+
assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");
2457+
2458+
let requested_desc = vec![sort_expr_asc.reverse()];
2459+
let result = config.try_pushdown_sort(&requested_desc)?;
2460+
let SortOrderPushdownResult::Inexact { inner } = result else {
2461+
panic!("Expected Inexact result");
2462+
};
2463+
let pushed_config = inner
2464+
.as_any()
2465+
.downcast_ref::<FileScanConfig>()
2466+
.expect("Expected FileScanConfig");
2467+
let pushed_files = pushed_config.file_groups[0].files();
2468+
assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
2469+
assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");
2470+
2471+
Ok(())
2472+
}
23402473
}

0 commit comments

Comments
 (0)