Skip to content

Commit 0345b44

Browse files
committed
If we have foreign objects, get the underlying ffi object instead of wrapping
1 parent dd86bbd commit 0345b44

21 files changed

Lines changed: 69 additions & 50 deletions

datafusion-examples/examples/custom_data_source/adapter_serialization.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,6 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
308308
"try_encode not used - adapter wrapping happens in serialize_physical_plan"
309309
)
310310
}
311-
312-
fn as_any(&self) -> &dyn std::any::Any {
313-
self
314-
}
315311
}
316312

317313
impl PhysicalProtoConverterExtension for AdapterPreservingCodec {

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,6 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
158158
internal_err!("Not supported")
159159
}
160160
}
161-
162-
fn as_any(&self) -> &dyn Any {
163-
self
164-
}
165161
}
166162

167163
#[derive(Debug)]
@@ -236,8 +232,4 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
236232
internal_err!("Not supported")
237233
}
238234
}
239-
240-
fn as_any(&self) -> &dyn Any {
241-
self
242-
}
243235
}

datafusion-examples/examples/proto/expression_deduplication.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,6 @@ impl PhysicalExtensionCodec for CachingCodec {
199199
) -> Result<()> {
200200
datafusion::common::not_impl_err!("No custom extension nodes")
201201
}
202-
203-
fn as_any(&self) -> &dyn std::any::Any {
204-
self
205-
}
206202
}
207203

208204
impl PhysicalProtoConverterExtension for CachingCodec {

datafusion/catalog/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ pub trait TableProviderFactory: Debug + Sync + Send {
486486
}
487487

488488
/// A trait for table function implementations
489-
pub trait TableFunctionImpl: Debug + Sync + Send {
489+
pub trait TableFunctionImpl: Debug + Sync + Send + Any {
490490
/// Create a table provider
491491
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
492492
}

datafusion/ffi/src/catalog_provider.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,11 @@ impl FFI_CatalogProvider {
250250
runtime: Option<Handle>,
251251
logical_codec: FFI_LogicalExtensionCodec,
252252
) -> Self {
253+
if let Some(provider) = provider.as_any().downcast_ref::<ForeignCatalogProvider>()
254+
{
255+
return provider.0.clone();
256+
}
257+
253258
let private_data = Box::new(ProviderPrivateData { provider, runtime });
254259

255260
Self {

datafusion/ffi/src/catalog_provider_list.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,13 @@ impl FFI_CatalogProviderList {
212212
runtime: Option<Handle>,
213213
logical_codec: FFI_LogicalExtensionCodec,
214214
) -> Self {
215+
if let Some(provider) = provider
216+
.as_any()
217+
.downcast_ref::<ForeignCatalogProviderList>()
218+
{
219+
return provider.0.clone();
220+
}
221+
215222
let private_data = Box::new(ProviderPrivateData { provider, runtime });
216223

217224
Self {

datafusion/ffi/src/proto/logical_extension_codec.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::any::Any;
1819
use std::ffi::c_void;
1920
use std::sync::Arc;
2021

@@ -296,6 +297,12 @@ impl FFI_LogicalExtensionCodec {
296297
runtime: Option<Handle>,
297298
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
298299
) -> Self {
300+
if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
301+
.downcast_ref::<ForeignLogicalExtensionCodec>()
302+
{
303+
return codec.0.clone();
304+
}
305+
299306
let task_ctx_provider = task_ctx_provider.into();
300307
let private_data = Box::new(LogicalExtensionCodecPrivateData { codec, runtime });
301308

datafusion/ffi/src/proto/physical_extension_codec.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::ffi::c_void;
19-
use std::sync::Arc;
20-
2118
use abi_stable::StableAbi;
2219
use abi_stable::std_types::{RResult, RSlice, RStr, RVec};
2320
use datafusion_common::error::Result;
@@ -27,6 +24,9 @@ use datafusion_expr::{
2724
};
2825
use datafusion_physical_plan::ExecutionPlan;
2926
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
27+
use std::any::Any;
28+
use std::ffi::c_void;
29+
use std::sync::Arc;
3030
use tokio::runtime::Handle;
3131

3232
use crate::execution::FFI_TaskContextProvider;
@@ -271,8 +271,7 @@ impl FFI_PhysicalExtensionCodec {
271271
runtime: Option<Handle>,
272272
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
273273
) -> Self {
274-
if let Some(codec) = codec
275-
.as_any()
274+
if let Some(codec) = (Arc::clone(&codec) as Arc<dyn Any>)
276275
.downcast_ref::<ForeignPhysicalExtensionCodec>()
277276
{
278277
return codec.0.clone();
@@ -409,10 +408,6 @@ impl PhysicalExtensionCodec for ForeignPhysicalExtensionCodec {
409408

410409
Ok(())
411410
}
412-
413-
fn as_any(&self) -> &dyn std::any::Any {
414-
self
415-
}
416411
}
417412

418413
#[cfg(test)]
@@ -568,10 +563,6 @@ pub(crate) mod tests {
568563

569564
Ok(())
570565
}
571-
572-
fn as_any(&self) -> &dyn std::any::Any {
573-
self
574-
}
575566
}
576567

577568
fn create_test_exec() -> Arc<dyn ExecutionPlan> {

datafusion/ffi/src/schema_provider.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,11 @@ impl FFI_SchemaProvider {
259259
runtime: Option<Handle>,
260260
logical_codec: FFI_LogicalExtensionCodec,
261261
) -> Self {
262+
if let Some(provider) = provider.as_any().downcast_ref::<ForeignSchemaProvider>()
263+
{
264+
return provider.0.clone();
265+
}
266+
262267
let owner_name = provider.owner_name().map(|s| s.into()).into();
263268
let private_data = Box::new(ProviderPrivateData { provider, runtime });
264269

datafusion/ffi/src/session/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,10 @@ impl FFI_SessionRef {
321321
runtime: Option<Handle>,
322322
logical_codec: FFI_LogicalExtensionCodec,
323323
) -> Self {
324+
if let Some(session) = session.as_any().downcast_ref::<ForeignSession>() {
325+
return session.session.clone();
326+
}
327+
324328
let private_data = Box::new(SessionPrivateData { session, runtime });
325329

326330
Self {

0 commit comments

Comments
 (0)