@@ -6853,3 +6853,255 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
68536853
68546854 Ok ( ( ) )
68556855}
6856+
6857+ struct FixtureDataGen {
6858+ _tmp_dir : TempDir ,
6859+ out_dir : String ,
6860+ ctx : SessionContext ,
6861+ }
6862+
6863+ impl FixtureDataGen {
6864+ fn register_local_table (
6865+ out_dir : impl AsRef < Path > ,
6866+ ctx : & SessionContext ,
6867+ ) -> Result < ( ) > {
6868+ // Create an in memory table with schema C1 and C2, both strings
6869+ let schema = Arc :: new ( Schema :: new ( vec ! [
6870+ Field :: new( "c1" , DataType :: Utf8 , false ) ,
6871+ Field :: new( "c2" , DataType :: Utf8 , false ) ,
6872+ ] ) ) ;
6873+
6874+ let record_batch = RecordBatch :: try_new (
6875+ schema. clone ( ) ,
6876+ vec ! [
6877+ Arc :: new( StringArray :: from( vec![ "abc" , "def" ] ) ) ,
6878+ Arc :: new( StringArray :: from( vec![ "123" , "456" ] ) ) ,
6879+ ] ,
6880+ ) ?;
6881+
6882+ let mem_table = Arc :: new ( MemTable :: try_new ( schema, vec ! [ vec![ record_batch] ] ) ?) ;
6883+
6884+ // Register the table in the context
6885+ ctx. register_table ( "test" , mem_table) ?;
6886+
6887+ let local = Arc :: new ( LocalFileSystem :: new_with_prefix ( & out_dir) ?) ;
6888+ let local_url = Url :: parse ( "file://local" ) . unwrap ( ) ;
6889+ ctx. register_object_store ( & local_url, local) ;
6890+
6891+ Ok ( ( ) )
6892+ }
6893+
6894+ // initializes basic data and writes it using via executing physical plan
6895+ //
6896+ // Available columns: c1, c2
6897+ async fn prepare_execution_plan_writes ( config : SessionConfig ) -> Result < Self > {
6898+ let tmp_dir = TempDir :: new ( ) ?;
6899+
6900+ let ctx = SessionContext :: new_with_config ( config) ;
6901+
6902+ Self :: register_local_table ( & tmp_dir, & ctx) ?;
6903+
6904+ let out_dir = tmp_dir. as_ref ( ) . to_str ( ) . unwrap ( ) . to_string ( ) + "/out/" ;
6905+ let out_dir_url = format ! ( "file://{out_dir}" ) ;
6906+
6907+ let df = ctx. sql ( "SELECT c1, c2 FROM test" ) . await ?;
6908+ let plan = df. create_physical_plan ( ) . await ?;
6909+
6910+ ctx. write_parquet ( plan. clone ( ) , & out_dir_url, None ) . await ?;
6911+ ctx. write_csv ( plan. clone ( ) , & out_dir_url) . await ?;
6912+ ctx. write_json ( plan. clone ( ) , & out_dir_url) . await ?;
6913+
6914+ Ok ( Self {
6915+ _tmp_dir : tmp_dir,
6916+ out_dir,
6917+ ctx,
6918+ } )
6919+ }
6920+
6921+ // initializes basic data and writes it using `write_opts`
6922+ //
6923+ // Available columns: c1, c2
6924+ async fn prepare_direct_df_writes (
6925+ config : SessionConfig ,
6926+ write_opts : DataFrameWriteOptions ,
6927+ ) -> Result < Self > {
6928+ let tmp_dir = TempDir :: new ( ) ?;
6929+
6930+ let ctx = SessionContext :: new_with_config ( config) ;
6931+
6932+ Self :: register_local_table ( & tmp_dir, & ctx) ?;
6933+
6934+ let out_dir = tmp_dir. as_ref ( ) . to_str ( ) . unwrap ( ) . to_string ( ) + "/out/" ;
6935+ let out_dir_url = format ! ( "file://{out_dir}" ) ;
6936+
6937+ let df = ctx. sql ( "SELECT c1, c2 FROM test" ) . await ?;
6938+
6939+ df. clone ( )
6940+ . write_parquet ( & out_dir_url, write_opts. clone ( ) , None )
6941+ . await ?;
6942+ df. clone ( )
6943+ . write_csv ( & out_dir_url, write_opts. clone ( ) , None )
6944+ . await ?;
6945+ df. write_json ( & out_dir_url, write_opts. clone ( ) , None )
6946+ . await ?;
6947+
6948+ Ok ( Self {
6949+ _tmp_dir : tmp_dir,
6950+ out_dir,
6951+ ctx,
6952+ } )
6953+ }
6954+ }
6955+
6956+ #[ tokio:: test]
6957+ async fn write_partitioned_results_with_prefix ( ) -> Result < ( ) > {
6958+ let mut config = SessionConfig :: new ( ) ;
6959+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
6960+
6961+ let df_write_options =
6962+ DataFrameWriteOptions :: new ( ) . with_partition_by ( vec ! [ String :: from( "c2" ) ] ) ;
6963+ let FixtureDataGen {
6964+ _tmp_dir,
6965+ out_dir,
6966+ ctx,
6967+ } = FixtureDataGen :: prepare_direct_df_writes ( config, df_write_options) . await ?;
6968+
6969+ let partitioned_file = format ! ( "{out_dir}/c2=123/prefix-*" ) ;
6970+ let filter_df = ctx
6971+ . read_parquet ( & partitioned_file, ParquetReadOptions :: default ( ) )
6972+ . await ?;
6973+
6974+ // Check that the c2 column is gone and that c1 is abc.
6975+ let results_parquet = filter_df. collect ( ) . await ?;
6976+ let results_parquet_display = batches_to_string ( & results_parquet) ;
6977+ assert_snapshot ! (
6978+ results_parquet_display. as_str( ) ,
6979+ @r###"
6980+ +-----+
6981+ | c1 |
6982+ +-----+
6983+ | abc |
6984+ +-----+
6985+ "###
6986+ ) ;
6987+
6988+ let results_csv = ctx
6989+ . read_csv ( & partitioned_file, Default :: default ( ) )
6990+ . await ?
6991+ . collect ( )
6992+ . await ?;
6993+ assert_eq ! (
6994+ results_parquet_display. as_str( ) ,
6995+ batches_to_string( & results_csv)
6996+ ) ;
6997+
6998+ let results_json = ctx
6999+ . read_json ( & partitioned_file, Default :: default ( ) )
7000+ . await ?
7001+ . collect ( )
7002+ . await ?;
7003+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7004+
7005+ Ok ( ( ) )
7006+ }
7007+
7008+ #[ tokio:: test]
7009+ async fn write_physical_plan_results_with_prefix ( ) -> Result < ( ) > {
7010+ let mut config = SessionConfig :: new ( ) ;
7011+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
7012+
7013+ let FixtureDataGen {
7014+ _tmp_dir,
7015+ out_dir,
7016+ ctx,
7017+ } = FixtureDataGen :: prepare_execution_plan_writes ( config) . await ?;
7018+
7019+ let partitioned_file = format ! ( "{out_dir}/prefix-*" ) ;
7020+
7021+ let df = ctx
7022+ . read_parquet ( & partitioned_file, Default :: default ( ) )
7023+ . await ?;
7024+ let results_parquet = df. collect ( ) . await ?;
7025+ let results_parquet_display = batches_to_string ( & results_parquet) ;
7026+ assert_snapshot ! (
7027+ results_parquet_display. as_str( ) ,
7028+ @r###"
7029+ +-----+-----+
7030+ | c1 | c2 |
7031+ +-----+-----+
7032+ | abc | 123 |
7033+ | def | 456 |
7034+ +-----+-----+
7035+ "###
7036+ ) ;
7037+
7038+ let results_csv = ctx
7039+ . read_csv ( & partitioned_file, Default :: default ( ) )
7040+ . await ?
7041+ . collect ( )
7042+ . await ?;
7043+ assert_eq ! (
7044+ results_parquet_display. as_str( ) ,
7045+ batches_to_string( & results_csv)
7046+ ) ;
7047+
7048+ let results_json = ctx
7049+ . read_json ( & partitioned_file, Default :: default ( ) )
7050+ . await ?
7051+ . collect ( )
7052+ . await ?;
7053+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7054+
7055+ Ok ( ( ) )
7056+ }
7057+
7058+ #[ tokio:: test]
7059+ async fn write_parts_parquet_results_with_prefix ( ) -> Result < ( ) > {
7060+ let mut config = SessionConfig :: new ( ) ;
7061+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
7062+
7063+ let df_write_options = DataFrameWriteOptions :: new ( ) ;
7064+ let FixtureDataGen {
7065+ _tmp_dir,
7066+ out_dir,
7067+ ctx,
7068+ } = FixtureDataGen :: prepare_direct_df_writes ( config, df_write_options) . await ?;
7069+
7070+ let partitioned_file = format ! ( "{out_dir}/prefix-*" ) ;
7071+
7072+ let df = ctx
7073+ . read_parquet ( & partitioned_file, Default :: default ( ) )
7074+ . await ?;
7075+ let results_parquet = df. collect ( ) . await ?;
7076+ let results_parquet_display = batches_to_string ( & results_parquet) ;
7077+ assert_snapshot ! (
7078+ results_parquet_display. as_str( ) ,
7079+ @r###"
7080+ +-----+-----+
7081+ | c1 | c2 |
7082+ +-----+-----+
7083+ | abc | 123 |
7084+ | def | 456 |
7085+ +-----+-----+
7086+ "###
7087+ ) ;
7088+
7089+ let results_csv = ctx
7090+ . read_csv ( & partitioned_file, Default :: default ( ) )
7091+ . await ?
7092+ . collect ( )
7093+ . await ?;
7094+ assert_eq ! (
7095+ results_parquet_display. as_str( ) ,
7096+ batches_to_string( & results_csv)
7097+ ) ;
7098+
7099+ let results_json = ctx
7100+ . read_json ( & partitioned_file, Default :: default ( ) )
7101+ . await ?
7102+ . collect ( )
7103+ . await ?;
7104+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7105+
7106+ Ok ( ( ) )
7107+ }
0 commit comments