Skip to content

Commit 803766a

Browse files
committed
Introduce way to customize prefix of multi file outputs
1 parent 10c933c commit 803766a

5 files changed

Lines changed: 32 additions & 21 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,12 @@ config_namespace! {
649649
/// # Default
650650
/// `false` — ANSI SQL mode is disabled by default.
651651
pub enable_ansi_mode: bool, default = false
652+
653+
/// Prefix to use when generating file name in multi file output
654+
///
655+
/// When prefix is non-empty string, this prefix will be used to generate file name as
656+
/// `{partitioned_file_prefix_name}{datafusion generated suffix}`
657+
pub partitioned_file_prefix_name: String, default = String::new()
652658
}
653659
}
654660

datafusion/datasource-csv/src/source.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -453,16 +453,17 @@ pub async fn plan_to_csv(
453453
let parsed = ListingTableUrl::parse(path)?;
454454
let object_store_url = parsed.object_store();
455455
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
456-
let writer_buffer_size = task_ctx
457-
.session_config()
458-
.options()
459-
.execution
460-
.objectstore_writer_buffer_size;
456+
let exec_options = &task_ctx.session_config().options().execution;
457+
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
461458
let mut join_set = JoinSet::new();
462459
for i in 0..plan.output_partitioning().partition_count() {
463460
let storeref = Arc::clone(&store);
464461
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
465-
let filename = format!("{}/part-{i}.csv", parsed.prefix());
462+
let filename = format!(
463+
"{}/{}part-{i}.csv",
464+
exec_options.partitioned_file_prefix_name,
465+
parsed.prefix()
466+
);
466467
let file = object_store::path::Path::parse(filename)?;
467468

468469
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-json/src/source.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,16 +275,17 @@ pub async fn plan_to_json(
275275
let parsed = ListingTableUrl::parse(path)?;
276276
let object_store_url = parsed.object_store();
277277
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
278-
let writer_buffer_size = task_ctx
279-
.session_config()
280-
.options()
281-
.execution
282-
.objectstore_writer_buffer_size;
278+
let exec_options = &task_ctx.session_config().options().execution;
279+
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
283280
let mut join_set = JoinSet::new();
284281
for i in 0..plan.output_partitioning().partition_count() {
285282
let storeref = Arc::clone(&store);
286283
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
287-
let filename = format!("{}/part-{i}.json", parsed.prefix());
284+
let filename = format!(
285+
"{}/{}part-{i}.json",
286+
exec_options.partitioned_file_prefix_name,
287+
parsed.prefix()
288+
);
288289
let file = object_store::path::Path::parse(filename)?;
289290

290291
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-parquet/src/writer.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,22 @@ pub async fn plan_to_parquet(
3939
let object_store_url = parsed.object_store();
4040
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
4141
let mut join_set = JoinSet::new();
42+
let exec_options = &task_ctx.session_config().options().execution;
4243
for i in 0..plan.output_partitioning().partition_count() {
4344
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
44-
let filename = format!("{}/part-{i}.parquet", parsed.prefix());
45+
let filename = format!(
46+
"{}/{}part-{i}.parquet",
47+
exec_options.partitioned_file_prefix_name,
48+
parsed.prefix()
49+
);
4550
let file = Path::parse(filename)?;
4651
let propclone = writer_properties.clone();
4752

4853
let storeref = Arc::clone(&store);
4954
let buf_writer = BufWriter::with_capacity(
5055
storeref,
5156
file.clone(),
52-
task_ctx
53-
.session_config()
54-
.options()
55-
.execution
56-
.objectstore_writer_buffer_size,
57+
exec_options.objectstore_writer_buffer_size,
5758
);
5859
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
5960
join_set.spawn(async move {

datafusion/datasource/src/write/demux.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ async fn row_count_demuxer(
156156
let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
157157
let minimum_parallel_files = exec_options.minimum_parallel_output_files;
158158
let mut part_idx = 0;
159-
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
159+
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
160+
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);
160161

161162
let mut open_file_streams = Vec::with_capacity(minimum_parallel_files);
162163

@@ -283,9 +284,10 @@ async fn hive_style_partitions_demuxer(
283284
file_extension: String,
284285
keep_partition_by_columns: bool,
285286
) -> Result<()> {
286-
let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(), 16);
287-
288287
let exec_options = &context.session_config().options().execution;
288+
let mut write_id = exec_options.partitioned_file_prefix_name.clone();
289+
rand::distr::Alphanumeric.append_string(&mut rand::rng(), &mut write_id, 16);
290+
289291
let max_buffered_recordbatches = exec_options.max_buffered_batches_per_output_file;
290292

291293
// To support non string partition col types, cast the type to &str first

0 commit comments

Comments
 (0)