Skip to content

Commit cc670e8

Browse files
authored
Better document the relationship between FileFormat::projection / FileFormat::filter and FileScanConfig::Statistics (#20188)
## Which issue does this PR close? - Part of #20173 ## Rationale for this change I am debugging an issue related to the interplay of pre-existing orderings, filtering and projections in FileScanConfig. As part of that I am trying to understand how `Statistics` were handled by `FileScanConfig` -- specifically when relatively speaking are the projection and filtering applied After some study, I have found that the statistics are (supposed?) to be *before* applying the Filter and Projection from the scan, so let's document that better. Also I found the schemas involved to be a bit confusing. I also would like to use this PR to validate my understanding of the intended design ## What changes are included in this PR? Update documentation ## Are these changes tested? by CI ## Are there any user-facing changes? Just documentation changes, no functional changes
1 parent 7788c73 commit cc670e8

4 files changed

Lines changed: 90 additions & 17 deletions

File tree

datafusion/datasource/src/file.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource>
4646

4747
/// file format specific behaviors for elements in [`DataSource`]
4848
///
49+
/// # Schema information
50+
/// There are two important schemas for a [`FileSource`]:
51+
/// 1. [`Self::table_schema`] -- the schema for the overall "table"
52+
/// 2. The logical output schema, comprised of [`Self::table_schema`] with
53+
/// [`Self::projection`] applied
54+
///
4955
/// See more details on specific implementations:
5056
/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
5157
/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
@@ -64,24 +70,38 @@ pub trait FileSource: Send + Sync {
6470
) -> Result<Arc<dyn FileOpener>>;
6571
/// Any
6672
fn as_any(&self) -> &dyn Any;
73+
6774
/// Returns the table schema for this file source.
6875
///
69-
/// This always returns the unprojected schema (the full schema of the data).
76+
/// This always returns the unprojected schema (the full schema of the data)
77+
/// without [`Self::projection`] applied.
78+
///
79+
/// The output schema of this `FileSource` is this TableSchema
80+
/// with [`Self::projection`] applied.
7081
fn table_schema(&self) -> &crate::table_schema::TableSchema;
82+
7183
/// Initialize new type with batch size configuration
7284
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
73-
/// Returns the filter expression that will be applied during the file scan.
85+
86+
/// Returns the filter expression that will be applied *during* the file scan.
87+
///
88+
/// These expressions are in terms of the unprojected [`Self::table_schema`].
7489
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
7590
None
7691
}
77-
/// Return the projection that will be applied to the output stream on top of the table schema.
92+
93+
/// Return the projection that will be applied to the output stream on top
94+
/// of [`Self::table_schema`].
7895
fn projection(&self) -> Option<&ProjectionExprs> {
7996
None
8097
}
98+
8199
/// Return execution plan metrics
82100
fn metrics(&self) -> &ExecutionPlanMetricsSet;
101+
83102
/// String representation of file source such as "csv", "json", "parquet"
84103
fn file_type(&self) -> &str;
104+
85105
/// Format FileType specific information
86106
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
87107
Ok(())
@@ -135,6 +155,19 @@ pub trait FileSource: Send + Sync {
135155
}
136156

137157
/// Try to push down filters into this FileSource.
158+
///
159+
/// `filters` must be in terms of the unprojected table schema (file schema
160+
/// plus partition columns), before any projection is applied.
161+
///
162+
/// Any filters that this FileSource chooses to evaluate itself should be
163+
/// returned as `PushedDown::Yes` in the result, along with a FileSource
164+
/// instance that incorporates those filters. Such filters are logically
165+
/// applied "during" the file scan, meaning they may refer to columns not
166+
/// included in the final output projection.
167+
///
168+
/// Filters that cannot be pushed down should be marked as `PushedDown::No`,
169+
/// and will be evaluated by an execution plan after the file source.
170+
///
138171
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
139172
///
140173
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
@@ -220,7 +253,7 @@ pub trait FileSource: Send + Sync {
220253
Ok(SortOrderPushdownResult::Unsupported)
221254
}
222255

223-
/// Try to push down a projection into a this FileSource.
256+
/// Try to push down a projection into this FileSource.
224257
///
225258
/// `FileSource` implementations that support projection pushdown should
226259
/// override this method and return a new `FileSource` instance with the

datafusion/datasource/src/file_scan_config.rs

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,21 @@ use datafusion_physical_plan::{
5555
use log::{debug, warn};
5656
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
5757

58-
/// The base configurations for a [`DataSourceExec`], the a physical plan for
59-
/// any given file format.
58+
/// [`FileScanConfig`] represents scanning data from a group of files
6059
///
61-
/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from a ``FileScanConfig`.
60+
/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan
61+
/// for scanning files with a particular file format.
62+
///
63+
/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible
64+
/// for creating the actual execution plan to read the files based on a
65+
/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent
66+
/// information about the files **before** any projection or filtering is
67+
/// applied in the file source.
68+
///
69+
/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`.
70+
///
71+
/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from
72+
/// a `FileScanConfig`.
6273
///
6374
/// # Example
6475
/// ```
@@ -169,8 +180,11 @@ pub struct FileScanConfig {
169180
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
170181
/// from the logical schema to the physical schema of the file.
171182
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
172-
/// Unprojected statistics for the table (file schema + partition columns).
173-
/// These are projected on-demand via `projected_stats()`.
183+
/// Statistics for the entire table (file schema + partition columns).
184+
/// See [`FileScanConfigBuilder::with_statistics`] for more details.
185+
///
186+
/// The effective statistics are computed on-demand via
187+
/// [`ProjectionExprs::project_statistics`].
174188
///
175189
/// Note that this field is pub(crate) because accessing it directly from outside
176190
/// would be incorrect if there are filters being applied, thus this should be accessed
@@ -283,31 +297,35 @@ impl FileScanConfigBuilder {
283297
}
284298
}
285299

286-
/// Set the maximum number of records to read from this plan. If `None`,
287-
/// all records after filtering are returned.
300+
/// Set the maximum number of records to read from this plan.
301+
///
302+
/// If `None`, all records after filtering are returned.
288303
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
289304
self.limit = limit;
290305
self
291306
}
292307

293308
/// Set whether the limit should be order-sensitive.
309+
///
294310
/// When `true`, files must be read in the exact order specified to produce
295311
/// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
296-
/// DataFusion may reorder file processing for optimization without affecting correctness.
312+
/// DataFusion may reorder file processing for optimization without
313+
/// affecting correctness.
297314
pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
298315
self.preserve_order = order_sensitive;
299316
self
300317
}
301318

302319
/// Set the file source for scanning files.
303320
///
304-
/// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
305-
/// after the builder has been created.
321+
/// This method allows you to change the file source implementation (e.g.
322+
/// ParquetSource, CsvSource, etc.) after the builder has been created.
306323
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
307324
self.file_source = file_source;
308325
self
309326
}
310327

328+
/// Return the table schema
311329
pub fn table_schema(&self) -> &SchemaRef {
312330
self.file_source.table_schema().table_schema()
313331
}
@@ -332,7 +350,12 @@ impl FileScanConfigBuilder {
332350

333351
/// Set the columns on which to project the data using column indices.
334352
///
335-
/// Indexes that are higher than the number of columns of `file_schema` refer to `table_partition_cols`.
353+
/// This method attempts to push down the projection to the underlying file
354+
/// source if supported. If the file source does not support projection
355+
/// pushdown, an error is returned.
356+
///
357+
/// Indexes that are higher than the number of columns of `file_schema`
358+
/// refer to `table_partition_cols`.
336359
pub fn with_projection_indices(
337360
mut self,
338361
indices: Option<Vec<usize>>,
@@ -371,8 +394,18 @@ impl FileScanConfigBuilder {
371394
self
372395
}
373396

374-
/// Set the estimated overall statistics of the files, taking `filters` into account.
375-
/// Defaults to [`Statistics::new_unknown`].
397+
/// Set the statistics of the files, including partition
398+
/// columns. Defaults to [`Statistics::new_unknown`].
399+
///
400+
/// These statistics are for the entire table (file schema + partition
401+
/// columns) before any projection or filtering is applied. Projections are
402+
/// applied when statistics are retrieved, and if a filter is present,
403+
/// [`FileScanConfig::statistics`] will mark the statistics as inexact
404+
/// (counts are not adjusted).
405+
///
406+
/// Projections and filters may be applied by the file source, either by
407+
/// [`Self::with_projection_indices`] or a preexisting
408+
/// [`FileSource::projection`] or [`FileSource::filter`].
376409
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
377410
self.statistics = Some(statistics);
378411
self

datafusion/datasource/src/source.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,13 @@ pub trait DataSource: Send + Sync + Debug {
178178
&self,
179179
_projection: &ProjectionExprs,
180180
) -> Result<Option<Arc<dyn DataSource>>>;
181+
181182
/// Try to push down filters into this DataSource.
183+
///
184+
/// These filters are in terms of the output schema of this DataSource (e.g.
185+
/// [`Self::eq_properties`] and output of any projections pushed into the
186+
/// source), not the original table schema.
187+
///
182188
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
183189
///
184190
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
577577
}
578578

579579
/// Handle the result of a child pushdown.
580+
///
580581
/// This method is called as we recurse back up the plan tree after pushing
581582
/// filters down to child nodes via [`ExecutionPlan::gather_filters_for_pushdown`].
582583
/// It allows the current node to process the results of filter pushdown from

0 commit comments

Comments
 (0)