Skip to content

Commit c51b888

Browse files
committed
Implement FFI_ExtensionOptions
1 parent 132b043 commit c51b888

5 files changed

Lines changed: 397 additions & 33 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions
12561256
}
12571257

12581258
/// A key value pair, with a corresponding description
1259-
#[derive(Debug, Hash, PartialEq, Eq)]
1259+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
12601260
pub struct ConfigEntry {
12611261
/// A unique string to identify this config value
12621262
pub key: String,
@@ -1352,6 +1352,8 @@ impl ConfigField for ConfigOptions {
13521352
}
13531353
}
13541354

1355+
pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
1356+
13551357
impl ConfigOptions {
13561358
/// Creates a new [`ConfigOptions`] with default values
13571359
pub fn new() -> Self {
@@ -1366,12 +1368,12 @@ impl ConfigOptions {
13661368

13671369
/// Set a configuration option
13681370
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1369-
let Some((prefix, key)) = key.split_once('.') else {
1371+
let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
13701372
return _config_err!("could not find config namespace for key \"{key}\"");
13711373
};
13721374

13731375
if prefix == "datafusion" {
1374-
if key == "optimizer.enable_dynamic_filter_pushdown" {
1376+
if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
13751377
let bool_value = value.parse::<bool>().map_err(|e| {
13761378
DataFusionError::Configuration(format!(
13771379
"Failed to parse '{value}' as bool: {e}",
@@ -1386,13 +1388,23 @@ impl ConfigOptions {
13861388
}
13871389
return Ok(());
13881390
}
1389-
return ConfigField::set(self, key, value);
1391+
return ConfigField::set(self, inner_key, value);
1392+
}
1393+
1394+
if !self.extensions.0.contains_key(prefix)
1395+
&& self
1396+
.extensions
1397+
.0
1398+
.contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
1399+
{
1400+
inner_key = key;
1401+
prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
13901402
}
13911403

13921404
let Some(e) = self.extensions.0.get_mut(prefix) else {
13931405
return _config_err!("Could not find config namespace \"{prefix}\"");
13941406
};
1395-
e.0.set(key, value)
1407+
e.0.set(inner_key, value)
13961408
}
13971409

13981410
/// Create new [`ConfigOptions`], taking values from environment variables
@@ -2157,7 +2169,7 @@ impl TableOptions {
21572169
///
21582170
/// A result indicating success or failure in setting the configuration option.
21592171
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2160-
let Some((prefix, _)) = key.split_once('.') else {
2172+
let Some((mut prefix, _)) = key.split_once('.') else {
21612173
return _config_err!("could not find config namespace for key \"{key}\"");
21622174
};
21632175

@@ -2169,6 +2181,15 @@ impl TableOptions {
21692181
return Ok(());
21702182
}
21712183

2184+
if !self.extensions.0.contains_key(prefix)
2185+
&& self
2186+
.extensions
2187+
.0
2188+
.contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
2189+
{
2190+
prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
2191+
}
2192+
21722193
let Some(e) = self.extensions.0.get_mut(prefix) else {
21732194
return _config_err!("Could not find config namespace \"{prefix}\"");
21742195
};
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::collections::HashMap;
20+
use std::ffi::c_void;
21+
22+
use abi_stable::StableAbi;
23+
use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2};
24+
use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
25+
use datafusion_common::{Result, exec_err};
26+
27+
use crate::df_result;
28+
29+
/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries.
30+
///
31+
/// Unlike other FFI structs in this crate, we do not construct a foreign
32+
/// variant of this object. This is due to the typical method for interacting
33+
/// with extension options is by creating a local struct of your concrete type.
34+
/// To support this methodology use the `to_extension` method instead.
35+
///
36+
/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension
37+
/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys
38+
/// are stored with the full path prefix to avoid overwriting values when using
39+
/// multiple extensions.
40+
#[repr(C)]
41+
#[derive(Debug, StableAbi)]
42+
pub struct FFI_ExtensionOptions {
43+
/// Return a deep clone of this [`ExtensionOptions`]
44+
pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions,
45+
46+
/// Set the given `key`, `value` pair
47+
pub set:
48+
unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>,
49+
50+
/// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
51+
pub entries: unsafe extern "C" fn(&Self) -> RVec<Tuple2<RString, RString>>,
52+
53+
/// Release the memory of the private data when it is no longer being used.
54+
pub release: unsafe extern "C" fn(&mut Self),
55+
56+
/// Internal data. This is only to be accessed by the provider of the options.
57+
pub private_data: *mut c_void,
58+
}
59+
60+
unsafe impl Send for FFI_ExtensionOptions {}
61+
unsafe impl Sync for FFI_ExtensionOptions {}
62+
63+
pub struct ExtensionOptionsPrivateData {
64+
pub options: HashMap<String, String>,
65+
}
66+
67+
impl FFI_ExtensionOptions {
68+
#[inline]
69+
fn inner_mut(&mut self) -> &mut HashMap<String, String> {
70+
let private_data = self.private_data as *mut ExtensionOptionsPrivateData;
71+
unsafe { &mut (*private_data).options }
72+
}
73+
74+
#[inline]
75+
fn inner(&self) -> &HashMap<String, String> {
76+
let private_data = self.private_data as *const ExtensionOptionsPrivateData;
77+
unsafe { &(*private_data).options }
78+
}
79+
}
80+
81+
unsafe extern "C" fn cloned_fn_wrapper(
82+
options: &FFI_ExtensionOptions,
83+
) -> FFI_ExtensionOptions {
84+
options
85+
.inner()
86+
.iter()
87+
.map(|(k, v)| (k.to_owned(), v.to_owned()))
88+
.collect::<HashMap<String, String>>()
89+
.into()
90+
}
91+
92+
unsafe extern "C" fn set_fn_wrapper(
93+
options: &mut FFI_ExtensionOptions,
94+
key: RStr,
95+
value: RStr,
96+
) -> RResult<(), RString> {
97+
let _ = options.inner_mut().insert(key.into(), value.into());
98+
RResult::ROk(())
99+
}
100+
101+
unsafe extern "C" fn entries_fn_wrapper(
102+
options: &FFI_ExtensionOptions,
103+
) -> RVec<Tuple2<RString, RString>> {
104+
options
105+
.inner()
106+
.iter()
107+
.map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into())
108+
.collect()
109+
}
110+
111+
unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) {
112+
let private_data = unsafe {
113+
Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData)
114+
};
115+
drop(private_data);
116+
}
117+
118+
impl Default for FFI_ExtensionOptions {
119+
fn default() -> Self {
120+
HashMap::new().into()
121+
}
122+
}
123+
124+
impl From<HashMap<String, String>> for FFI_ExtensionOptions {
125+
fn from(options: HashMap<String, String>) -> Self {
126+
let private_data = ExtensionOptionsPrivateData { options };
127+
128+
Self {
129+
cloned: cloned_fn_wrapper,
130+
set: set_fn_wrapper,
131+
entries: entries_fn_wrapper,
132+
release: release_fn_wrapper,
133+
private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
134+
}
135+
}
136+
}
137+
138+
impl Drop for FFI_ExtensionOptions {
139+
fn drop(&mut self) {
140+
unsafe { (self.release)(self) }
141+
}
142+
}
143+
144+
impl Clone for FFI_ExtensionOptions {
145+
fn clone(&self) -> Self {
146+
unsafe { (self.cloned)(self) }
147+
}
148+
}
149+
150+
impl ConfigExtension for FFI_ExtensionOptions {
151+
const PREFIX: &'static str =
152+
datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE;
153+
}
154+
155+
impl ExtensionOptions for FFI_ExtensionOptions {
156+
fn as_any(&self) -> &dyn Any {
157+
self
158+
}
159+
160+
fn as_any_mut(&mut self) -> &mut dyn Any {
161+
self
162+
}
163+
164+
fn cloned(&self) -> Box<dyn ExtensionOptions> {
165+
let ffi_options = unsafe { (self.cloned)(self) };
166+
Box::new(ffi_options)
167+
}
168+
169+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
170+
if key.split_once('.').is_none() {
171+
return exec_err!("Unable to set FFI config value without namespace set");
172+
};
173+
174+
df_result!(unsafe { (self.set)(self, key.into(), value.into()) })
175+
}
176+
177+
fn entries(&self) -> Vec<ConfigEntry> {
178+
unsafe {
179+
(self.entries)(self)
180+
.into_iter()
181+
.map(|entry_tuple| ConfigEntry {
182+
key: entry_tuple.0.into(),
183+
value: Some(entry_tuple.1.into()),
184+
description: "ffi_config_options",
185+
})
186+
.collect()
187+
}
188+
}
189+
}
190+
191+
impl FFI_ExtensionOptions {
192+
/// Add all of the values in a concrete configuration extension to the
193+
/// FFI variant. This is safe to call on either side of the FFI
194+
/// boundary.
195+
pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()> {
196+
for entry in config.entries() {
197+
if let Some(value) = entry.value {
198+
let key = format!("{}.{}", C::PREFIX, entry.key);
199+
self.set(key.as_str(), value.as_str())?;
200+
}
201+
}
202+
203+
Ok(())
204+
}
205+
206+
/// Merge another `FFI_ExtensionOptions` configurations into this one.
207+
/// This is safe to call on either side of the FFI boundary.
208+
pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> {
209+
for entry in other.entries() {
210+
if let Some(value) = entry.value {
211+
self.set(entry.key.as_str(), value.as_str())?;
212+
}
213+
}
214+
Ok(())
215+
}
216+
217+
/// Create a concrete extension type from the FFI variant.
218+
/// This is safe to call on either side of the FFI boundary.
219+
pub fn to_extension<C: ConfigExtension + Default>(&self) -> Result<C> {
220+
let mut result = C::default();
221+
222+
unsafe {
223+
for entry in (self.entries)(self) {
224+
let key = entry.0.as_str();
225+
let value = entry.1.as_str();
226+
227+
if let Some((prefix, inner_key)) = key.split_once('.')
228+
&& prefix == C::PREFIX
229+
{
230+
result.set(inner_key, value)?;
231+
}
232+
}
233+
}
234+
235+
Ok(result)
236+
}
237+
}
238+
239+
#[cfg(test)]
240+
mod tests {
241+
use datafusion_common::config::{ConfigExtension, ConfigOptions};
242+
use datafusion_common::extensions_options;
243+
244+
use crate::config::extension_options::FFI_ExtensionOptions;
245+
246+
// Define a new configuration struct using the `extensions_options` macro
247+
extensions_options! {
248+
/// My own config options.
249+
pub struct MyConfig {
250+
/// Should "foo" be replaced by "bar"?
251+
pub foo_to_bar: bool, default = true
252+
253+
/// How many "baz" should be created?
254+
pub baz_count: usize, default = 1337
255+
}
256+
}
257+
258+
impl ConfigExtension for MyConfig {
259+
const PREFIX: &'static str = "my_config";
260+
}
261+
262+
#[test]
263+
fn round_trip_ffi_extension_options() {
264+
// set up config struct and register extension
265+
let mut config = ConfigOptions::default();
266+
let mut ffi_options = FFI_ExtensionOptions::default();
267+
ffi_options.add_config(&MyConfig::default()).unwrap();
268+
269+
config.extensions.insert(ffi_options);
270+
271+
// overwrite config default
272+
config.set("my_config.baz_count", "42").unwrap();
273+
274+
// check config state
275+
let returned_ffi_config =
276+
config.extensions.get::<FFI_ExtensionOptions>().unwrap();
277+
let my_config: MyConfig = returned_ffi_config.to_extension().unwrap();
278+
279+
// check default value
280+
assert!(my_config.foo_to_bar);
281+
282+
// check overwritten value
283+
assert_eq!(my_config.baz_count, 42);
284+
}
285+
}

0 commit comments

Comments
 (0)