Skip to content

Commit 84b43c8

Browse files
committed
Preserve shared dynamic filter errors
1 parent 0bec631 commit 84b43c8

1 file changed

Lines changed: 4 additions & 6 deletions

File tree

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::joins::hash_join::partitioned_hash_eval::{
3333
use arrow::array::ArrayRef;
3434
use arrow::datatypes::{DataType, Field, Schema};
3535
use datafusion_common::config::ConfigOptions;
36-
use datafusion_common::{DataFusionError, Result, ScalarValue};
36+
use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult};
3737
use datafusion_expr::Operator;
3838
use datafusion_functions::core::r#struct as struct_func;
3939
use datafusion_physical_expr::expressions::{
@@ -275,7 +275,7 @@ enum AccumulatedBuildData {
275275
enum CompletionState {
276276
Pending,
277277
Finalizing,
278-
Ready(std::result::Result<(), String>),
278+
Ready(SharedResult<()>),
279279
}
280280

281281
struct AccumulatorState {
@@ -507,9 +507,7 @@ impl SharedBuildAccumulator {
507507
}
508508

509509
fn finish(&self, finalize_input: FinalizeInput) {
510-
let result = self
511-
.build_filter(finalize_input)
512-
.map_err(|err| err.to_string());
510+
let result = self.build_filter(finalize_input).map_err(Arc::new);
513511
self.dynamic_filter.mark_complete();
514512

515513
let mut guard = self.inner.lock();
@@ -525,7 +523,7 @@ impl SharedBuildAccumulator {
525523
match &guard.completion {
526524
CompletionState::Ready(Ok(())) => return Ok(()),
527525
CompletionState::Ready(Err(err)) => {
528-
return Err(DataFusionError::Execution(err.clone()));
526+
return Err(DataFusionError::Shared(Arc::clone(err)));
529527
}
530528
CompletionState::Pending | CompletionState::Finalizing => {
531529
self.completion_notify.notified()

0 commit comments

Comments
 (0)