Skip to content

Commit f823854

Browse files
committed
More testing
1 parent 280e815 commit f823854

3 files changed

Lines changed: 91 additions & 67 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions
12301230
}
12311231

12321232
/// A key value pair, with a corresponding description
1233-
#[derive(Debug, Hash, PartialEq, Eq)]
1233+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
12341234
pub struct ConfigEntry {
12351235
/// A unique string to identify this config value
12361236
pub key: String,

datafusion/ffi/src/config_options.rs

Lines changed: 77 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{any::Any, ffi::c_void, ops::Deref};
18+
use std::any::Any;
1919
use std::collections::HashMap;
20-
use abi_stable::{
21-
std_types::{RHashMap, ROption, RResult, RStr, RString, RVec, Tuple3},
22-
RTuple, StableAbi,
23-
};
24-
use arrow::{array::ArrayRef, error::ArrowError};
25-
use datafusion::{
26-
error::{DataFusionError, Result},
27-
scalar::ScalarValue,
28-
};
20+
use std::ffi::c_void;
21+
22+
use abi_stable::StableAbi;
23+
use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2};
24+
use datafusion::error::Result;
2925
use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions};
30-
use prost::Message;
3126
use datafusion_common::exec_err;
32-
use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return};
27+
28+
use crate::df_result;
3329

3430
/// A stable struct for sharing [`ExtensionOptions`] across FFI boundaries.
3531
/// For an explanation of each field, see the corresponding function
@@ -43,11 +39,7 @@ pub struct FFI_ExtensionOptions {
4339
pub set:
4440
unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>,
4541

46-
pub entries: unsafe extern "C" fn(
47-
&Self,
48-
) -> RVec<
49-
Tuple3<RString, ROption<RString>, RStr<'static>>,
50-
>,
42+
pub entries: unsafe extern "C" fn(&Self) -> RVec<Tuple2<RString, RString>>,
5143

5244
/// Release the memory of the private data when it is no longer being used.
5345
pub release: unsafe extern "C" fn(&mut Self),
@@ -81,32 +73,30 @@ impl FFI_ExtensionOptions {
8173
unsafe extern "C" fn cloned_fn_wrapper(
8274
options: &FFI_ExtensionOptions,
8375
) -> FFI_ExtensionOptions {
84-
options.inner().cloned().into()
76+
options
77+
.inner()
78+
.iter()
79+
.map(|(k, v)| (k.to_owned(), v.to_owned()))
80+
.collect::<HashMap<String, String>>()
81+
.into()
8582
}
8683

8784
unsafe extern "C" fn set_fn_wrapper(
8885
options: &mut FFI_ExtensionOptions,
8986
key: RStr,
9087
value: RStr,
9188
) -> RResult<(), RString> {
92-
rresult!(options.inner_mut().set(key.into(), value.into()))
89+
let _ = options.inner_mut().insert(key.into(), value.into());
90+
RResult::ROk(())
9391
}
9492

9593
unsafe extern "C" fn entries_fn_wrapper(
9694
options: &FFI_ExtensionOptions,
97-
) -> RVec<Tuple3<RString, ROption<RString>, RStr<'static>>> {
95+
) -> RVec<Tuple2<RString, RString>> {
9896
options
9997
.inner()
100-
.entries()
101-
.into_iter()
102-
.map(|entry| {
103-
(
104-
entry.key.into(),
105-
entry.value.map(Into::into).into(),
106-
entry.description.into(),
107-
)
108-
.into()
109-
})
98+
.iter()
99+
.map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into())
110100
.collect()
111101
}
112102

@@ -117,8 +107,14 @@ unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_ExtensionOptions) {
117107
}
118108

119109
impl Default for FFI_ExtensionOptions {
120-
fn default() -> Self {
121-
let private_data = ExtensionOptionsPrivateData { options: HashMap::new() };
110+
fn default() -> Self {
111+
HashMap::new().into()
112+
}
113+
}
114+
115+
impl From<HashMap<String, String>> for FFI_ExtensionOptions {
116+
fn from(options: HashMap<String, String>) -> Self {
117+
let private_data = ExtensionOptionsPrivateData { options };
122118

123119
Self {
124120
cloned: cloned_fn_wrapper,
@@ -148,27 +144,41 @@ pub struct ForeignExtensionOptions(FFI_ExtensionOptions);
148144
unsafe impl Send for ForeignExtensionOptions {}
149145
unsafe impl Sync for ForeignExtensionOptions {}
150146

151-
impl<T: ConfigExtension + Default> TryFrom<FFI_ExtensionOptions> for T {
152-
type Error = DataFusionError;
153-
154-
fn try_from(options: &FFI_ExtensionOptions) -> Result<Self, Self::Error> {
155-
let mut config = T::default();
156-
157-
let mut found = false;
158-
unsafe {
159-
for entry_tuple in (options.entries)(&options)
160-
.into_iter() {
161-
if let ROption::RSome(value) = entry_tuple.1
162-
&& let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') {
163-
if namespace == T::PREFIX {
164-
found = true;
165-
config.set(key, value.as_str())?;
166-
}
167-
}
147+
// impl<T : ConfigExtension + Default> TryFrom<FFI_ExtensionOptions> for T {
148+
// type Error = DataFusionError;
149+
//
150+
// fn try_from(options: &FFI_ExtensionOptions) -> Result<Self, Self::Error> {
151+
// let mut config = T::default();
152+
//
153+
// let mut found = false;
154+
// unsafe {
155+
// for entry_tuple in (options.entries)(&options).into_iter() {
156+
// if let ROption::RSome(value) = entry_tuple.1 {
157+
// if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.')
158+
// {
159+
// if namespace == T::PREFIX {
160+
// found = true;
161+
// config.set(key, value.as_str())?;
162+
// }
163+
// }
164+
// }
165+
// }
166+
// }
167+
//
168+
// Ok(config)
169+
// }
170+
// }
171+
172+
impl ForeignExtensionOptions {
173+
pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()> {
174+
for entry in config.entries() {
175+
if let Some(value) = entry.value {
176+
let key = format!("{}.{}", C::PREFIX, entry.key);
177+
self.set(key.as_str(), value.as_str())?;
168178
}
169179
}
170180

171-
Ok(config)
181+
Ok(())
172182
}
173183
}
174184

@@ -186,17 +196,20 @@ impl ExtensionOptions for ForeignExtensionOptions {
186196
}
187197

188198
fn cloned(&self) -> Box<dyn ExtensionOptions> {
189-
unsafe { (self.0.cloned)(&self.0).into() }
199+
let ffi_options = unsafe { (self.0.cloned)(&self.0) };
200+
let foreign_options = ForeignExtensionOptions(ffi_options);
201+
Box::new(foreign_options)
190202
}
191203

192204
fn set(&mut self, key: &str, value: &str) -> Result<()> {
205+
println!("Setting {key} = {value}");
193206
let Some((namespace, key)) = key.split_once('.') else {
194207
return exec_err!("Unable to set FFI config value without namespace set");
195208
};
196209

197-
if namespace != ForeignExtensionOptions::PREFIX {
198-
return exec_err!("Unexpected namespace {namespace} set for FFI config");
199-
}
210+
// if namespace != ForeignExtensionOptions::PREFIX {
211+
// return exec_err!("Unexpected namespace {namespace} set for FFI config");
212+
// }
200213

201214
df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) })
202215
}
@@ -207,8 +220,8 @@ impl ExtensionOptions for ForeignExtensionOptions {
207220
.into_iter()
208221
.map(|entry_tuple| ConfigEntry {
209222
key: entry_tuple.0.into(),
210-
value: entry_tuple.1.map(Into::into).into(),
211-
description: entry_tuple.2.into(),
223+
value: Some(entry_tuple.1.into()),
224+
description: "ffi_config_options",
212225
})
213226
.collect()
214227
}
@@ -217,12 +230,10 @@ impl ExtensionOptions for ForeignExtensionOptions {
217230

218231
#[cfg(test)]
219232
mod tests {
220-
use datafusion_common::{
221-
config::{ConfigExtension, ConfigOptions, ExtensionOptions},
222-
extensions_options,
223-
};
233+
use datafusion_common::config::{ConfigExtension, ConfigOptions};
234+
use datafusion_common::extensions_options;
224235

225-
use crate::config_options::FFI_ExtensionOptions;
236+
use crate::config_options::{FFI_ExtensionOptions, ForeignExtensionOptions};
226237

227238
// Define a new configuration struct using the `extensions_options` macro
228239
extensions_options! {
@@ -244,7 +255,11 @@ mod tests {
244255
fn round_trip_ffi_extension_options() {
245256
// set up config struct and register extension
246257
let mut config = ConfigOptions::default();
247-
config.extensions.insert(FFI_ExtensionOptions::default());
258+
let mut foreign_options =
259+
ForeignExtensionOptions(FFI_ExtensionOptions::default());
260+
foreign_options.add_config(&MyConfig::default()).unwrap();
261+
262+
config.extensions.insert(foreign_options);
248263
// config.extensions.insert(MyConfig::default());
249264

250265
// overwrite config default

datafusion/ffi/src/udwf/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -399,14 +399,14 @@ impl From<&FFI_SortOptions> for SortOptions {
399399
mod tests {
400400
use std::sync::Arc;
401401

402-
use arrow::array::{ArrayRef, create_array};
402+
use crate::tests::create_record_batch;
403+
use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF};
404+
use arrow::array::{ArrayRef, RecordBatch, create_array};
403405
use datafusion::functions_window::lead_lag::{WindowShift, lag_udwf};
404406
use datafusion::logical_expr::expr::Sort;
405407
use datafusion::logical_expr::{ExprFunctionExt, WindowUDF, WindowUDFImpl, col};
406408
use datafusion::prelude::SessionContext;
407-
408-
use crate::tests::create_record_batch;
409-
use crate::udwf::{FFI_WindowUDF, ForeignWindowUDF};
409+
use datafusion_common::record_batch;
410410

411411
fn create_test_foreign_udwf(
412412
original_udwf: impl WindowUDFImpl + 'static,
@@ -437,11 +437,20 @@ mod tests {
437437
Ok(())
438438
}
439439

440+
fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
441+
let end_value = start_value + num_values as i32;
442+
let a_vals: Vec<i32> = (start_value..end_value).collect();
443+
let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
444+
445+
record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
446+
}
447+
440448
#[tokio::test]
441449
async fn test_lag_udwf() -> datafusion::common::Result<()> {
442450
let udwf = create_test_foreign_udwf(WindowShift::lag())?;
443451

444452
let ctx = SessionContext::default();
453+
445454
let df = ctx.read_batch(create_record_batch(-5, 5))?;
446455

447456
let df = df.select(vec![

0 commit comments

Comments
 (0)