Skip to content

Commit 8c413f5

Browse files
committed
feat: add TableProvider support and related functionality
1 parent 8d7281b commit 8c413f5

6 files changed

Lines changed: 90 additions & 45 deletions

File tree

python/datafusion/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from . import functions, object_store, substrait, unparser
3434

3535
# The following imports are okay to remain as opaque to the user.
36-
from ._internal import Config
36+
from ._internal import Config, TableProvider
3737
from .catalog import Catalog, Database, Table
3838
from .col import col, column
3939
from .common import (
@@ -90,6 +90,7 @@
9090
"SessionContext",
9191
"Table",
9292
"TableFunction",
93+
"TableProvider",
9394
"WindowFrame",
9495
"WindowUDF",
9596
"catalog",

python/datafusion/dataframe.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from datafusion._internal import DataFrame as DataFrameInternal
4141
from datafusion._internal import ParquetColumnOptions as ParquetColumnOptionsInternal
4242
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
43+
from datafusion._internal import TableProvider as TableProviderInternal
4344
from datafusion.expr import Expr, SortExpr, sort_or_default
4445
from datafusion.plan import ExecutionPlan, LogicalPlan
4546
from datafusion.record_batch import RecordBatchStream
@@ -307,8 +308,8 @@ def __init__(self, df: DataFrameInternal) -> None:
307308
"""
308309
self.df = df
309310

310-
def into_view(self) -> pa.Table:
311-
"""Convert DataFrame as a ViewTable which can be used in register_table."""
311+
def into_view(self) -> TableProviderInternal:
312+
"""Convert ``DataFrame`` into a ``TableProvider`` view for registration."""
312313
return self.df.into_view()
313314

314315
def __getitem__(self, key: str | list[str]) -> DataFrame:

src/dataframe.rs

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,19 @@ use datafusion::error::DataFusionError;
3636
use datafusion::execution::SendableRecordBatchStream;
3737
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
3838
use datafusion::prelude::*;
39-
use datafusion_ffi::table_provider::FFI_TableProvider;
4039
use futures::{StreamExt, TryStreamExt};
4140
use pyo3::exceptions::PyValueError;
4241
use pyo3::prelude::*;
4342
use pyo3::pybacked::PyBackedStr;
4443
use pyo3::types::{PyCapsule, PyList, PyTuple, PyTupleMethods};
4544
use tokio::task::JoinHandle;
4645

47-
use crate::catalog::PyTable;
4846
use crate::errors::{py_datafusion_err, to_datafusion_err, PyDataFusionError};
4947
use crate::expr::sort_expr::to_sort_expressions;
5048
use crate::physical_plan::PyExecutionPlan;
5149
use crate::record_batch::PyRecordBatchStream;
5250
use crate::sql::logical::PyLogicalPlan;
51+
use crate::table::PyTableProvider;
5352
use crate::utils::{
5453
get_tokio_runtime, is_ipython_env, py_obj_to_scalar_value, validate_pycapsule, wait_for_future,
5554
};
@@ -58,40 +57,6 @@ use crate::{
5857
expr::{sort_expr::PySortExpr, PyExpr},
5958
};
6059

61-
// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
62-
// - we have not decided on the table_provider approach yet
63-
// this is an interim implementation
64-
#[pyclass(name = "TableProvider", module = "datafusion")]
65-
pub struct PyTableProvider {
66-
provider: Arc<dyn TableProvider + Send>,
67-
}
68-
69-
impl PyTableProvider {
70-
pub fn new(provider: Arc<dyn TableProvider>) -> Self {
71-
Self { provider }
72-
}
73-
74-
pub fn as_table(&self) -> PyTable {
75-
let table_provider: Arc<dyn TableProvider> = self.provider.clone();
76-
PyTable::new(table_provider)
77-
}
78-
}
79-
80-
#[pymethods]
81-
impl PyTableProvider {
82-
fn __datafusion_table_provider__<'py>(
83-
&self,
84-
py: Python<'py>,
85-
) -> PyResult<Bound<'py, PyCapsule>> {
86-
let name = CString::new("datafusion_table_provider").unwrap();
87-
88-
let runtime = get_tokio_runtime().0.handle().clone();
89-
let provider = FFI_TableProvider::new(Arc::clone(&self.provider), false, Some(runtime));
90-
91-
PyCapsule::new(py, provider, Some(name.clone()))
92-
}
93-
}
94-
9560
/// Configuration for DataFrame display formatting
9661
#[derive(Debug, Clone)]
9762
pub struct FormatterConfig {
@@ -303,6 +268,10 @@ impl PyDataFrame {
303268
}
304269
}
305270

271+
pub(crate) fn into_view_provider(&self) -> Arc<dyn TableProvider> {
272+
self.df.as_ref().clone().into_view()
273+
}
274+
306275
fn prepare_repr_string(&mut self, py: Python, as_html: bool) -> PyDataFusionResult<String> {
307276
// Get the Python formatter and config
308277
let PythonFormatter { formatter, config } = get_python_formatter_with_config(py)?;
@@ -436,14 +405,12 @@ impl PyDataFrame {
436405
/// https://github.com/apache/datafusion-python/pull/1016#discussion_r1983239116
437406
/// - we have not decided on the table_provider approach yet
438407
#[allow(clippy::wrong_self_convention)]
439-
fn into_view(&self) -> PyDataFusionResult<PyTable> {
408+
fn into_view(&self) -> PyDataFusionResult<PyTableProvider> {
440409
// Call the underlying Rust DataFrame::into_view method.
441410
// Note that the Rust method consumes self; here we clone the inner Arc<DataFrame>
442411
// so that we don’t invalidate this PyDataFrame.
443-
let table_provider = self.df.as_ref().clone().into_view();
444-
let table_provider = PyTableProvider::new(table_provider);
445-
446-
Ok(table_provider.as_table())
412+
let table_provider = self.into_view_provider();
413+
Ok(PyTableProvider::new(table_provider))
447414
}
448415

449416
#[pyo3(signature = (*args))]

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub mod pyarrow_util;
5252
mod record_batch;
5353
pub mod sql;
5454
pub mod store;
55+
pub mod table;
5556
pub mod unparser;
5657

5758
#[cfg(feature = "substrait")]
@@ -97,6 +98,7 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
9798
m.add_class::<physical_plan::PyExecutionPlan>()?;
9899
m.add_class::<record_batch::PyRecordBatch>()?;
99100
m.add_class::<record_batch::PyRecordBatchStream>()?;
101+
m.add_class::<table::PyTableProvider>()?;
100102

101103
let catalog = PyModule::new(py, "catalog")?;
102104
catalog::init_module(&catalog)?;

src/table.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::ffi::CString;
19+
use std::sync::Arc;
20+
21+
use datafusion::datasource::TableProvider;
22+
use datafusion_ffi::table_provider::{FFI_TableProvider, ForeignTableProvider};
23+
use pyo3::prelude::*;
24+
use pyo3::types::PyCapsule;
25+
26+
use crate::dataframe::PyDataFrame;
27+
use crate::errors::{py_datafusion_err, PyDataFusionResult};
28+
use crate::utils::{get_tokio_runtime, validate_pycapsule};
29+
30+
/// Represents a table provider that can be registered with DataFusion
31+
#[pyclass(name = "TableProvider", module = "datafusion")]
32+
pub struct PyTableProvider {
33+
pub(crate) provider: Arc<dyn TableProvider + Send>,
34+
}
35+
36+
impl PyTableProvider {
37+
pub(crate) fn new(provider: Arc<dyn TableProvider>) -> Self {
38+
Self { provider }
39+
}
40+
}
41+
42+
#[pymethods]
43+
impl PyTableProvider {
44+
/// Create a `TableProvider` from a PyCapsule containing an FFI pointer
45+
#[staticmethod]
46+
pub fn from_capsule(capsule: Bound<'_, PyAny>) -> PyResult<Self> {
47+
let capsule = capsule.downcast::<PyCapsule>().map_err(py_datafusion_err)?;
48+
validate_pycapsule(&capsule, "datafusion_table_provider")?;
49+
50+
let provider = unsafe { capsule.reference::<FFI_TableProvider>() };
51+
let provider: ForeignTableProvider = provider.into();
52+
53+
Ok(Self::new(Arc::new(provider)))
54+
}
55+
56+
/// Create a `TableProvider` from a DataFrame by converting it into a view
57+
#[staticmethod]
58+
pub fn from_view(df: &PyDataFrame) -> PyDataFusionResult<Self> {
59+
let table_provider = df.into_view_provider();
60+
Ok(Self::new(table_provider))
61+
}
62+
63+
fn __datafusion_table_provider__<'py>(
64+
&self,
65+
py: Python<'py>,
66+
) -> PyResult<Bound<'py, PyCapsule>> {
67+
let name = CString::new("datafusion_table_provider").unwrap();
68+
69+
let runtime = get_tokio_runtime().0.handle().clone();
70+
let provider = FFI_TableProvider::new(Arc::clone(&self.provider), false, Some(runtime));
71+
72+
PyCapsule::new(py, provider, Some(name.clone()))
73+
}
74+
}

src/udtf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
use pyo3::prelude::*;
1919
use std::sync::Arc;
2020

21-
use crate::dataframe::PyTableProvider;
2221
use crate::errors::{py_datafusion_err, to_datafusion_err};
2322
use crate::expr::PyExpr;
23+
use crate::table::PyTableProvider;
2424
use crate::utils::validate_pycapsule;
2525
use datafusion::catalog::{TableFunctionImpl, TableProvider};
2626
use datafusion::error::Result as DataFusionResult;

0 commit comments

Comments
 (0)