Skip to content

Commit 397a967

Browse files
authored
Merge branch 'main' into feat/sort-file-groups-by-statistics
2 parents f9de9be + a910b03 commit 397a967

7 files changed

Lines changed: 438 additions & 393 deletions

File tree

datafusion/datasource/src/memory.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,22 @@ impl DataSource for MemorySourceConfig {
243243
&exprs,
244244
self.projection().as_ref().unwrap_or(&all_projections),
245245
);
246-
247-
MemorySourceConfig::try_new(
248-
self.partitions(),
249-
self.original_schema(),
250-
Some(new_projections),
251-
)
252-
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
246+
let projected_schema =
247+
project_schema(&self.schema, Some(&new_projections));
248+
249+
projected_schema.map(|projected_schema| {
250+
// Clone self to preserve all metadata (fetch, sort_information,
251+
// show_sizes, etc.) then update only the projection-related fields.
252+
let mut new_source = self.clone();
253+
new_source.projection = Some(new_projections);
254+
new_source.projected_schema = projected_schema;
255+
// Project sort information to match the new projection
256+
new_source.sort_information = project_orderings(
257+
&new_source.sort_information,
258+
&new_source.projected_schema,
259+
);
260+
Arc::new(new_source) as Arc<dyn DataSource>
261+
})
253262
})
254263
.transpose()
255264
}
@@ -897,6 +906,42 @@ mod tests {
897906
Ok(())
898907
}
899908

909+
/// Test that `try_swapping_with_projection` preserves the `fetch` limit.
910+
/// Regression test for <https://github.com/apache/datafusion/issues/21176>
911+
#[test]
912+
fn try_swapping_with_projection_preserves_fetch() {
913+
use datafusion_physical_expr::projection::ProjectionExprs;
914+
915+
let schema = Arc::new(Schema::new(vec![
916+
Field::new("a", DataType::Int32, false),
917+
Field::new("b", DataType::Utf8, false),
918+
Field::new("c", DataType::Int64, false),
919+
]));
920+
let partitions: Vec<Vec<RecordBatch>> = vec![vec![batch(10)]];
921+
let source = MemorySourceConfig::try_new(&partitions, schema.clone(), None)
922+
.unwrap()
923+
.with_limit(Some(5));
924+
925+
assert_eq!(source.fetch, Some(5));
926+
927+
// Create a projection that reorders columns: [c, a] (indices 2, 0)
928+
let projection = ProjectionExprs::from_indices(&[2, 0], &schema);
929+
let swapped = source
930+
.try_swapping_with_projection(&projection)
931+
.unwrap()
932+
.unwrap();
933+
let new_source = swapped
934+
.as_any()
935+
.downcast_ref::<MemorySourceConfig>()
936+
.unwrap();
937+
938+
assert_eq!(
939+
new_source.fetch,
940+
Some(5),
941+
"fetch limit must be preserved after projection pushdown"
942+
);
943+
}
944+
900945
#[tokio::test]
901946
async fn values_empty_case() -> Result<()> {
902947
let schema = aggr_test_schema();

0 commit comments

Comments
 (0)