Skip to content

Commit 71b6c31

Browse files
committed
Minimal workable example, tested against df-python
1 parent 0f5ca5e commit 71b6c31

3 files changed

Lines changed: 78 additions & 78 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,12 +1340,12 @@ impl ConfigOptions {
13401340

13411341
/// Set a configuration option
13421342
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1343-
let Some((prefix, key)) = key.split_once('.') else {
1343+
let Some((mut prefix, mut inner_key)) = key.split_once('.') else {
13441344
return _config_err!("could not find config namespace for key \"{key}\"");
13451345
};
13461346

13471347
if prefix == "datafusion" {
1348-
if key == "optimizer.enable_dynamic_filter_pushdown" {
1348+
if inner_key == "optimizer.enable_dynamic_filter_pushdown" {
13491349
let bool_value = value.parse::<bool>().map_err(|e| {
13501350
DataFusionError::Configuration(format!(
13511351
"Failed to parse '{value}' as bool: {e}",
@@ -1360,13 +1360,18 @@ impl ConfigOptions {
13601360
}
13611361
return Ok(());
13621362
}
1363-
return ConfigField::set(self, key, value);
1363+
return ConfigField::set(self, inner_key, value);
1364+
}
1365+
1366+
if !self.extensions.0.contains_key(prefix) && self.extensions.0.contains_key("datafusion_ffi") {
1367+
inner_key = key;
1368+
prefix = "datafusion_ffi";
13641369
}
13651370

13661371
let Some(e) = self.extensions.0.get_mut(prefix) else {
13671372
return _config_err!("Could not find config namespace \"{prefix}\"");
13681373
};
1369-
e.0.set(key, value)
1374+
e.0.set(inner_key, value)
13701375
}
13711376

13721377
/// Create new [`ConfigOptions`], taking values from environment variables
@@ -1613,6 +1618,7 @@ impl Extensions {
16131618

16141619
/// Retrieves the extension of the given type if any
16151620
pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1621+
println!("extensions trying get_mut on prefix {}", T::PREFIX);
16161622
let e = self.0.get_mut(T::PREFIX)?;
16171623
e.0.as_any_mut().downcast_mut()
16181624
}
@@ -2131,7 +2137,7 @@ impl TableOptions {
21312137
///
21322138
/// A result indicating success or failure in setting the configuration option.
21332139
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
2134-
let Some((prefix, _)) = key.split_once('.') else {
2140+
let Some((mut prefix, _)) = key.split_once('.') else {
21352141
return _config_err!("could not find config namespace for key \"{key}\"");
21362142
};
21372143

@@ -2143,6 +2149,12 @@ impl TableOptions {
21432149
return Ok(());
21442150
}
21452151

2152+
if !self.extensions.0.contains_key(prefix) && self.extensions.0.contains_key("datafusion_ffi") {
2153+
prefix = "datafusion_ffi";
2154+
} else {
2155+
println!("Existing keys {:?}", self.extensions.0.keys());
2156+
}
2157+
21462158
let Some(e) = self.extensions.0.get_mut(prefix) else {
21472159
return _config_err!("Could not find config namespace \"{prefix}\"");
21482160
};

datafusion/ffi/src/config/extension_options.rs

Lines changed: 61 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ use std::ffi::c_void;
2121

2222
use abi_stable::StableAbi;
2323
use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2};
24-
use datafusion::error::Result;
2524
use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
26-
use datafusion_common::{DataFusionError, exec_err};
25+
use datafusion_common::{Result, exec_err};
2726

2827
use crate::df_result;
2928

@@ -71,15 +70,15 @@ pub struct ExtensionOptionsPrivateData {
7170

7271
impl FFI_ExtensionOptions {
7372
#[inline]
74-
unsafe fn inner_mut(&mut self) -> &mut HashMap<String, String> {
73+
fn inner_mut(&mut self) -> &mut HashMap<String, String> {
7574
let private_data = self.private_data as *mut ExtensionOptionsPrivateData;
76-
&mut (*private_data).options
75+
unsafe { &mut (*private_data).options }
7776
}
7877

7978
#[inline]
80-
unsafe fn inner(&self) -> &HashMap<String, String> {
79+
fn inner(&self) -> &HashMap<String, String> {
8180
let private_data = self.private_data as *const ExtensionOptionsPrivateData;
82-
&(*private_data).options
81+
unsafe { &(*private_data).options }
8382
}
8483
}
8584

@@ -114,8 +113,9 @@ unsafe extern "C" fn entries_fn_wrapper(
114113
}
115114

116115
unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) {
117-
let private_data =
118-
Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData);
116+
let private_data = unsafe {
117+
Box::from_raw(options.private_data as *mut ExtensionOptionsPrivateData)
118+
};
119119
drop(private_data);
120120
}
121121

@@ -145,44 +145,25 @@ impl Drop for FFI_ExtensionOptions {
145145
}
146146
}
147147

148+
impl Clone for FFI_ExtensionOptions {
149+
fn clone(&self) -> Self {
150+
unsafe { (self.cloned)(&self) }
151+
}
152+
}
153+
148154
/// This struct is used to access an UDF provided by a foreign
149155
/// library across a FFI boundary.
150156
///
151157
/// The ForeignExtensionOptions is to be used by the caller of the UDF, so it has
152158
/// no knowledge or access to the private data. All interaction with the UDF
153159
/// must occur through the functions defined in FFI_ExtensionOptions.
154-
#[derive(Debug)]
155-
pub struct ForeignExtensionOptions(FFI_ExtensionOptions);
156-
157-
unsafe impl Send for ForeignExtensionOptions {}
158-
unsafe impl Sync for ForeignExtensionOptions {}
159-
160-
// impl<T : ConfigExtension + Default> TryFrom<FFI_ExtensionOptions> for T {
161-
// type Error = DataFusionError;
160+
// #[derive(Debug)]
161+
// pub struct ForeignExtensionOptions(FFI_ExtensionOptions);
162162
//
163-
// fn try_from(options: &FFI_ExtensionOptions) -> Result<Self, Self::Error> {
164-
// let mut config = T::default();
165-
//
166-
// let mut found = false;
167-
// unsafe {
168-
// for entry_tuple in (options.entries)(&options).into_iter() {
169-
// if let ROption::RSome(value) = entry_tuple.1 {
170-
// if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.')
171-
// {
172-
// if namespace == T::PREFIX {
173-
// found = true;
174-
// config.set(key, value.as_str())?;
175-
// }
176-
// }
177-
// }
178-
// }
179-
// }
180-
//
181-
// Ok(config)
182-
// }
183-
// }
163+
// unsafe impl Send for ForeignExtensionOptions {}
164+
// unsafe impl Sync for ForeignExtensionOptions {}
184165

185-
impl ForeignExtensionOptions {
166+
impl FFI_ExtensionOptions {
186167
pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()> {
187168
for entry in config.entries() {
188169
if let Some(value) = entry.value {
@@ -193,13 +174,22 @@ impl ForeignExtensionOptions {
193174

194175
Ok(())
195176
}
177+
178+
pub fn merge(&mut self, other: &FFI_ExtensionOptions) -> Result<()> {
179+
for entry in other.entries() {
180+
if let Some(value) = entry.value {
181+
self.set(entry.key.as_str(), value.as_str())?;
182+
}
183+
}
184+
Ok(())
185+
}
196186
}
197187

198-
impl ConfigExtension for ForeignExtensionOptions {
188+
impl ConfigExtension for FFI_ExtensionOptions {
199189
const PREFIX: &'static str = "datafusion_ffi";
200190
}
201191

202-
impl ExtensionOptions for ForeignExtensionOptions {
192+
impl ExtensionOptions for FFI_ExtensionOptions {
203193
fn as_any(&self) -> &dyn Any {
204194
self
205195
}
@@ -209,23 +199,21 @@ impl ExtensionOptions for ForeignExtensionOptions {
209199
}
210200

211201
fn cloned(&self) -> Box<dyn ExtensionOptions> {
212-
let ffi_options = unsafe { (self.0.cloned)(&self.0) };
213-
let foreign_options = ForeignExtensionOptions(ffi_options);
214-
Box::new(foreign_options)
202+
let ffi_options = unsafe { (self.cloned)(&self) };
203+
Box::new(ffi_options)
215204
}
216205

217206
fn set(&mut self, key: &str, value: &str) -> Result<()> {
218-
println!("Setting {key} = {value}");
219207
if key.split_once('.').is_none() {
220208
return exec_err!("Unable to set FFI config value without namespace set");
221209
};
222210

223-
df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) })
211+
df_result!(unsafe { (self.set)(self, key.into(), value.into()) })
224212
}
225213

226214
fn entries(&self) -> Vec<ConfigEntry> {
227215
unsafe {
228-
(self.0.entries)(&self.0)
216+
(self.entries)(&self)
229217
.into_iter()
230218
.map(|entry_tuple| ConfigEntry {
231219
key: entry_tuple.0.into(),
@@ -237,18 +225,24 @@ impl ExtensionOptions for ForeignExtensionOptions {
237225
}
238226
}
239227

240-
// TODO: Maybe get rid of ForeignExtensionOptions?
241-
impl<C: ConfigExtension + Default> TryFrom<&ForeignExtensionOptions> for C {
242-
type Error = DataFusionError;
243-
fn try_from(options: &ForeignExtensionOptions) -> Result<Self> {
228+
impl FFI_ExtensionOptions {
229+
pub fn to_extension<C: ConfigExtension + Default>(&self) -> Result<C> {
244230
let mut result = C::default();
245-
for entry in options.entries() {
246-
if let Some(value) = entry.value {
247-
result.set(entry.key.as_str(), value.as_str())?;
231+
232+
unsafe {
233+
for entry in (self.entries)(&self) {
234+
let key = entry.0.as_str();
235+
let value = entry.1.as_str();
236+
237+
if let Some((prefix, inner_key)) = key.split_once('.')
238+
&& prefix == C::PREFIX
239+
{
240+
result.set(inner_key, value)?;
241+
}
248242
}
249243
}
250244

251-
result
245+
Ok(result)
252246
}
253247
}
254248

@@ -257,9 +251,7 @@ mod tests {
257251
use datafusion_common::config::{ConfigExtension, ConfigOptions};
258252
use datafusion_common::extensions_options;
259253

260-
use crate::config::extension_options::{
261-
FFI_ExtensionOptions, ForeignExtensionOptions,
262-
};
254+
use crate::config::extension_options::FFI_ExtensionOptions;
263255

264256
// Define a new configuration struct using the `extensions_options` macro
265257
extensions_options! {
@@ -281,26 +273,23 @@ mod tests {
281273
fn round_trip_ffi_extension_options() {
282274
// set up config struct and register extension
283275
let mut config = ConfigOptions::default();
284-
let mut foreign_options =
285-
ForeignExtensionOptions(FFI_ExtensionOptions::default());
286-
foreign_options.add_config(&MyConfig::default()).unwrap();
276+
let mut ffi_options = FFI_ExtensionOptions::default();
277+
ffi_options.add_config(&MyConfig::default()).unwrap();
287278

288-
config.extensions.insert(foreign_options);
289-
// config.extensions.insert(MyConfig::default());
279+
config.extensions.insert(ffi_options);
290280

291281
// overwrite config default
292282
config.set("my_config.baz_count", "42").unwrap();
293283

294284
// check config state
295-
let my_config = config.extensions.get::<MyConfig>().unwrap();
296-
assert!(my_config.foo_to_bar,);
297-
assert_eq!(my_config.baz_count, 42,);
298-
299-
// let boxed_config = Box::new(MyConfig::default()) as Box<dyn ExtensionOptions>;
300-
// let mut ffi_config = FFI_ExtensionOptions::from(boxed_config);
301-
// ffi_config.library_marker_id = crate::mock_foreign_marker_id;
302-
// let foreign_config: Box<dyn ExtensionOptions> = ffi_config.into();
303-
//
304-
// config.extensions.insert(foreign_config);
285+
let returned_ffi_config =
286+
config.extensions.get::<FFI_ExtensionOptions>().unwrap();
287+
let my_config: MyConfig = returned_ffi_config.to_extension().unwrap();
288+
289+
// check default value
290+
assert!(my_config.foo_to_bar);
291+
292+
// check overwritten value
293+
assert_eq!(my_config.baz_count, 42);
305294
}
306295
}

datafusion/ffi/src/udwf/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ impl From<&FFI_SortOptions> for SortOptions {
399399
mod tests {
400400
use std::sync::Arc;
401401

402-
use crate::tests::create_record_batch;
403402
use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF};
404403
use arrow::array::{ArrayRef, RecordBatch, create_array};
405404
use datafusion::functions_window::lead_lag::{WindowShift, lag_udwf};

0 commit comments

Comments
 (0)