Skip to content

Commit 75d07ce

Browse files
timsaucerCopilot
andauthored
Implement configuration extension support (#1391)
* Implement config options * Update examples and tests * pyo3 update * Add docstring * rat * Update examples/datafusion-ffi-example/python/tests/_test_config.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/core/src/context.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update crates/core/src/context.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 207fc16 commit 75d07ce

File tree

6 files changed

+213
-1
lines changed

6 files changed

+213
-1
lines changed

crates/core/src/context.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use datafusion::prelude::{
4949
};
5050
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
5151
use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
52+
use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
5253
use datafusion_ffi::execution::FFI_TaskContextProvider;
5354
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
5455
use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
@@ -184,6 +185,33 @@ impl PySessionConfig {
184185
fn set(&self, key: &str, value: &str) -> Self {
185186
Self::from(self.config.clone().set_str(key, value))
186187
}
188+
189+
pub fn with_extension(&self, extension: Bound<PyAny>) -> PyResult<Self> {
190+
if !extension.hasattr("__datafusion_extension_options__")? {
191+
return Err(pyo3::exceptions::PyAttributeError::new_err(
192+
"Expected extension object to define __datafusion_extension_options__()",
193+
));
194+
}
195+
let capsule = extension.call_method0("__datafusion_extension_options__")?;
196+
let capsule = capsule.cast::<PyCapsule>()?;
197+
198+
let extension: NonNull<FFI_ExtensionOptions> = capsule
199+
.pointer_checked(Some(c_str!("datafusion_extension_options")))?
200+
.cast();
201+
let mut extension = unsafe { extension.as_ref() }.clone();
202+
203+
let mut config = self.config.clone();
204+
let options = config.options_mut();
205+
if let Some(prior_extension) = options.extensions.get::<FFI_ExtensionOptions>() {
206+
extension
207+
.merge(prior_extension)
208+
.map_err(py_datafusion_err)?;
209+
}
210+
211+
options.extensions.insert(extension);
212+
213+
Ok(Self::from(config))
214+
}
187215
}
188216

189217
/// Runtime options for a SessionContext

crates/core/src/dataset_exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl DatasetExec {
111111

112112
let scanner = dataset.call_method("scanner", (), Some(&kwargs))?;
113113

114-
let schema = Arc::new(
114+
let schema: SchemaRef = Arc::new(
115115
scanner
116116
.getattr("projected_schema")?
117117
.extract::<PyArrowType<_>>()?
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from datafusion import SessionConfig, SessionContext
19+
from datafusion_ffi_example import MyConfig
20+
21+
22+
def test_config_extension_show_set():
23+
config = MyConfig()
24+
config = SessionConfig(
25+
{"datafusion.catalog.information_schema": "true"}
26+
).with_extension(config)
27+
config.set("my_config.baz_count", "42")
28+
ctx = SessionContext(config)
29+
30+
result = ctx.sql("SHOW my_config.baz_count;").collect()
31+
assert result[0][1][0].as_py() == "42"
32+
33+
ctx.sql("SET my_config.baz_count=1;")
34+
result = ctx.sql("SHOW my_config.baz_count;").collect()
35+
assert result[0][1][0].as_py() == "1"
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
20+
use datafusion_common::config::{
21+
ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, Visit,
22+
};
23+
use datafusion_common::{DataFusionError, config_err};
24+
use datafusion_ffi::config::extension_options::FFI_ExtensionOptions;
25+
use pyo3::exceptions::PyRuntimeError;
26+
use pyo3::types::PyCapsule;
27+
use pyo3::{Bound, PyResult, Python, pyclass, pymethods};
28+
29+
/// My own config options.
30+
#[pyclass(
31+
from_py_object,
32+
name = "MyConfig",
33+
module = "datafusion_ffi_example",
34+
subclass
35+
)]
36+
#[derive(Clone, Debug)]
37+
pub struct MyConfig {
38+
/// Should "foo" be replaced by "bar"?
39+
pub foo_to_bar: bool,
40+
41+
/// How many "baz" should be created?
42+
pub baz_count: usize,
43+
}
44+
45+
#[pymethods]
46+
impl MyConfig {
47+
#[new]
48+
fn new() -> Self {
49+
Self::default()
50+
}
51+
52+
fn __datafusion_extension_options__<'py>(
53+
&self,
54+
py: Python<'py>,
55+
) -> PyResult<Bound<'py, PyCapsule>> {
56+
let name = cr"datafusion_extension_options".into();
57+
58+
let mut config = FFI_ExtensionOptions::default();
59+
config
60+
.add_config(self)
61+
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
62+
63+
PyCapsule::new(py, config, Some(name))
64+
}
65+
}
66+
67+
impl Default for MyConfig {
68+
fn default() -> Self {
69+
Self {
70+
foo_to_bar: true,
71+
baz_count: 1337,
72+
}
73+
}
74+
}
75+
76+
impl ConfigExtension for MyConfig {
77+
const PREFIX: &'static str = "my_config";
78+
}
79+
80+
impl ExtensionOptions for MyConfig {
81+
fn as_any(&self) -> &dyn Any {
82+
self
83+
}
84+
85+
fn as_any_mut(&mut self) -> &mut dyn Any {
86+
self
87+
}
88+
89+
fn cloned(&self) -> Box<dyn ExtensionOptions> {
90+
Box::new(self.clone())
91+
}
92+
93+
fn set(&mut self, key: &str, value: &str) -> datafusion_common::Result<()> {
94+
datafusion_common::config::ConfigField::set(self, key, value)
95+
}
96+
97+
fn entries(&self) -> Vec<ConfigEntry> {
98+
vec![
99+
ConfigEntry {
100+
key: "foo_to_bar".to_owned(),
101+
value: Some(format!("{}", self.foo_to_bar)),
102+
description: "foo to bar",
103+
},
104+
ConfigEntry {
105+
key: "baz_count".to_owned(),
106+
value: Some(format!("{}", self.baz_count)),
107+
description: "baz count",
108+
},
109+
]
110+
}
111+
}
112+
113+
impl ConfigField for MyConfig {
114+
fn visit<V: Visit>(&self, v: &mut V, _key: &str, _description: &'static str) {
115+
let key = "foo_to_bar";
116+
let desc = "foo to bar";
117+
self.foo_to_bar.visit(v, key, desc);
118+
119+
let key = "baz_count";
120+
let desc = "baz count";
121+
self.baz_count.visit(v, key, desc);
122+
}
123+
124+
fn set(&mut self, key: &str, value: &str) -> Result<(), DataFusionError> {
125+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
126+
match key {
127+
"foo_to_bar" => self.foo_to_bar.set(rem, value.as_ref()),
128+
"baz_count" => self.baz_count.set(rem, value.as_ref()),
129+
130+
_ => config_err!("Config value \"{}\" not found on MyConfig", key),
131+
}
132+
}
133+
}

examples/datafusion-ffi-example/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use pyo3::prelude::*;
1919

2020
use crate::aggregate_udf::MySumUDF;
2121
use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogProviderList};
22+
use crate::config::MyConfig;
2223
use crate::scalar_udf::IsNullUDF;
2324
use crate::table_function::MyTableFunction;
2425
use crate::table_provider::MyTableProvider;
@@ -27,6 +28,7 @@ use crate::window_udf::MyRankUDF;
2728

2829
pub(crate) mod aggregate_udf;
2930
pub(crate) mod catalog_provider;
31+
pub(crate) mod config;
3032
pub(crate) mod scalar_udf;
3133
pub(crate) mod table_function;
3234
pub(crate) mod table_provider;
@@ -46,5 +48,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> {
4648
m.add_class::<IsNullUDF>()?;
4749
m.add_class::<MySumUDF>()?;
4850
m.add_class::<MyRankUDF>()?;
51+
m.add_class::<MyConfig>()?;
4952
Ok(())
5053
}

python/datafusion/context.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,19 @@ def set(self, key: str, value: str) -> SessionConfig:
296296
self.config_internal = self.config_internal.set(key, value)
297297
return self
298298

299+
def with_extension(self, extension: Any) -> SessionConfig:
300+
"""Create a new configuration using an extension.
301+
302+
Args:
303+
extension: A custom configuration extension object. These are
304+
shared from another DataFusion extension library.
305+
306+
Returns:
307+
A new :py:class:`SessionConfig` object with the updated setting.
308+
"""
309+
self.config_internal = self.config_internal.with_extension(extension)
310+
return self
311+
299312

300313
class RuntimeEnvBuilder:
301314
"""Runtime configuration options."""

0 commit comments

Comments
 (0)