Skip to content

Commit 19e9c06

Browse files
alambKontinuation
andauthored
[branch-53] fix: preserve None projection semantics across FFI boundary in ForeignTableProvider::scan (#20393) (#20895)
- Part of #19692 This PR: - Backports #20393 from @Kontinuation to the branch-53 line Co-authored-by: Kristin Cowalcijk <bo@wherobots.com>
1 parent 05e00ae commit 19e9c06

1 file changed

Lines changed: 69 additions & 8 deletions

File tree

datafusion/ffi/src/table_provider.rs

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub struct FFI_TableProvider {
108108
scan: unsafe extern "C" fn(
109109
provider: &Self,
110110
session: FFI_SessionRef,
111-
projections: RVec<usize>,
111+
projections: ROption<RVec<usize>>,
112112
filters_serialized: RVec<u8>,
113113
limit: ROption<usize>,
114114
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
@@ -232,7 +232,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
232232
unsafe extern "C" fn scan_fn_wrapper(
233233
provider: &FFI_TableProvider,
234234
session: FFI_SessionRef,
235-
projections: RVec<usize>,
235+
projections: ROption<RVec<usize>>,
236236
filters_serialized: RVec<u8>,
237237
limit: ROption<usize>,
238238
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
@@ -269,11 +269,12 @@ unsafe extern "C" fn scan_fn_wrapper(
269269
}
270270
};
271271

272-
let projections: Vec<_> = projections.into_iter().collect();
272+
let projections: Option<Vec<usize>> =
273+
projections.into_option().map(|p| p.into_iter().collect());
273274

274275
let plan = rresult_return!(
275276
internal_provider
276-
.scan(session, Some(&projections), &filters, limit.into())
277+
.scan(session, projections.as_ref(), &filters, limit.into())
277278
.await
278279
);
279280

@@ -461,8 +462,9 @@ impl TableProvider for ForeignTableProvider {
461462
) -> Result<Arc<dyn ExecutionPlan>> {
462463
let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
463464

464-
let projections: Option<RVec<usize>> =
465-
projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
465+
let projections: ROption<RVec<usize>> = projection
466+
.map(|p| p.iter().map(|v| v.to_owned()).collect())
467+
.into();
466468

467469
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
468470
let filter_list = LogicalExprList {
@@ -474,7 +476,7 @@ impl TableProvider for ForeignTableProvider {
474476
let maybe_plan = (self.0.scan)(
475477
&self.0,
476478
session,
477-
projections.unwrap_or_default(),
479+
projections,
478480
filters_serialized,
479481
limit.into(),
480482
)
@@ -658,8 +660,9 @@ mod tests {
658660

659661
let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1]])?);
660662

661-
let ffi_provider =
663+
let mut ffi_provider =
662664
FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
665+
ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
663666

664667
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
665668

@@ -712,4 +715,62 @@ mod tests {
712715

713716
Ok(())
714717
}
718+
719+
#[tokio::test]
720+
async fn test_scan_with_none_projection_returns_all_columns() -> Result<()> {
721+
use arrow::datatypes::Field;
722+
use datafusion::arrow::array::Float32Array;
723+
use datafusion::arrow::datatypes::DataType;
724+
use datafusion::arrow::record_batch::RecordBatch;
725+
use datafusion::datasource::MemTable;
726+
use datafusion::physical_plan::collect;
727+
728+
let schema = Arc::new(Schema::new(vec![
729+
Field::new("a", DataType::Float32, false),
730+
Field::new("b", DataType::Float32, false),
731+
Field::new("c", DataType::Float32, false),
732+
]));
733+
734+
let batch = RecordBatch::try_new(
735+
Arc::clone(&schema),
736+
vec![
737+
Arc::new(Float32Array::from(vec![1.0, 2.0])),
738+
Arc::new(Float32Array::from(vec![3.0, 4.0])),
739+
Arc::new(Float32Array::from(vec![5.0, 6.0])),
740+
],
741+
)?;
742+
743+
let provider =
744+
Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
745+
746+
let ctx = Arc::new(SessionContext::new());
747+
let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
748+
let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
749+
750+
// Wrap in FFI and force the foreign path (not local bypass)
751+
let mut ffi_provider =
752+
FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
753+
ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
754+
755+
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
756+
757+
// Call scan with projection=None, meaning "return all columns"
758+
let plan = foreign_table_provider
759+
.scan(&ctx.state(), None, &[], None)
760+
.await?;
761+
assert_eq!(
762+
plan.schema().fields().len(),
763+
3,
764+
"scan(projection=None) should return all columns; got {}",
765+
plan.schema().fields().len()
766+
);
767+
768+
// Also verify we can execute and get correct data
769+
let batches = collect(plan, ctx.task_ctx()).await?;
770+
assert_eq!(batches.len(), 1);
771+
assert_eq!(batches[0].num_columns(), 3);
772+
assert_eq!(batches[0].num_rows(), 2);
773+
774+
Ok(())
775+
}
715776
}

0 commit comments

Comments
 (0)