Skip to content

Commit c3e66f7

Browse files
authored
Merge branch 'main' into feat_migrate_ffi_to_stabby
2 parents 3b22836 + 4bff17e commit c3e66f7

7 files changed

Lines changed: 886 additions & 392 deletions

File tree

datafusion/physical-expr-common/src/metrics/mod.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::{
3030
borrow::Cow,
3131
fmt::{self, Debug, Display},
3232
sync::Arc,
33+
vec::IntoIter,
3334
};
3435

3536
// public exports
@@ -432,6 +433,38 @@ impl Display for MetricsSet {
432433
}
433434
}
434435

436+
impl IntoIterator for MetricsSet {
437+
type Item = Arc<Metric>;
438+
type IntoIter = IntoIter<Self::Item>;
439+
440+
fn into_iter(self) -> Self::IntoIter {
441+
self.metrics.into_iter()
442+
}
443+
}
444+
445+
impl<'a> IntoIterator for &'a MetricsSet {
446+
type Item = &'a Arc<Metric>;
447+
type IntoIter = std::slice::Iter<'a, Arc<Metric>>;
448+
449+
fn into_iter(self) -> Self::IntoIter {
450+
self.metrics.iter()
451+
}
452+
}
453+
454+
impl Extend<Arc<Metric>> for MetricsSet {
455+
fn extend<I: IntoIterator<Item = Arc<Metric>>>(&mut self, iter: I) {
456+
self.metrics.extend(iter);
457+
}
458+
}
459+
460+
impl FromIterator<Arc<Metric>> for MetricsSet {
461+
fn from_iter<T: IntoIterator<Item = Arc<Metric>>>(iter: T) -> Self {
462+
Self {
463+
metrics: iter.into_iter().collect(),
464+
}
465+
}
466+
}
467+
435468
/// A set of [`Metric`]s for an individual operator.
436469
///
437470
/// This structure is intended as a convenience for execution plan
@@ -465,6 +498,14 @@ impl ExecutionPlanMetricsSet {
465498
}
466499
}
467500

501+
impl From<MetricsSet> for ExecutionPlanMetricsSet {
502+
fn from(metrics: MetricsSet) -> Self {
503+
Self {
504+
inner: Arc::new(Mutex::new(metrics)),
505+
}
506+
}
507+
}
508+
468509
/// `name=value` pairs identifying a metric. This concept is called various things
469510
/// in various different systems:
470511
///
@@ -752,6 +793,52 @@ mod tests {
752793
};
753794
}
754795

796+
#[test]
797+
fn test_extend() {
798+
let mut metrics = MetricsSet::new();
799+
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
800+
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
801+
802+
metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]);
803+
assert_eq!(metrics.iter().count(), 2);
804+
805+
let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None));
806+
metrics.extend(std::iter::once(Arc::clone(&m3)));
807+
assert_eq!(metrics.iter().count(), 3);
808+
}
809+
810+
#[test]
811+
fn test_collect() {
812+
let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
813+
let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
814+
815+
let metrics: MetricsSet =
816+
vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect();
817+
assert_eq!(metrics.iter().count(), 2);
818+
819+
let empty: MetricsSet = std::iter::empty().collect();
820+
assert_eq!(empty.iter().count(), 0);
821+
}
822+
823+
#[test]
824+
fn test_into_iterator_by_ref() {
825+
let mut metrics = MetricsSet::new();
826+
metrics.push(Arc::new(Metric::new(
827+
MetricValue::OutputRows(Count::new()),
828+
None,
829+
)));
830+
metrics.push(Arc::new(Metric::new(
831+
MetricValue::SpillCount(Count::new()),
832+
None,
833+
)));
834+
835+
let mut count = 0;
836+
for _m in &metrics {
837+
count += 1;
838+
}
839+
assert_eq!(count, 2);
840+
}
841+
755842
#[test]
756843
fn test_sorted_for_display() {
757844
let metrics = ExecutionPlanMetricsSet::new();

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 196 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,33 @@ impl ExecutionPlan for HashJoinExec {
13161316
.with_category(MetricCategory::Rows)
13171317
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
13181318

1319+
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1320+
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1321+
let repartition_random_state = REPARTITION_RANDOM_STATE;
1322+
let build_accumulator = enable_dynamic_filter_pushdown
1323+
.then(|| {
1324+
self.dynamic_filter.as_ref().map(|df| {
1325+
let filter = Arc::clone(&df.filter);
1326+
let on_right = self
1327+
.on
1328+
.iter()
1329+
.map(|(_, right_expr)| Arc::clone(right_expr))
1330+
.collect::<Vec<_>>();
1331+
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1332+
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1333+
self.mode,
1334+
self.left.as_ref(),
1335+
self.right.as_ref(),
1336+
filter,
1337+
on_right,
1338+
repartition_random_state,
1339+
))
1340+
})))
1341+
})
1342+
})
1343+
.flatten()
1344+
.flatten();
1345+
13191346
let left_fut = match self.mode {
13201347
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
13211348
let left_stream = self.left.execute(0, Arc::clone(&context))?;
@@ -1343,7 +1370,6 @@ impl ExecutionPlan for HashJoinExec {
13431370
let reservation =
13441371
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
13451372
.register(context.memory_pool());
1346-
13471373
OnceFut::new(collect_left_input(
13481374
self.random_state.random_state().clone(),
13491375
left_stream,
@@ -1368,33 +1394,6 @@ impl ExecutionPlan for HashJoinExec {
13681394

13691395
let batch_size = context.session_config().batch_size();
13701396

1371-
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
1372-
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
1373-
let repartition_random_state = REPARTITION_RANDOM_STATE;
1374-
let build_accumulator = enable_dynamic_filter_pushdown
1375-
.then(|| {
1376-
self.dynamic_filter.as_ref().map(|df| {
1377-
let filter = Arc::clone(&df.filter);
1378-
let on_right = self
1379-
.on
1380-
.iter()
1381-
.map(|(_, right_expr)| Arc::clone(right_expr))
1382-
.collect::<Vec<_>>();
1383-
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
1384-
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
1385-
self.mode,
1386-
self.left.as_ref(),
1387-
self.right.as_ref(),
1388-
filter,
1389-
on_right,
1390-
repartition_random_state,
1391-
))
1392-
})))
1393-
})
1394-
})
1395-
.flatten()
1396-
.flatten();
1397-
13981397
// we have the batches and the hash map with their keys. We can how create a stream
13991398
// over the right that uses this information to issue new batches.
14001399
let right_stream = self.right.execute(partition, context)?;
@@ -2347,6 +2346,22 @@ mod tests {
23472346
right: Arc<dyn ExecutionPlan>,
23482347
on: JoinOn,
23492348
join_type: JoinType,
2349+
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
2350+
hash_join_with_dynamic_filter_and_mode(
2351+
left,
2352+
right,
2353+
on,
2354+
join_type,
2355+
PartitionMode::CollectLeft,
2356+
)
2357+
}
2358+
2359+
fn hash_join_with_dynamic_filter_and_mode(
2360+
left: Arc<dyn ExecutionPlan>,
2361+
right: Arc<dyn ExecutionPlan>,
2362+
on: JoinOn,
2363+
join_type: JoinType,
2364+
mode: PartitionMode,
23502365
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
23512366
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
23522367
let mut join = HashJoinExec::try_new(
@@ -2356,7 +2371,7 @@ mod tests {
23562371
None,
23572372
&join_type,
23582373
None,
2359-
PartitionMode::CollectLeft,
2374+
mode,
23602375
NullEquality::NullEqualsNothing,
23612376
false,
23622377
)?;
@@ -5628,6 +5643,158 @@ mod tests {
56285643
Ok(())
56295644
}
56305645

5646+
#[tokio::test]
5647+
async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
5648+
-> Result<()> {
5649+
let mut session_config = SessionConfig::default();
5650+
session_config
5651+
.options_mut()
5652+
.optimizer
5653+
.enable_dynamic_filter_pushdown = true;
5654+
let task_ctx =
5655+
Arc::new(TaskContext::default().with_session_config(session_config));
5656+
5657+
let child_left_schema = Arc::new(Schema::new(vec![
5658+
Field::new("child_left_payload", DataType::Int32, false),
5659+
Field::new("child_key", DataType::Int32, false),
5660+
Field::new("child_left_extra", DataType::Int32, false),
5661+
]));
5662+
let child_right_schema = Arc::new(Schema::new(vec![
5663+
Field::new("child_right_payload", DataType::Int32, false),
5664+
Field::new("child_right_key", DataType::Int32, false),
5665+
Field::new("child_right_extra", DataType::Int32, false),
5666+
]));
5667+
let parent_left_schema = Arc::new(Schema::new(vec![
5668+
Field::new("parent_payload", DataType::Int32, false),
5669+
Field::new("parent_key", DataType::Int32, false),
5670+
Field::new("parent_extra", DataType::Int32, false),
5671+
]));
5672+
5673+
let child_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5674+
&[
5675+
vec![build_table_i32(
5676+
("child_left_payload", &vec![10]),
5677+
("child_key", &vec![0]),
5678+
("child_left_extra", &vec![100]),
5679+
)],
5680+
vec![build_table_i32(
5681+
("child_left_payload", &vec![11]),
5682+
("child_key", &vec![1]),
5683+
("child_left_extra", &vec![101]),
5684+
)],
5685+
vec![build_table_i32(
5686+
("child_left_payload", &vec![12]),
5687+
("child_key", &vec![2]),
5688+
("child_left_extra", &vec![102]),
5689+
)],
5690+
vec![build_table_i32(
5691+
("child_left_payload", &vec![13]),
5692+
("child_key", &vec![3]),
5693+
("child_left_extra", &vec![103]),
5694+
)],
5695+
],
5696+
Arc::clone(&child_left_schema),
5697+
None,
5698+
)?;
5699+
let child_right: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5700+
&[
5701+
vec![build_table_i32(
5702+
("child_right_payload", &vec![20]),
5703+
("child_right_key", &vec![0]),
5704+
("child_right_extra", &vec![200]),
5705+
)],
5706+
vec![build_table_i32(
5707+
("child_right_payload", &vec![21]),
5708+
("child_right_key", &vec![1]),
5709+
("child_right_extra", &vec![201]),
5710+
)],
5711+
vec![build_table_i32(
5712+
("child_right_payload", &vec![22]),
5713+
("child_right_key", &vec![2]),
5714+
("child_right_extra", &vec![202]),
5715+
)],
5716+
vec![build_table_i32(
5717+
("child_right_payload", &vec![23]),
5718+
("child_right_key", &vec![3]),
5719+
("child_right_extra", &vec![203]),
5720+
)],
5721+
],
5722+
Arc::clone(&child_right_schema),
5723+
None,
5724+
)?;
5725+
let parent_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
5726+
&[
5727+
vec![build_table_i32(
5728+
("parent_payload", &vec![30]),
5729+
("parent_key", &vec![0]),
5730+
("parent_extra", &vec![300]),
5731+
)],
5732+
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5733+
vec![build_table_i32(
5734+
("parent_payload", &vec![32]),
5735+
("parent_key", &vec![2]),
5736+
("parent_extra", &vec![302]),
5737+
)],
5738+
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
5739+
],
5740+
Arc::clone(&parent_left_schema),
5741+
None,
5742+
)?;
5743+
5744+
let child_on = vec![(
5745+
Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _,
5746+
Arc::new(Column::new_with_schema(
5747+
"child_right_key",
5748+
&child_right_schema,
5749+
)?) as _,
5750+
)];
5751+
let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode(
5752+
child_left,
5753+
child_right,
5754+
child_on,
5755+
JoinType::Inner,
5756+
PartitionMode::Partitioned,
5757+
)?;
5758+
let child_join: Arc<dyn ExecutionPlan> = Arc::new(child_join);
5759+
5760+
let parent_on = vec![(
5761+
Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _,
5762+
Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _,
5763+
)];
5764+
let parent_join = HashJoinExec::try_new(
5765+
parent_left,
5766+
child_join,
5767+
parent_on,
5768+
None,
5769+
&JoinType::RightSemi,
5770+
None,
5771+
PartitionMode::Partitioned,
5772+
NullEquality::NullEqualsNothing,
5773+
false,
5774+
)?;
5775+
5776+
let batches = tokio::time::timeout(
5777+
std::time::Duration::from_secs(5),
5778+
crate::execution_plan::collect(Arc::new(parent_join), task_ctx),
5779+
)
5780+
.await
5781+
.expect("partitioned right-semi join should not hang")?;
5782+
5783+
assert_batches_sorted_eq!(
5784+
[
5785+
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5786+
"| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |",
5787+
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5788+
"| 10 | 0 | 100 | 20 | 0 | 200 |",
5789+
"| 12 | 2 | 102 | 22 | 2 | 202 |",
5790+
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
5791+
],
5792+
&batches
5793+
);
5794+
5795+
Ok(())
5796+
}
5797+
56315798
#[tokio::test]
56325799
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
56335800
-> Result<()> {

0 commit comments

Comments
 (0)