@@ -6854,3 +6854,255 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
68546854
68556855 Ok ( ( ) )
68566856}
6857+
6858+ struct FixtureDataGen {
6859+ _tmp_dir : TempDir ,
6860+ out_dir : String ,
6861+ ctx : SessionContext ,
6862+ }
6863+
6864+ impl FixtureDataGen {
6865+ fn register_local_table (
6866+ out_dir : impl AsRef < Path > ,
6867+ ctx : & SessionContext ,
6868+ ) -> Result < ( ) > {
6869+ // Create an in memory table with schema C1 and C2, both strings
6870+ let schema = Arc :: new ( Schema :: new ( vec ! [
6871+ Field :: new( "c1" , DataType :: Utf8 , false ) ,
6872+ Field :: new( "c2" , DataType :: Utf8 , false ) ,
6873+ ] ) ) ;
6874+
6875+ let record_batch = RecordBatch :: try_new (
6876+ schema. clone ( ) ,
6877+ vec ! [
6878+ Arc :: new( StringArray :: from( vec![ "abc" , "def" ] ) ) ,
6879+ Arc :: new( StringArray :: from( vec![ "123" , "456" ] ) ) ,
6880+ ] ,
6881+ ) ?;
6882+
6883+ let mem_table = Arc :: new ( MemTable :: try_new ( schema, vec ! [ vec![ record_batch] ] ) ?) ;
6884+
6885+ // Register the table in the context
6886+ ctx. register_table ( "test" , mem_table) ?;
6887+
6888+ let local = Arc :: new ( LocalFileSystem :: new_with_prefix ( & out_dir) ?) ;
6889+ let local_url = Url :: parse ( "file://local" ) . unwrap ( ) ;
6890+ ctx. register_object_store ( & local_url, local) ;
6891+
6892+ Ok ( ( ) )
6893+ }
6894+
6895+ // initializes basic data and writes it using via executing physical plan
6896+ //
6897+ // Available columns: c1, c2
6898+ async fn prepare_execution_plan_writes ( config : SessionConfig ) -> Result < Self > {
6899+ let tmp_dir = TempDir :: new ( ) ?;
6900+
6901+ let ctx = SessionContext :: new_with_config ( config) ;
6902+
6903+ Self :: register_local_table ( & tmp_dir, & ctx) ?;
6904+
6905+ let out_dir = tmp_dir. as_ref ( ) . to_str ( ) . unwrap ( ) . to_string ( ) + "/out/" ;
6906+ let out_dir_url = format ! ( "file://{out_dir}" ) ;
6907+
6908+ let df = ctx. sql ( "SELECT c1, c2 FROM test" ) . await ?;
6909+ let plan = df. create_physical_plan ( ) . await ?;
6910+
6911+ ctx. write_parquet ( plan. clone ( ) , & out_dir_url, None ) . await ?;
6912+ ctx. write_csv ( plan. clone ( ) , & out_dir_url) . await ?;
6913+ ctx. write_json ( plan. clone ( ) , & out_dir_url) . await ?;
6914+
6915+ Ok ( Self {
6916+ _tmp_dir : tmp_dir,
6917+ out_dir,
6918+ ctx,
6919+ } )
6920+ }
6921+
6922+ // initializes basic data and writes it using `write_opts`
6923+ //
6924+ // Available columns: c1, c2
6925+ async fn prepare_direct_df_writes (
6926+ config : SessionConfig ,
6927+ write_opts : DataFrameWriteOptions ,
6928+ ) -> Result < Self > {
6929+ let tmp_dir = TempDir :: new ( ) ?;
6930+
6931+ let ctx = SessionContext :: new_with_config ( config) ;
6932+
6933+ Self :: register_local_table ( & tmp_dir, & ctx) ?;
6934+
6935+ let out_dir = tmp_dir. as_ref ( ) . to_str ( ) . unwrap ( ) . to_string ( ) + "/out/" ;
6936+ let out_dir_url = format ! ( "file://{out_dir}" ) ;
6937+
6938+ let df = ctx. sql ( "SELECT c1, c2 FROM test" ) . await ?;
6939+
6940+ df. clone ( )
6941+ . write_parquet ( & out_dir_url, write_opts. clone ( ) , None )
6942+ . await ?;
6943+ df. clone ( )
6944+ . write_csv ( & out_dir_url, write_opts. clone ( ) , None )
6945+ . await ?;
6946+ df. write_json ( & out_dir_url, write_opts. clone ( ) , None )
6947+ . await ?;
6948+
6949+ Ok ( Self {
6950+ _tmp_dir : tmp_dir,
6951+ out_dir,
6952+ ctx,
6953+ } )
6954+ }
6955+ }
6956+
6957+ #[ tokio:: test]
6958+ async fn write_partitioned_results_with_prefix ( ) -> Result < ( ) > {
6959+ let mut config = SessionConfig :: new ( ) ;
6960+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
6961+
6962+ let df_write_options =
6963+ DataFrameWriteOptions :: new ( ) . with_partition_by ( vec ! [ String :: from( "c2" ) ] ) ;
6964+ let FixtureDataGen {
6965+ _tmp_dir,
6966+ out_dir,
6967+ ctx,
6968+ } = FixtureDataGen :: prepare_direct_df_writes ( config, df_write_options) . await ?;
6969+
6970+ let partitioned_file = format ! ( "{out_dir}/c2=123/prefix-*" ) ;
6971+ let filter_df = ctx
6972+ . read_parquet ( & partitioned_file, ParquetReadOptions :: default ( ) )
6973+ . await ?;
6974+
6975+ // Check that the c2 column is gone and that c1 is abc.
6976+ let results_parquet = filter_df. collect ( ) . await ?;
6977+ let results_parquet_display = batches_to_string ( & results_parquet) ;
6978+ assert_snapshot ! (
6979+ results_parquet_display. as_str( ) ,
6980+ @r###"
6981+ +-----+
6982+ | c1 |
6983+ +-----+
6984+ | abc |
6985+ +-----+
6986+ "###
6987+ ) ;
6988+
6989+ let results_csv = ctx
6990+ . read_csv ( & partitioned_file, Default :: default ( ) )
6991+ . await ?
6992+ . collect ( )
6993+ . await ?;
6994+ assert_eq ! (
6995+ results_parquet_display. as_str( ) ,
6996+ batches_to_string( & results_csv)
6997+ ) ;
6998+
6999+ let results_json = ctx
7000+ . read_json ( & partitioned_file, Default :: default ( ) )
7001+ . await ?
7002+ . collect ( )
7003+ . await ?;
7004+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7005+
7006+ Ok ( ( ) )
7007+ }
7008+
7009+ #[ tokio:: test]
7010+ async fn write_physical_plan_results_with_prefix ( ) -> Result < ( ) > {
7011+ let mut config = SessionConfig :: new ( ) ;
7012+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
7013+
7014+ let FixtureDataGen {
7015+ _tmp_dir,
7016+ out_dir,
7017+ ctx,
7018+ } = FixtureDataGen :: prepare_execution_plan_writes ( config) . await ?;
7019+
7020+ let partitioned_file = format ! ( "{out_dir}/prefix-*" ) ;
7021+
7022+ let df = ctx
7023+ . read_parquet ( & partitioned_file, Default :: default ( ) )
7024+ . await ?;
7025+ let results_parquet = df. collect ( ) . await ?;
7026+ let results_parquet_display = batches_to_string ( & results_parquet) ;
7027+ assert_snapshot ! (
7028+ results_parquet_display. as_str( ) ,
7029+ @r###"
7030+ +-----+-----+
7031+ | c1 | c2 |
7032+ +-----+-----+
7033+ | abc | 123 |
7034+ | def | 456 |
7035+ +-----+-----+
7036+ "###
7037+ ) ;
7038+
7039+ let results_csv = ctx
7040+ . read_csv ( & partitioned_file, Default :: default ( ) )
7041+ . await ?
7042+ . collect ( )
7043+ . await ?;
7044+ assert_eq ! (
7045+ results_parquet_display. as_str( ) ,
7046+ batches_to_string( & results_csv)
7047+ ) ;
7048+
7049+ let results_json = ctx
7050+ . read_json ( & partitioned_file, Default :: default ( ) )
7051+ . await ?
7052+ . collect ( )
7053+ . await ?;
7054+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7055+
7056+ Ok ( ( ) )
7057+ }
7058+
7059+ #[ tokio:: test]
7060+ async fn write_parts_parquet_results_with_prefix ( ) -> Result < ( ) > {
7061+ let mut config = SessionConfig :: new ( ) ;
7062+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
7063+
7064+ let df_write_options = DataFrameWriteOptions :: new ( ) ;
7065+ let FixtureDataGen {
7066+ _tmp_dir,
7067+ out_dir,
7068+ ctx,
7069+ } = FixtureDataGen :: prepare_direct_df_writes ( config, df_write_options) . await ?;
7070+
7071+ let partitioned_file = format ! ( "{out_dir}/prefix-*" ) ;
7072+
7073+ let df = ctx
7074+ . read_parquet ( & partitioned_file, Default :: default ( ) )
7075+ . await ?;
7076+ let results_parquet = df. collect ( ) . await ?;
7077+ let results_parquet_display = batches_to_string ( & results_parquet) ;
7078+ assert_snapshot ! (
7079+ results_parquet_display. as_str( ) ,
7080+ @r###"
7081+ +-----+-----+
7082+ | c1 | c2 |
7083+ +-----+-----+
7084+ | abc | 123 |
7085+ | def | 456 |
7086+ +-----+-----+
7087+ "###
7088+ ) ;
7089+
7090+ let results_csv = ctx
7091+ . read_csv ( & partitioned_file, Default :: default ( ) )
7092+ . await ?
7093+ . collect ( )
7094+ . await ?;
7095+ assert_eq ! (
7096+ results_parquet_display. as_str( ) ,
7097+ batches_to_string( & results_csv)
7098+ ) ;
7099+
7100+ let results_json = ctx
7101+ . read_json ( & partitioned_file, Default :: default ( ) )
7102+ . await ?
7103+ . collect ( )
7104+ . await ?;
7105+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7106+
7107+ Ok ( ( ) )
7108+ }
0 commit comments