@@ -26,7 +26,7 @@ use arrow_schema::SchemaRef;
2626use arrow_schema:: ffi:: FFI_ArrowSchema ;
2727use async_ffi:: { FfiFuture , FutureExt } ;
2828use async_trait:: async_trait;
29- use datafusion_common:: config:: { ConfigOptions , TableOptions } ;
29+ use datafusion_common:: config:: { ConfigFileType , ConfigOptions , TableOptions } ;
3030use datafusion_common:: { DFSchema , DataFusionError } ;
3131use datafusion_execution:: TaskContext ;
3232use datafusion_execution:: config:: SessionConfig ;
@@ -240,20 +240,38 @@ unsafe extern "C" fn window_functions_fn_wrapper(
240240 . collect ( )
241241}
242242
243- fn table_options_to_rhash ( options : & TableOptions ) -> RHashMap < RString , RString > {
244- options
243+ fn table_options_to_rhash ( mut options : TableOptions ) -> RHashMap < RString , RString > {
244+ // It is important that we mutate options here and set current format
245+ // to None so that when we call `entries()` we get ALL format entries.
246+ // We will pass current_format as a special case and strip it on the
247+ // other side of the boundary.
248+ let current_format = options. current_format . take ( ) ;
249+ let mut options: HashMap < RString , RString > = options
245250 . entries ( )
246251 . into_iter ( )
247252 . filter_map ( |entry| entry. value . map ( |v| ( entry. key . into ( ) , v. into ( ) ) ) )
248- . collect ( )
253+ . collect ( ) ;
254+ if let Some ( current_format) = current_format {
255+ options. insert (
256+ "datafusion_ffi.table_current_format" . into ( ) ,
257+ match current_format {
258+ ConfigFileType :: JSON => "json" ,
259+ ConfigFileType :: PARQUET => "parquet" ,
260+ ConfigFileType :: CSV => "csv" ,
261+ }
262+ . into ( ) ,
263+ ) ;
264+ }
265+
266+ options. into ( )
249267}
250268
251269unsafe extern "C" fn table_options_fn_wrapper (
252270 session : & FFI_SessionRef ,
253271) -> RHashMap < RString , RString > {
254272 let session = session. inner ( ) ;
255273 let table_options = session. table_options ( ) ;
256- table_options_to_rhash ( table_options)
274+ table_options_to_rhash ( table_options. clone ( ) )
257275}
258276
259277unsafe extern "C" fn default_table_options_fn_wrapper (
@@ -262,7 +280,7 @@ unsafe extern "C" fn default_table_options_fn_wrapper(
262280 let session = session. inner ( ) ;
263281 let table_options = session. default_table_options ( ) ;
264282
265- table_options_to_rhash ( & table_options)
283+ table_options_to_rhash ( table_options)
266284}
267285
268286unsafe extern "C" fn task_ctx_fn_wrapper ( session : & FFI_SessionRef ) -> FFI_TaskContext {
@@ -438,15 +456,70 @@ impl Clone for FFI_SessionRef {
438456}
439457
440458fn table_options_from_rhashmap ( options : RHashMap < RString , RString > ) -> TableOptions {
441- let options = options
459+ let mut options: HashMap < String , String > = options
442460 . into_iter ( )
443461 . map ( |kv_pair| ( kv_pair. 0 . into_string ( ) , kv_pair. 1 . into_string ( ) ) )
444462 . collect ( ) ;
463+ let current_format = options. remove ( "datafusion_ffi.table_current_format" ) ;
464+
465+ let mut table_options = TableOptions :: default ( ) ;
466+ let formats = [
467+ ConfigFileType :: CSV ,
468+ ConfigFileType :: JSON ,
469+ ConfigFileType :: PARQUET ,
470+ ] ;
471+ for format in formats {
472+ // It is imperative that if new enum variants are added below that they be
473+ // included in the formats list above and in the extension check below.
474+ let format_name = match & format {
475+ ConfigFileType :: CSV => "csv" ,
476+ ConfigFileType :: PARQUET => "parquet" ,
477+ ConfigFileType :: JSON => "json" ,
478+ } ;
479+ let format_options: HashMap < String , String > = options
480+ . iter ( )
481+ . filter_map ( |( k, v) | {
482+ let ( prefix, key) = k. split_once ( "." ) ?;
483+ if prefix == format_name {
484+ Some ( ( format ! ( "format.{key}" ) , v. to_owned ( ) ) )
485+ } else {
486+ None
487+ }
488+ } )
489+ . collect ( ) ;
490+ if !format_options. is_empty ( ) {
491+ table_options. current_format = Some ( format. clone ( ) ) ;
492+ table_options
493+ . alter_with_string_hash_map ( & format_options)
494+ . unwrap_or_else ( |err| log:: warn!( "Error parsing table options: {err}" ) ) ;
495+ }
496+ }
497+
498+ let extension_options: HashMap < String , String > = options
499+ . iter ( )
500+ . filter_map ( |( k, v) | {
501+ let ( prefix, _) = k. split_once ( "." ) ?;
502+ if ![ "json" , "parquet" , "csv" ] . contains ( & prefix) {
503+ Some ( ( k. to_owned ( ) , v. to_owned ( ) ) )
504+ } else {
505+ None
506+ }
507+ } )
508+ . collect ( ) ;
509+ if !extension_options. is_empty ( ) {
510+ table_options
511+ . alter_with_string_hash_map ( & extension_options)
512+ . unwrap_or_else ( |err| log:: warn!( "Error parsing table options: {err}" ) ) ;
513+ }
445514
446- TableOptions :: from_string_hash_map ( & options) . unwrap_or_else ( |err| {
447- log:: warn!( "Error parsing default table options: {err}" ) ;
448- TableOptions :: default ( )
449- } )
515+ table_options. current_format =
516+ current_format. and_then ( |format| match format. as_str ( ) {
517+ "csv" => Some ( ConfigFileType :: CSV ) ,
518+ "parquet" => Some ( ConfigFileType :: PARQUET ) ,
519+ "json" => Some ( ConfigFileType :: JSON ) ,
520+ _ => None ,
521+ } ) ;
522+ table_options
450523}
451524
452525#[ async_trait]
@@ -556,6 +629,7 @@ mod tests {
556629 use std:: sync:: Arc ;
557630
558631 use arrow_schema:: { DataType , Field , Schema } ;
632+ use datafusion:: execution:: SessionStateBuilder ;
559633 use datafusion_common:: DataFusionError ;
560634 use datafusion_expr:: col;
561635 use datafusion_expr:: registry:: FunctionRegistry ;
@@ -566,7 +640,16 @@ mod tests {
566640 #[ tokio:: test]
567641 async fn test_ffi_session ( ) -> Result < ( ) , DataFusionError > {
568642 let ( ctx, task_ctx_provider) = crate :: util:: tests:: test_session_and_ctx ( ) ;
569- let state = ctx. state ( ) ;
643+ let mut table_options = TableOptions :: default ( ) ;
644+ table_options. csv . has_header = Some ( true ) ;
645+ table_options. json . schema_infer_max_rec = Some ( 10 ) ;
646+ table_options. parquet . global . coerce_int96 = Some ( "123456789" . into ( ) ) ;
647+ table_options. current_format = Some ( ConfigFileType :: JSON ) ;
648+
649+ let state = SessionStateBuilder :: new_from_existing ( ctx. state ( ) )
650+ . with_table_options ( table_options)
651+ . build ( ) ;
652+
570653 let logical_codec = FFI_LogicalExtensionCodec :: new (
571654 Arc :: new ( DefaultLogicalExtensionCodec { } ) ,
572655 None ,
0 commit comments