Skip to content

Commit a8d6f7f

Browse files
committed
Refactor cast node and resolve column logic
Reuse existing candidate cast node instead of rebuilding it. Extract shared target-field resolution and same-type elision logic. Inline the one-off nested-struct validation wrapper and remove redundant column re-resolution inside create_cast_expr. Collapse cast and try-cast lowering in the planner behind a small helper function. Update the adapter test to ensure that name-based canonicalization occurs in resolve_physical_column, rather than again in create_cast_expr.
1 parent 29c454b commit a8d6f7f

4 files changed

Lines changed: 104 additions & 97 deletions

File tree

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -498,13 +498,6 @@ impl DefaultPhysicalExprAdapterRewriter {
498498
logical_field.data_type()
499499
)))?;
500500

501-
let physical_column_index = self.physical_file_schema.index_of(column.name())?;
502-
let column = if column.index() == physical_column_index {
503-
column
504-
} else {
505-
Column::new_with_schema(column.name(), self.physical_file_schema.as_ref())?
506-
};
507-
508501
let cast_expr = Arc::new(CastExpr::new_with_target_field(
509502
Arc::new(column),
510503
Arc::new(logical_field.clone()),
@@ -1693,7 +1686,7 @@ mod tests {
16931686
}
16941687

16951688
#[test]
1696-
fn test_create_cast_expr_uses_name_lookup_not_column_index() {
1689+
fn test_resolve_physical_column_uses_name_lookup_not_column_index() {
16971690
// Physical schema has column `a` at index 1; index 0 is an incompatible type.
16981691
let physical_schema = Arc::new(Schema::new(vec![
16991692
Field::new("b", DataType::Binary, true),
@@ -1711,11 +1704,19 @@ mod tests {
17111704
};
17121705

17131706
// Deliberately provide the wrong index for column `a`.
1714-
// Regression: this must still resolve against physical field `a` by name.
1707+
// Regression: resolution must still find physical field `a` by name.
1708+
let (resolved_column, physical_field) = rewriter
1709+
.resolve_physical_column(&Column::new("a", 0))
1710+
.unwrap()
1711+
.expect("Expected resolved column");
1712+
assert_eq!(resolved_column.name(), "a");
1713+
assert_eq!(resolved_column.index(), 1);
1714+
assert_eq!(physical_field.data_type(), &DataType::Int32);
1715+
17151716
let transformed = rewriter
17161717
.create_cast_expr(
1717-
Column::new("a", 0),
1718-
&Arc::new(physical_schema.field_with_name("a").unwrap().clone()),
1718+
resolved_column,
1719+
&physical_field,
17191720
logical_schema.field_with_name("a").unwrap(),
17201721
)
17211722
.unwrap();

datafusion/physical-expr/src/expressions/cast.rs

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,44 @@ const DEFAULT_SAFE_CAST_OPTIONS: CastOptions<'static> = CastOptions {
4545
format_options: DEFAULT_FORMAT_OPTIONS,
4646
};
4747

48-
/// Check if name-based struct casting is allowed by validating field compatibility.
49-
///
50-
/// This function applies the same validation rules as execution time to ensure
51-
/// planning-time validation matches runtime validation, enabling fail-fast behavior
52-
/// instead of deferring errors to execution. Handles structs at any nesting level
53-
/// (e.g., `List<Struct>`, `Dictionary<_, Struct>`).
54-
fn can_cast_named_struct_types(source: &DataType, target: &DataType) -> bool {
55-
validate_data_type_compatibility("", source, target).is_ok()
48+
pub(crate) fn is_default_target_field(target_field: &FieldRef) -> bool {
49+
target_field.name().is_empty()
50+
&& target_field.is_nullable()
51+
&& target_field.metadata().is_empty()
52+
}
53+
54+
pub(crate) fn resolve_target_field(
55+
expr: &Arc<dyn PhysicalExpr>,
56+
target_field: &FieldRef,
57+
input_schema: &Schema,
58+
force_nullable: bool,
59+
) -> Result<FieldRef> {
60+
if is_default_target_field(target_field) {
61+
let field = expr.return_field(input_schema)?;
62+
let field = field
63+
.as_ref()
64+
.clone()
65+
.with_data_type(target_field.data_type().clone());
66+
let field = if force_nullable {
67+
field.with_nullable(true)
68+
} else {
69+
field
70+
};
71+
Ok(Arc::new(field))
72+
} else {
73+
Ok(Arc::clone(target_field))
74+
}
75+
}
76+
77+
pub(crate) fn should_elide_same_type_cast(
78+
expr: &Arc<dyn PhysicalExpr>,
79+
target_field: &FieldRef,
80+
input_schema: &Schema,
81+
force_nullable: bool,
82+
) -> Result<bool> {
83+
Ok(is_default_target_field(target_field)
84+
|| resolve_target_field(expr, target_field, input_schema, force_nullable)?
85+
== expr.return_field(input_schema)?)
5686
}
5787

5888
/// CAST expression casts an expression to a specific data type and returns a runtime error on invalid cast
@@ -151,30 +181,12 @@ impl CastExpr {
151181
&self.cast_options
152182
}
153183

154-
fn is_default_target_field(&self) -> bool {
155-
self.target_field.name().is_empty()
156-
&& self.target_field.is_nullable()
157-
&& self.target_field.metadata().is_empty()
158-
}
159-
160184
fn resolved_target_field(&self, input_schema: &Schema) -> Result<FieldRef> {
161-
if self.is_default_target_field() {
162-
self.expr.return_field(input_schema).map(|field| {
163-
Arc::new(
164-
field
165-
.as_ref()
166-
.clone()
167-
.with_data_type(self.cast_type().clone()),
168-
)
169-
})
170-
} else {
171-
Ok(Arc::clone(&self.target_field))
172-
}
185+
resolve_target_field(&self.expr, &self.target_field, input_schema, false)
173186
}
174187

175188
fn preserves_child_field_semantics(&self, input_schema: &Schema) -> Result<bool> {
176-
Ok(self.resolved_target_field(input_schema)?
177-
== self.expr.return_field(input_schema)?)
189+
should_elide_same_type_cast(&self.expr, &self.target_field, input_schema, false)
178190
}
179191

180192
/// Check if casting from the specified source type to the target type is a
@@ -338,20 +350,15 @@ pub(crate) fn cast_with_target_field_and_options(
338350
) -> Result<Arc<dyn PhysicalExpr>> {
339351
let expr_type = expr.data_type(input_schema)?;
340352
let cast_type = target_field.data_type().clone();
341-
let candidate = CastExpr::new_with_target_field(
342-
Arc::clone(&expr),
343-
Arc::clone(&target_field),
344-
cast_options.clone(),
345-
);
346-
347-
if expr_type == cast_type
348-
&& candidate.preserves_child_field_semantics(input_schema)?
349-
{
350-
return Ok(expr);
351-
}
353+
let candidate =
354+
CastExpr::new_with_target_field(Arc::clone(&expr), target_field, cast_options);
352355

353356
if expr_type == cast_type {
354-
return Ok(Arc::new(candidate));
357+
return if candidate.preserves_child_field_semantics(input_schema)? {
358+
Ok(expr)
359+
} else {
360+
Ok(Arc::new(candidate))
361+
};
355362
}
356363

357364
let is_valid_cast = if requires_nested_struct_cast(&expr_type, &cast_type) {
@@ -360,19 +367,15 @@ pub(crate) fn cast_with_target_field_and_options(
360367
// applied at planning time (now) to fail fast, rather than deferring errors
361368
// to execution time. The name-based casting logic will be executed at runtime
362369
// via ColumnarValue::cast_to.
363-
can_cast_named_struct_types(&expr_type, &cast_type)
370+
validate_data_type_compatibility("", &expr_type, &cast_type).is_ok()
364371
} else {
365372
can_cast_types(&expr_type, &cast_type)
366373
};
367374

368375
if !is_valid_cast {
369376
not_impl_err!("Unsupported CAST from {expr_type} to {cast_type}")
370377
} else {
371-
Ok(Arc::new(CastExpr::new_with_target_field(
372-
expr,
373-
target_field,
374-
cast_options,
375-
)))
378+
Ok(Arc::new(candidate))
376379
}
377380
}
378381

datafusion/physical-expr/src/expressions/try_cast.rs

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::hash::Hash;
2121
use std::sync::Arc;
2222

2323
use crate::PhysicalExpr;
24+
use crate::expressions::cast::{resolve_target_field, should_elide_same_type_cast};
2425
use arrow::compute;
2526
use arrow::compute::CastOptions;
2627
use arrow::datatypes::{DataType, FieldRef, Schema};
@@ -83,31 +84,12 @@ impl TryCastExpr {
8384
&self.target_field
8485
}
8586

86-
fn is_default_target_field(&self) -> bool {
87-
self.target_field.name().is_empty()
88-
&& self.target_field.is_nullable()
89-
&& self.target_field.metadata().is_empty()
90-
}
91-
9287
fn resolved_target_field(&self, input_schema: &Schema) -> Result<FieldRef> {
93-
if self.is_default_target_field() {
94-
self.expr.return_field(input_schema).map(|field| {
95-
Arc::new(
96-
field
97-
.as_ref()
98-
.clone()
99-
.with_data_type(self.cast_type().clone())
100-
.with_nullable(true),
101-
)
102-
})
103-
} else {
104-
Ok(Arc::clone(&self.target_field))
105-
}
88+
resolve_target_field(&self.expr, &self.target_field, input_schema, true)
10689
}
10790

10891
fn preserves_child_field_semantics(&self, input_schema: &Schema) -> Result<bool> {
109-
Ok(self.resolved_target_field(input_schema)?
110-
== self.expr.return_field(input_schema)?)
92+
should_elide_same_type_cast(&self.expr, &self.target_field, input_schema, true)
11193
}
11294
}
11395

@@ -187,8 +169,7 @@ pub(crate) fn try_cast_with_target_field(
187169
let candidate = TryCastExpr::new_with_target_field(Arc::clone(&expr), target_field);
188170

189171
if expr_type == cast_type
190-
&& (candidate.is_default_target_field()
191-
|| candidate.preserves_child_field_semantics(input_schema)?)
172+
&& candidate.preserves_child_field_semantics(input_schema)?
192173
{
193174
Ok(expr)
194175
} else if expr_type == cast_type || can_cast_types(&expr_type, &cast_type) {

datafusion/physical-expr/src/planner.rs

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
expressions::{self, Column, Literal, binary, like, similar_to},
2424
};
2525

26-
use arrow::datatypes::Schema;
26+
use arrow::datatypes::{FieldRef, Schema};
2727
use datafusion_common::config::ConfigOptions;
2828
use datafusion_common::metadata::FieldMetadata;
2929
use datafusion_common::{
@@ -288,22 +288,29 @@ pub fn create_physical_expr(
288288
};
289289
Ok(expressions::case(expr, when_then_expr, else_expr)?)
290290
}
291-
Expr::Cast(Cast { expr, field }) => {
292-
let child = create_physical_expr(expr, input_dfschema, execution_props)?;
293-
expressions::cast_with_target_field_and_options(
294-
child,
295-
input_schema,
296-
Arc::clone(field),
297-
None,
298-
)
299-
}
300-
Expr::TryCast(TryCast { expr, field }) => {
301-
expressions::try_cast_with_target_field(
302-
create_physical_expr(expr, input_dfschema, execution_props)?,
303-
input_schema,
304-
Arc::clone(field),
305-
)
306-
}
291+
Expr::Cast(Cast { expr, field }) => create_physical_cast_like_expr(
292+
expr,
293+
field,
294+
input_dfschema,
295+
input_schema,
296+
execution_props,
297+
|child, input_schema, field| {
298+
expressions::cast_with_target_field_and_options(
299+
child,
300+
input_schema,
301+
field,
302+
None,
303+
)
304+
},
305+
),
306+
Expr::TryCast(TryCast { expr, field }) => create_physical_cast_like_expr(
307+
expr,
308+
field,
309+
input_dfschema,
310+
input_schema,
311+
execution_props,
312+
expressions::try_cast_with_target_field,
313+
),
307314
Expr::Not(expr) => {
308315
expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
309316
}
@@ -396,6 +403,21 @@ pub fn create_physical_expr(
396403
}
397404
}
398405

406+
fn create_physical_cast_like_expr<F>(
407+
expr: &Expr,
408+
field: &FieldRef,
409+
input_dfschema: &DFSchema,
410+
input_schema: &Schema,
411+
execution_props: &ExecutionProps,
412+
lower: F,
413+
) -> Result<Arc<dyn PhysicalExpr>>
414+
where
415+
F: FnOnce(Arc<dyn PhysicalExpr>, &Schema, FieldRef) -> Result<Arc<dyn PhysicalExpr>>,
416+
{
417+
let child = create_physical_expr(expr, input_dfschema, execution_props)?;
418+
lower(child, input_schema, Arc::clone(field))
419+
}
420+
399421
/// Create vector of Physical Expression from a vector of logical expression
400422
pub fn create_physical_exprs<'a, I>(
401423
exprs: I,

0 commit comments

Comments
 (0)