Skip to content

Commit 9fd84e7

Browse files
authored
Introduce ProjectionExprs::unproject_exprs/project_exprs and improve docs (#20193)
## Which issue does this PR close? - Part of #20173 ## Rationale for this change I am going through how the various layers of expression pushdown and schema rewrites work, and I spent a long time confused about what the `sync_with_child` parameter on `update_expr` did -- like what was the different between`update_expr(.., true)` vs `update_expr(..., false)` 😕 After some study I concluded it controls which way the rewrite is done (either project the expressions to refer to the projection expressions, or the opposite, 'unproject' an expression and substitute the projection definitions back in) ## What changes are included in this PR? 1. Introduce ProjectionExprs::unproject_exprs/project_exprs 2. rename `sync_with_child` to `unproject` 3. Improve documentation about what is done ## Are these changes tested? Yes by existing ci ## Are there any user-facing changes? There are new APIs, but no changes to existing APIs
1 parent 4d06c40 commit 9fd84e7

2 files changed

Lines changed: 98 additions & 49 deletions

File tree

datafusion/datasource/src/file_scan_config.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -842,37 +842,27 @@ impl DataSource for FileScanConfig {
842842
config: &ConfigOptions,
843843
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
844844
// Remap filter Column indices to match the table schema (file + partition columns).
845-
// This is necessary because filters may have been created against a different schema
846-
// (e.g., after projection pushdown) and need to be remapped to the table schema
847-
// before being passed to the file source and ultimately serialized.
848-
// For example, the filter being pushed down is `c1_c2 > 5` and it was created
849-
// against the output schema of the this `DataSource` which has projection `c1 + c2 as c1_c2`.
850-
// Thus we need to rewrite the filter back to `c1 + c2 > 5` before passing it to the file source.
845+
// This is necessary because filters refer to the output schema of this `DataSource`
846+
// (e.g., after projection pushdown has been applied) and need to be remapped to the table schema
847+
// before being passed to the file source
848+
//
849+
// For example, consider a filter `c1_c2 > 5` being pushed down. If the
850+
// `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must be rewritten
851+
// to refer to the table schema `c1 + c2 > 5`
851852
let table_schema = self.file_source.table_schema().table_schema();
852-
// If there's a projection with aliases, first map the filters back through
853-
// the projection expressions before remapping to the table schema.
854853
let filters_to_remap = if let Some(projection) = self.file_source.projection() {
855-
use datafusion_physical_plan::projection::update_expr;
856854
filters
857855
.into_iter()
858-
.map(|filter| {
859-
update_expr(&filter, projection.as_ref(), true)?.ok_or_else(|| {
860-
internal_datafusion_err!(
861-
"Failed to map filter expression through projection: {}",
862-
filter
863-
)
864-
})
865-
})
856+
.map(|filter| projection.unproject_expr(&filter))
866857
.collect::<Result<Vec<_>>>()?
867858
} else {
868859
filters
869860
};
870861
// Now remap column indices to match the table schema.
871-
let remapped_filters: Result<Vec<_>> = filters_to_remap
862+
let remapped_filters = filters_to_remap
872863
.into_iter()
873-
.map(|filter| reassign_expr_columns(filter, table_schema.as_ref()))
874-
.collect();
875-
let remapped_filters = remapped_filters?;
864+
.map(|filter| reassign_expr_columns(filter, table_schema))
865+
.collect::<Result<Vec<_>>>()?;
876866

877867
let result = self
878868
.file_source

datafusion/physical-expr/src/projection.rs

Lines changed: 87 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -372,16 +372,8 @@ impl ProjectionExprs {
372372
pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
373373
let mut new_exprs = Vec::with_capacity(other.exprs.len());
374374
for proj_expr in other.exprs.iter() {
375-
let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
376-
.ok_or_else(|| {
377-
internal_datafusion_err!(
378-
"Failed to combine projections: expression {} could not be applied on top of existing projections {}",
379-
proj_expr.expr,
380-
self.exprs.iter().map(|e| format!("{e}")).join(", ")
381-
)
382-
})?;
383375
new_exprs.push(ProjectionExpr {
384-
expr: new_expr,
376+
expr: self.unproject_expr(&proj_expr.expr)?,
385377
alias: proj_expr.alias.clone(),
386378
});
387379
}
@@ -450,9 +442,16 @@ impl ProjectionExprs {
450442
}
451443

452444
/// Project a schema according to this projection.
453-
/// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
454-
/// if the input schema is `[a: Int32, b: Int32, c: Int32]`, the output schema would be `[x: Int32, y: Int32]`.
455-
/// Fields' metadata are preserved from the input schema.
445+
///
446+
/// For example, given a projection:
447+
/// * `SELECT a AS x, b + 1 AS y`
448+
/// * where `a` is at index 0
449+
/// * `b` is at index 1
450+
///
451+
/// If the input schema is `[a: Int32, b: Int32, c: Int32]`, the output
452+
/// schema would be `[x: Int32, y: Int32]`.
453+
///
454+
/// Note that [`Field`] metadata are preserved from the input schema.
456455
pub fn project_schema(&self, input_schema: &Schema) -> Result<Schema> {
457456
let fields: Result<Vec<Field>> = self
458457
.exprs
@@ -481,6 +480,48 @@ impl ProjectionExprs {
481480
))
482481
}
483482

483+
/// "unproject" an expression by applying this projection in reverse,
484+
/// returning a new set of expressions that reference the original input
485+
/// columns.
486+
///
487+
/// For example, consider
488+
/// * an expression `c1_c2 > 5`, and a schema `[c1, c2]`
489+
/// * a projection `c1 + c2 as c1_c2`
490+
///
491+
/// This method would rewrite the expression to `c1 + c2 > 5`
492+
pub fn unproject_expr(
493+
&self,
494+
expr: &Arc<dyn PhysicalExpr>,
495+
) -> Result<Arc<dyn PhysicalExpr>> {
496+
update_expr(expr, &self.exprs, true)?.ok_or_else(|| {
497+
internal_datafusion_err!(
498+
"Failed to unproject an expression {} with ProjectionExprs {}",
499+
expr,
500+
self.exprs.iter().map(|e| format!("{e}")).join(", ")
501+
)
502+
})
503+
}
504+
505+
/// "project" an expression using these projection's expressions
506+
///
507+
/// For example, consider
508+
/// * an expression `c1 + c2 > 5`, and a schema `[c1, c2]`
509+
/// * a projection `c1 + c2 as c1_c2`
510+
///
511+
/// * This method would rewrite the expression to `c1_c2 > 5`
512+
pub fn project_expr(
513+
&self,
514+
expr: &Arc<dyn PhysicalExpr>,
515+
) -> Result<Arc<dyn PhysicalExpr>> {
516+
update_expr(expr, &self.exprs, false)?.ok_or_else(|| {
517+
internal_datafusion_err!(
518+
"Failed to project an expression {} with ProjectionExprs {}",
519+
expr,
520+
self.exprs.iter().map(|e| format!("{e}")).join(", ")
521+
)
522+
})
523+
}
524+
484525
/// Create a new [`Projector`] from this projection and an input schema.
485526
///
486527
/// A [`Projector`] can be used to apply this projection to record batches.
@@ -812,26 +853,44 @@ pub fn combine_projections(
812853
))
813854
}
814855

815-
/// The function operates in two modes:
856+
/// The function projects / unprojects an expression with respect to set of
857+
/// projection expressions.
858+
///
859+
/// See also [`ProjectionExprs::unproject_expr`] and [`ProjectionExprs::project_expr`]
860+
///
861+
/// 1) When `unproject` is `true`:
862+
///
863+
/// Rewrites an expression with respect to the projection expressions,
864+
/// effectively "unprojecting" it to reference the original input columns.
865+
///
866+
/// For example, given
867+
/// * the expressions `a@1 + b@2` and `c@0`
868+
/// * and projection expressions `c@2, a@0, b@1`
869+
///
870+
/// Then
871+
/// * `a@1 + b@2` becomes `a@0 + b@1`
872+
/// * `c@0` becomes `c@2`
873+
///
874+
/// 2) When `unproject` is `false`:
816875
///
817-
/// 1) When `sync_with_child` is `true`:
876+
/// Rewrites the expression to reference the projected expressions,
877+
/// effectively "projecting" it. The resulting expression will reference the
878+
/// indices as they appear in the projection.
818879
///
819-
/// The function updates the indices of `expr` if the expression resides
820-
/// in the input plan. For instance, given the expressions `a@1 + b@2`
821-
/// and `c@0` with the input schema `c@2, a@0, b@1`, the expressions are
822-
/// updated to `a@0 + b@1` and `c@2`.
880+
/// If the expression cannot be rewritten after the projection, it returns
881+
/// `None`.
823882
///
824-
/// 2) When `sync_with_child` is `false`:
883+
/// For example, given
884+
/// * the expressions `c@0`, `a@1` and `b@2`
885+
/// * the projection `a@1 as a, c@0 as c_new`,
825886
///
826-
/// The function determines how the expression would be updated if a projection
827-
/// was placed before the plan associated with the expression. If the expression
828-
/// cannot be rewritten after the projection, it returns `None`. For example,
829-
/// given the expressions `c@0`, `a@1` and `b@2`, and the projection with
830-
/// an output schema of `a, c_new`, then `c@0` becomes `c_new@1`, `a@1` becomes
831-
/// `a@0`, but `b@2` results in `None` since the projection does not include `b`.
887+
/// Then
888+
/// * `c@0` becomes `c_new@1`
889+
/// * `a@1` becomes `a@0`
890+
/// * `b@2` results in `None` since the projection does not include `b`.
832891
///
833892
/// # Errors
834-
/// This function returns an error if `sync_with_child` is `true` and if any expression references
893+
/// This function returns an error if `unproject` is `true` and if any expression references
835894
/// an index that is out of bounds for `projected_exprs`.
836895
/// For example:
837896
///
@@ -842,7 +901,7 @@ pub fn combine_projections(
842901
pub fn update_expr(
843902
expr: &Arc<dyn PhysicalExpr>,
844903
projected_exprs: &[ProjectionExpr],
845-
sync_with_child: bool,
904+
unproject: bool,
846905
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
847906
#[derive(Debug, PartialEq)]
848907
enum RewriteState {
@@ -866,7 +925,7 @@ pub fn update_expr(
866925
let Some(column) = expr.as_any().downcast_ref::<Column>() else {
867926
return Ok(Transformed::no(expr));
868927
};
869-
if sync_with_child {
928+
if unproject {
870929
state = RewriteState::RewrittenValid;
871930
// Update the index of `column`:
872931
let projected_expr = projected_exprs.get(column.index()).ok_or_else(|| {

0 commit comments

Comments
 (0)