Skip to content

Commit 7f66e30

Browse files
committed
Prepare comprehensive tests
1 parent 16405ec commit 7f66e30

2 files changed

Lines changed: 223 additions & 38 deletions

File tree

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ use datafusion_functions_aggregate::expr_fn::{
7373
use async_trait::async_trait;
7474
use datafusion_catalog::Session;
7575

76+
#[derive(Clone)]
7677
/// Contains options that control how data is
7778
/// written out from a DataFrame
7879
pub struct DataFrameWriteOptions {

datafusion/core/tests/dataframe/mod.rs

Lines changed: 222 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6736,61 +6736,127 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
67366736
Ok(())
67376737
}
67386738

6739-
#[tokio::test]
6740-
async fn write_partitioned_parquet_results_with_prefix() -> Result<()> {
6741-
// create partitioned input file and context
6742-
let tmp_dir = TempDir::new()?;
6739+
struct FixtureDataGen {
6740+
_tmp_dir: TempDir,
6741+
out_dir: String,
6742+
ctx: SessionContext,
6743+
}
6744+
6745+
impl FixtureDataGen {
6746+
fn register_local_table(
6747+
out_dir: impl AsRef<Path>,
6748+
ctx: &SessionContext,
6749+
) -> Result<()> {
6750+
// Create an in memory table with schema C1 and C2, both strings
6751+
let schema = Arc::new(Schema::new(vec![
6752+
Field::new("c1", DataType::Utf8, false),
6753+
Field::new("c2", DataType::Utf8, false),
6754+
]));
6755+
6756+
let record_batch = RecordBatch::try_new(
6757+
schema.clone(),
6758+
vec![
6759+
Arc::new(StringArray::from(vec!["abc", "def"])),
6760+
Arc::new(StringArray::from(vec!["123", "456"])),
6761+
],
6762+
)?;
67436763

6744-
let mut config = SessionConfig::new();
6745-
config.options_mut().execution.partitioned_file_prefix_name = "prefix".to_owned();
6746-
let ctx = SessionContext::new_with_config(config);
6764+
let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
67476765

6748-
// Create an in memory table with schema C1 and C2, both strings
6749-
let schema = Arc::new(Schema::new(vec![
6750-
Field::new("c1", DataType::Utf8, false),
6751-
Field::new("c2", DataType::Utf8, false),
6752-
]));
6766+
// Register the table in the context
6767+
ctx.register_table("test", mem_table)?;
67536768

6754-
let record_batch = RecordBatch::try_new(
6755-
schema.clone(),
6756-
vec![
6757-
Arc::new(StringArray::from(vec!["abc", "def"])),
6758-
Arc::new(StringArray::from(vec!["123", "456"])),
6759-
],
6760-
)?;
6769+
let local = Arc::new(LocalFileSystem::new_with_prefix(&out_dir)?);
6770+
let local_url = Url::parse("file://local").unwrap();
6771+
ctx.register_object_store(&local_url, local);
67616772

6762-
let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
6773+
Ok(())
6774+
}
67636775

6764-
// Register the table in the context
6765-
ctx.register_table("test", mem_table)?;
6776+
async fn prepare_execution_plan_writes(config: SessionConfig) -> Result<Self> {
6777+
let tmp_dir = TempDir::new()?;
67666778

6767-
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
6768-
let local_url = Url::parse("file://local").unwrap();
6769-
ctx.register_object_store(&local_url, local);
6779+
let ctx = SessionContext::new_with_config(config);
67706780

6771-
// execute a simple query and write the results to parquet
6772-
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6773-
let out_dir_url = format!("file://{out_dir}");
6781+
Self::register_local_table(&tmp_dir, &ctx)?;
6782+
6783+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6784+
let out_dir_url = format!("file://{out_dir}");
6785+
6786+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6787+
let plan = df.create_physical_plan().await?;
6788+
6789+
ctx.write_parquet(plan.clone(), &out_dir_url, None).await?;
6790+
ctx.write_csv(plan.clone(), &out_dir_url).await?;
6791+
ctx.write_json(plan.clone(), &out_dir_url).await?;
6792+
6793+
Ok(Self {
6794+
_tmp_dir: tmp_dir,
6795+
out_dir,
6796+
ctx,
6797+
})
6798+
}
6799+
6800+
// initializes basic data and writes it using `write_opts`
6801+
//
6802+
// Available columns: c1, c2
6803+
//
6804+
// Returns tuple of output directory and used context
6805+
async fn prepare_direct_df_writes(
6806+
config: SessionConfig,
6807+
write_opts: DataFrameWriteOptions,
6808+
) -> Result<Self> {
6809+
let tmp_dir = TempDir::new()?;
6810+
6811+
let ctx = SessionContext::new_with_config(config);
6812+
6813+
Self::register_local_table(&tmp_dir, &ctx)?;
6814+
6815+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6816+
let out_dir_url = format!("file://{out_dir}");
6817+
6818+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6819+
6820+
df.clone()
6821+
.write_parquet(&out_dir_url, write_opts.clone(), None)
6822+
.await?;
6823+
df.clone()
6824+
.write_csv(&out_dir_url, write_opts.clone(), None)
6825+
.await?;
6826+
df.write_json(&out_dir_url, write_opts.clone(), None)
6827+
.await?;
6828+
6829+
Ok(Self {
6830+
_tmp_dir: tmp_dir,
6831+
out_dir,
6832+
ctx,
6833+
})
6834+
}
6835+
}
6836+
6837+
#[tokio::test]
6838+
async fn write_partitioned_results_with_prefix() -> Result<()> {
6839+
let mut config = SessionConfig::new();
6840+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
67746841

6775-
// Write the results to parquet with partitioning
6776-
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
67776842
let df_write_options =
67786843
DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
6844+
let FixtureDataGen {
6845+
_tmp_dir,
6846+
out_dir,
6847+
ctx,
6848+
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;
67796849

6780-
df.write_parquet(&out_dir_url, df_write_options, None)
6781-
.await?;
6782-
6783-
// Explicitly read the parquet file at c2=123 to verify the physical files are partitioned with
6784-
// specified prefix
6785-
let partitioned_file = format!("{out_dir}/c2=123/prefix*");
6850+
let partitioned_file = format!("{out_dir}/c2=123/prefix-*");
67866851
let filter_df = ctx
67876852
.read_parquet(&partitioned_file, ParquetReadOptions::default())
67886853
.await?;
67896854

67906855
// Check that the c2 column is gone and that c1 is abc.
6791-
let results = filter_df.collect().await?;
6856+
let results_parquet = filter_df.collect().await?;
6857+
let results_parquet_display = batches_to_string(&results_parquet);
67926858
assert_snapshot!(
6793-
batches_to_string(&results),
6859+
results_parquet_display.as_str(),
67946860
@r###"
67956861
+-----+
67966862
| c1 |
@@ -6800,5 +6866,123 @@ async fn write_partitioned_parquet_results_with_prefix() -> Result<()> {
68006866
"###
68016867
);
68026868

6869+
let results_csv = ctx
6870+
.read_csv(&partitioned_file, Default::default())
6871+
.await?
6872+
.collect()
6873+
.await?;
6874+
assert_eq!(
6875+
results_parquet_display.as_str(),
6876+
batches_to_string(&results_csv)
6877+
);
6878+
6879+
let results_json = ctx
6880+
.read_json(&partitioned_file, Default::default())
6881+
.await?
6882+
.collect()
6883+
.await?;
6884+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
6885+
6886+
Ok(())
6887+
}
6888+
6889+
#[tokio::test]
6890+
async fn write_physical_plan_results_with_prefix() -> Result<()> {
6891+
let mut config = SessionConfig::new();
6892+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
6893+
6894+
let FixtureDataGen {
6895+
_tmp_dir,
6896+
out_dir,
6897+
ctx,
6898+
} = FixtureDataGen::prepare_execution_plan_writes(config).await?;
6899+
6900+
let partitioned_file = format!("{out_dir}/c2=123/prefix-*");
6901+
6902+
let df = ctx
6903+
.read_parquet(&partitioned_file, Default::default())
6904+
.await?;
6905+
let results_parquet = df.collect().await?;
6906+
let results_parquet_display = batches_to_string(&results_parquet);
6907+
assert_snapshot!(
6908+
results_parquet_display.as_str(),
6909+
@r###"
6910+
+-----+-----+
6911+
| c1 | c2 |
6912+
+-----+-----+
6913+
| abc | 123 |
6914+
| def | 456 |
6915+
+-----+-----+
6916+
"###
6917+
);
6918+
6919+
let results_csv = ctx
6920+
.read_csv(&partitioned_file, Default::default())
6921+
.await?
6922+
.collect()
6923+
.await?;
6924+
assert_eq!(
6925+
results_parquet_display.as_str(),
6926+
batches_to_string(&results_csv)
6927+
);
6928+
6929+
let results_json = ctx
6930+
.read_json(&partitioned_file, Default::default())
6931+
.await?
6932+
.collect()
6933+
.await?;
6934+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
6935+
6936+
Ok(())
6937+
}
6938+
6939+
#[tokio::test]
6940+
async fn write_parts_parquet_results_with_prefix() -> Result<()> {
6941+
let mut config = SessionConfig::new();
6942+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
6943+
6944+
let df_write_options = DataFrameWriteOptions::new();
6945+
let FixtureDataGen {
6946+
_tmp_dir,
6947+
out_dir,
6948+
ctx,
6949+
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;
6950+
6951+
let partitioned_file = format!("{out_dir}/prefix-*");
6952+
6953+
let df = ctx
6954+
.read_parquet(&partitioned_file, Default::default())
6955+
.await?;
6956+
let results_parquet = df.collect().await?;
6957+
let results_parquet_display = batches_to_string(&results_parquet);
6958+
assert_snapshot!(
6959+
results_parquet_display.as_str(),
6960+
@r###"
6961+
+-----+-----+
6962+
| c1 | c2 |
6963+
+-----+-----+
6964+
| abc | 123 |
6965+
| def | 456 |
6966+
+-----+-----+
6967+
"###
6968+
);
6969+
6970+
let results_csv = ctx
6971+
.read_csv(&partitioned_file, Default::default())
6972+
.await?
6973+
.collect()
6974+
.await?;
6975+
assert_eq!(
6976+
results_parquet_display.as_str(),
6977+
batches_to_string(&results_csv)
6978+
);
6979+
6980+
let results_json = ctx
6981+
.read_json(&partitioned_file, Default::default())
6982+
.await?
6983+
.collect()
6984+
.await?;
6985+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
6986+
68036987
Ok(())
68046988
}

0 commit comments

Comments
 (0)