Skip to content

Commit 16405ec

Browse files
committed
Add test to illustrate prefixed parquet files
1 parent 803766a commit 16405ec

1 file changed

Lines changed: 67 additions & 0 deletions

File tree

  • datafusion/core/tests/dataframe

datafusion/core/tests/dataframe/mod.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6735,3 +6735,70 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
67356735

67366736
Ok(())
67376737
}
6738+
6739+
#[tokio::test]
6740+
async fn write_partitioned_parquet_results_with_prefix() -> Result<()> {
6741+
// create partitioned input file and context
6742+
let tmp_dir = TempDir::new()?;
6743+
6744+
let mut config = SessionConfig::new();
6745+
config.options_mut().execution.partitioned_file_prefix_name = "prefix".to_owned();
6746+
let ctx = SessionContext::new_with_config(config);
6747+
6748+
// Create an in memory table with schema C1 and C2, both strings
6749+
let schema = Arc::new(Schema::new(vec![
6750+
Field::new("c1", DataType::Utf8, false),
6751+
Field::new("c2", DataType::Utf8, false),
6752+
]));
6753+
6754+
let record_batch = RecordBatch::try_new(
6755+
schema.clone(),
6756+
vec![
6757+
Arc::new(StringArray::from(vec!["abc", "def"])),
6758+
Arc::new(StringArray::from(vec!["123", "456"])),
6759+
],
6760+
)?;
6761+
6762+
let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
6763+
6764+
// Register the table in the context
6765+
ctx.register_table("test", mem_table)?;
6766+
6767+
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
6768+
let local_url = Url::parse("file://local").unwrap();
6769+
ctx.register_object_store(&local_url, local);
6770+
6771+
// execute a simple query and write the results to parquet
6772+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6773+
let out_dir_url = format!("file://{out_dir}");
6774+
6775+
// Write the results to parquet with partitioning
6776+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6777+
let df_write_options =
6778+
DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
6779+
6780+
df.write_parquet(&out_dir_url, df_write_options, None)
6781+
.await?;
6782+
6783+
// Explicitly read the parquet file at c2=123 to verify the physical files are partitioned with
6784+
// specified prefix
6785+
let partitioned_file = format!("{out_dir}/c2=123/prefix*");
6786+
let filter_df = ctx
6787+
.read_parquet(&partitioned_file, ParquetReadOptions::default())
6788+
.await?;
6789+
6790+
// Check that the c2 column is gone and that c1 is abc.
6791+
let results = filter_df.collect().await?;
6792+
assert_snapshot!(
6793+
batches_to_string(&results),
6794+
@r###"
6795+
+-----+
6796+
| c1 |
6797+
+-----+
6798+
| abc |
6799+
+-----+
6800+
"###
6801+
);
6802+
6803+
Ok(())
6804+
}

0 commit comments

Comments
 (0)