1+ // Licensed to the Apache Software Foundation (ASF) under one
2+ // or more contributor license agreements. See the NOTICE file
3+ // distributed with this work for additional information
4+ // regarding copyright ownership. The ASF licenses this file
5+ // to you under the Apache License, Version 2.0 (the
6+ // "License"); you may not use this file except in compliance
7+ // with the License. You may obtain a copy of the License at
8+ //
9+ // http://www.apache.org/licenses/LICENSE-2.0
10+ //
11+ // Unless required by applicable law or agreed to in writing,
12+ // software distributed under the License is distributed on an
13+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+ // KIND, either express or implied. See the License for the
15+ // specific language governing permissions and limitations
16+ // under the License.
17+
18+ use std:: { any:: Any , fmt:: Debug , sync:: Arc } ;
19+ use pyo3:: { pyclass, pymethods, Bound , PyResult , Python } ;
20+
21+ use arrow:: datatypes:: Schema ;
22+ use async_trait:: async_trait;
23+ use datafusion:: {
24+ catalog:: {
25+ CatalogProvider , MemoryCatalogProvider , MemorySchemaProvider , SchemaProvider ,
26+ TableProvider ,
27+ } ,
28+ common:: exec_err,
29+ datasource:: MemTable ,
30+ error:: { DataFusionError , Result } ,
31+ } ;
32+ use datafusion_ffi:: catalog_provider:: FFI_CatalogProvider ;
33+ use pyo3:: types:: PyCapsule ;
34+
35+ pub fn my_table ( ) -> Arc < dyn TableProvider + ' static > {
36+ use arrow:: datatypes:: { DataType , Field } ;
37+ use datafusion:: common:: record_batch;
38+
39+ let schema = Arc :: new ( Schema :: new ( vec ! [
40+ Field :: new( "units" , DataType :: Int32 , true ) ,
41+ Field :: new( "price" , DataType :: Float64 , true ) ,
42+ ] ) ) ;
43+
44+ let partitions = vec ! [
45+ record_batch!(
46+ ( "units" , Int32 , vec![ 10 , 20 , 30 ] ) ,
47+ ( "price" , Float64 , vec![ 1.0 , 2.0 , 5.0 ] )
48+ )
49+ . unwrap( ) ,
50+ record_batch!(
51+ ( "units" , Int32 , vec![ 5 , 7 ] ) ,
52+ ( "price" , Float64 , vec![ 1.5 , 2.5 ] )
53+ )
54+ . unwrap( ) ,
55+ ] ;
56+
57+ Arc :: new ( MemTable :: try_new ( schema, vec ! [ partitions] ) . unwrap ( ) )
58+ }
59+
60+ #[ derive( Debug ) ]
61+ pub struct FixedSchemaProvider {
62+ inner : MemorySchemaProvider ,
63+ }
64+
65+ impl Default for FixedSchemaProvider {
66+ fn default ( ) -> Self {
67+ let inner = MemorySchemaProvider :: new ( ) ;
68+
69+ let table = my_table ( ) ;
70+
71+ let _ = inner
72+ . register_table ( "my_table" . to_string ( ) , table)
73+ . unwrap ( ) ;
74+
75+ Self { inner }
76+ }
77+ }
78+
79+ #[ async_trait]
80+ impl SchemaProvider for FixedSchemaProvider {
81+ fn as_any ( & self ) -> & dyn Any {
82+ self
83+ }
84+
85+ fn table_names ( & self ) -> Vec < String > {
86+ self . inner . table_names ( )
87+ }
88+
89+ async fn table (
90+ & self ,
91+ name : & str ,
92+ ) -> Result < Option < Arc < dyn TableProvider > > , DataFusionError > {
93+ self . inner . table ( name) . await
94+ }
95+
96+ fn register_table (
97+ & self ,
98+ name : String ,
99+ table : Arc < dyn TableProvider > ,
100+ ) -> Result < Option < Arc < dyn TableProvider > > > {
101+ self . inner . register_table ( name, table)
102+ }
103+
104+ fn deregister_table ( & self , name : & str ) -> Result < Option < Arc < dyn TableProvider > > > {
105+ self . inner . deregister_table ( name)
106+ }
107+
108+ fn table_exist ( & self , name : & str ) -> bool {
109+ self . inner . table_exist ( name)
110+ }
111+ }
112+
113+
114+ /// This catalog provider is intended only for unit tests. It prepopulates with one
115+ /// schema and only allows for schemas named after four types of fruit.
116+ #[ pyclass( name = "MyCatalogProvider" , module = "datafusion_ffi_example" , subclass) ]
117+ #[ derive( Debug ) ]
118+ pub ( crate ) struct MyCatalogProvider {
119+ inner : MemoryCatalogProvider ,
120+ }
121+
122+ impl Default for MyCatalogProvider {
123+ fn default ( ) -> Self {
124+ let inner = MemoryCatalogProvider :: new ( ) ;
125+
126+ let schema_name: & str = "my_schema" ;
127+ let _ = inner. register_schema ( schema_name, Arc :: new ( FixedSchemaProvider :: default ( ) ) ) ;
128+
129+ Self { inner }
130+ }
131+ }
132+
133+ impl CatalogProvider for MyCatalogProvider {
134+ fn as_any ( & self ) -> & dyn Any {
135+ self
136+ }
137+
138+ fn schema_names ( & self ) -> Vec < String > {
139+ self . inner . schema_names ( )
140+ }
141+
142+ fn schema ( & self , name : & str ) -> Option < Arc < dyn SchemaProvider > > {
143+ self . inner . schema ( name)
144+ }
145+
146+ fn register_schema (
147+ & self ,
148+ name : & str ,
149+ schema : Arc < dyn SchemaProvider > ,
150+ ) -> Result < Option < Arc < dyn SchemaProvider > > > {
151+ self . inner . register_schema ( name, schema)
152+ }
153+
154+ fn deregister_schema (
155+ & self ,
156+ name : & str ,
157+ cascade : bool ,
158+ ) -> Result < Option < Arc < dyn SchemaProvider > > > {
159+ self . inner . deregister_schema ( name, cascade)
160+ }
161+ }
162+
163+ #[ pymethods]
164+ impl MyCatalogProvider {
165+ #[ new]
166+ pub fn new ( ) -> Self {
167+ Self {
168+ inner : Default :: default ( ) ,
169+ }
170+ }
171+
172+ pub fn __datafusion_catalog_provider__ < ' py > (
173+ & self ,
174+ py : Python < ' py > ,
175+ ) -> PyResult < Bound < ' py , PyCapsule > > {
176+ let name = cr"datafusion_catalog_provider" . into ( ) ;
177+ let catalog_provider = FFI_CatalogProvider :: new ( Arc :: new ( MyCatalogProvider :: default ( ) ) , None ) ;
178+
179+ PyCapsule :: new ( py, catalog_provider, Some ( name) )
180+ }
181+ }
0 commit comments