Skip to content

Commit dfddaf3

Browse files
committed
Fix datasource-*
1 parent 7f66e30 commit dfddaf3

4 files changed

Lines changed: 10 additions & 16 deletions

File tree

datafusion/core/tests/dataframe/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6897,7 +6897,7 @@ async fn write_physical_plan_results_with_prefix() -> Result<()> {
68976897
ctx,
68986898
} = FixtureDataGen::prepare_execution_plan_writes(config).await?;
68996899

6900-
let partitioned_file = format!("{out_dir}/c2=123/prefix-*");
6900+
let partitioned_file = format!("{out_dir}/prefix-*");
69016901

69026902
let df = ctx
69036903
.read_parquet(&partitioned_file, Default::default())

datafusion/datasource-csv/src/source.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -455,15 +455,13 @@ pub async fn plan_to_csv(
455455
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
456456
let exec_options = &task_ctx.session_config().options().execution;
457457
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
458+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
459+
458460
let mut join_set = JoinSet::new();
459461
for i in 0..plan.output_partitioning().partition_count() {
460462
let storeref = Arc::clone(&store);
461463
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
462-
let filename = format!(
463-
"{}/{}part-{i}.csv",
464-
exec_options.partitioned_file_prefix_name,
465-
parsed.prefix()
466-
);
464+
let filename = format!("{}/{file_name_prefix}part-{i}.csv", parsed.prefix(),);
467465
let file = object_store::path::Path::parse(filename)?;
468466

469467
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-json/src/source.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,13 @@ pub async fn plan_to_json(
277277
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
278278
let exec_options = &task_ctx.session_config().options().execution;
279279
let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
280+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
281+
280282
let mut join_set = JoinSet::new();
281283
for i in 0..plan.output_partitioning().partition_count() {
282284
let storeref = Arc::clone(&store);
283285
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
284-
let filename = format!(
285-
"{}/{}part-{i}.json",
286-
exec_options.partitioned_file_prefix_name,
287-
parsed.prefix()
288-
);
286+
let filename = format!("{}/{file_name_prefix}part-{i}.json", parsed.prefix());
289287
let file = object_store::path::Path::parse(filename)?;
290288

291289
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;

datafusion/datasource-parquet/src/writer.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,11 @@ pub async fn plan_to_parquet(
4040
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
4141
let mut join_set = JoinSet::new();
4242
let exec_options = &task_ctx.session_config().options().execution;
43+
let file_name_prefix = exec_options.partitioned_file_prefix_name.as_str();
44+
4345
for i in 0..plan.output_partitioning().partition_count() {
4446
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
45-
let filename = format!(
46-
"{}/{}part-{i}.parquet",
47-
exec_options.partitioned_file_prefix_name,
48-
parsed.prefix()
49-
);
47+
let filename = format!("{}/{file_name_prefix}part-{i}.parquet", parsed.prefix());
5048
let file = Path::parse(filename)?;
5149
let propclone = writer_properties.clone();
5250

0 commit comments

Comments
 (0)