|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | +// |
| 18 | +// use std::any::Any; |
| 19 | +// use std::collections::HashMap; |
| 20 | +// use std::ffi::c_void; |
| 21 | +// |
| 22 | +// use abi_stable::std_types::{RResult, RStr, RString, RVec, Tuple2}; |
| 23 | +// use abi_stable::StableAbi; |
| 24 | +// use datafusion::error::Result; |
| 25 | +// use datafusion_common::config::{ConfigEntry, ConfigExtension, Extensions}; |
| 26 | +// use datafusion_common::exec_err; |
| 27 | +// |
| 28 | +// use crate::df_result; |
| 29 | +// |
| 30 | +// /// A stable struct for sharing [`Extensions`] across FFI boundaries. |
| 31 | +// /// For an explanation of each field, see the corresponding function |
| 32 | +// /// defined in [`Extensions`]. |
| 33 | +// #[repr(C)] |
| 34 | +// #[derive(Debug, StableAbi)] |
| 35 | +// #[allow(non_camel_case_types)] |
| 36 | +// pub struct FFI_Extensions { |
| 37 | +// pub cloned: unsafe extern "C" fn(&Self) -> FFI_Extensions, |
| 38 | +// |
| 39 | +// pub set: |
| 40 | +// unsafe extern "C" fn(&mut Self, key: RStr, value: RStr) -> RResult<(), RString>, |
| 41 | +// |
| 42 | +// pub entries: unsafe extern "C" fn(&Self) -> RVec<Tuple2<RString, RString>>, |
| 43 | +// |
| 44 | +// /// Release the memory of the private data when it is no longer being used. |
| 45 | +// pub release: unsafe extern "C" fn(&mut Self), |
| 46 | +// |
| 47 | +// /// Internal data. This is only to be accessed by the provider of the options. |
| 48 | +// /// A [`ForeignExtensions`] should never attempt to access this data. |
| 49 | +// pub private_data: *mut c_void, |
| 50 | +// } |
| 51 | +// |
| 52 | +// // TODO(tsaucer) We have a problem in datafusion_common::config::Extension::get |
| 53 | +// // which relies on knowing the concrete types of the extensions so that we can |
| 54 | +// // use their PREFIX for insertion of configs. We cannot work around this using |
| 55 | +// // things like `fn namespace() -> &'static str` because we must be able to do |
| 56 | +// // this without having an instance. Instead we will go to an approach of having |
| 57 | +// // a concrete FFI_ForeignConfigExtension and add a check into all of the methods |
| 58 | +// // in the above `get` (and similar) methods to check to see if we have an FFI |
| 59 | +// // configs. If so we get the concrete FFI config and then have a method that will |
| 60 | +// // convert from FFI_ForeignExtensionConfig into the concrete type. Somehow our |
| 61 | +// // FFI library will need to make this as easy an experience as they are used to |
| 62 | +// // so maybe we need to implement something at the `Extensions` level in addition |
| 63 | +// // to the ConfigExtension. |
| 64 | +// |
| 65 | +// unsafe impl Send for FFI_Extensions {} |
| 66 | +// unsafe impl Sync for FFI_Extensions {} |
| 67 | +// |
| 68 | +// pub struct ExtensionsPrivateData { |
| 69 | +// pub options: HashMap<String, String>, |
| 70 | +// } |
| 71 | +// |
| 72 | +// impl FFI_Extensions { |
| 73 | +// #[inline] |
| 74 | +// unsafe fn inner_mut(&mut self) -> &mut HashMap<String, String> { |
| 75 | +// let private_data = self.private_data as *mut ExtensionsPrivateData; |
| 76 | +// &mut (*private_data).options |
| 77 | +// } |
| 78 | +// |
| 79 | +// #[inline] |
| 80 | +// unsafe fn inner(&self) -> &HashMap<String, String> { |
| 81 | +// let private_data = self.private_data as *const ExtensionsPrivateData; |
| 82 | +// &(*private_data).options |
| 83 | +// } |
| 84 | +// } |
| 85 | +// |
| 86 | +// unsafe extern "C" fn cloned_fn_wrapper( |
| 87 | +// options: &FFI_Extensions, |
| 88 | +// ) -> FFI_Extensions { |
| 89 | +// options |
| 90 | +// .inner() |
| 91 | +// .iter() |
| 92 | +// .map(|(k, v)| (k.to_owned(), v.to_owned())) |
| 93 | +// .collect::<HashMap<String, String>>() |
| 94 | +// .into() |
| 95 | +// } |
| 96 | +// |
| 97 | +// unsafe extern "C" fn set_fn_wrapper( |
| 98 | +// options: &mut FFI_Extensions, |
| 99 | +// key: RStr, |
| 100 | +// value: RStr, |
| 101 | +// ) -> RResult<(), RString> { |
| 102 | +// let _ = options.inner_mut().insert(key.into(), value.into()); |
| 103 | +// RResult::ROk(()) |
| 104 | +// } |
| 105 | +// |
| 106 | +// unsafe extern "C" fn entries_fn_wrapper( |
| 107 | +// options: &FFI_Extensions, |
| 108 | +// ) -> RVec<Tuple2<RString, RString>> { |
| 109 | +// options |
| 110 | +// .inner() |
| 111 | +// .iter() |
| 112 | +// .map(|(key, value)| (key.to_owned().into(), value.to_owned().into()).into()) |
| 113 | +// .collect() |
| 114 | +// } |
| 115 | +// |
| 116 | +// unsafe extern "C" fn release_fn_wrapper(options: &mut FFI_Extensions) { |
| 117 | +// let private_data = |
| 118 | +// Box::from_raw(options.private_data as *mut ExtensionsPrivateData); |
| 119 | +// drop(private_data); |
| 120 | +// } |
| 121 | +// |
| 122 | +// impl Default for FFI_Extensions { |
| 123 | +// fn default() -> Self { |
| 124 | +// HashMap::new().into() |
| 125 | +// } |
| 126 | +// } |
| 127 | +// |
| 128 | +// impl From<HashMap<String, String>> for FFI_Extensions { |
| 129 | +// fn from(options: HashMap<String, String>) -> Self { |
| 130 | +// let private_data = ExtensionsPrivateData { options }; |
| 131 | +// |
| 132 | +// Self { |
| 133 | +// cloned: cloned_fn_wrapper, |
| 134 | +// set: set_fn_wrapper, |
| 135 | +// entries: entries_fn_wrapper, |
| 136 | +// release: release_fn_wrapper, |
| 137 | +// private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, |
| 138 | +// } |
| 139 | +// } |
| 140 | +// } |
| 141 | +// |
| 142 | +// impl Drop for FFI_Extensions { |
| 143 | +// fn drop(&mut self) { |
| 144 | +// unsafe { (self.release)(self) } |
| 145 | +// } |
| 146 | +// } |
| 147 | +// |
| 148 | +// /// This struct is used to access an UDF provided by a foreign |
| 149 | +// /// library across a FFI boundary. |
| 150 | +// /// |
| 151 | +// /// The ForeignExtensions is to be used by the caller of the UDF, so it has |
| 152 | +// /// no knowledge or access to the private data. All interaction with the UDF |
| 153 | +// /// must occur through the functions defined in FFI_Extensions. |
| 154 | +// #[derive(Debug)] |
| 155 | +// pub struct ForeignExtensions(FFI_Extensions); |
| 156 | +// |
| 157 | +// unsafe impl Send for ForeignExtensions {} |
| 158 | +// unsafe impl Sync for ForeignExtensions {} |
| 159 | +// |
| 160 | +// // impl<T : ConfigExtension + Default> TryFrom<FFI_Extensions> for T { |
| 161 | +// // type Error = DataFusionError; |
| 162 | +// // |
| 163 | +// // fn try_from(options: &FFI_Extensions) -> Result<Self, Self::Error> { |
| 164 | +// // let mut config = T::default(); |
| 165 | +// // |
| 166 | +// // let mut found = false; |
| 167 | +// // unsafe { |
| 168 | +// // for entry_tuple in (options.entries)(&options).into_iter() { |
| 169 | +// // if let ROption::RSome(value) = entry_tuple.1 { |
| 170 | +// // if let Some((namespace, key)) = entry_tuple.0.as_str().split_once('.') |
| 171 | +// // { |
| 172 | +// // if namespace == T::PREFIX { |
| 173 | +// // found = true; |
| 174 | +// // config.set(key, value.as_str())?; |
| 175 | +// // } |
| 176 | +// // } |
| 177 | +// // } |
| 178 | +// // } |
| 179 | +// // } |
| 180 | +// // |
| 181 | +// // Ok(config) |
| 182 | +// // } |
| 183 | +// // } |
| 184 | +// |
| 185 | +// impl ForeignExtensions { |
| 186 | +// pub fn add_config<C: ConfigExtension>(&mut self, config: &C) -> Result<()> { |
| 187 | +// for entry in config.entries() { |
| 188 | +// if let Some(value) = entry.value { |
| 189 | +// let key = format!("{}.{}", C::PREFIX, entry.key); |
| 190 | +// self.set(key.as_str(), value.as_str())?; |
| 191 | +// } |
| 192 | +// } |
| 193 | +// |
| 194 | +// Ok(()) |
| 195 | +// } |
| 196 | +// } |
| 197 | +// |
| 198 | +// impl ConfigExtension for ForeignExtensions { |
| 199 | +// const PREFIX: &'static str = "datafusion_ffi"; |
| 200 | +// } |
| 201 | +// |
| 202 | +// impl Extensions for ForeignExtensions { |
| 203 | +// fn as_any(&self) -> &dyn Any { |
| 204 | +// self |
| 205 | +// } |
| 206 | +// |
| 207 | +// fn as_any_mut(&mut self) -> &mut dyn Any { |
| 208 | +// self |
| 209 | +// } |
| 210 | +// |
| 211 | +// fn cloned(&self) -> Box<dyn Extensions> { |
| 212 | +// let ffi_options = unsafe { (self.0.cloned)(&self.0) }; |
| 213 | +// let foreign_options = ForeignExtensions(ffi_options); |
| 214 | +// Box::new(foreign_options) |
| 215 | +// } |
| 216 | +// |
| 217 | +// fn set(&mut self, key: &str, value: &str) -> Result<()> { |
| 218 | +// println!("Setting {key} = {value}"); |
| 219 | +// let Some((namespace, key)) = key.split_once('.') else { |
| 220 | +// return exec_err!("Unable to set FFI config value without namespace set"); |
| 221 | +// }; |
| 222 | +// |
| 223 | +// // if namespace != ForeignExtensions::PREFIX { |
| 224 | +// // return exec_err!("Unexpected namespace {namespace} set for FFI config"); |
| 225 | +// // } |
| 226 | +// |
| 227 | +// df_result!(unsafe { (self.0.set)(&mut self.0, key.into(), value.into()) }) |
| 228 | +// } |
| 229 | +// |
| 230 | +// fn entries(&self) -> Vec<ConfigEntry> { |
| 231 | +// unsafe { |
| 232 | +// (self.0.entries)(&self.0) |
| 233 | +// .into_iter() |
| 234 | +// .map(|entry_tuple| ConfigEntry { |
| 235 | +// key: entry_tuple.0.into(), |
| 236 | +// value: Some(entry_tuple.1.into()), |
| 237 | +// description: "ffi_config_options", |
| 238 | +// }) |
| 239 | +// .collect() |
| 240 | +// } |
| 241 | +// } |
| 242 | +// } |
| 243 | +// |
| 244 | +// impl<C: ConfigExtension> From<&ForeignExtensions> for C { |
| 245 | +// fn from(options: &ForeignExtensions) -> Self { |
| 246 | +// |
| 247 | +// } |
| 248 | +// } |
| 249 | +// |
| 250 | +// #[cfg(test)] |
| 251 | +// mod tests { |
| 252 | +// use datafusion_common::config::{ConfigExtension, ConfigOptions}; |
| 253 | +// use datafusion_common::extensions_options; |
| 254 | +// |
| 255 | +// use crate::config_options::{FFI_Extensions, ForeignExtensions}; |
| 256 | +// |
| 257 | +// // Define a new configuration struct using the `extensions_options` macro |
| 258 | +// extensions_options! { |
| 259 | +// /// My own config options. |
| 260 | +// pub struct MyConfig { |
| 261 | +// /// Should "foo" be replaced by "bar"? |
| 262 | +// pub foo_to_bar: bool, default = true |
| 263 | +// |
| 264 | +// /// How many "baz" should be created? |
| 265 | +// pub baz_count: usize, default = 1337 |
| 266 | +// } |
| 267 | +// } |
| 268 | +// |
| 269 | +// impl ConfigExtension for MyConfig { |
| 270 | +// const PREFIX: &'static str = "my_config"; |
| 271 | +// } |
| 272 | +// |
| 273 | +// #[test] |
| 274 | +// fn round_trip_ffi_extension_options() { |
| 275 | +// // set up config struct and register extension |
| 276 | +// let mut config = ConfigOptions::default(); |
| 277 | +// let mut foreign_options = ForeignExtensions(FFI_Extensions::default()); |
| 278 | +// foreign_options.add_config(&MyConfig::default()).unwrap(); |
| 279 | +// |
| 280 | +// config.extensions.insert(foreign_options); |
| 281 | +// // config.extensions.insert(MyConfig::default()); |
| 282 | +// |
| 283 | +// // overwrite config default |
| 284 | +// config.set("my_config.baz_count", "42").unwrap(); |
| 285 | +// |
| 286 | +// // check config state |
| 287 | +// let my_config = config.extensions.get::<MyConfig>().unwrap(); |
| 288 | +// assert!(my_config.foo_to_bar,); |
| 289 | +// assert_eq!(my_config.baz_count, 42,); |
| 290 | +// |
| 291 | +// // let boxed_config = Box::new(MyConfig::default()) as Box<dyn Extensions>; |
| 292 | +// // let mut ffi_config = FFI_Extensions::from(boxed_config); |
| 293 | +// // ffi_config.library_marker_id = crate::mock_foreign_marker_id; |
| 294 | +// // let foreign_config: Box<dyn Extensions> = ffi_config.into(); |
| 295 | +// // |
| 296 | +// // config.extensions.insert(foreign_config); |
| 297 | +// } |
| 298 | +// } |
0 commit comments