Skip to content

Commit 639971a

Browse files
authored
Refactor schema rewriter: remove lifetimes, extract column/cast helpers, add mismatch coverage (#20166)
## Which issue does this PR close? * Closes #20161. ## Rationale for this change This change is a focused refactor of the `PhysicalExprAdapter` schema rewriter to improve readability and maintainability while preserving behavior. Key motivations: * Reduce complexity from explicit lifetimes by storing schema references as `SchemaRef`. * Make column/index/type handling easier to follow by extracting helper functions. * Strengthen the test suite to ensure refactors do not alter adapter output. ## What changes are included in this PR? * Refactored `DefaultPhysicalExprAdapterRewriter` to own `SchemaRef` values instead of borrowing `&Schema`. * Simplifies construction and avoids lifetime plumbing. * Simplified column rewrite logic by: * Early-exiting when both the physical index and data type already match. * Extracting `resolve_column` to handle physical index/name resolution. * Extracting `create_cast_column_expr` to validate cast compatibility (including nested structs) and build `CastColumnExpr`. * Minor cleanups in struct compatibility validation and field selection to ensure the cast checks are performed against the *actual* physical field resolved by the final column index. * Test updates and additions: * Simplified construction of expected struct `Field`s in tests for clarity. * Added `test_rewrite_column_index_and_type_mismatch` to validate the combined case where the logical column index differs from the physical schema *and* the data type requires casting. ## Are these changes tested? Yes. * Existing unit tests continue to pass. * Added a new unit test to cover the index-and-type mismatch scenario for column rewriting, asserting: * The inner `Column` points to the correct physical index. * The resulting expression is a `CastColumnExpr` producing the expected logical type. ## Are there any user-facing changes? No. * This is a refactor/cleanup intended to preserve existing behavior. * No public API changes, no behavioral changes expected in query results. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.
1 parent c92ba29 commit 639971a

1 file changed

Lines changed: 127 additions & 50 deletions

File tree

datafusion/physical-expr-adapter/src/schema_rewriter.rs

Lines changed: 127 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use std::sync::Arc;
2626

2727
use arrow::array::RecordBatch;
2828
use arrow::compute::can_cast_types;
29-
use arrow::datatypes::{DataType, Schema, SchemaRef};
29+
use arrow::datatypes::{DataType, Field, SchemaRef};
3030
use datafusion_common::{
3131
Result, ScalarValue, exec_err,
3232
nested_struct::validate_struct_compatibility,
@@ -260,20 +260,20 @@ impl DefaultPhysicalExprAdapter {
260260
impl PhysicalExprAdapter for DefaultPhysicalExprAdapter {
261261
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
262262
let rewriter = DefaultPhysicalExprAdapterRewriter {
263-
logical_file_schema: &self.logical_file_schema,
264-
physical_file_schema: &self.physical_file_schema,
263+
logical_file_schema: Arc::clone(&self.logical_file_schema),
264+
physical_file_schema: Arc::clone(&self.physical_file_schema),
265265
};
266266
expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr)))
267267
.data()
268268
}
269269
}
270270

271-
struct DefaultPhysicalExprAdapterRewriter<'a> {
272-
logical_file_schema: &'a Schema,
273-
physical_file_schema: &'a Schema,
271+
struct DefaultPhysicalExprAdapterRewriter {
272+
logical_file_schema: SchemaRef,
273+
physical_file_schema: SchemaRef,
274274
}
275275

276-
impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
276+
impl DefaultPhysicalExprAdapterRewriter {
277277
fn rewrite_expr(
278278
&self,
279279
expr: Arc<dyn PhysicalExpr>,
@@ -421,18 +421,13 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
421421
};
422422
let physical_field = self.physical_file_schema.field(physical_column_index);
423423

424-
let column = match (
425-
column.index() == physical_column_index,
426-
logical_field.data_type() == physical_field.data_type(),
427-
) {
428-
// If the column index matches and the data types match, we can use the column as is
429-
(true, true) => return Ok(Transformed::no(expr)),
430-
// If the indexes or data types do not match, we need to create a new column expression
431-
(true, _) => column.clone(),
432-
(false, _) => {
433-
Column::new_with_schema(logical_field.name(), self.physical_file_schema)?
434-
}
435-
};
424+
if column.index() == physical_column_index
425+
&& logical_field.data_type() == physical_field.data_type()
426+
{
427+
return Ok(Transformed::no(expr));
428+
}
429+
430+
let column = self.resolve_column(column, physical_column_index)?;
436431

437432
if logical_field.data_type() == physical_field.data_type() {
438433
// If the data types match, we can use the column as is
@@ -443,24 +438,60 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
443438
// TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123`
444439
// since that's much cheaper to evalaute.
445440
// See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928
446-
//
441+
self.create_cast_column_expr(column, logical_field)
442+
}
443+
444+
/// Resolves a column expression, handling index and type mismatches.
445+
///
446+
/// Returns the appropriate Column expression when the column's index or data type
447+
/// don't match the physical schema. Assumes that the early-exit case (both index
448+
/// and type match) has already been checked by the caller.
449+
fn resolve_column(
450+
&self,
451+
column: &Column,
452+
physical_column_index: usize,
453+
) -> Result<Column> {
454+
if column.index() == physical_column_index {
455+
Ok(column.clone())
456+
} else {
457+
Column::new_with_schema(column.name(), self.physical_file_schema.as_ref())
458+
}
459+
}
460+
461+
/// Validates type compatibility and creates a CastColumnExpr if needed.
462+
///
463+
/// Checks whether the physical field can be cast to the logical field type,
464+
/// handling both struct and scalar types. Returns a CastColumnExpr with the
465+
/// appropriate configuration.
466+
fn create_cast_column_expr(
467+
&self,
468+
column: Column,
469+
logical_field: &Field,
470+
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
471+
let actual_physical_field = self.physical_file_schema.field(column.index());
472+
447473
// For struct types, use validate_struct_compatibility which handles:
448474
// - Missing fields in source (filled with nulls)
449475
// - Extra fields in source (ignored)
450476
// - Recursive validation of nested structs
451477
// For non-struct types, use Arrow's can_cast_types
452-
match (physical_field.data_type(), logical_field.data_type()) {
478+
match (actual_physical_field.data_type(), logical_field.data_type()) {
453479
(DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => {
454-
validate_struct_compatibility(physical_fields, logical_fields)?;
480+
validate_struct_compatibility(
481+
physical_fields.as_ref(),
482+
logical_fields.as_ref(),
483+
)?;
455484
}
456485
_ => {
457-
let is_compatible =
458-
can_cast_types(physical_field.data_type(), logical_field.data_type());
486+
let is_compatible = can_cast_types(
487+
actual_physical_field.data_type(),
488+
logical_field.data_type(),
489+
);
459490
if !is_compatible {
460491
return exec_err!(
461492
"Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)",
462493
column.name(),
463-
physical_field.data_type(),
494+
actual_physical_field.data_type(),
464495
logical_field.data_type()
465496
);
466497
}
@@ -469,7 +500,7 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
469500

470501
let cast_expr = Arc::new(CastColumnExpr::new(
471502
Arc::new(column),
472-
Arc::new(physical_field.clone()),
503+
Arc::new(actual_physical_field.clone()),
473504
Arc::new(logical_field.clone()),
474505
None,
475506
));
@@ -777,30 +808,32 @@ mod tests {
777808

778809
let result = adapter.rewrite(column_expr).unwrap();
779810

811+
let physical_struct_fields: Fields = vec![
812+
Field::new("id", DataType::Int32, false),
813+
Field::new("name", DataType::Utf8, true),
814+
]
815+
.into();
816+
let physical_field = Arc::new(Field::new(
817+
"data",
818+
DataType::Struct(physical_struct_fields),
819+
false,
820+
));
821+
822+
let logical_struct_fields: Fields = vec![
823+
Field::new("id", DataType::Int64, false),
824+
Field::new("name", DataType::Utf8View, true),
825+
]
826+
.into();
827+
let logical_field = Arc::new(Field::new(
828+
"data",
829+
DataType::Struct(logical_struct_fields),
830+
false,
831+
));
832+
780833
let expected = Arc::new(CastColumnExpr::new(
781834
Arc::new(Column::new("data", 0)),
782-
Arc::new(Field::new(
783-
"data",
784-
DataType::Struct(
785-
vec![
786-
Field::new("id", DataType::Int32, false),
787-
Field::new("name", DataType::Utf8, true),
788-
]
789-
.into(),
790-
),
791-
false,
792-
)),
793-
Arc::new(Field::new(
794-
"data",
795-
DataType::Struct(
796-
vec![
797-
Field::new("id", DataType::Int64, false),
798-
Field::new("name", DataType::Utf8View, true),
799-
]
800-
.into(),
801-
),
802-
false,
803-
)),
835+
physical_field,
836+
logical_field,
804837
None,
805838
)) as Arc<dyn PhysicalExpr>;
806839

@@ -1193,8 +1226,8 @@ mod tests {
11931226
)]);
11941227

11951228
let rewriter = DefaultPhysicalExprAdapterRewriter {
1196-
logical_file_schema: &logical_schema,
1197-
physical_file_schema: &physical_schema,
1229+
logical_file_schema: Arc::new(logical_schema),
1230+
physical_file_schema: Arc::new(physical_schema),
11981231
};
11991232

12001233
// Test that when a field exists in physical schema, it returns None
@@ -1415,4 +1448,48 @@ mod tests {
14151448
assert!(format!("{:?}", adapter1).contains("BatchAdapter"));
14161449
assert!(format!("{:?}", adapter2).contains("BatchAdapter"));
14171450
}
1451+
1452+
#[test]
1453+
fn test_rewrite_column_index_and_type_mismatch() {
1454+
let physical_schema = Schema::new(vec![
1455+
Field::new("b", DataType::Utf8, true),
1456+
Field::new("a", DataType::Int32, false), // Index 1
1457+
]);
1458+
1459+
let logical_schema = Schema::new(vec![
1460+
Field::new("a", DataType::Int64, false), // Index 0, Different Type
1461+
Field::new("b", DataType::Utf8, true),
1462+
]);
1463+
1464+
let factory = DefaultPhysicalExprAdapterFactory;
1465+
let adapter = factory
1466+
.create(Arc::new(logical_schema), Arc::new(physical_schema))
1467+
.unwrap();
1468+
1469+
// Logical column "a" is at index 0
1470+
let column_expr = Arc::new(Column::new("a", 0));
1471+
1472+
let result = adapter.rewrite(column_expr).unwrap();
1473+
1474+
// Should be a CastColumnExpr
1475+
let cast_expr = result
1476+
.as_any()
1477+
.downcast_ref::<CastColumnExpr>()
1478+
.expect("Expected CastColumnExpr");
1479+
1480+
// Verify the inner column points to the correct physical index (1)
1481+
let inner_col = cast_expr
1482+
.expr()
1483+
.as_any()
1484+
.downcast_ref::<Column>()
1485+
.expect("Expected inner Column");
1486+
assert_eq!(inner_col.name(), "a");
1487+
assert_eq!(inner_col.index(), 1); // Physical index is 1
1488+
1489+
// Verify cast types
1490+
assert_eq!(
1491+
cast_expr.data_type(&Schema::empty()).unwrap(),
1492+
DataType::Int64
1493+
);
1494+
}
14181495
}

0 commit comments

Comments
 (0)