Skip to content

Commit d72b0b8

Browse files
authored
fix: preserve None projection semantics across FFI boundary in ForeignTableProvider::scan (#20393)
## Which issue does this PR close? N/A (newly discovered bug) This is originally found in apache/sedona-db when working on a custom plan node: apache/sedona-db#611 (comment) ## Rationale for this change `ForeignTableProvider::scan()` converts a `None` projection (meaning "return all columns") into an empty `RVec<usize>` before passing it across the FFI boundary. On the receiving side, `scan_fn_wrapper` always wraps the received `RVec` in `Some(...)`, passing `Some(&vec![])` to the inner `TableProvider::scan()`. This means "project zero columns" — the exact opposite of the intended "project all columns." The root cause is that the `FFI_TableProvider::scan` function signature uses `RVec<usize>` for the projections parameter. Since `RVec<usize>` cannot represent `None`, the `None` vs. empty-vec distinction is lost at the FFI boundary. ## What changes are included in this PR? Three coordinated changes in `datafusion/ffi/src/table_provider.rs`: 1. **FFI struct definition**: Changed `scan` function pointer signature from `RVec<usize>` to `ROption<RVec<usize>>` for the projections parameter, matching how `limit` already uses `ROption<usize>` for the same `None`-vs-value distinction. 2. **Receiver side** (`scan_fn_wrapper`): Converts `ROption<RVec<usize>>` via `.into_option().map(...)` and passes `projections.as_ref()` to the inner provider, preserving `None` semantics. 3. **Sender side** (`ForeignTableProvider::scan`): Converts `Option<&Vec<usize>>` to `ROption<RVec<usize>>` via `.into()` instead of using `unwrap_or_default()`. Plus a new unit test `test_scan_with_none_projection_returns_all_columns` that directly exercises the FFI round-trip with `projection=None` and verifies all 3 columns are returned. Also fixed the existing `test_aggregation` test to set `library_marker_id = mock_foreign_marker_id` so it actually exercises the FFI path instead of taking the local bypass. ## How are these changes tested? - New test `test_scan_with_none_projection_returns_all_columns`: creates a 3-column MemTable, wraps it through FFI with the foreign marker set, calls `scan(None)`, and asserts 3 columns are returned (previously returned 0). ## Are these changes safe? This is a **breaking FFI ABI change** to the `FFI_TableProvider::scan` function pointer signature. However: - The `abi_stable` crate's `#[derive(StableAbi)]` generates layout checks at dylib load time, so mismatched dylibs will be caught at load rather than causing silent corruption. - All existing providers construct `FFI_TableProvider` via `::new()` or `::new_with_ffi_codec()`, which internally wire up `scan_fn_wrapper` — nobody constructs the `scan` function pointer manually.
1 parent 33c922f commit d72b0b8

1 file changed

Lines changed: 69 additions & 8 deletions

File tree

datafusion/ffi/src/table_provider.rs

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub struct FFI_TableProvider {
108108
scan: unsafe extern "C" fn(
109109
provider: &Self,
110110
session: FFI_SessionRef,
111-
projections: RVec<usize>,
111+
projections: ROption<RVec<usize>>,
112112
filters_serialized: RVec<u8>,
113113
limit: ROption<usize>,
114114
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
@@ -232,7 +232,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
232232
unsafe extern "C" fn scan_fn_wrapper(
233233
provider: &FFI_TableProvider,
234234
session: FFI_SessionRef,
235-
projections: RVec<usize>,
235+
projections: ROption<RVec<usize>>,
236236
filters_serialized: RVec<u8>,
237237
limit: ROption<usize>,
238238
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
@@ -269,11 +269,12 @@ unsafe extern "C" fn scan_fn_wrapper(
269269
}
270270
};
271271

272-
let projections: Vec<_> = projections.into_iter().collect();
272+
let projections: Option<Vec<usize>> =
273+
projections.into_option().map(|p| p.into_iter().collect());
273274

274275
let plan = rresult_return!(
275276
internal_provider
276-
.scan(session, Some(&projections), &filters, limit.into())
277+
.scan(session, projections.as_ref(), &filters, limit.into())
277278
.await
278279
);
279280

@@ -461,8 +462,9 @@ impl TableProvider for ForeignTableProvider {
461462
) -> Result<Arc<dyn ExecutionPlan>> {
462463
let session = FFI_SessionRef::new(session, None, self.0.logical_codec.clone());
463464

464-
let projections: Option<RVec<usize>> =
465-
projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
465+
let projections: ROption<RVec<usize>> = projection
466+
.map(|p| p.iter().map(|v| v.to_owned()).collect())
467+
.into();
466468

467469
let codec: Arc<dyn LogicalExtensionCodec> = (&self.0.logical_codec).into();
468470
let filter_list = LogicalExprList {
@@ -474,7 +476,7 @@ impl TableProvider for ForeignTableProvider {
474476
let maybe_plan = (self.0.scan)(
475477
&self.0,
476478
session,
477-
projections.unwrap_or_default(),
479+
projections,
478480
filters_serialized,
479481
limit.into(),
480482
)
@@ -658,8 +660,9 @@ mod tests {
658660

659661
let provider = Arc::new(MemTable::try_new(schema, vec![vec![batch1]])?);
660662

661-
let ffi_provider =
663+
let mut ffi_provider =
662664
FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
665+
ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
663666

664667
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
665668

@@ -712,4 +715,62 @@ mod tests {
712715

713716
Ok(())
714717
}
718+
719+
#[tokio::test]
720+
async fn test_scan_with_none_projection_returns_all_columns() -> Result<()> {
721+
use arrow::datatypes::Field;
722+
use datafusion::arrow::array::Float32Array;
723+
use datafusion::arrow::datatypes::DataType;
724+
use datafusion::arrow::record_batch::RecordBatch;
725+
use datafusion::datasource::MemTable;
726+
use datafusion::physical_plan::collect;
727+
728+
let schema = Arc::new(Schema::new(vec![
729+
Field::new("a", DataType::Float32, false),
730+
Field::new("b", DataType::Float32, false),
731+
Field::new("c", DataType::Float32, false),
732+
]));
733+
734+
let batch = RecordBatch::try_new(
735+
Arc::clone(&schema),
736+
vec![
737+
Arc::new(Float32Array::from(vec![1.0, 2.0])),
738+
Arc::new(Float32Array::from(vec![3.0, 4.0])),
739+
Arc::new(Float32Array::from(vec![5.0, 6.0])),
740+
],
741+
)?;
742+
743+
let provider =
744+
Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
745+
746+
let ctx = Arc::new(SessionContext::new());
747+
let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn TaskContextProvider>;
748+
let task_ctx_provider = FFI_TaskContextProvider::from(&task_ctx_provider);
749+
750+
// Wrap in FFI and force the foreign path (not local bypass)
751+
let mut ffi_provider =
752+
FFI_TableProvider::new(provider, true, None, task_ctx_provider, None);
753+
ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
754+
755+
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_provider).into();
756+
757+
// Call scan with projection=None, meaning "return all columns"
758+
let plan = foreign_table_provider
759+
.scan(&ctx.state(), None, &[], None)
760+
.await?;
761+
assert_eq!(
762+
plan.schema().fields().len(),
763+
3,
764+
"scan(projection=None) should return all columns; got {}",
765+
plan.schema().fields().len()
766+
);
767+
768+
// Also verify we can execute and get correct data
769+
let batches = collect(plan, ctx.task_ctx()).await?;
770+
assert_eq!(batches.len(), 1);
771+
assert_eq!(batches[0].num_columns(), 3);
772+
assert_eq!(batches[0].num_rows(), 2);
773+
774+
Ok(())
775+
}
715776
}

0 commit comments

Comments
 (0)