Skip to content

Commit 12771ca

Browse files
committed
address comments
1 parent a3f8191 commit 12771ca

1 file changed

Lines changed: 16 additions & 17 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -934,39 +934,35 @@ fn add_hash_on_top(
934934
/// ordering would cause the parent to switch from streaming to blocking,
935935
/// keeping the order-preserving variant is beneficial.
936936
///
937-
/// Only applicable to single-child operators; returns false for multi-child
938-
/// operators (e.g. joins) where child substitution semantics are ambiguous.
937+
/// Only applicable to single-child operators; returns `Ok(false)` for
938+
/// multi-child operators (e.g. joins) where child substitution semantics are
939+
/// ambiguous.
939940
fn preserving_order_enables_streaming(
940941
parent: &Arc<dyn ExecutionPlan>,
941942
ordered_child: &Arc<dyn ExecutionPlan>,
942-
) -> bool {
943+
) -> Result<bool> {
943944
// Only applicable to single-child operators that maintain input order
944945
// (e.g. AggregateExec in PartiallySorted mode). Operators that don't
945946
// maintain input order (e.g. SortExec) handle ordering themselves —
946947
// preserving SPM for them is unnecessary.
947948
if parent.children().len() != 1 {
948-
return false;
949+
return Ok(false);
949950
}
950951
if !parent.maintains_input_order()[0] {
951-
return false;
952+
return Ok(false);
952953
}
953954
// Build parent with the ordered child
954955
let with_ordered =
955-
match Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
956-
Ok(p) => p,
957-
Err(_) => return false,
958-
};
956+
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)])?;
959957
if with_ordered.pipeline_behavior() == EmissionType::Final {
960958
// Parent is blocking even with ordering — no benefit
961-
return false;
959+
return Ok(false);
962960
}
963-
// Build parent with an unordered child (simulating CoalescePartitionsExec)
961+
// Build parent with an unordered child via CoalescePartitionsExec.
964962
let unordered_child: Arc<dyn ExecutionPlan> =
965963
Arc::new(CoalescePartitionsExec::new(Arc::clone(ordered_child)));
966-
match Arc::clone(parent).with_new_children(vec![unordered_child]) {
967-
Ok(without_ordered) => without_ordered.pipeline_behavior() == EmissionType::Final,
968-
Err(_) => false,
969-
}
964+
let without_ordered = Arc::clone(parent).with_new_children(vec![unordered_child])?;
965+
Ok(without_ordered.pipeline_behavior() == EmissionType::Final)
970966
}
971967

972968
/// # Returns
@@ -1381,8 +1377,11 @@ pub fn ensure_distribution(
13811377
}
13821378
};
13831379

1384-
let streaming_benefit = child.data
1385-
&& preserving_order_enables_streaming(&plan, &child.plan);
1380+
let streaming_benefit = if child.data {
1381+
preserving_order_enables_streaming(&plan, &child.plan)?
1382+
} else {
1383+
false
1384+
};
13861385

13871386
// There is an ordering requirement of the operator:
13881387
if let Some(required_input_ordering) = required_input_ordering {

0 commit comments

Comments
 (0)