@@ -6856,3 +6856,255 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
68566856
68576857 Ok ( ( ) )
68586858}
6859+
6860+ struct FixtureDataGen {
6861+ _tmp_dir : TempDir ,
6862+ out_dir : String ,
6863+ ctx : SessionContext ,
6864+ }
6865+
6866+ impl FixtureDataGen {
6867+ fn register_local_table (
6868+ out_dir : impl AsRef < Path > ,
6869+ ctx : & SessionContext ,
6870+ ) -> Result < ( ) > {
6871+ // Create an in memory table with schema C1 and C2, both strings
6872+ let schema = Arc :: new ( Schema :: new ( vec ! [
6873+ Field :: new( "c1" , DataType :: Utf8 , false ) ,
6874+ Field :: new( "c2" , DataType :: Utf8 , false ) ,
6875+ ] ) ) ;
6876+
6877+ let record_batch = RecordBatch :: try_new (
6878+ schema. clone ( ) ,
6879+ vec ! [
6880+ Arc :: new( StringArray :: from( vec![ "abc" , "def" ] ) ) ,
6881+ Arc :: new( StringArray :: from( vec![ "123" , "456" ] ) ) ,
6882+ ] ,
6883+ ) ?;
6884+
6885+ let mem_table = Arc :: new ( MemTable :: try_new ( schema, vec ! [ vec![ record_batch] ] ) ?) ;
6886+
6887+ // Register the table in the context
6888+ ctx. register_table ( "test" , mem_table) ?;
6889+
6890+ let local = Arc :: new ( LocalFileSystem :: new_with_prefix ( & out_dir) ?) ;
6891+ let local_url = Url :: parse ( "file://local" ) . unwrap ( ) ;
6892+ ctx. register_object_store ( & local_url, local) ;
6893+
6894+ Ok ( ( ) )
6895+ }
6896+
6897+ // initializes basic data and writes it using via executing physical plan
6898+ //
6899+ // Available columns: c1, c2
6900+ async fn prepare_execution_plan_writes ( config : SessionConfig ) -> Result < Self > {
6901+ let tmp_dir = TempDir :: new ( ) ?;
6902+
6903+ let ctx = SessionContext :: new_with_config ( config) ;
6904+
6905+ Self :: register_local_table ( & tmp_dir, & ctx) ?;
6906+
6907+ let out_dir = tmp_dir. as_ref ( ) . to_str ( ) . unwrap ( ) . to_string ( ) + "/out/" ;
6908+ let out_dir_url = format ! ( "file://{out_dir}" ) ;
6909+
6910+ let df = ctx. sql ( "SELECT c1, c2 FROM test" ) . await ?;
6911+ let plan = df. create_physical_plan ( ) . await ?;
6912+
6913+ ctx. write_parquet ( plan. clone ( ) , & out_dir_url, None ) . await ?;
6914+ ctx. write_csv ( plan. clone ( ) , & out_dir_url) . await ?;
6915+ ctx. write_json ( plan. clone ( ) , & out_dir_url) . await ?;
6916+
6917+ Ok ( Self {
6918+ _tmp_dir : tmp_dir,
6919+ out_dir,
6920+ ctx,
6921+ } )
6922+ }
6923+
6924+ // initializes basic data and writes it using `write_opts`
6925+ //
6926+ // Available columns: c1, c2
6927+ async fn prepare_direct_df_writes (
6928+ config : SessionConfig ,
6929+ write_opts : DataFrameWriteOptions ,
6930+ ) -> Result < Self > {
6931+ let tmp_dir = TempDir :: new ( ) ?;
6932+
6933+ let ctx = SessionContext :: new_with_config ( config) ;
6934+
6935+ Self :: register_local_table ( & tmp_dir, & ctx) ?;
6936+
6937+ let out_dir = tmp_dir. as_ref ( ) . to_str ( ) . unwrap ( ) . to_string ( ) + "/out/" ;
6938+ let out_dir_url = format ! ( "file://{out_dir}" ) ;
6939+
6940+ let df = ctx. sql ( "SELECT c1, c2 FROM test" ) . await ?;
6941+
6942+ df. clone ( )
6943+ . write_parquet ( & out_dir_url, write_opts. clone ( ) , None )
6944+ . await ?;
6945+ df. clone ( )
6946+ . write_csv ( & out_dir_url, write_opts. clone ( ) , None )
6947+ . await ?;
6948+ df. write_json ( & out_dir_url, write_opts. clone ( ) , None )
6949+ . await ?;
6950+
6951+ Ok ( Self {
6952+ _tmp_dir : tmp_dir,
6953+ out_dir,
6954+ ctx,
6955+ } )
6956+ }
6957+ }
6958+
6959+ #[ tokio:: test]
6960+ async fn write_partitioned_results_with_prefix ( ) -> Result < ( ) > {
6961+ let mut config = SessionConfig :: new ( ) ;
6962+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
6963+
6964+ let df_write_options =
6965+ DataFrameWriteOptions :: new ( ) . with_partition_by ( vec ! [ String :: from( "c2" ) ] ) ;
6966+ let FixtureDataGen {
6967+ _tmp_dir,
6968+ out_dir,
6969+ ctx,
6970+ } = FixtureDataGen :: prepare_direct_df_writes ( config, df_write_options) . await ?;
6971+
6972+ let partitioned_file = format ! ( "{out_dir}/c2=123/prefix-*" ) ;
6973+ let filter_df = ctx
6974+ . read_parquet ( & partitioned_file, ParquetReadOptions :: default ( ) )
6975+ . await ?;
6976+
6977+ // Check that the c2 column is gone and that c1 is abc.
6978+ let results_parquet = filter_df. collect ( ) . await ?;
6979+ let results_parquet_display = batches_to_string ( & results_parquet) ;
6980+ assert_snapshot ! (
6981+ results_parquet_display. as_str( ) ,
6982+ @r###"
6983+ +-----+
6984+ | c1 |
6985+ +-----+
6986+ | abc |
6987+ +-----+
6988+ "###
6989+ ) ;
6990+
6991+ let results_csv = ctx
6992+ . read_csv ( & partitioned_file, Default :: default ( ) )
6993+ . await ?
6994+ . collect ( )
6995+ . await ?;
6996+ assert_eq ! (
6997+ results_parquet_display. as_str( ) ,
6998+ batches_to_string( & results_csv)
6999+ ) ;
7000+
7001+ let results_json = ctx
7002+ . read_json ( & partitioned_file, Default :: default ( ) )
7003+ . await ?
7004+ . collect ( )
7005+ . await ?;
7006+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7007+
7008+ Ok ( ( ) )
7009+ }
7010+
7011+ #[ tokio:: test]
7012+ async fn write_physical_plan_results_with_prefix ( ) -> Result < ( ) > {
7013+ let mut config = SessionConfig :: new ( ) ;
7014+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
7015+
7016+ let FixtureDataGen {
7017+ _tmp_dir,
7018+ out_dir,
7019+ ctx,
7020+ } = FixtureDataGen :: prepare_execution_plan_writes ( config) . await ?;
7021+
7022+ let partitioned_file = format ! ( "{out_dir}/prefix-*" ) ;
7023+
7024+ let df = ctx
7025+ . read_parquet ( & partitioned_file, Default :: default ( ) )
7026+ . await ?;
7027+ let results_parquet = df. collect ( ) . await ?;
7028+ let results_parquet_display = batches_to_string ( & results_parquet) ;
7029+ assert_snapshot ! (
7030+ results_parquet_display. as_str( ) ,
7031+ @r###"
7032+ +-----+-----+
7033+ | c1 | c2 |
7034+ +-----+-----+
7035+ | abc | 123 |
7036+ | def | 456 |
7037+ +-----+-----+
7038+ "###
7039+ ) ;
7040+
7041+ let results_csv = ctx
7042+ . read_csv ( & partitioned_file, Default :: default ( ) )
7043+ . await ?
7044+ . collect ( )
7045+ . await ?;
7046+ assert_eq ! (
7047+ results_parquet_display. as_str( ) ,
7048+ batches_to_string( & results_csv)
7049+ ) ;
7050+
7051+ let results_json = ctx
7052+ . read_json ( & partitioned_file, Default :: default ( ) )
7053+ . await ?
7054+ . collect ( )
7055+ . await ?;
7056+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7057+
7058+ Ok ( ( ) )
7059+ }
7060+
7061+ #[ tokio:: test]
7062+ async fn write_parts_parquet_results_with_prefix ( ) -> Result < ( ) > {
7063+ let mut config = SessionConfig :: new ( ) ;
7064+ config. options_mut ( ) . execution . partitioned_file_prefix_name = "prefix-" . to_owned ( ) ;
7065+
7066+ let df_write_options = DataFrameWriteOptions :: new ( ) ;
7067+ let FixtureDataGen {
7068+ _tmp_dir,
7069+ out_dir,
7070+ ctx,
7071+ } = FixtureDataGen :: prepare_direct_df_writes ( config, df_write_options) . await ?;
7072+
7073+ let partitioned_file = format ! ( "{out_dir}/prefix-*" ) ;
7074+
7075+ let df = ctx
7076+ . read_parquet ( & partitioned_file, Default :: default ( ) )
7077+ . await ?;
7078+ let results_parquet = df. collect ( ) . await ?;
7079+ let results_parquet_display = batches_to_string ( & results_parquet) ;
7080+ assert_snapshot ! (
7081+ results_parquet_display. as_str( ) ,
7082+ @r###"
7083+ +-----+-----+
7084+ | c1 | c2 |
7085+ +-----+-----+
7086+ | abc | 123 |
7087+ | def | 456 |
7088+ +-----+-----+
7089+ "###
7090+ ) ;
7091+
7092+ let results_csv = ctx
7093+ . read_csv ( & partitioned_file, Default :: default ( ) )
7094+ . await ?
7095+ . collect ( )
7096+ . await ?;
7097+ assert_eq ! (
7098+ results_parquet_display. as_str( ) ,
7099+ batches_to_string( & results_csv)
7100+ ) ;
7101+
7102+ let results_json = ctx
7103+ . read_json ( & partitioned_file, Default :: default ( ) )
7104+ . await ?
7105+ . collect ( )
7106+ . await ?;
7107+ assert_eq ! ( results_parquet_display, batches_to_string( & results_json) ) ;
7108+
7109+ Ok ( ( ) )
7110+ }
0 commit comments