Skip to content

Commit b774113

Browse files
committed
Implement FFI_ExtensionOptions
1 parent 566bcde commit b774113

5 files changed

Lines changed: 398 additions & 33 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,7 +1231,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions
12311231
}
12321232

12331233
/// A key value pair, with a corresponding description
1234-
#[derive(Debug, Hash, PartialEq, Eq)]
1234+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
12351235
pub struct ConfigEntry {
12361236
/// A unique string to identify this config value
12371237
pub key: String,
@@ -1327,6 +1327,8 @@ impl ConfigField for ConfigOptions {
13271327
}
13281328
}
13291329

1330+
pub const DATAFUSION_FFI_CONFIG_NAMESPACE: &str = "datafusion_ffi";
1331+
13301332
impl ConfigOptions {
13311333
/// Creates a new [`ConfigOptions`] with default values
13321334
pub fn new() -> Self {
@@ -1341,12 +1343,12 @@ impl ConfigOptions {
13411343

13421344
/// Set a configuration option
13431345
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1344-
let Some((prefix, key)) = key.split_once('.') else {
1346+
let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
13451347
return _config_err!("could not find config namespace for key \"{key}\"");
13461348
};
13471349

13481350
if prefix == "datafusion" {
1349-
if key == "optimizer.enable_dynamic_filter_pushdown" {
1351+
if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
13501352
let bool_value = value.parse::<bool>().map_err(|e| {
13511353
DataFusionError::Configuration(format!(
13521354
"Failed to parse '{value}' as bool: {e}",
@@ -1361,13 +1363,23 @@ impl ConfigOptions {
13611363
}
13621364
return Ok(());
13631365
}
1364-
return ConfigField::set(self, key, value);
1366+
return ConfigField::set(self, inner_key, value);
1367+
}
1368+
1369+
if !self.extensions.0.contains_key(prefix)
1370+
&& self
1371+
.extensions
1372+
.0
1373+
.contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
1374+
{
1375+
inner_key = key;
1376+
prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
13651377
}
13661378

13671379
let Some(e) = self.extensions.0.get_mut(prefix) else {
13681380
return _config_err!("Could not find config namespace \"{prefix}\"");
13691381
};
1370-
e.0.set(key, value)
1382+
e.0.set(inner_key, value)
13711383
}
13721384

13731385
/// Create new [`ConfigOptions`], taking values from environment variables
@@ -2132,7 +2144,7 @@ impl TableOptions {
21322144
///
21332145
/// A result indicating success or failure in setting the configuration option.
21342146
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2135-
let Some((prefix, _)) = key.split_once('.') else {
2147+
let Some((mut prefix, _)) = key.split_once('.') else {
21362148
return _config_err!("could not find config namespace for key \"{key}\"");
21372149
};
21382150

@@ -2144,6 +2156,15 @@ impl TableOptions {
21442156
return Ok(());
21452157
}
21462158

2159+
if !self.extensions.0.contains_key(prefix)
2160+
&& self
2161+
.extensions
2162+
.0
2163+
.contains_key(DATAFUSION_FFI_CONFIG_NAMESPACE)
2164+
{
2165+
prefix = DATAFUSION_FFI_CONFIG_NAMESPACE;
2166+
}
2167+
21472168
let Some(e) = self.extensions.0.get_mut(prefix) else {
21482169
return _config_err!("Could not find config namespace \"{prefix}\"");
21492170
};
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::collections::HashMap;
20+
use std::ffi::c_void;
21+
22+
use abi_stable::StableAbi;
23+
use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2};
24+
use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
25+
use datafusion_common::{Result, exec_err};
26+
27+
use crate::df_result;
28+
29+
/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries.
30+
///
31+
/// Unlike other FFI structs in this crate, we do not construct a foreign
32+
/// variant of this object. This is due to the typical method for interacting
33+
/// with extension options is by creating a local struct of your concrete type.
34+
/// To support this methodology use the `to_extension` method instead.
35+
///
36+
/// When using [`FFI_ExtensionOptions`] with multiple extensions, all extension
37+
/// values are stored on a single [`FFI_ExtensionOptions`] object. The keys
38+
/// are stored with the full path prefix to avoid overwriting values when using
39+
/// multiple extensions.
40+
#[repr(C)]
41+
#[derive(Debug, StableAbi)]
42+
pub struct FFI_ExtensionOptions {
43+
/// Return a deep clone of this [`ExtensionOptions`]
44+
pub cloned: unsafe extern "C" fn(&Self) -> FFI_ExtensionOptions,
45+
46+
/// Set the given `key`, `value` pair
47+
pub set:
48+
unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>,
49+
50+
/// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
51+
pub entries: unsafe extern "C" fn(&Self) -> RVec<Tuple2<RString, RString>>,
52+
53+
/// Release the memory of the private data when it is no longer being used.
54+
pub release: unsafe extern "C" fn(&mut Self),
55+
56+
/// Internal data. This is only to be accessed by the provider of the options.
57+
/// A [`ForeignExtensionOptions`] should never attempt to access this data.
58+
pub private_data: *mut c_void,
59+
}
60+
61+
unsafe impl Send for FFI_ExtensionOptions {}
62+
unsafe impl Sync for FFI_ExtensionOptions {}
63+
64+
pub struct ExtensionOptionsPrivateData {
65+
pub options: HashMap<String, String>,
66+
}
67+
68+
impl FFI_ExtensionOptions {
69+
#[inline]
70+
fn inner_mut(&mut self) -> &mut HashMap<String, String> {
71+
let private_data = self.private_data as *mut ExtensionOptionsPrivateData;
72+
unsafe { &mut (*private_data).options }
73+
}
74+
75+
#[inline]
76+
fn inner(&self) -> &HashMap<String, String> {
77+
let private_data = self.private_data as *const ExtensionOptionsPrivateData;
78+
unsafe { &(*private_data).options }
79+
}
80+
}
81+
82+
unsafe extern "C" fn cloned_fn_wrapper(
83+
options: &FFI_ExtensionOptions,
84+
) -> FFI_ExtensionOptions {
85+
options
86+
.inner()
87+
.iter()
88+
.map(|(k, v)| (k.to_owned(), v.to_owned()))
89+
.collect::<HashMap<String, String>>()
90+
.into()
91+
}
92+
93+
unsafe extern "C" fn set_fn_wrapper(
94+
options: &mut FFI_ExtensionOptions,
95+
key: RStr,
96+
value: RStr,
97+
) -> RResult<(), RString> {
98+
let _ = options.inner_mut().insert(key.into(), value.into());
99+
RResult::ROk(())
100+
}
101+
102+
unsafe extern "C" fn entries_fn_wrapper(
103+
options: &FFI_ExtensionOptions,
104+
) -> RVec<Tuple2<RString, RString>> {
105+
options
106+
.inner()
107+
.iter()
108+
.map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into())
109+
.collect()
110+
}
111+
112+
unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) {
113+
let private_data = unsafe {
114+
Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData)
115+
};
116+
drop(private_data);
117+
}
118+
119+
impl Default for FFI_ExtensionOptions {
120+
fn default() -> Self {
121+
HashMap::new().into()
122+
}
123+
}
124+
125+
impl From<HashMap<String, String>> for FFI_ExtensionOptions {
126+
fn from(options: HashMap<String, String>) -> Self {
127+
let private_data = ExtensionOptionsPrivateData { options };
128+
129+
Self {
130+
cloned: cloned_fn_wrapper,
131+
set: set_fn_wrapper,
132+
entries: entries_fn_wrapper,
133+
release: release_fn_wrapper,
134+
private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
135+
}
136+
}
137+
}
138+
139+
impl Drop for FFI_ExtensionOptions {
140+
fn drop(&mut self) {
141+
unsafe { (self.release)(self) }
142+
}
143+
}
144+
145+
impl Clone for FFI_ExtensionOptions {
146+
fn clone(&self) -> Self {
147+
unsafe { (self.cloned)(self) }
148+
}
149+
}
150+
151+
impl ConfigExtension for FFI_ExtensionOptions {
152+
const PREFIX: &'static str =
153+
datafusion_common::config::DATAFUSION_FFI_CONFIG_NAMESPACE;
154+
}
155+
156+
impl ExtensionOptions for FFI_ExtensionOptions {
157+
fn as_any(&self) -> &dyn Any {
158+
self
159+
}
160+
161+
fn as_any_mut(&mut self) -> &mut dyn Any {
162+
self
163+
}
164+
165+
fn cloned(&self) -> Box<dyn ExtensionOptions> {
166+
let ffi_options = unsafe { (self.cloned)(self) };
167+
Box::new(ffi_options)
168+
}
169+
170+
fn set(&mut self, key: &str, value: &str) -> Result<()> {
171+
if key.split_once('.').is_none() {
172+
return exec_err!("Unable to set FFI config value without namespace set");
173+
};
174+
175+
df_result!(unsafe { (self.set)(self, key.into(), value.into()) })
176+
}
177+
178+
fn entries(&self) -> Vec<ConfigEntry> {
179+
unsafe {
180+
(self.entries)(self)
181+
.into_iter()
182+
.map(|entry_tuple| ConfigEntry {
183+
key: entry_tuple.0.into(),
184+
value: Some(entry_tuple.1.into()),
185+
description: "ffi_config_options",
186+
})
187+
.collect()
188+
}
189+
}
190+
}
191+
192+
impl FFI_ExtensionOptions {
193+
/// Add all of the values in a concrete configuration extension to the
194+
/// FFI variant. This is safe to call on either side of the FFI
195+
/// boundary.
196+
pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()> {
197+
for entry in config.entries() {
198+
if let Some(value) = entry.value {
199+
let key = format!("{}.{}", C::PREFIX, entry.key);
200+
self.set(key.as_str(), value.as_str())?;
201+
}
202+
}
203+
204+
Ok(())
205+
}
206+
207+
/// Merge another `FFI_ExtensionOptions` configurations into this one.
208+
/// This is safe to call on either side of the FFI boundary.
209+
pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> {
210+
for entry in other.entries() {
211+
if let Some(value) = entry.value {
212+
self.set(entry.key.as_str(), value.as_str())?;
213+
}
214+
}
215+
Ok(())
216+
}
217+
218+
/// Create a concrete extension type from the FFI variant.
219+
/// This is safe to call on either side of the FFI boundary.
220+
pub fn to_extension<C: ConfigExtension + Default>(&self) -> Result<C> {
221+
let mut result = C::default();
222+
223+
unsafe {
224+
for entry in (self.entries)(self) {
225+
let key = entry.0.as_str();
226+
let value = entry.1.as_str();
227+
228+
if let Some((prefix, inner_key)) = key.split_once('.')
229+
&& prefix == C::PREFIX
230+
{
231+
result.set(inner_key, value)?;
232+
}
233+
}
234+
}
235+
236+
Ok(result)
237+
}
238+
}
239+
240+
#[cfg(test)]
241+
mod tests {
242+
use datafusion_common::config::{ConfigExtension, ConfigOptions};
243+
use datafusion_common::extensions_options;
244+
245+
use crate::config::extension_options::FFI_ExtensionOptions;
246+
247+
// Define a new configuration struct using the `extensions_options` macro
248+
extensions_options! {
249+
/// My own config options.
250+
pub struct MyConfig {
251+
/// Should "foo" be replaced by "bar"?
252+
pub foo_to_bar: bool, default = true
253+
254+
/// How many "baz" should be created?
255+
pub baz_count: usize, default = 1337
256+
}
257+
}
258+
259+
impl ConfigExtension for MyConfig {
260+
const PREFIX: &'static str = "my_config";
261+
}
262+
263+
#[test]
264+
fn round_trip_ffi_extension_options() {
265+
// set up config struct and register extension
266+
let mut config = ConfigOptions::default();
267+
let mut ffi_options = FFI_ExtensionOptions::default();
268+
ffi_options.add_config(&MyConfig::default()).unwrap();
269+
270+
config.extensions.insert(ffi_options);
271+
272+
// overwrite config default
273+
config.set("my_config.baz_count", "42").unwrap();
274+
275+
// check config state
276+
let returned_ffi_config =
277+
config.extensions.get::<FFI_ExtensionOptions>().unwrap();
278+
let my_config: MyConfig = returned_ffi_config.to_extension().unwrap();
279+
280+
// check default value
281+
assert!(my_config.foo_to_bar);
282+
283+
// check overwritten value
284+
assert_eq!(my_config.baz_count, 42);
285+
}
286+
}

0 commit comments

Comments
 (0)