Skip to content

Commit d59bc72

Browse files
authored
fix(proto): correctly serialize FilterExec empty projection (#21885)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #21871 ## Rationale for this change FilterExec supports two semantically different projection states: - None → return all columns (full projection) - Some(vec![]) → return no columns (empty projection) However, both cases were being serialized identically as an empty vector in the proto representation. During deserialization, an empty vector was always mapped back to None, meaning an empty projection would silently become a full projection after a serde round-trip. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> No <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent ea0928c commit d59bc72

2 files changed

Lines changed: 58 additions & 13 deletions

File tree

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -716,18 +716,23 @@ impl protobuf::PhysicalPlanNode {
716716
})?;
717717

718718
let filter_selectivity = filter.default_filter_selectivity.try_into();
719-
let projection = if !filter.projection.is_empty() {
720-
Some(
721-
filter
722-
.projection
723-
.iter()
724-
.map(|i| *i as usize)
725-
.collect::<Vec<_>>(),
726-
)
727-
} else {
719+
// Preserve the `None` state across proto boundaries. Proto cannot distinguish
720+
// between `None` (full projection) and `Some(vec![])` (empty projection) since
721+
// both serialize as an empty list. If all columns are included, we reconstruct
722+
// `None` to avoid losing this semantic distinction on deserialization.
723+
let num_fields = input.schema().fields().len();
724+
let mut is_full_projection = filter.projection.len() == num_fields;
725+
let mut projection_vec: Vec<usize> = Vec::with_capacity(filter.projection.len());
726+
for (i, idx) in filter.projection.iter().enumerate() {
727+
let idx = *idx as usize;
728+
is_full_projection &= idx == i;
729+
projection_vec.push(idx);
730+
}
731+
let projection = if is_full_projection {
728732
None
733+
} else {
734+
Some(projection_vec)
729735
};
730-
731736
let filter = FilterExecBuilder::new(predicate, input)
732737
.apply_projection(projection)?
733738
.with_batch_size(filter.batch_size as usize)
@@ -2339,9 +2344,12 @@ impl protobuf::PhysicalPlanNode {
23392344
.physical_expr_to_proto(exec.predicate(), codec)?,
23402345
),
23412346
default_filter_selectivity: exec.default_selectivity() as u32,
2342-
projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
2343-
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2344-
}),
2347+
projection: match exec.projection() {
2348+
None => (0..exec.input().schema().fields().len())
2349+
.map(|i| i as u32)
2350+
.collect(),
2351+
Some(v) => v.iter().map(|x| *x as u32).collect(),
2352+
},
23452353
batch_size: exec.batch_size() as u32,
23462354
fetch: exec.fetch().map(|f| f as u32),
23472355
},

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3681,3 +3681,40 @@ async fn roundtrip_issue_18602_complex_filter_decode_recursion() -> Result<()> {
36813681

36823682
roundtrip_test_sql_with_context(sql, &ctx).await
36833683
}
3684+
3685+
#[test]
3686+
fn roundtrip_filter_with_none_projection() -> Result<()> {
3687+
let schema = Arc::new(Schema::new(vec![
3688+
Field::new("a", DataType::Int32, false),
3689+
Field::new("b", DataType::Int32, false),
3690+
Field::new("c", DataType::Int32, false),
3691+
]));
3692+
let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
3693+
Arc::new(Column::new("a", 0)),
3694+
Operator::Gt,
3695+
lit(ScalarValue::Int32(Some(0))),
3696+
));
3697+
let input: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::clone(&schema)));
3698+
3699+
// Case 1: None projection (return all columns)
3700+
roundtrip_test(Arc::new(FilterExec::try_new(
3701+
Arc::clone(&predicate),
3702+
Arc::clone(&input),
3703+
)?))?;
3704+
3705+
// Case 2: Some(vec![]) — explicitly empty projection
3706+
roundtrip_test(Arc::new(
3707+
FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
3708+
.apply_projection(Some(vec![]))?
3709+
.build()?,
3710+
))?;
3711+
3712+
// Case 3: Some(vec![2, 0]) — partial projection
3713+
roundtrip_test(Arc::new(
3714+
FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input))
3715+
.apply_projection(Some(vec![2, 0]))?
3716+
.build()?,
3717+
))?;
3718+
3719+
Ok(())
3720+
}

0 commit comments

Comments
 (0)