Skip to content

Commit 579aa74

Browse files
committed
fix: raise AmbiguousReference error for duplicate column names in subquery derived tables
1 parent 2c03881 commit 579aa74

4 files changed

Lines changed: 102 additions & 3 deletions

File tree

datafusion/common/src/column.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,39 @@ impl Column {
237237
.collect::<Vec<_>>();
238238
match qualified_fields.len() {
239239
0 => continue,
240-
1 => return Ok(Column::from(qualified_fields[0])),
240+
1 => {
241+
// Even a single structural match must be rejected when the
242+
// schema itself has flagged the name as ambiguous (e.g. a
243+
// derived-table subquery that contained two columns with
244+
// the same unqualified name).
245+
let is_ambiguous = schema_level
246+
.iter()
247+
.any(|s| s.ambiguous_names().contains(&self.name));
248+
if is_ambiguous {
249+
return _schema_err!(SchemaError::AmbiguousReference {
250+
field: Box::new(Column::new_unqualified(&self.name)),
251+
})
252+
.map_err(|err| {
253+
let mut diagnostic = Diagnostic::new_error(
254+
format!("column '{}' is ambiguous", &self.name),
255+
self.spans().first(),
256+
);
257+
let columns = schema_level
258+
.iter()
259+
.flat_map(|s| {
260+
s.columns_with_unqualified_name(&self.name)
261+
})
262+
.collect::<Vec<_>>();
263+
add_possible_columns_to_diag(
264+
&mut diagnostic,
265+
&Column::new_unqualified(&self.name),
266+
&columns,
267+
);
268+
err.with_diagnostic(diagnostic)
269+
});
270+
}
271+
return Ok(Column::from(qualified_fields[0]));
272+
}
241273
_ => {
242274
// More than 1 fields in this schema have their names set to self.name.
243275
//

datafusion/common/src/dfschema.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ pub struct DFSchema {
117117
field_qualifiers: Vec<Option<TableReference>>,
118118
/// Stores functional dependencies in the schema.
119119
functional_dependencies: FunctionalDependencies,
120+
/// Field names that are ambiguous in this schema because the underlying
121+
/// source (e.g. a derived-table subquery) contained multiple columns with
122+
/// the same unqualified name. Any attempt to reference these names without
123+
/// a qualifier should produce an [`SchemaError::AmbiguousReference`] error.
124+
ambiguous_names: HashSet<String>,
120125
}
121126

122127
impl DFSchema {
@@ -126,6 +131,7 @@ impl DFSchema {
126131
inner: Arc::new(Schema::new([])),
127132
field_qualifiers: vec![],
128133
functional_dependencies: FunctionalDependencies::empty(),
134+
ambiguous_names: HashSet::new(),
129135
}
130136
}
131137

@@ -157,6 +163,7 @@ impl DFSchema {
157163
inner: schema,
158164
field_qualifiers: qualifiers,
159165
functional_dependencies: FunctionalDependencies::empty(),
166+
ambiguous_names: HashSet::new(),
160167
};
161168
dfschema.check_names()?;
162169
Ok(dfschema)
@@ -173,6 +180,7 @@ impl DFSchema {
173180
inner: schema,
174181
field_qualifiers: vec![None; field_count],
175182
functional_dependencies: FunctionalDependencies::empty(),
183+
ambiguous_names: HashSet::new(),
176184
};
177185
dfschema.check_names()?;
178186
Ok(dfschema)
@@ -191,6 +199,7 @@ impl DFSchema {
191199
inner: schema.clone().into(),
192200
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
193201
functional_dependencies: FunctionalDependencies::empty(),
202+
ambiguous_names: HashSet::new(),
194203
};
195204
schema.check_names()?;
196205
Ok(schema)
@@ -205,6 +214,7 @@ impl DFSchema {
205214
inner: Arc::clone(schema),
206215
field_qualifiers: qualifiers,
207216
functional_dependencies: FunctionalDependencies::empty(),
217+
ambiguous_names: HashSet::new(),
208218
};
209219
dfschema.check_names()?;
210220
Ok(dfschema)
@@ -226,6 +236,7 @@ impl DFSchema {
226236
inner: Arc::clone(&self.inner),
227237
field_qualifiers: qualifiers,
228238
functional_dependencies: self.functional_dependencies.clone(),
239+
ambiguous_names: self.ambiguous_names.clone(),
229240
})
230241
}
231242

@@ -275,6 +286,24 @@ impl DFSchema {
275286
}
276287
}
277288

289+
/// Marks the given field names as ambiguous.
290+
///
291+
/// Ambiguous names correspond to fields that originated from multiple
292+
/// source columns with the same unqualified name (e.g. both sides of a
293+
/// JOIN having an `age` column). Any attempt to resolve such a name
294+
/// without a table qualifier will produce an
295+
/// [`SchemaError::AmbiguousReference`] error.
296+
pub fn with_ambiguous_names(mut self, names: HashSet<String>) -> Self {
297+
self.ambiguous_names = names;
298+
self
299+
}
300+
301+
/// Returns the set of field names that are considered ambiguous in this
302+
/// schema. See [`Self::with_ambiguous_names`].
303+
pub fn ambiguous_names(&self) -> &HashSet<String> {
304+
&self.ambiguous_names
305+
}
306+
278307
/// Create a new schema that contains the fields from this schema followed by the fields
279308
/// from the supplied schema. An error will be returned if there are duplicate field names.
280309
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
@@ -294,6 +323,7 @@ impl DFSchema {
294323
inner: Arc::new(new_schema_with_metadata),
295324
field_qualifiers: new_qualifiers,
296325
functional_dependencies: FunctionalDependencies::empty(),
326+
ambiguous_names: HashSet::new(),
297327
};
298328
new_self.check_names()?;
299329
Ok(new_self)
@@ -506,6 +536,14 @@ impl DFSchema {
506536
&self,
507537
name: &str,
508538
) -> Result<(Option<&TableReference>, &FieldRef)> {
539+
// If this field name was marked as ambiguous at schema creation time
540+
// (e.g. because a derived-table subquery produced duplicate column
541+
// names), refuse to resolve it without an explicit qualifier.
542+
if self.ambiguous_names.contains(name) {
543+
return _schema_err!(SchemaError::AmbiguousReference {
544+
field: Box::new(Column::new_unqualified(name.to_string()))
545+
});
546+
}
509547
let matches = self.qualified_fields_with_unqualified_name(name);
510548
match matches.len() {
511549
0 => Err(unqualified_field_not_found(name, self)),
@@ -845,6 +883,7 @@ impl DFSchema {
845883
field_qualifiers: vec![None; self.inner.fields.len()],
846884
inner: self.inner,
847885
functional_dependencies: self.functional_dependencies,
886+
ambiguous_names: self.ambiguous_names,
848887
}
849888
}
850889

@@ -855,6 +894,7 @@ impl DFSchema {
855894
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
856895
inner: self.inner,
857896
functional_dependencies: self.functional_dependencies,
897+
ambiguous_names: self.ambiguous_names,
858898
}
859899
}
860900

@@ -1126,6 +1166,7 @@ impl TryFrom<SchemaRef> for DFSchema {
11261166
inner: schema,
11271167
field_qualifiers: vec![None; field_count],
11281168
functional_dependencies: FunctionalDependencies::empty(),
1169+
ambiguous_names: HashSet::new(),
11291170
};
11301171
// Without checking names, because schema here may have duplicate field names.
11311172
// For example, Partial AggregateMode will generate duplicate field names from
@@ -1187,6 +1228,7 @@ impl ToDFSchema for Vec<Field> {
11871228
inner: schema.into(),
11881229
field_qualifiers: vec![None; field_count],
11891230
functional_dependencies: FunctionalDependencies::empty(),
1231+
ambiguous_names: HashSet::new(),
11901232
};
11911233
Ok(dfschema)
11921234
}
@@ -1578,6 +1620,7 @@ mod tests {
15781620
inner: Arc::clone(&arrow_schema_ref),
15791621
field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
15801622
functional_dependencies: FunctionalDependencies::empty(),
1623+
ambiguous_names: HashSet::new(),
15811624
};
15821625
let df_schema_ref = Arc::new(df_schema.clone());
15831626

@@ -1624,6 +1667,7 @@ mod tests {
16241667
inner: Arc::clone(&schema),
16251668
field_qualifiers: vec![None; schema.fields.len()],
16261669
functional_dependencies: FunctionalDependencies::empty(),
1670+
ambiguous_names: HashSet::new(),
16271671
};
16281672

16291673
assert_eq!(df_schema.inner.metadata(), schema.metadata())

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3562,7 +3562,7 @@ mod tests {
35623562
.expect_err("planning error")
35633563
.strip_backtrace();
35643564

3565-
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
3565+
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] }, ambiguous_names: {} }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
35663566
}
35673567

35683568
#[tokio::test]

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2406,6 +2406,28 @@ impl SubqueryAlias {
24062406
let aliases = unique_field_aliases(plan.schema().fields());
24072407
let is_projection_needed = aliases.iter().any(Option::is_some);
24082408

2409+
// Collect the set of unqualified field names that are ambiguous in this
2410+
// subquery alias's output schema. A name is ambiguous when two or more
2411+
// input columns share the same unqualified name (they come, say, from
2412+
// different sides of a JOIN). `unique_field_aliases` renames the
2413+
// duplicates to keep the Arrow schema free of duplicates, but we still
2414+
// need to reject unqualified references to those names from outer
2415+
// queries.
2416+
let ambiguous_names: HashSet<String> = aliases
2417+
.iter()
2418+
.zip(plan.schema().fields().iter())
2419+
.filter_map(|(alias, field)| {
2420+
// When a field was given a rename alias it means its original
2421+
// name already appeared in the schema → the original name is
2422+
// ambiguous.
2423+
if alias.is_some() {
2424+
Some(field.name().to_string())
2425+
} else {
2426+
None
2427+
}
2428+
})
2429+
.collect();
2430+
24092431
// Insert a projection node, if needed, to make sure aliases are applied.
24102432
let plan = if is_projection_needed {
24112433
let projection_expressions = aliases
@@ -2438,7 +2460,8 @@ impl SubqueryAlias {
24382460

24392461
let schema = DFSchemaRef::new(
24402462
DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2441-
.with_functional_dependencies(func_dependencies)?,
2463+
.with_functional_dependencies(func_dependencies)?
2464+
.with_ambiguous_names(ambiguous_names),
24422465
);
24432466
Ok(SubqueryAlias {
24442467
input: plan,

0 commit comments

Comments
 (0)