Skip to content

Commit 2487518

Browse files
committed
Preserve compact dynamic filter plan shapes
1 parent 92a4adb commit 2487518

1 file changed

Lines changed: 73 additions & 55 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 73 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -576,65 +576,83 @@ impl SharedBuildAccumulator {
576576
lit(ScalarValue::UInt64(Some(num_partitions as u64))),
577577
)) as Arc<dyn PhysicalExpr>;
578578

579-
let when_then_branches = partitions
580-
.iter()
581-
.enumerate()
582-
.map(|(partition_id, partition)| -> Result<_> {
583-
let then_expr = match partition {
584-
PartitionStatus::Reported(partition)
585-
if matches!(
586-
partition.pushdown,
587-
PushdownStrategy::Empty
588-
) =>
589-
{
590-
lit(false)
591-
}
592-
PartitionStatus::Reported(partition) => {
593-
let membership_expr = create_membership_predicate(
594-
&self.on_right,
595-
partition.pushdown.clone(),
596-
&HASH_JOIN_SEED,
597-
self.probe_schema.as_ref(),
598-
)?;
599-
let bounds_expr = create_bounds_predicate(
600-
&self.on_right,
601-
&partition.bounds,
602-
);
603-
match (membership_expr, bounds_expr) {
604-
(Some(membership), Some(bounds)) => Arc::new(
605-
BinaryExpr::new(
606-
bounds,
607-
Operator::And,
608-
membership,
609-
),
610-
)
611-
as Arc<dyn PhysicalExpr>,
612-
(Some(membership), None) => membership,
613-
(None, Some(bounds)) => bounds,
614-
(None, None) => lit(true),
615-
}
616-
}
617-
PartitionStatus::CanceledUnknown => lit(true),
618-
PartitionStatus::Pending => {
619-
return datafusion_common::internal_err!(
620-
"attempted to finalize dynamic filter with pending partition"
579+
let mut real_branches = Vec::new();
580+
let mut empty_partition_ids = Vec::new();
581+
let mut has_canceled_unknown = false;
582+
583+
for (partition_id, partition) in partitions.iter().enumerate() {
584+
match partition {
585+
PartitionStatus::Reported(partition)
586+
if matches!(partition.pushdown, PushdownStrategy::Empty) =>
587+
{
588+
empty_partition_ids.push(partition_id);
589+
}
590+
PartitionStatus::Reported(partition) => {
591+
let membership_expr = create_membership_predicate(
592+
&self.on_right,
593+
partition.pushdown.clone(),
594+
&HASH_JOIN_SEED,
595+
self.probe_schema.as_ref(),
596+
)?;
597+
let bounds_expr = create_bounds_predicate(
598+
&self.on_right,
599+
&partition.bounds,
600+
);
601+
let then_expr = match (membership_expr, bounds_expr) {
602+
(Some(membership), Some(bounds)) => Arc::new(
603+
BinaryExpr::new(bounds, Operator::And, membership),
621604
)
622-
}
623-
};
624-
Ok((
625-
lit(ScalarValue::UInt64(Some(partition_id as u64))),
626-
then_expr,
627-
))
628-
})
629-
.collect::<Result<Vec<_>>>()?;
630-
631-
let filter_expr = if when_then_branches.len() == 1 {
632-
Arc::clone(&when_then_branches[0].1)
605+
as Arc<dyn PhysicalExpr>,
606+
(Some(membership), None) => membership,
607+
(None, Some(bounds)) => bounds,
608+
(None, None) => lit(true),
609+
};
610+
real_branches.push((
611+
lit(ScalarValue::UInt64(Some(partition_id as u64))),
612+
then_expr,
613+
));
614+
}
615+
PartitionStatus::CanceledUnknown => {
616+
has_canceled_unknown = true;
617+
}
618+
PartitionStatus::Pending => {
619+
return datafusion_common::internal_err!(
620+
"attempted to finalize dynamic filter with pending partition"
621+
);
622+
}
623+
}
624+
}
625+
626+
let filter_expr = if has_canceled_unknown {
627+
let mut when_then_branches = empty_partition_ids
628+
.into_iter()
629+
.map(|partition_id| {
630+
(
631+
lit(ScalarValue::UInt64(Some(partition_id as u64))),
632+
lit(false),
633+
)
634+
})
635+
.collect::<Vec<_>>();
636+
when_then_branches.extend(real_branches);
637+
638+
if when_then_branches.is_empty() {
639+
lit(true)
640+
} else {
641+
Arc::new(CaseExpr::try_new(
642+
Some(modulo_expr),
643+
when_then_branches,
644+
Some(lit(true)),
645+
)?) as Arc<dyn PhysicalExpr>
646+
}
647+
} else if real_branches.is_empty() {
648+
lit(false)
649+
} else if num_partitions == 1 {
650+
Arc::clone(&real_branches[0].1)
633651
} else {
634652
Arc::new(CaseExpr::try_new(
635653
Some(modulo_expr),
636-
when_then_branches,
637-
Some(lit(true)),
654+
real_branches,
655+
Some(lit(false)),
638656
)?) as Arc<dyn PhysicalExpr>
639657
};
640658

0 commit comments

Comments
 (0)