Skip to content

Commit 2190059

Browse files
committed
Introduce way to customize prefix of multi file outputs
Add test to illustrate prefixed parquet files Update docs with new execution's parameter partitioned_file_prefix_name
1 parent d1800db commit 2190059

9 files changed

Lines changed: 284 additions & 21 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,14 @@ config_namespace! {
706706
///
707707
/// Disabled by default, set to a number greater than 0 for enabling it.
708708
pub hash_join_buffering_capacity: usize, default = 0
709+
710+
/// Prefix to use when generating file name in multi file output.
711+
///
712+
/// When prefix is non-empty string, this prefix will be used to generate file name as
713+
/// `{partitioned_file_prefix_name}{datafusion generated suffix}`.
714+
///
715+
/// Defaults to empty string.
716+
pub partitioned_file_prefix_name: String, default = String::new()
709717
}
710718
}
711719

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use async_trait::async_trait;
7272
use datafusion_catalog::Session;
7373
use datafusion_expr::extension_types::DFArrayFormatterFactory;
7474

75+
#[derive(Clone)]
7576
/// Contains options that control how data is
7677
/// written out from a DataFrame
7778
pub struct DataFrameWriteOptions {

datafusion/core/tests/dataframe/mod.rs

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6898,3 +6898,255 @@ async fn test_grouping_with_alias() -> Result<()> {
68986898

68996899
Ok(())
69006900
}
6901+
6902+
struct FixtureDataGen {
6903+
_tmp_dir: TempDir,
6904+
out_dir: String,
6905+
ctx: SessionContext,
6906+
}
6907+
6908+
impl FixtureDataGen {
6909+
fn register_local_table(
6910+
out_dir: impl AsRef<Path>,
6911+
ctx: &SessionContext,
6912+
) -> Result<()> {
6913+
// Create an in memory table with schema C1 and C2, both strings
6914+
let schema = Arc::new(Schema::new(vec![
6915+
Field::new("c1", DataType::Utf8, false),
6916+
Field::new("c2", DataType::Utf8, false),
6917+
]));
6918+
6919+
let record_batch = RecordBatch::try_new(
6920+
schema.clone(),
6921+
vec![
6922+
Arc::new(StringArray::from(vec!["abc", "def"])),
6923+
Arc::new(StringArray::from(vec!["123", "456"])),
6924+
],
6925+
)?;
6926+
6927+
let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
6928+
6929+
// Register the table in the context
6930+
ctx.register_table("test", mem_table)?;
6931+
6932+
let local = Arc::new(LocalFileSystem::new_with_prefix(&out_dir)?);
6933+
let local_url = Url::parse("file://local").unwrap();
6934+
ctx.register_object_store(&local_url, local);
6935+
6936+
Ok(())
6937+
}
6938+
6939+
// initializes basic data and writes it using via executing physical plan
6940+
//
6941+
// Available columns: c1, c2
6942+
async fn prepare_execution_plan_writes(config: SessionConfig) -> Result<Self> {
6943+
let tmp_dir = TempDir::new()?;
6944+
6945+
let ctx = SessionContext::new_with_config(config);
6946+
6947+
Self::register_local_table(&tmp_dir, &ctx)?;
6948+
6949+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6950+
let out_dir_url = format!("file://{out_dir}");
6951+
6952+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6953+
let plan = df.create_physical_plan().await?;
6954+
6955+
ctx.write_parquet(plan.clone(), &out_dir_url, None).await?;
6956+
ctx.write_csv(plan.clone(), &out_dir_url).await?;
6957+
ctx.write_json(plan.clone(), &out_dir_url).await?;
6958+
6959+
Ok(Self {
6960+
_tmp_dir: tmp_dir,
6961+
out_dir,
6962+
ctx,
6963+
})
6964+
}
6965+
6966+
// initializes basic data and writes it using `write_opts`
6967+
//
6968+
// Available columns: c1, c2
6969+
async fn prepare_direct_df_writes(
6970+
config: SessionConfig,
6971+
write_opts: DataFrameWriteOptions,
6972+
) -> Result<Self> {
6973+
let tmp_dir = TempDir::new()?;
6974+
6975+
let ctx = SessionContext::new_with_config(config);
6976+
6977+
Self::register_local_table(&tmp_dir, &ctx)?;
6978+
6979+
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
6980+
let out_dir_url = format!("file://{out_dir}");
6981+
6982+
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
6983+
6984+
df.clone()
6985+
.write_parquet(&out_dir_url, write_opts.clone(), None)
6986+
.await?;
6987+
df.clone()
6988+
.write_csv(&out_dir_url, write_opts.clone(), None)
6989+
.await?;
6990+
df.write_json(&out_dir_url, write_opts.clone(), None)
6991+
.await?;
6992+
6993+
Ok(Self {
6994+
_tmp_dir: tmp_dir,
6995+
out_dir,
6996+
ctx,
6997+
})
6998+
}
6999+
}
7000+
7001+
#[tokio::test]
7002+
async fn write_partitioned_results_with_prefix() -> Result<()> {
7003+
let mut config = SessionConfig::new();
7004+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
7005+
7006+
let df_write_options =
7007+
DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
7008+
let FixtureDataGen {
7009+
_tmp_dir,
7010+
out_dir,
7011+
ctx,
7012+
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;
7013+
7014+
let partitioned_file = format!("{out_dir}/c2=123/prefix-*");
7015+
let filter_df = ctx
7016+
.read_parquet(&partitioned_file, ParquetReadOptions::default())
7017+
.await?;
7018+
7019+
// Check that the c2 column is gone and that c1 is abc.
7020+
let results_parquet = filter_df.collect().await?;
7021+
let results_parquet_display = batches_to_string(&results_parquet);
7022+
assert_snapshot!(
7023+
results_parquet_display.as_str(),
7024+
@r###"
7025+
+-----+
7026+
| c1 |
7027+
+-----+
7028+
| abc |
7029+
+-----+
7030+
"###
7031+
);
7032+
7033+
let results_csv = ctx
7034+
.read_csv(&partitioned_file, Default::default())
7035+
.await?
7036+
.collect()
7037+
.await?;
7038+
assert_eq!(
7039+
results_parquet_display.as_str(),
7040+
batches_to_string(&results_csv)
7041+
);
7042+
7043+
let results_json = ctx
7044+
.read_json(&partitioned_file, Default::default())
7045+
.await?
7046+
.collect()
7047+
.await?;
7048+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
7049+
7050+
Ok(())
7051+
}
7052+
7053+
#[tokio::test]
7054+
async fn write_physical_plan_results_with_prefix() -> Result<()> {
7055+
let mut config = SessionConfig::new();
7056+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
7057+
7058+
let FixtureDataGen {
7059+
_tmp_dir,
7060+
out_dir,
7061+
ctx,
7062+
} = FixtureDataGen::prepare_execution_plan_writes(config).await?;
7063+
7064+
let partitioned_file = format!("{out_dir}/prefix-*");
7065+
7066+
let df = ctx
7067+
.read_parquet(&partitioned_file, Default::default())
7068+
.await?;
7069+
let results_parquet = df.collect().await?;
7070+
let results_parquet_display = batches_to_string(&results_parquet);
7071+
assert_snapshot!(
7072+
results_parquet_display.as_str(),
7073+
@r###"
7074+
+-----+-----+
7075+
| c1 | c2 |
7076+
+-----+-----+
7077+
| abc | 123 |
7078+
| def | 456 |
7079+
+-----+-----+
7080+
"###
7081+
);
7082+
7083+
let results_csv = ctx
7084+
.read_csv(&partitioned_file, Default::default())
7085+
.await?
7086+
.collect()
7087+
.await?;
7088+
assert_eq!(
7089+
results_parquet_display.as_str(),
7090+
batches_to_string(&results_csv)
7091+
);
7092+
7093+
let results_json = ctx
7094+
.read_json(&partitioned_file, Default::default())
7095+
.await?
7096+
.collect()
7097+
.await?;
7098+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
7099+
7100+
Ok(())
7101+
}
7102+
7103+
#[tokio::test]
7104+
async fn write_parts_parquet_results_with_prefix() -> Result<()> {
7105+
let mut config = SessionConfig::new();
7106+
config.options_mut().execution.partitioned_file_prefix_name = "prefix-".to_owned();
7107+
7108+
let df_write_options = DataFrameWriteOptions::new();
7109+
let FixtureDataGen {
7110+
_tmp_dir,
7111+
out_dir,
7112+
ctx,
7113+
} = FixtureDataGen::prepare_direct_df_writes(config, df_write_options).await?;
7114+
7115+
let partitioned_file = format!("{out_dir}/prefix-*");
7116+
7117+
let df = ctx
7118+
.read_parquet(&partitioned_file, Default::default())
7119+
.await?;
7120+
let results_parquet = df.collect().await?;
7121+
let results_parquet_display = batches_to_string(&results_parquet);
7122+
assert_snapshot!(
7123+
results_parquet_display.as_str(),
7124+
@r###"
7125+
+-----+-----+
7126+
| c1 | c2 |
7127+
+-----+-----+
7128+
| abc | 123 |
7129+
| def | 456 |
7130+
+-----+-----+
7131+
"###
7132+
);
7133+
7134+
let results_csv = ctx
7135+
.read_csv(&partitioned_file, Default::default())
7136+
.await?
7137+
.collect()
7138+
.await?;
7139+
assert_eq!(
7140+
results_parquet_display.as_str(),
7141+
batches_to_string(&results_csv)
7142+
);
7143+
7144+
let results_json = ctx
7145+
.read_json(&partitioned_file, Default::default())
7146+
.await?
7147+
.collect()
7148+
.await?;
7149+
assert_eq!(results_parquet_display, batches_to_string(&results_json));
7150+
7151+
Ok(())
7152+
}

datafusion/datasource-csv/src/source.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -463,16 +463,15 @@ pub async fn plan_to_csv(
463463
let parsed = ListingTableUrl::parse(path)?;
464464
let object_store_url = parsed.object_store();
465465
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
466-
let writer_buffer_size = task_ctx
467-
.session_config()
468-
.options()
469-
.execution
470-
.objectstore_writer_buffer_size;
466+
let exec_options = &task_ctx.session_config().options().execution;
467+
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
468+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
469+
471470
let mut join_set = JoinSet::new();
472471
for i in 0..plan.output_partitioning().partition_count() {
473472
let storeref = Arc::clone(&store);
474473
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
475-
let filename = format!("{}/part-{i}.csv", parsed.prefix());
474+
let filename = format!("{}/{file_name_prefix}part-{i}.csv", parsed.prefix(),);
476475
let file = object_store::path::Path::parse(filename)?;
477476

478477
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-json/src/source.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -488,16 +488,15 @@ pub async fn plan_to_json(
488488
let parsed = ListingTableUrl::parse(path)?;
489489
let object_store_url = parsed.object_store();
490490
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
491-
let writer_buffer_size = task_ctx
492-
.session_config()
493-
.options()
494-
.execution
495-
.objectstore_writer_buffer_size;
491+
let exec_options = &task_ctx.session_config().options().execution;
492+
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
493+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
494+
496495
let mut join_set = JoinSet::new();
497496
for i in 0..plan.output_partitioning().partition_count() {
498497
let storeref = Arc::clone(&store);
499498
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
500-
let filename = format!("{}/part-{i}.json", parsed.prefix());
499+
let filename = format!("{}/{file_name_prefix}part-{i}.json", parsed.prefix());
501500
let file = object_store::path::Path::parse(filename)?;
502501

503502
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-parquet/src/writer.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,20 @@ pub async fn plan_to_parquet(
3939
let object_store_url = parsed.object_store();
4040
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
4141
let mut join_set = JoinSet::new();
42+
let exec_options = &task_ctx.session_config().options().execution;
43+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
44+
4245
for i in 0..plan.output_partitioning().partition_count() {
4346
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
44-
let filename = format!("{}/part-{i}.parquet", parsed.prefix());
47+
let filename = format!("{}/{file_name_prefix}part-{i}.parquet", parsed.prefix());
4548
let file = Path::parse(filename)?;
4649
let propclone = writer_properties.clone();
4750

4851
let storeref = Arc::clone(&store);
4952
let buf_writer = BufWriter::with_capacity(
5053
storeref,
5154
file.clone(),
52-
task_ctx
53-
.session_config()
54-
.options()
55-
.execution
56-
.objectstore_writer_buffer_size,
55+
exec_options.objectstore_writer_buffer_size,
5756
);
5857
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
5958
join_set.spawn(async move {

datafusion/datasource/src/write/demux.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ async fn row_count_demuxer(
157157
let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
158158
let minimum_parallel_files = exec_options.minimum_parallel_output_files;
159159
let mut part_idx = 0;
160-
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
160+
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
161+
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);
161162

162163
let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);
163164

@@ -302,9 +303,10 @@ async fn hive_style_partitions_demuxer(
302303
file_extension: String,
303304
keep_partition_by_columns: bool,
304305
) -> Result<()> {
305-
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
306-
307306
let exec_options = &context.session_config().options().execution;
307+
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
308+
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);
309+
308310
let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
309311

310312
// To support non string partition col types, cast the type to &str first

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ datafusion.execution.parquet.statistics_truncate_length 64
262262
datafusion.execution.parquet.use_content_defined_chunking NULL
263263
datafusion.execution.parquet.write_batch_size 1024
264264
datafusion.execution.parquet.writer_version 1.0
265+
datafusion.execution.partitioned_file_prefix_name (empty)
265266
datafusion.execution.perfect_hash_join_min_key_density 0.15
266267
datafusion.execution.perfect_hash_join_small_build_threshold 1024
267268
datafusion.execution.planning_concurrency 13
@@ -409,6 +410,7 @@ datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statis
409410
datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups.
410411
datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows
411412
datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0"
413+
datafusion.execution.partitioned_file_prefix_name (empty) Prefix to use when generating file name in multi file output. When prefix is non-empty string, this prefix will be used to generate file name as `{partitioned_file_prefix_name}{datafusion generated suffix}`. Defaults to empty string.
412414
datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
413415
datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future.
414416
datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system

0 commit comments

Comments
 (0)