Skip to content

Commit 3aefba7

Browse files
authored
fix: fix elapsed_compute metric in ParquetSink to report encoding time only (#21825)
## Which issue does this PR close? - Closes #21797. ## Rationale for this change `ParquetSink` registered an `elapsed_compute` metric using a single wall-clock timer that spanned the entire write operation — upstream batch wait, CPU Arrow→Parquet encoding, and object-store I/O all rolled into one number. This made the metric misleading: it inflated `elapsed_compute` with I/O latency, which is inconsistent with how every other operator in DataFusion reports this metric (CPU time only). ## What changes are included in this PR? Two write paths are fixed independently: **Sequential path** (`allow_single_file_parallelism = false` or CDC enabled): - A new `TimingWriter<W>` wrapper implements `AsyncFileWriter` and records wall-clock time spent in I/O calls (`write` / `complete`). - The total time inside `writer.write()` and `writer.close()` is accumulated in `total_write_time`. After all tasks join, `elapsed_compute` is set to `total_write_time − io_time`, isolating pure Arrow→Parquet encoding time. **Parallel path** (`allow_single_file_parallelism = true`, default): - `encoding_time: Time` (a clone of the registered `elapsed_compute` metric) is threaded through the five-function call chain down to the two leaf sites: `writer.write()` in `column_serializer_task` and `writer.close()` in `spawn_rg_join_and_finalize_task`. Since `Time` is `Arc<AtomicUsize>`, all concurrent column tasks accumulate directly into the registered metric. - Note: on the parallel path, `append_to_row_group()` in `concatenate_parallel_row_groups` is interleaved with I/O and cannot be cleanly isolated. It is excluded from `elapsed_compute`. This is acceptable since it operates on already-encoded data and represents a small fraction of total encoding CPU time. ## Are these changes tested? Yes. Two tests are added/extended in `datafusion/core/src/dataframe/parquet.rs`: - `test_parquet_sink_metrics_sequential` (new): verifies `elapsed_compute > 0` with `allow_single_file_parallelism = false`. - `test_parquet_sink_metrics_parallel`: extended with an `elapsed_compute > 0` assertion (was previously missing for the parallel path). The existing `test_parquet_sink_metrics` test (parallel path, default config) already asserted `elapsed_compute > 0` and continues to pass. ## Are there any user-facing changes? No API changes. The `elapsed_compute` metric was already surfaced — this PR makes its value accurate rather than introducing a new metric.
1 parent 73ca6a5 commit 3aefba7

7 files changed

Lines changed: 241 additions & 51 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ parquet = { version = "58.1.0", default-features = false, features = [
183183
] }
184184
pbjson = { version = "0.9.0" }
185185
pbjson-types = "0.9"
186+
pin-project = "1"
186187
# Should match arrow-flight's version of prost.
187188
prost = "0.14.1"
188189
rand = "0.9"

datafusion/core/src/dataframe/parquet.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,66 @@ mod tests {
558558

559559
assert_eq!(metric_usize(&aggregated, "rows_written"), 50);
560560
assert!(metric_usize(&aggregated, "bytes_written") > 0);
561+
assert!(
562+
metric_usize(&aggregated, "elapsed_compute") > 0,
563+
"expected elapsed_compute > 0 on parallel path"
564+
);
565+
566+
Ok(())
567+
}
568+
569+
/// Test that ParquetSink reports a non-zero elapsed_compute on the sequential
570+
/// write path (allow_single_file_parallelism = false), where elapsed_compute
571+
/// is computed as total_write_time - io_time via TimingWriter.
572+
#[tokio::test]
573+
async fn test_parquet_sink_metrics_sequential() -> Result<()> {
574+
use arrow::array::Int32Array;
575+
use arrow::datatypes::{DataType, Field, Schema};
576+
use arrow::record_batch::RecordBatch;
577+
use datafusion_execution::TaskContext;
578+
579+
use futures::TryStreamExt;
580+
581+
let ctx = SessionContext::new();
582+
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = false")
583+
.await?
584+
.collect()
585+
.await?;
586+
587+
let tmp_dir = TempDir::new()?;
588+
let output_path = tmp_dir.path().join("metrics_sequential.parquet");
589+
let output_path_str = output_path.to_str().unwrap();
590+
591+
let schema =
592+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
593+
let ids: Vec<i32> = (0..50).collect();
594+
let batch = RecordBatch::try_new(
595+
Arc::clone(&schema),
596+
vec![Arc::new(Int32Array::from(ids))],
597+
)?;
598+
ctx.register_batch("source_seq", batch)?;
599+
600+
let df = ctx
601+
.sql(&format!(
602+
"COPY source_seq TO '{output_path_str}' STORED AS PARQUET"
603+
))
604+
.await?;
605+
let plan = df.create_physical_plan().await?;
606+
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
607+
let stream = plan.execute(0, task_ctx)?;
608+
let _batches: Vec<_> = stream.try_collect().await?;
609+
610+
let metrics = plan
611+
.metrics()
612+
.expect("DataSinkExec should return metrics from ParquetSink");
613+
let aggregated = metrics.aggregate_by_name();
614+
615+
assert_eq!(metric_usize(&aggregated, "rows_written"), 50);
616+
assert!(metric_usize(&aggregated, "bytes_written") > 0);
617+
assert!(
618+
metric_usize(&aggregated, "elapsed_compute") > 0,
619+
"expected elapsed_compute > 0 on sequential path"
620+
);
561621

562622
Ok(())
563623
}

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 75 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5555
use datafusion_expr::dml::InsertOp;
5656
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
5757
use datafusion_physical_plan::metrics::{
58-
ExecutionPlanMetricsSet, MetricBuilder, MetricCategory, MetricsSet,
58+
ElapsedComputeFutureExt, ExecutionPlanMetricsSet, MetricBuilder, MetricCategory,
59+
MetricsSet, Time,
5960
};
6061
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
6162
use datafusion_session::Session;
@@ -1340,8 +1341,6 @@ impl FileSink for ParquetSink {
13401341
.global_counter("bytes_written");
13411342
let elapsed_compute = MetricBuilder::new(&self.metrics).elapsed_compute(0);
13421343

1343-
let write_start = datafusion_common::instant::Instant::now();
1344-
13451344
let parquet_opts = &self.parquet_options;
13461345

13471346
let mut file_write_tasks: JoinSet<
@@ -1375,17 +1374,20 @@ impl FileSink for ParquetSink {
13751374
.await?;
13761375
let reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
13771376
.register(context.memory_pool());
1378-
file_write_tasks.spawn(async move {
1379-
while let Some(batch) = rx.recv().await {
1380-
writer.write(&batch).await?;
1381-
reservation.try_resize(writer.memory_size())?;
1377+
file_write_tasks.spawn(
1378+
async move {
1379+
while let Some(batch) = rx.recv().await {
1380+
writer.write(&batch).await?;
1381+
reservation.try_resize(writer.memory_size())?;
1382+
}
1383+
let parquet_meta_data = writer
1384+
.close()
1385+
.await
1386+
.map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1387+
Ok((path, parquet_meta_data))
13821388
}
1383-
let parquet_meta_data = writer
1384-
.close()
1385-
.await
1386-
.map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1387-
Ok((path, parquet_meta_data))
1388-
});
1389+
.with_elapsed_compute(elapsed_compute.clone()),
1390+
);
13891391
} else {
13901392
let writer = ObjectWriterBuilder::new(
13911393
// Parquet files as a whole are never compressed, since they
@@ -1402,20 +1404,20 @@ impl FileSink for ParquetSink {
14021404
.objectstore_writer_buffer_size,
14031405
))
14041406
.build()?;
1405-
let schema = get_writer_schema(&self.config);
1406-
let props = parquet_props.clone();
1407-
let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1408-
let parallel_options_clone = parallel_options.clone();
1409-
let pool = Arc::clone(context.memory_pool());
1407+
let ctx = ParquetFileWriteContext {
1408+
schema: get_writer_schema(&self.config),
1409+
props: Arc::new(parquet_props),
1410+
skip_arrow_metadata: self.parquet_options.global.skip_arrow_metadata,
1411+
parallel_options: Arc::new(parallel_options.clone()),
1412+
pool: Arc::clone(context.memory_pool()),
1413+
};
1414+
let encoding_time = elapsed_compute.clone();
14101415
file_write_tasks.spawn(async move {
14111416
let parquet_meta_data = output_single_parquet_file_parallelized(
14121417
writer,
14131418
rx,
1414-
schema,
1415-
&props,
1416-
skip_arrow_metadata,
1417-
parallel_options_clone,
1418-
pool,
1419+
ctx,
1420+
encoding_time,
14191421
)
14201422
.await?;
14211423
Ok((path, parquet_meta_data))
@@ -1456,8 +1458,6 @@ impl FileSink for ParquetSink {
14561458
.await
14571459
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
14581460

1459-
elapsed_compute.add_elapsed(write_start);
1460-
14611461
Ok(rows_written_counter.value() as u64)
14621462
}
14631463
}
@@ -1487,8 +1487,10 @@ async fn column_serializer_task(
14871487
mut rx: Receiver<ArrowLeafColumn>,
14881488
mut writer: ArrowColumnWriter,
14891489
reservation: MemoryReservation,
1490+
encoding_time: Time,
14901491
) -> Result<(ArrowColumnWriter, MemoryReservation)> {
14911492
while let Some(col) = rx.recv().await {
1493+
let _timer = encoding_time.timer();
14921494
writer.write(&col)?;
14931495
reservation.try_resize(writer.memory_size())?;
14941496
}
@@ -1505,6 +1507,7 @@ fn spawn_column_parallel_row_group_writer(
15051507
col_writers: Vec<ArrowColumnWriter>,
15061508
max_buffer_size: usize,
15071509
pool: &Arc<dyn MemoryPool>,
1510+
encoding_time: &Time,
15081511
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
15091512
let num_columns = col_writers.len();
15101513

@@ -1522,6 +1525,7 @@ fn spawn_column_parallel_row_group_writer(
15221525
receive_array,
15231526
writer,
15241527
reservation,
1528+
encoding_time.clone(),
15251529
));
15261530
col_writer_tasks.push(task);
15271531
}
@@ -1536,6 +1540,22 @@ struct ParallelParquetWriterOptions {
15361540
max_buffered_record_batches_per_stream: usize,
15371541
}
15381542

1543+
/// Write configuration inputs shared across all parallel tasks that encode a
1544+
/// single Parquet file. These values are invariant for the duration of one file
1545+
/// write and do not change per row-group or per column.
1546+
///
1547+
/// Separating these from per-call parameters (`object_store_writer`, `data`,
1548+
/// `encoding_time`) keeps the deep parallel call chain below the argument-count
1549+
/// limit without mixing configuration with runtime state.
1550+
#[derive(Clone)]
1551+
struct ParquetFileWriteContext {
1552+
schema: Arc<Schema>,
1553+
props: Arc<WriterProperties>,
1554+
skip_arrow_metadata: bool,
1555+
parallel_options: Arc<ParallelParquetWriterOptions>,
1556+
pool: Arc<dyn MemoryPool>,
1557+
}
1558+
15391559
/// This is the return type of calling [ArrowColumnWriter].close() on each column
15401560
/// i.e. the Vec of encoded columns which can be appended to a row group
15411561
type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
@@ -1570,6 +1590,7 @@ fn spawn_rg_join_and_finalize_task(
15701590
column_writer_tasks: Vec<ColumnWriterTask>,
15711591
rg_rows: usize,
15721592
pool: &Arc<dyn MemoryPool>,
1593+
encoding_time: Time,
15731594
) -> SpawnedTask<RBStreamSerializeResult> {
15741595
let rg_reservation =
15751596
MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
@@ -1584,6 +1605,7 @@ fn spawn_rg_join_and_finalize_task(
15841605
.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
15851606
let encoded_size = writer.get_estimated_total_bytes();
15861607
rg_reservation.grow(encoded_size);
1608+
let _timer = encoding_time.timer();
15871609
finalized_rg.push(writer.close()?);
15881610
}
15891611

@@ -1603,21 +1625,25 @@ fn spawn_parquet_parallel_serialization_task(
16031625
row_group_writer_factory: ArrowRowGroupWriterFactory,
16041626
mut data: Receiver<RecordBatch>,
16051627
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1606-
schema: Arc<Schema>,
1607-
writer_props: Arc<WriterProperties>,
1608-
parallel_options: Arc<ParallelParquetWriterOptions>,
1609-
pool: Arc<dyn MemoryPool>,
1628+
ctx: ParquetFileWriteContext,
1629+
encoding_time: Time,
16101630
) -> SpawnedTask<Result<(), DataFusionError>> {
16111631
SpawnedTask::spawn(async move {
1612-
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1613-
let max_row_group_rows = writer_props
1632+
let max_buffer_rb = ctx.parallel_options.max_buffered_record_batches_per_stream;
1633+
let max_row_group_rows = ctx
1634+
.props
16141635
.max_row_group_row_count()
16151636
.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT);
16161637
let mut row_group_index = 0;
16171638
let col_writers =
16181639
row_group_writer_factory.create_column_writers(row_group_index)?;
16191640
let (mut column_writer_handles, mut col_array_channels) =
1620-
spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1641+
spawn_column_parallel_row_group_writer(
1642+
col_writers,
1643+
max_buffer_rb,
1644+
&ctx.pool,
1645+
&encoding_time,
1646+
)?;
16211647
let mut current_rg_rows = 0;
16221648

16231649
while let Some(mut rb) = data.recv().await {
@@ -1629,7 +1655,7 @@ fn spawn_parquet_parallel_serialization_task(
16291655
send_arrays_to_col_writers(
16301656
&col_array_channels,
16311657
&rb,
1632-
Arc::clone(&schema),
1658+
Arc::clone(&ctx.schema),
16331659
)
16341660
.await?;
16351661
current_rg_rows += rb.num_rows();
@@ -1640,7 +1666,7 @@ fn spawn_parquet_parallel_serialization_task(
16401666
send_arrays_to_col_writers(
16411667
&col_array_channels,
16421668
&a,
1643-
Arc::clone(&schema),
1669+
Arc::clone(&ctx.schema),
16441670
)
16451671
.await?;
16461672

@@ -1651,7 +1677,8 @@ fn spawn_parquet_parallel_serialization_task(
16511677
let finalize_rg_task = spawn_rg_join_and_finalize_task(
16521678
column_writer_handles,
16531679
max_row_group_rows,
1654-
&pool,
1680+
&ctx.pool,
1681+
encoding_time.clone(),
16551682
);
16561683

16571684
// Do not surface error from closed channel (means something
@@ -1670,7 +1697,8 @@ fn spawn_parquet_parallel_serialization_task(
16701697
spawn_column_parallel_row_group_writer(
16711698
col_writers,
16721699
max_buffer_rb,
1673-
&pool,
1700+
&ctx.pool,
1701+
&encoding_time,
16741702
)?;
16751703
}
16761704
}
@@ -1682,7 +1710,8 @@ fn spawn_parquet_parallel_serialization_task(
16821710
let finalize_rg_task = spawn_rg_join_and_finalize_task(
16831711
column_writer_handles,
16841712
current_rg_rows,
1685-
&pool,
1713+
&ctx.pool,
1714+
encoding_time.clone(),
16861715
);
16871716

16881717
// Do not surface error from closed channel (means something
@@ -1749,37 +1778,32 @@ async fn concatenate_parallel_row_groups(
17491778
async fn output_single_parquet_file_parallelized(
17501779
object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
17511780
data: Receiver<RecordBatch>,
1752-
output_schema: Arc<Schema>,
1753-
parquet_props: &WriterProperties,
1754-
skip_arrow_metadata: bool,
1755-
parallel_options: ParallelParquetWriterOptions,
1756-
pool: Arc<dyn MemoryPool>,
1781+
ctx: ParquetFileWriteContext,
1782+
encoding_time: Time,
17571783
) -> Result<ParquetMetaData> {
1758-
let max_rowgroups = parallel_options.max_parallel_row_groups;
1784+
let max_rowgroups = ctx.parallel_options.max_parallel_row_groups;
17591785
// Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
17601786
let (serialize_tx, serialize_rx) =
17611787
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
17621788

1763-
let arc_props = Arc::new(parquet_props.clone());
17641789
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
17651790
let options = ArrowWriterOptions::new()
1766-
.with_properties(parquet_props.clone())
1767-
.with_skip_arrow_metadata(skip_arrow_metadata);
1791+
.with_properties((*ctx.props).clone())
1792+
.with_skip_arrow_metadata(ctx.skip_arrow_metadata);
17681793
let writer = ArrowWriter::try_new_with_options(
17691794
merged_buff.clone(),
1770-
Arc::clone(&output_schema),
1795+
Arc::clone(&ctx.schema),
17711796
options,
17721797
)?;
17731798
let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
17741799

1800+
let pool = Arc::clone(&ctx.pool);
17751801
let launch_serialization_task = spawn_parquet_parallel_serialization_task(
17761802
row_group_writer_factory,
17771803
data,
17781804
serialize_tx,
1779-
Arc::clone(&output_schema),
1780-
Arc::clone(&arc_props),
1781-
parallel_options.into(),
1782-
Arc::clone(&pool),
1805+
ctx,
1806+
encoding_time,
17831807
);
17841808
let parquet_meta_data = concatenate_parallel_row_groups(
17851809
writer,

datafusion/physical-expr-common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ hashbrown = { workspace = true }
4949
indexmap = { workspace = true }
5050
itertools = { workspace = true }
5151
parking_lot = { workspace = true }
52+
pin-project = { workspace = true }
5253

5354
[dev-dependencies]
5455
criterion = { workspace = true }

0 commit comments

Comments
 (0)