Skip to content

Commit 51346f7

Browse files
timsaucerdavisp
authored andcommitted
Implement config options
1 parent 21990b0 commit 51346f7

5 files changed

Lines changed: 143 additions & 1 deletion

File tree

crates/core/src/context.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use datafusion::prelude::{
4949
};
5050
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
5151
use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
52+
use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
5253
use datafusion_ffi::execution::FFI_TaskContextProvider;
5354
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
5455
use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
@@ -185,6 +186,27 @@ impl PySessionConfig {
185186
fn set(&self, key: &str, value: &str) -> Self {
186187
Self::from(self.config.clone().set_str(key, value))
187188
}
189+
190+
pub fn with_extension(&self, extension: Bound<PyAny>) -> PyResult<Self> {
191+
let capsule = extension.call_method0("__datafusion_extension_options__")?;
192+
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
193+
194+
validate_pycapsule(capsule, "datafusion_extension_options")?;
195+
196+
let mut extension = unsafe { capsule.reference::<FFI_ExtensionOptions>() }.clone();
197+
198+
let mut config = self.config.clone();
199+
let options = config.options_mut();
200+
if let Some(prior_extension) = options.extensions.get::<FFI_ExtensionOptions>() {
201+
extension
202+
.merge(prior_extension)
203+
.map_err(py_datafusion_err)?;
204+
}
205+
206+
options.extensions.insert(extension);
207+
208+
Ok(Self::from(config))
209+
}
188210
}
189211

190212
/// Runtime options for a SessionContext

crates/core/src/dataset_exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl DatasetExec {
111111

112112
let scanner = dataset.call_method("scanner", (), Some(&kwargs))?;
113113

114-
let schema = Arc::new(
114+
let schema: SchemaRef = Arc::new(
115115
scanner
116116
.getattr("projected_schema")?
117117
.extract::<PyArrowType<_>>()?
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import pyarrow as pa
2+
from datafusion import SessionConfig
3+
from datafusion_ffi_example import MyConfig
4+
5+
def test_catalog_provider():
6+
config = MyConfig()
7+
config = SessionConfig().with_extension(config)
8+
config.set("my_config.baz_count", "42")
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use datafusion::common::{config_err, DataFusionError};
2+
use datafusion::config::{ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, Visit};
3+
use pyo3::{pyclass, pymethods, Bound, PyResult, Python};
4+
use std::any::Any;
5+
use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
6+
use pyo3::exceptions::PyRuntimeError;
7+
use pyo3::types::PyCapsule;
8+
9+
/// My own config options.
10+
#[pyclass(name = "MyConfig", module = "datafusion_ffi_example", subclass)]
11+
#[derive(Clone, Debug)]
12+
pub struct MyConfig {
13+
/// Should "foo" be replaced by "bar"?
14+
pub foo_to_bar: bool,
15+
16+
/// How many "baz" should be created?
17+
pub baz_count: usize,
18+
}
19+
20+
#[pymethods]
21+
impl MyConfig {
22+
#[new]
23+
fn new() -> Self {
24+
Self::default()
25+
}
26+
27+
28+
fn __datafusion_extension_options__<'py>(
29+
&self,
30+
py: Python<'py>,
31+
) -> PyResult<Bound<'py, PyCapsule>> {
32+
let name = cr"datafusion_extension_options".into();
33+
34+
let mut config = FFI_ExtensionOptions::default();
35+
config.add_config(self).map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
36+
37+
PyCapsule::new(py, config, Some(name))
38+
}
39+
}
40+
41+
impl Default for MyConfig {
42+
fn default() -> Self {
43+
Self {
44+
foo_to_bar: true,
45+
baz_count: 1337,
46+
}
47+
}
48+
}
49+
50+
impl ConfigExtension for MyConfig {
51+
const PREFIX: &'static str = "my_config";
52+
}
53+
54+
impl ExtensionOptions for MyConfig {
55+
fn as_any(&self) -> &dyn Any {
56+
self
57+
}
58+
59+
fn as_any_mut(&mut self) -> &mut dyn Any {
60+
self
61+
}
62+
63+
fn cloned(&self) -> Box<dyn ExtensionOptions> {
64+
Box::new(self.clone())
65+
}
66+
67+
fn set(&mut self, key: &str, value: &str) -> datafusion::common::Result<()> {
68+
datafusion::config::ConfigField::set(self, key, value)
69+
}
70+
71+
fn entries(&self) -> Vec<ConfigEntry> {
72+
vec![
73+
ConfigEntry {
74+
key: "foo_to_bar".to_owned(),
75+
value: Some(format!("{}", self.foo_to_bar)),
76+
description: "foo to bar",
77+
},
78+
ConfigEntry {
79+
key: "baz_count".to_owned(),
80+
value: Some(format!("{}", self.baz_count)),
81+
description: "baz count",
82+
},
83+
]
84+
}
85+
}
86+
87+
impl ConfigField for MyConfig {
88+
fn visit<V: Visit>(&self, v: &mut V, _key: &str, _description: &'static str) {
89+
let key = "foo_to_bar";
90+
let desc = "foo to bar";
91+
self.foo_to_bar.visit(v, key, desc);
92+
93+
let key = "baz_count";
94+
let desc = "baz count";
95+
self.baz_count.visit(v, key, desc);
96+
}
97+
98+
fn set(&mut self, key: &str, value: &str) -> Result<(), DataFusionError> {
99+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
100+
match key {
101+
"foo_to_bar" => self.foo_to_bar.set(rem, value.as_ref()),
102+
"baz_count" => self.baz_count.set(rem, value.as_ref()),
103+
104+
_ => config_err!("Config value \"{}\" not found on MyConfig", key),
105+
}
106+
107+
}
108+
}

python/datafusion/context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ def set(self, key: str, value: str) -> SessionConfig:
296296
self.config_internal = self.config_internal.set(key, value)
297297
return self
298298

299+
def with_extension(self, extension: Any) -> SessionConfig:
300+
self.config_internal = self.config_internal.with_extension(extension)
301+
return self
302+
299303

300304
class RuntimeEnvBuilder:
301305
"""Runtime configuration options."""

0 commit comments

Comments
 (0)