Skip to content

Commit 7a09e27

Browse files
authored
fix: union should retrun error instead of panic when input schema's len different (#19922)
## Which issue does this PR close? None ## Rationale for this change current when user self construct a UnionExec that with different schema len, it will panic instead of error ## What changes are included in this PR? if the input for UnionExec have diffent schema len, return error instead of panic ## Are these changes tested? yes, add one test case ## Are there any user-facing changes?
1 parent f0e38df commit 7a09e27

1 file changed

Lines changed: 44 additions & 1 deletion

File tree

datafusion/physical-plan/src/union.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,8 +593,20 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> Result<SchemaRef> {
593593
}
594594

595595
let first_schema = inputs[0].schema();
596+
let first_field_count = first_schema.fields().len();
597+
598+
// validate that all inputs have the same number of fields
599+
for (idx, input) in inputs.iter().enumerate().skip(1) {
600+
let field_count = input.schema().fields().len();
601+
if field_count != first_field_count {
602+
return exec_err!(
603+
"UnionExec/InterleaveExec requires all inputs to have the same number of fields. \
604+
Input 0 has {first_field_count} fields, but input {idx} has {field_count} fields"
605+
);
606+
}
607+
}
596608

597-
let fields = (0..first_schema.fields().len())
609+
let fields = (0..first_field_count)
598610
.map(|i| {
599611
// We take the name from the left side of the union to match how names are coerced during logical planning,
600612
// which also uses the left side names.
@@ -763,6 +775,18 @@ mod tests {
763775
Ok(schema)
764776
}
765777

778+
fn create_test_schema2() -> Result<SchemaRef> {
779+
let a = Field::new("a", DataType::Int32, true);
780+
let b = Field::new("b", DataType::Int32, true);
781+
let c = Field::new("c", DataType::Int32, true);
782+
let d = Field::new("d", DataType::Int32, true);
783+
let e = Field::new("e", DataType::Int32, true);
784+
let f = Field::new("f", DataType::Int32, true);
785+
let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
786+
787+
Ok(schema)
788+
}
789+
766790
#[tokio::test]
767791
async fn test_union_partitions() -> Result<()> {
768792
let task_ctx = Arc::new(TaskContext::default());
@@ -1052,4 +1076,23 @@ mod tests {
10521076

10531077
Ok(())
10541078
}
1079+
1080+
#[test]
1081+
fn test_union_schema_mismatch() {
1082+
// Test that UnionExec properly rejects inputs with different field counts
1083+
let schema = create_test_schema().unwrap();
1084+
let schema2 = create_test_schema2().unwrap();
1085+
let memory_exec1 =
1086+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema), None).unwrap());
1087+
let memory_exec2 =
1088+
Arc::new(TestMemoryExec::try_new(&[], Arc::clone(&schema2), None).unwrap());
1089+
1090+
let result = UnionExec::try_new(vec![memory_exec1, memory_exec2]);
1091+
assert!(result.is_err());
1092+
assert!(
1093+
result.unwrap_err().to_string().contains(
1094+
"UnionExec/InterleaveExec requires all inputs to have the same number of fields"
1095+
)
1096+
);
1097+
}
10551098
}

0 commit comments

Comments
 (0)