@@ -10,7 +10,7 @@ use datafusion::error::Result;
1010use datafusion:: execution:: { SendableRecordBatchStream , TaskContext } ;
1111use datafusion:: physical_plan:: stream:: RecordBatchStreamAdapter ;
1212use datafusion:: physical_plan:: streaming:: PartitionStream ;
13- use postgres_types:: Oid ;
13+ use postgres_types:: { Oid , Type } ;
1414use tokio:: sync:: RwLock ;
1515
1616use crate :: pg_catalog:: catalog_info:: CatalogInfo ;
@@ -141,7 +141,7 @@ impl<C: CatalogInfo> PgAttributeTable<C> {
141141
142142 attrelids. push ( table_oid as i32 ) ;
143143 attnames. push ( field. name ( ) . clone ( ) ) ;
144- atttypids. push ( pg_type_oid) ;
144+ atttypids. push ( pg_type_oid as i32 ) ;
145145 attstattargets. push ( -1 ) ; // Default statistics target
146146 attlens. push ( type_len) ;
147147 attnums. push ( attnum) ;
@@ -211,31 +211,22 @@ impl<C: CatalogInfo> PgAttributeTable<C> {
211211 }
212212
213213 /// Map DataFusion data types to PostgreSQL type information
214- fn datafusion_to_pg_type ( data_type : & DataType ) -> ( i32 , i16 , bool , & ' static str , & ' static str ) {
215- match data_type {
216- DataType :: Boolean => ( 16 , 1 , true , "c" , "p" ) , // bool
217- DataType :: Int8 => ( 18 , 1 , true , "c" , "p" ) , // char
218- DataType :: Int16 => ( 21 , 2 , true , "s" , "p" ) , // int2
219- DataType :: Int32 => ( 23 , 4 , true , "i" , "p" ) , // int4
220- DataType :: Int64 => ( 20 , 8 , true , "d" , "p" ) , // int8
221- DataType :: UInt8 => ( 18 , 2 , true , "s" , "p" ) , // char
222- DataType :: UInt16 => ( 21 , 4 , true , "i" , "p" ) , // int2
223- DataType :: UInt32 => ( 23 , 8 , true , "d" , "p" ) , // int4
224- DataType :: UInt64 => ( 20 , -1 , false , "i" , "m" ) , // int8
225- DataType :: Float32 => ( 700 , 4 , true , "i" , "p" ) , // float4
226- DataType :: Float64 => ( 701 , 8 , true , "d" , "p" ) , // float8
227- DataType :: Utf8 => ( 25 , -1 , false , "i" , "x" ) , // text
228- DataType :: LargeUtf8 => ( 25 , -1 , false , "i" , "x" ) , // text
229- DataType :: Binary => ( 17 , -1 , false , "i" , "x" ) , // bytea
230- DataType :: LargeBinary => ( 17 , -1 , false , "i" , "x" ) , // bytea
231- DataType :: Date32 => ( 1082 , 4 , true , "i" , "p" ) , // date
232- DataType :: Date64 => ( 1082 , 4 , true , "i" , "p" ) , // date
233- DataType :: Time32 ( _) => ( 1083 , 8 , true , "d" , "p" ) , // time
234- DataType :: Time64 ( _) => ( 1083 , 8 , true , "d" , "p" ) , // time
235- DataType :: Timestamp ( _, _) => ( 1114 , 8 , true , "d" , "p" ) , // timestamp
236- DataType :: Decimal128 ( _, _) => ( 1700 , -1 , false , "i" , "m" ) , // numeric
237- DataType :: Decimal256 ( _, _) => ( 1700 , -1 , false , "i" , "m" ) , // numeric
238- _ => ( 25 , -1 , false , "i" , "x" ) , // Default to text for unknown types
214+ fn datafusion_to_pg_type ( data_type : & DataType ) -> ( u32 , i16 , bool , & ' static str , & ' static str ) {
215+ match arrow_pg:: datatypes:: into_pg_type ( data_type) {
216+ Ok ( t @ Type :: BOOL ) => ( t. oid ( ) , 1 , true , "c" , "p" ) ,
217+ Ok ( t @ Type :: CHAR ) => ( t. oid ( ) , 1 , true , "c" , "p" ) ,
218+ Ok ( t @ Type :: INT2 ) => ( t. oid ( ) , 2 , true , "s" , "p" ) ,
219+ Ok ( t @ Type :: INT4 ) => ( t. oid ( ) , 4 , true , "i" , "p" ) ,
220+ Ok ( t @ Type :: INT8 ) => ( t. oid ( ) , 8 , true , "d" , "p" ) ,
221+ Ok ( t @ Type :: FLOAT4 ) => ( t. oid ( ) , 4 , true , "i" , "p" ) ,
222+ Ok ( t @ Type :: FLOAT8 ) => ( t. oid ( ) , 8 , true , "d" , "p" ) ,
223+ Ok ( t @ Type :: TEXT ) => ( t. oid ( ) , -1 , false , "i" , "x" ) ,
224+ Ok ( t @ Type :: BYTEA ) => ( t. oid ( ) , -1 , false , "i" , "x" ) ,
225+ Ok ( t @ Type :: DATE ) => ( t. oid ( ) , 4 , true , "i" , "p" ) ,
226+ Ok ( t @ Type :: TIME ) => ( t. oid ( ) , 8 , true , "d" , "p" ) ,
227+ Ok ( t @ Type :: TIMESTAMP ) => ( t. oid ( ) , 8 , true , "d" , "p" ) ,
228+ Ok ( t @ Type :: NUMERIC ) => ( t. oid ( ) , -1 , false , "i" , "m" ) ,
229+ _ => ( Type :: TEXT . oid ( ) , -1 , false , "i" , "x" ) , // Default to text for unknown types
239230 }
240231 }
241232}
0 commit comments