Skip to content

Commit f12160a

Browse files
committed
Introduce way to customize prefix of multi file outputs
Add test to illustrate prefixed parquet files Update docs with new execution's parameter partitioned_file_prefix_name
1 parent 32e6fe8 commit f12160a

8 files changed

Lines changed: 280 additions & 21 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,12 @@ config_namespace! {
649649
/// # Default
650650
/// `false` — ANSI SQL mode is disabled by default.
651651
pub enable_ansi_mode: bool, default = false
652+
653+
/// Prefix to use when generating file name in multi file output
654+
///
655+
/// When prefix is non-empty string, this prefix will be used to generate file name as
656+
/// `{partitioned_file_prefix_name}{datafusion generated suffix}`
657+
pub partitioned_file_prefix_name: String, default = String::new()
652658
}
653659
}
654660

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use datafusion_functions_aggregate::expr_fn::{
7272
use async_trait::async_trait;
7373
use datafusion_catalog::Session;
7474

75+
#[derive(Clone)]
7576
/// Contains options that control how data is
7677
/// written out from a DataFrame
7778
pub struct DataFrameWriteOptions {

datafusion/core/tests/dataframe/mod.rs

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6745,3 +6745,255 @@ async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
67456745

67466746
Ok(())
67476747
}
6748+
6749+
struct FixtureDataGen {
6750+
_tmp_dir: TempDir,
6751+
out_dir: String,
6752+
ctx: SessionContext,
6753+
}
6754+
6755+
impl FixtureDataGen {
6756+
fn register_local_table(
6757+
out_dir: impl AsRef<Path>,
6758+
ctx: &SessionContext,
6759+
) -> Result<()> {
6760+
// Create an in memory table with schema C1 and C2, both strings
6761+
let schema = Arc::new(Schema::new(vec![
6762+
Field::new("c1", DataType::Utf8, false),
6763+
Field::new("c2", DataType::Utf8, false),
6764+
]));
6765+
6766+
let record_batch = RecordBatch::try_new(
6767+
schema.clone(),
6768+
vec![
6769+
Arc::new(StringArray::from(vec!["abc", "def"])),
6770+
Arc::new(StringArray::from(vec!["123", "456"])),
6771+
],
6772+
)?;
6773+
6774+
let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
6775+
6776+
// Register the table in the context
6777+
ctx.register_table("test", mem_table)?;
6778+
6779+
let local = Arc::new(LocalFileSystem::new_with_prefix(&out_dir)?);
6780+
let local_url = Url::parse("file://local").unwrap();
6781+
ctx.register_object_store(&local_url, local);
6782+
6783+
Ok(())
6784+
}
6785+
6786+
// initializes basic data and writes it using via executing physical plan
6787+
//
6788+
// Available columns: c1, c2
6789+
async fn prepare_execution_plan_writes(config: SessionConfig) -> Result<Self> {
6790+
let tmp_dir = TempDir::new()?;
6791+
6792+
let ctx = SessionContext::new_with_config(config);
6793+
6794+
Self::register_local_table(&tmp_dir, &ctx)?;
6795+
6796+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6797+
let out_dir_url = format!("file://{out_dir}");
6798+
6799+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6800+
let plan = df.create_physical_plan().await?;
6801+
6802+
ctx.write_parquet(plan.clone(), &out_dir_url, None).await?;
6803+
ctx.write_csv(plan.clone(), &out_dir_url).await?;
6804+
ctx.write_json(plan.clone(), &out_dir_url).await?;
6805+
6806+
Ok(Self {
6807+
_tmp_dir: tmp_dir,
6808+
out_dir,
6809+
ctx,
6810+
})
6811+
}
6812+
6813+
// initializes basic data and writes it using `write_opts`
6814+
//
6815+
// Available columns: c1, c2
6816+
async fn prepare_direct_df_writes(
6817+
config: SessionConfig,
6818+
write_opts: DataFrameWriteOptions,
6819+
) -> Result<Self> {
6820+
let tmp_dir = TempDir::new()?;
6821+
6822+
let ctx = SessionContext::new_with_config(config);
6823+
6824+
Self::register_local_table(&tmp_dir, &ctx)?;
6825+
6826+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6827+
let out_dir_url = format!("file://{out_dir}");
6828+
6829+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6830+
6831+
df.clone()
6832+
.write_parquet(&out_dir_url, write_opts.clone(), None)
6833+
.await?;
6834+
df.clone()
6835+
.write_csv(&out_dir_url, write_opts.clone(), None)
6836+
.await?;
6837+
df.write_json(&out_dir_url, write_opts.clone(), None)
6838+
.await?;
6839+
6840+
Ok(Self {
6841+
_tmp_dir: tmp_dir,
6842+
out_dir,
6843+
ctx,
6844+
})
6845+
}
6846+
}
6847+
6848+
#[tokio::test]
6849+
async fn write_partitioned_results_with_prefix() -> Result<()> {
6850+
let mut config = SessionConfig::new();
6851+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
6852+
6853+
let df_write_options =
6854+
DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
6855+
let FixtureDataGen {
6856+
_tmp_dir,
6857+
out_dir,
6858+
ctx,
6859+
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;
6860+
6861+
let partitioned_file = format!("{out_dir}/c2=123/prefix-*");
6862+
let filter_df = ctx
6863+
.read_parquet(&partitioned_file, ParquetReadOptions::default())
6864+
.await?;
6865+
6866+
// Check that the c2 column is gone and that c1 is abc.
6867+
let results_parquet = filter_df.collect().await?;
6868+
let results_parquet_display = batches_to_string(&results_parquet);
6869+
assert_snapshot!(
6870+
results_parquet_display.as_str(),
6871+
@r###"
6872+
+-----+
6873+
| c1 |
6874+
+-----+
6875+
| abc |
6876+
+-----+
6877+
"###
6878+
);
6879+
6880+
let results_csv = ctx
6881+
.read_csv(&partitioned_file, Default::default())
6882+
.await?
6883+
.collect()
6884+
.await?;
6885+
assert_eq!(
6886+
results_parquet_display.as_str(),
6887+
batches_to_string(&results_csv)
6888+
);
6889+
6890+
let results_json = ctx
6891+
.read_json(&partitioned_file, Default::default())
6892+
.await?
6893+
.collect()
6894+
.await?;
6895+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
6896+
6897+
Ok(())
6898+
}
6899+
6900+
#[tokio::test]
6901+
async fn write_physical_plan_results_with_prefix() -> Result<()> {
6902+
let mut config = SessionConfig::new();
6903+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
6904+
6905+
let FixtureDataGen {
6906+
_tmp_dir,
6907+
out_dir,
6908+
ctx,
6909+
} = FixtureDataGen::prepare_execution_plan_writes(config).await?;
6910+
6911+
let partitioned_file = format!("{out_dir}/prefix-*");
6912+
6913+
let df = ctx
6914+
.read_parquet(&partitioned_file, Default::default())
6915+
.await?;
6916+
let results_parquet = df.collect().await?;
6917+
let results_parquet_display = batches_to_string(&results_parquet);
6918+
assert_snapshot!(
6919+
results_parquet_display.as_str(),
6920+
@r###"
6921+
+-----+-----+
6922+
| c1 | c2 |
6923+
+-----+-----+
6924+
| abc | 123 |
6925+
| def | 456 |
6926+
+-----+-----+
6927+
"###
6928+
);
6929+
6930+
let results_csv = ctx
6931+
.read_csv(&partitioned_file, Default::default())
6932+
.await?
6933+
.collect()
6934+
.await?;
6935+
assert_eq!(
6936+
results_parquet_display.as_str(),
6937+
batches_to_string(&results_csv)
6938+
);
6939+
6940+
let results_json = ctx
6941+
.read_json(&partitioned_file, Default::default())
6942+
.await?
6943+
.collect()
6944+
.await?;
6945+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
6946+
6947+
Ok(())
6948+
}
6949+
6950+
#[tokio::test]
6951+
async fn write_parts_parquet_results_with_prefix() -> Result<()> {
6952+
let mut config = SessionConfig::new();
6953+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
6954+
6955+
let df_write_options = DataFrameWriteOptions::new();
6956+
let FixtureDataGen {
6957+
_tmp_dir,
6958+
out_dir,
6959+
ctx,
6960+
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;
6961+
6962+
let partitioned_file = format!("{out_dir}/prefix-*");
6963+
6964+
let df = ctx
6965+
.read_parquet(&partitioned_file, Default::default())
6966+
.await?;
6967+
let results_parquet = df.collect().await?;
6968+
let results_parquet_display = batches_to_string(&results_parquet);
6969+
assert_snapshot!(
6970+
results_parquet_display.as_str(),
6971+
@r###"
6972+
+-----+-----+
6973+
| c1 | c2 |
6974+
+-----+-----+
6975+
| abc | 123 |
6976+
| def | 456 |
6977+
+-----+-----+
6978+
"###
6979+
);
6980+
6981+
let results_csv = ctx
6982+
.read_csv(&partitioned_file, Default::default())
6983+
.await?
6984+
.collect()
6985+
.await?;
6986+
assert_eq!(
6987+
results_parquet_display.as_str(),
6988+
batches_to_string(&results_csv)
6989+
);
6990+
6991+
let results_json = ctx
6992+
.read_json(&partitioned_file, Default::default())
6993+
.await?
6994+
.collect()
6995+
.await?;
6996+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
6997+
6998+
Ok(())
6999+
}

datafusion/datasource-csv/src/source.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -448,16 +448,15 @@ pub async fn plan_to_csv(
448448
let parsed = ListingTableUrl::parse(path)?;
449449
let object_store_url = parsed.object_store();
450450
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
451-
let writer_buffer_size = task_ctx
452-
.session_config()
453-
.options()
454-
.execution
455-
.objectstore_writer_buffer_size;
451+
let exec_options = &task_ctx.session_config().options().execution;
452+
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
453+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
454+
456455
let mut join_set = JoinSet::new();
457456
for i in 0..plan.output_partitioning().partition_count() {
458457
let storeref = Arc::clone(&store);
459458
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
460-
let filename = format!("{}/part-{i}.csv", parsed.prefix());
459+
let filename = format!("{}/{file_name_prefix}part-{i}.csv", parsed.prefix(),);
461460
let file = object_store::path::Path::parse(filename)?;
462461

463462
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-json/src/source.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -258,16 +258,15 @@ pub async fn plan_to_json(
258258
let parsed = ListingTableUrl::parse(path)?;
259259
let object_store_url = parsed.object_store();
260260
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
261-
let writer_buffer_size = task_ctx
262-
.session_config()
263-
.options()
264-
.execution
265-
.objectstore_writer_buffer_size;
261+
let exec_options = &task_ctx.session_config().options().execution;
262+
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
263+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
264+
266265
let mut join_set = JoinSet::new();
267266
for i in 0..plan.output_partitioning().partition_count() {
268267
let storeref = Arc::clone(&store);
269268
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
270-
let filename = format!("{}/part-{i}.json", parsed.prefix());
269+
let filename = format!("{}/{file_name_prefix}part-{i}.json", parsed.prefix());
271270
let file = object_store::path::Path::parse(filename)?;
272271

273272
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-parquet/src/writer.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,20 @@ pub async fn plan_to_parquet(
3939
let object_store_url = parsed.object_store();
4040
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
4141
let mut join_set = JoinSet::new();
42+
let exec_options = &task_ctx.session_config().options().execution;
43+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
44+
4245
for i in 0..plan.output_partitioning().partition_count() {
4346
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
44-
let filename = format!("{}/part-{i}.parquet", parsed.prefix());
47+
let filename = format!("{}/{file_name_prefix}part-{i}.parquet", parsed.prefix());
4548
let file = Path::parse(filename)?;
4649
let propclone = writer_properties.clone();
4750

4851
let storeref = Arc::clone(&store);
4952
let buf_writer = BufWriter::with_capacity(
5053
storeref,
5154
file.clone(),
52-
task_ctx
53-
.session_config()
54-
.options()
55-
.execution
56-
.objectstore_writer_buffer_size,
55+
exec_options.objectstore_writer_buffer_size,
5756
);
5857
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
5958
join_set.spawn(async move {

datafusion/datasource/src/write/demux.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ async fn row_count_demuxer(
156156
let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
157157
let minimum_parallel_files = exec_options.minimum_parallel_output_files;
158158
let mut part_idx = 0;
159-
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
159+
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
160+
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);
160161

161162
let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);
162163

@@ -283,9 +284,10 @@ async fn hive_style_partitions_demuxer(
283284
file_extension: String,
284285
keep_partition_by_columns: bool,
285286
) -> Result<()> {
286-
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
287-
288287
let exec_options = &context.session_config().options().execution;
288+
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
289+
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);
290+
289291
let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
290292

291293
// To support non string partition col types, cast the type to &str first

0 commit comments

Comments
 (0)