Skip to content

Commit 7dea35a

Browse files
committed
implement_custom_stabby_friendly_ffi
1 parent 65aa893 commit 7dea35a

31 files changed

Lines changed: 353 additions & 227 deletions

Cargo.lock

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/ffi/ffi_example_table_provider/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ edition = { workspace = true }
2222
publish = false
2323

2424
[dependencies]
25-
abi_stable = "0.11.3"
2625
arrow = { workspace = true }
2726
datafusion = { workspace = true }
2827
datafusion-ffi = { workspace = true }

datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
use std::sync::Arc;
1919

20-
use abi_stable::{export_root_module, prefix_type::PrefixTypeTrait};
2120
use arrow::array::RecordBatch;
2221
use arrow::datatypes::{DataType, Field, Schema};
2322
use datafusion::{common::record_batch, datasource::MemTable};
2423
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2524
use datafusion_ffi::table_provider::FFI_TableProvider;
26-
use ffi_module_interface::{TableProviderModule, TableProviderModuleRef};
25+
use ffi_module_interface::TableProviderModule;
2726

2827
fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
2928
let end_value = start_value + num_values as i32;
@@ -56,11 +55,10 @@ extern "C" fn construct_simple_table_provider(
5655
FFI_TableProvider::new_with_ffi_codec(Arc::new(table_provider), true, None, codec)
5756
}
5857

59-
#[export_root_module]
58+
#[unsafe(no_mangle)]
6059
/// This defines the entry point for using the module.
61-
pub fn get_simple_memory_table() -> TableProviderModuleRef {
60+
pub unsafe extern "C" fn create_table_provider_module() -> TableProviderModule {
6261
TableProviderModule {
6362
create_table: construct_simple_table_provider,
6463
}
65-
.leak_into_prefix()
6664
}

datafusion-examples/examples/ffi/ffi_module_interface/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,4 @@ publish = false
2525
workspace = true
2626

2727
[dependencies]
28-
abi_stable = "0.11.3"
2928
datafusion-ffi = { workspace = true }

datafusion-examples/examples/ffi/ffi_module_interface/src/lib.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use abi_stable::{
19-
StableAbi, declare_root_module_statics,
20-
library::{LibraryError, RootModule},
21-
package_version_strings,
22-
sabi_types::VersionStrings,
23-
};
2418
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2519
use datafusion_ffi::table_provider::FFI_TableProvider;
2620

2721
#[repr(C)]
28-
#[derive(StableAbi)]
29-
#[sabi(kind(Prefix(prefix_ref = TableProviderModuleRef)))]
3022
/// This struct defines the module interfaces. It is to be shared by
3123
/// both the module loading program and library that implements the
3224
/// module. It is possible to move this definition into the loading
@@ -38,13 +30,10 @@ pub struct TableProviderModule {
3830
extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProvider,
3931
}
4032

41-
impl RootModule for TableProviderModuleRef {
42-
declare_root_module_statics! {TableProviderModuleRef}
43-
const BASE_NAME: &'static str = "ffi_example_table_provider";
44-
const NAME: &'static str = "ffi_example_table_provider";
45-
const VERSION_STRINGS: VersionStrings = package_version_strings!();
33+
/// # Safety
34+
/// This function is intended to be called from a dynamically loaded library.
35+
/// The caller must ensure proper initialization.
36+
pub type CreateTableProviderModuleFn = unsafe extern "C" fn() -> TableProviderModule;
4637

47-
fn initialization(self) -> Result<Self, LibraryError> {
48-
Ok(self)
49-
}
50-
}
38+
/// The symbol name that the module loader will look for when loading the library.
39+
pub const CREATE_TABLE_PROVIDER_MODULE_SYMBOL: &str = "create_table_provider_module";

datafusion/ffi/src/catalog_provider.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,14 @@ use datafusion_common::error::Result;
2424
use datafusion_proto::logical_plan::{
2525
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
2626
};
27-
use stabby::option::Option as StabbyOption;
28-
use stabby::result::Result as StabbyResult;
2927
use stabby::string::String as StabbyString;
3028
use stabby::vec::Vec as StabbyVec;
3129
use tokio::runtime::Handle;
3230

3331
use crate::execution::FFI_TaskContextProvider;
3432
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
3533
use crate::schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider};
36-
use crate::util::FFIResult;
34+
use crate::util::{FFIResult, FfiOption, FfiResult};
3735
use crate::{df_result, rresult_return};
3836

3937
/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
@@ -45,21 +43,21 @@ pub struct FFI_CatalogProvider {
4543
pub schema: unsafe extern "C" fn(
4644
provider: &Self,
4745
name: StabbyString,
48-
) -> StabbyOption<FFI_SchemaProvider>,
46+
) -> FfiOption<FFI_SchemaProvider>,
4947

5048
pub register_schema:
5149
unsafe extern "C" fn(
5250
provider: &Self,
5351
name: StabbyString,
5452
schema: &FFI_SchemaProvider,
55-
) -> FFIResult<StabbyOption<FFI_SchemaProvider>>,
53+
) -> FFIResult<FfiOption<FFI_SchemaProvider>>,
5654

5755
pub deregister_schema:
5856
unsafe extern "C" fn(
5957
provider: &Self,
6058
name: StabbyString,
6159
cascade: bool,
62-
) -> FFIResult<StabbyOption<FFI_SchemaProvider>>,
60+
) -> FFIResult<FfiOption<FFI_SchemaProvider>>,
6361

6462
pub logical_codec: FFI_LogicalExtensionCodec,
6563

@@ -119,7 +117,7 @@ unsafe extern "C" fn schema_names_fn_wrapper(
119117
unsafe extern "C" fn schema_fn_wrapper(
120118
provider: &FFI_CatalogProvider,
121119
name: StabbyString,
122-
) -> StabbyOption<FFI_SchemaProvider> {
120+
) -> FfiOption<FFI_SchemaProvider> {
123121
unsafe {
124122
let maybe_schema = provider.inner().schema(name.as_str());
125123
maybe_schema
@@ -138,7 +136,7 @@ unsafe extern "C" fn register_schema_fn_wrapper(
138136
provider: &FFI_CatalogProvider,
139137
name: StabbyString,
140138
schema: &FFI_SchemaProvider,
141-
) -> FFIResult<StabbyOption<FFI_SchemaProvider>> {
139+
) -> FFIResult<FfiOption<FFI_SchemaProvider>> {
142140
unsafe {
143141
let runtime = provider.runtime();
144142
let inner_provider = provider.inner();
@@ -155,23 +153,23 @@ unsafe extern "C" fn register_schema_fn_wrapper(
155153
})
156154
.into();
157155

158-
StabbyResult::Ok(returned_schema)
156+
FfiResult::Ok(returned_schema)
159157
}
160158
}
161159

162160
unsafe extern "C" fn deregister_schema_fn_wrapper(
163161
provider: &FFI_CatalogProvider,
164162
name: StabbyString,
165163
cascade: bool,
166-
) -> FFIResult<StabbyOption<FFI_SchemaProvider>> {
164+
) -> FFIResult<FfiOption<FFI_SchemaProvider>> {
167165
unsafe {
168166
let runtime = provider.runtime();
169167
let inner_provider = provider.inner();
170168

171169
let maybe_schema =
172170
rresult_return!(inner_provider.deregister_schema(name.as_str(), cascade));
173171

174-
StabbyResult::Ok(
172+
FfiResult::Ok(
175173
maybe_schema
176174
.map(|schema| {
177175
FFI_SchemaProvider::new_with_ffi_codec(

datafusion/ffi/src/catalog_provider_list.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ use datafusion_catalog::{CatalogProvider, CatalogProviderList};
2323
use datafusion_proto::logical_plan::{
2424
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
2525
};
26-
use stabby::option::Option as StabbyOption;
2726
use stabby::string::String as StabbyString;
2827
use stabby::vec::Vec as StabbyVec;
2928
use tokio::runtime::Handle;
3029

3130
use crate::catalog_provider::{FFI_CatalogProvider, ForeignCatalogProvider};
3231
use crate::execution::FFI_TaskContextProvider;
3332
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
33+
use crate::util::FfiOption;
3434

3535
/// A stable struct for sharing [`CatalogProviderList`] across FFI boundaries.
3636
#[repr(C)]
@@ -41,7 +41,7 @@ pub struct FFI_CatalogProviderList {
4141
&Self,
4242
name: StabbyString,
4343
catalog: &FFI_CatalogProvider,
44-
) -> StabbyOption<FFI_CatalogProvider>,
44+
) -> FfiOption<FFI_CatalogProvider>,
4545

4646
/// List of existing catalogs
4747
pub catalog_names: unsafe extern "C" fn(&Self) -> StabbyVec<StabbyString>,
@@ -50,7 +50,7 @@ pub struct FFI_CatalogProviderList {
5050
pub catalog: unsafe extern "C" fn(
5151
&Self,
5252
name: StabbyString,
53-
) -> StabbyOption<FFI_CatalogProvider>,
53+
) -> FfiOption<FFI_CatalogProvider>,
5454

5555
pub logical_codec: FFI_LogicalExtensionCodec,
5656

@@ -111,7 +111,7 @@ unsafe extern "C" fn register_catalog_fn_wrapper(
111111
provider: &FFI_CatalogProviderList,
112112
name: StabbyString,
113113
catalog: &FFI_CatalogProvider,
114-
) -> StabbyOption<FFI_CatalogProvider> {
114+
) -> FfiOption<FFI_CatalogProvider> {
115115
unsafe {
116116
let runtime = provider.runtime();
117117
let inner_provider = provider.inner();
@@ -133,7 +133,7 @@ unsafe extern "C" fn register_catalog_fn_wrapper(
133133
unsafe extern "C" fn catalog_fn_wrapper(
134134
provider: &FFI_CatalogProviderList,
135135
name: StabbyString,
136-
) -> StabbyOption<FFI_CatalogProvider> {
136+
) -> FfiOption<FFI_CatalogProvider> {
137137
unsafe {
138138
let runtime = provider.runtime();
139139
let inner_provider = provider.inner();

datafusion/ffi/src/config/extension_options.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ use std::ffi::c_void;
2121

2222
use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
2323
use datafusion_common::{Result, exec_err};
24-
use stabby::result::Result as StabbyResult;
24+
2525
use stabby::str::Str as StabbyStr;
2626
use stabby::string::String as StabbyString;
2727
use stabby::vec::Vec as StabbyVec;
2828

2929
use crate::df_result;
30+
use crate::util::{FFIResult, FfiResult};
3031

3132
/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries.
3233
///
@@ -50,7 +51,7 @@ pub struct FFI_ExtensionOptions {
5051
&mut Self,
5152
key: StabbyStr,
5253
value: StabbyStr,
53-
) -> StabbyResult<(), StabbyString>,
54+
) -> FFIResult<()>,
5455

5556
/// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
5657
pub entries: unsafe extern "C" fn(&Self) -> StabbyVec<(StabbyString, StabbyString)>,
@@ -98,11 +99,11 @@ unsafe extern "C" fn set_fn_wrapper(
9899
options: &mut FFI_ExtensionOptions,
99100
key: StabbyStr,
100101
value: StabbyStr,
101-
) -> StabbyResult<(), StabbyString> {
102+
) -> FFIResult<()> {
102103
let _ = options
103104
.inner_mut()
104105
.insert(key.as_str().into(), value.as_str().into());
105-
StabbyResult::Ok(())
106+
FfiResult::Ok(())
106107
}
107108

108109
unsafe extern "C" fn entries_fn_wrapper(

datafusion/ffi/src/execution/task_ctx.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ use datafusion_execution::runtime_env::RuntimeEnv;
2424
use datafusion_expr::{
2525
AggregateUDF, AggregateUDFImpl, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl,
2626
};
27-
use stabby::option::Option as StabbyOption;
27+
2828
use stabby::string::String as StabbyString;
2929
use stabby::vec::Vec as StabbyVec;
3030

3131
use crate::session::config::FFI_SessionConfig;
3232
use crate::udaf::FFI_AggregateUDF;
3333
use crate::udf::FFI_ScalarUDF;
3434
use crate::udwf::FFI_WindowUDF;
35+
use crate::util::FfiOption;
3536

3637
/// A stable struct for sharing [`TaskContext`] across FFI boundaries.
3738
#[repr(C)]
@@ -41,7 +42,7 @@ pub struct FFI_TaskContext {
4142
pub session_id: unsafe extern "C" fn(&Self) -> StabbyString,
4243

4344
/// Return the task ID.
44-
pub task_id: unsafe extern "C" fn(&Self) -> StabbyOption<StabbyString>,
45+
pub task_id: unsafe extern "C" fn(&Self) -> FfiOption<StabbyString>,
4546

4647
/// Return the session configuration.
4748
pub session_config: unsafe extern "C" fn(&Self) -> FFI_SessionConfig,
@@ -93,7 +94,7 @@ unsafe extern "C" fn session_id_fn_wrapper(ctx: &FFI_TaskContext) -> StabbyStrin
9394

9495
unsafe extern "C" fn task_id_fn_wrapper(
9596
ctx: &FFI_TaskContext,
96-
) -> StabbyOption<StabbyString> {
97+
) -> FfiOption<StabbyString> {
9798
unsafe {
9899
let ctx = ctx.inner();
99100
ctx.task_id().map(|s| s.as_str().into()).into()
@@ -202,7 +203,7 @@ impl From<FFI_TaskContext> for Arc<TaskContext> {
202203
let udf = <Arc<dyn ScalarUDFImpl>>::from(&kv_pair.1);
203204

204205
(
205-
kv_pair.0.into_string(),
206+
kv_pair.0.to_string(),
206207
Arc::new(ScalarUDF::new_from_shared_impl(udf)),
207208
)
208209
})
@@ -213,7 +214,7 @@ impl From<FFI_TaskContext> for Arc<TaskContext> {
213214
let udaf = <Arc<dyn AggregateUDFImpl>>::from(&kv_pair.1);
214215

215216
(
216-
kv_pair.0.into_string(),
217+
kv_pair.0.to_string(),
217218
Arc::new(AggregateUDF::new_from_shared_impl(udaf)),
218219
)
219220
})
@@ -224,7 +225,7 @@ impl From<FFI_TaskContext> for Arc<TaskContext> {
224225
let udwf = <Arc<dyn WindowUDFImpl>>::from(&kv_pair.1);
225226

226227
(
227-
kv_pair.0.into_string(),
228+
kv_pair.0.to_string(),
228229
Arc::new(WindowUDF::new_from_shared_impl(udwf)),
229230
)
230231
})

datafusion/ffi/src/execution_plan.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,10 @@ unsafe extern "C" fn children_fn_wrapper(
102102
let plan = &(*private_data).plan;
103103
let runtime = &(*private_data).runtime;
104104

105-
let children: Vec<_> = plan
106-
.children()
105+
plan.children()
107106
.into_iter()
108107
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
109-
.collect();
110-
111-
children.into()
108+
.collect()
112109
}
113110
}
114111

0 commit comments

Comments
 (0)