Skip to content

Commit e5966b5

Browse files
authored
fix: linearized operands in physical binaryexpr protobuf to avoid recursion limit (#21031)
## Which issue does this PR close? - part of #18602. ## Rationale for this change When a SQL query contains many filter conditions (e.g., 40+ `AND`/`OR` clauses in a `WHERE`), serializing the physical plan to protobuf and deserializing it fails with `DecodeError: recursion limit reached`. [This is because prost has a default recursion limit of 100](https://docs.rs/prost/latest/src/prost/lib.rs.html#30), and each `BinaryExpr` nesting consumes ~2 levels of protobuf recursion depth, so a chain of ~50 AND conditions exceeds the limit. ## What changes are included in this PR? Applied the same **linearization** approach that [logical expressions already use](https://github.com/apache/datafusion/blob/b6b542e87b84f4744096106bea0de755b2e70cc5/datafusion/proto/src/logical_plan/to_proto.rs#L228-L256) that convert a left-deep tree to linearization list. Instead of encoding a chain of same-operator binary expressions as a deeply nested tree, we flatten it into a flat `operands` list: **Before (nested, O(n) recursion depth):** ``` BinaryExpr(AND) { l: BinaryExpr(AND) { l: BinaryExpr(AND) { l: a, r: b }, r: c }, r: d } ``` **After (flat, O(1) recursion depth for the chain):** ``` BinaryExpr(AND) { operands: [a, b, c, d] } ``` ## Are these changes tested? yes, add some test case ## Are there any user-facing changes?
1 parent 5a427cb commit e5966b5

File tree

6 files changed

+394
-26
lines changed

6 files changed

+394
-26
lines changed

datafusion/proto/proto/datafusion.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,9 @@ message PhysicalBinaryExprNode {
981981
PhysicalExprNode l = 1;
982982
PhysicalExprNode r = 2;
983983
string op = 3;
984+
// Linearized operands for chains of the same operator (e.g. a AND b AND c).
985+
// When present, `l` and `r` are ignored and `operands` holds the flattened list.
986+
repeated PhysicalExprNode operands = 4;
984987
}
985988

986989
message PhysicalDateTimeIntervalExprNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -273,25 +273,59 @@ pub fn parse_physical_expr_with_converter(
273273
}
274274
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
275275
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
276-
ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new(
277-
parse_required_physical_expr(
278-
binary_expr.l.as_deref(),
279-
ctx,
280-
"left",
281-
input_schema,
282-
codec,
283-
proto_converter,
284-
)?,
285-
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
286-
parse_required_physical_expr(
287-
binary_expr.r.as_deref(),
288-
ctx,
289-
"right",
290-
input_schema,
291-
codec,
292-
proto_converter,
293-
)?,
294-
)),
276+
ExprType::BinaryExpr(binary_expr) => {
277+
let op = logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?;
278+
if !binary_expr.operands.is_empty() {
279+
// New linearized format: reduce the flat operands list back into
280+
// a nested binary expression tree.
281+
let operands: Vec<Arc<dyn PhysicalExpr>> = binary_expr
282+
.operands
283+
.iter()
284+
.map(|e| {
285+
proto_converter.proto_to_physical_expr(
286+
e,
287+
ctx,
288+
input_schema,
289+
codec,
290+
)
291+
})
292+
.collect::<Result<Vec<_>>>()?;
293+
294+
if operands.len() < 2 {
295+
return Err(proto_error(
296+
"A binary expression must always have at least 2 operands",
297+
));
298+
}
299+
300+
operands
301+
.into_iter()
302+
.reduce(|left, right| Arc::new(BinaryExpr::new(left, op, right)))
303+
.expect(
304+
"Binary expression could not be reduced to a single expression.",
305+
)
306+
} else {
307+
// Legacy format with l/r fields
308+
Arc::new(BinaryExpr::new(
309+
parse_required_physical_expr(
310+
binary_expr.l.as_deref(),
311+
ctx,
312+
"left",
313+
input_schema,
314+
codec,
315+
proto_converter,
316+
)?,
317+
op,
318+
parse_required_physical_expr(
319+
binary_expr.r.as_deref(),
320+
ctx,
321+
"right",
322+
input_schema,
323+
codec,
324+
proto_converter,
325+
)?,
326+
))
327+
}
328+
}
295329
ExprType::AggregateExpr(_) => {
296330
return not_impl_err!(
297331
"Cannot convert aggregate expr node to physical expression"

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,14 +306,37 @@ pub fn serialize_physical_expr_with_converter(
306306
)),
307307
})
308308
} else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
309+
// Linearize a nested binary expression tree of the same operator
310+
// into a flat vector of operands to avoid deep recursion in proto.
311+
let op = expr.op();
312+
let mut operand_refs: Vec<&Arc<dyn PhysicalExpr>> = vec![expr.right()];
313+
let mut current_expr: &BinaryExpr = expr;
314+
loop {
315+
match current_expr.left().as_any().downcast_ref::<BinaryExpr>() {
316+
Some(bin) if bin.op() == op => {
317+
operand_refs.push(bin.right());
318+
current_expr = bin;
319+
}
320+
_ => {
321+
operand_refs.push(current_expr.left());
322+
break;
323+
}
324+
}
325+
}
326+
327+
// Reverse so operands are ordered from left innermost to right outermost
328+
operand_refs.reverse();
329+
330+
let operands = operand_refs
331+
.iter()
332+
.map(|e| proto_converter.physical_expr_to_proto(e, codec))
333+
.collect::<Result<Vec<_>>>()?;
334+
309335
let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
310-
l: Some(Box::new(
311-
proto_converter.physical_expr_to_proto(expr.left(), codec)?,
312-
)),
313-
r: Some(Box::new(
314-
proto_converter.physical_expr_to_proto(expr.right(), codec)?,
315-
)),
316-
op: format!("{:?}", expr.op()),
336+
l: None,
337+
r: None,
338+
op: format!("{:?}", op),
339+
operands,
317340
});
318341

319342
Ok(protobuf::PhysicalExprNode {

0 commit comments

Comments
 (0)