Skip to content

Commit e30b6bf

Browse files
simonvandelayman-sigma
authored andcommitted
Support remaining pipe operators (apache#1879)
1 parent 01daed7 commit e30b6bf

File tree

3 files changed

+739
-0
lines changed

3 files changed

+739
-0
lines changed

src/ast/query.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2697,6 +2697,79 @@ pub enum PipeOperator {
26972697
/// Syntax: `|> TABLESAMPLE SYSTEM (10 PERCENT)
26982698
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#tablesample_pipe_operator>
26992699
TableSample { sample: Box<TableSample> },
2700+
/// Renames columns in the input table.
2701+
///
2702+
/// Syntax: `|> RENAME old_name AS new_name, ...`
2703+
///
2704+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#rename_pipe_operator>
2705+
Rename { mappings: Vec<IdentWithAlias> },
2706+
/// Combines the input table with one or more tables using UNION.
2707+
///
2708+
/// Syntax: `|> UNION [ALL|DISTINCT] (<query>), (<query>), ...`
2709+
///
2710+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#union_pipe_operator>
2711+
Union {
2712+
set_quantifier: SetQuantifier,
2713+
queries: Vec<Query>,
2714+
},
2715+
/// Returns only the rows that are present in both the input table and the specified tables.
2716+
///
2717+
/// Syntax: `|> INTERSECT [DISTINCT] (<query>), (<query>), ...`
2718+
///
2719+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#intersect_pipe_operator>
2720+
Intersect {
2721+
set_quantifier: SetQuantifier,
2722+
queries: Vec<Query>,
2723+
},
2724+
/// Returns only the rows that are present in the input table but not in the specified tables.
2725+
///
2726+
/// Syntax: `|> EXCEPT DISTINCT (<query>), (<query>), ...`
2727+
///
2728+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#except_pipe_operator>
2729+
Except {
2730+
set_quantifier: SetQuantifier,
2731+
queries: Vec<Query>,
2732+
},
2733+
/// Calls a table function or procedure that returns a table.
2734+
///
2735+
/// Syntax: `|> CALL function_name(args) [AS alias]`
2736+
///
2737+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#call_pipe_operator>
2738+
Call {
2739+
function: Function,
2740+
alias: Option<Ident>,
2741+
},
2742+
/// Pivots data from rows to columns.
2743+
///
2744+
/// Syntax: `|> PIVOT(aggregate_function(column) FOR pivot_column IN (value1, value2, ...)) [AS alias]`
2745+
///
2746+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#pivot_pipe_operator>
2747+
Pivot {
2748+
aggregate_functions: Vec<ExprWithAlias>,
2749+
value_column: Vec<Ident>,
2750+
value_source: PivotValueSource,
2751+
alias: Option<Ident>,
2752+
},
2753+
/// The `UNPIVOT` pipe operator transforms columns into rows.
2754+
///
2755+
/// Syntax:
2756+
/// ```sql
2757+
/// |> UNPIVOT(value_column FOR name_column IN (column1, column2, ...)) [alias]
2758+
/// ```
2759+
///
2760+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#unpivot_pipe_operator>
2761+
Unpivot {
2762+
value_column: Ident,
2763+
name_column: Ident,
2764+
unpivot_columns: Vec<Ident>,
2765+
alias: Option<Ident>,
2766+
},
2767+
/// Joins the input table with another table.
2768+
///
2769+
/// Syntax: `|> [JOIN_TYPE] JOIN <table> [alias] ON <condition>` or `|> [JOIN_TYPE] JOIN <table> [alias] USING (<columns>)`
2770+
///
2771+
/// See more at <https://cloud.google.com/bigquery/docs/reference/standard-sql/pipe-syntax#join_pipe_operator>
2772+
Join(Join),
27002773
}
27012774

27022775
impl fmt::Display for PipeOperator {
@@ -2752,7 +2825,87 @@ impl fmt::Display for PipeOperator {
27522825
PipeOperator::TableSample { sample } => {
27532826
write!(f, "{sample}")
27542827
}
2828+
PipeOperator::Rename { mappings } => {
2829+
write!(f, "RENAME {}", display_comma_separated(mappings))
2830+
}
2831+
PipeOperator::Union {
2832+
set_quantifier,
2833+
queries,
2834+
} => Self::fmt_set_operation(f, "UNION", set_quantifier, queries),
2835+
PipeOperator::Intersect {
2836+
set_quantifier,
2837+
queries,
2838+
} => Self::fmt_set_operation(f, "INTERSECT", set_quantifier, queries),
2839+
PipeOperator::Except {
2840+
set_quantifier,
2841+
queries,
2842+
} => Self::fmt_set_operation(f, "EXCEPT", set_quantifier, queries),
2843+
PipeOperator::Call { function, alias } => {
2844+
write!(f, "CALL {function}")?;
2845+
Self::fmt_optional_alias(f, alias)
2846+
}
2847+
PipeOperator::Pivot {
2848+
aggregate_functions,
2849+
value_column,
2850+
value_source,
2851+
alias,
2852+
} => {
2853+
write!(
2854+
f,
2855+
"PIVOT({} FOR {} IN ({}))",
2856+
display_comma_separated(aggregate_functions),
2857+
Expr::CompoundIdentifier(value_column.to_vec()),
2858+
value_source
2859+
)?;
2860+
Self::fmt_optional_alias(f, alias)
2861+
}
2862+
PipeOperator::Unpivot {
2863+
value_column,
2864+
name_column,
2865+
unpivot_columns,
2866+
alias,
2867+
} => {
2868+
write!(
2869+
f,
2870+
"UNPIVOT({} FOR {} IN ({}))",
2871+
value_column,
2872+
name_column,
2873+
display_comma_separated(unpivot_columns)
2874+
)?;
2875+
Self::fmt_optional_alias(f, alias)
2876+
}
2877+
PipeOperator::Join(join) => write!(f, "{join}"),
2878+
}
2879+
}
2880+
}
2881+
2882+
impl PipeOperator {
2883+
/// Helper function to format optional alias for pipe operators
2884+
fn fmt_optional_alias(f: &mut fmt::Formatter<'_>, alias: &Option<Ident>) -> fmt::Result {
2885+
if let Some(alias) = alias {
2886+
write!(f, " AS {alias}")?;
27552887
}
2888+
Ok(())
2889+
}
2890+
2891+
/// Helper function to format set operations (UNION, INTERSECT, EXCEPT) with queries
2892+
fn fmt_set_operation(
2893+
f: &mut fmt::Formatter<'_>,
2894+
operation: &str,
2895+
set_quantifier: &SetQuantifier,
2896+
queries: &[Query],
2897+
) -> fmt::Result {
2898+
write!(f, "{operation}")?;
2899+
match set_quantifier {
2900+
SetQuantifier::None => {}
2901+
_ => {
2902+
write!(f, " {set_quantifier}")?;
2903+
}
2904+
}
2905+
write!(f, " ")?;
2906+
let parenthesized_queries: Vec<String> =
2907+
queries.iter().map(|query| format!("({query})")).collect();
2908+
write!(f, "{}", display_comma_separated(&parenthesized_queries))
27562909
}
27572910
}
27582911

src/parser/mod.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10019,6 +10019,48 @@ impl<'a> Parser<'a> {
1001910019
Ok(IdentWithAlias { ident, alias })
1002010020
}
1002110021

10022+
/// Parse `identifier [AS] identifier` where the AS keyword is optional
10023+
fn parse_identifier_with_optional_alias(&mut self) -> Result<IdentWithAlias, ParserError> {
10024+
let ident = self.parse_identifier()?;
10025+
let _after_as = self.parse_keyword(Keyword::AS);
10026+
let alias = self.parse_identifier()?;
10027+
Ok(IdentWithAlias { ident, alias })
10028+
}
10029+
10030+
/// Parse comma-separated list of parenthesized queries for pipe operators
10031+
fn parse_pipe_operator_queries(&mut self) -> Result<Vec<Query>, ParserError> {
10032+
self.parse_comma_separated(|parser| {
10033+
parser.expect_token(&Token::LParen)?;
10034+
let query = parser.parse_query()?;
10035+
parser.expect_token(&Token::RParen)?;
10036+
Ok(*query)
10037+
})
10038+
}
10039+
10040+
/// Parse set quantifier for pipe operators that require DISTINCT. E.g. INTERSECT and EXCEPT
10041+
fn parse_distinct_required_set_quantifier(
10042+
&mut self,
10043+
operator_name: &str,
10044+
) -> Result<SetQuantifier, ParserError> {
10045+
let quantifier = self.parse_set_quantifier(&Some(SetOperator::Intersect));
10046+
match quantifier {
10047+
SetQuantifier::Distinct | SetQuantifier::DistinctByName => Ok(quantifier),
10048+
_ => Err(ParserError::ParserError(format!(
10049+
"{operator_name} pipe operator requires DISTINCT modifier",
10050+
))),
10051+
}
10052+
}
10053+
10054+
/// Parse optional identifier alias (with or without AS keyword)
10055+
fn parse_identifier_optional_alias(&mut self) -> Result<Option<Ident>, ParserError> {
10056+
if self.parse_keyword(Keyword::AS) {
10057+
Ok(Some(self.parse_identifier()?))
10058+
} else {
10059+
// Check if the next token is an identifier (implicit alias)
10060+
self.maybe_parse(|parser| parser.parse_identifier())
10061+
}
10062+
}
10063+
1002210064
/// Optionally parses an alias for a select list item
1002310065
fn maybe_parse_select_item_alias(&mut self) -> Result<Option<Ident>, ParserError> {
1002410066
fn validator(explicit: bool, kw: &Keyword, parser: &mut Parser) -> bool {
@@ -11165,6 +11207,19 @@ impl<'a> Parser<'a> {
1116511207
Keyword::AGGREGATE,
1116611208
Keyword::ORDER,
1116711209
Keyword::TABLESAMPLE,
11210+
Keyword::RENAME,
11211+
Keyword::UNION,
11212+
Keyword::INTERSECT,
11213+
Keyword::EXCEPT,
11214+
Keyword::CALL,
11215+
Keyword::PIVOT,
11216+
Keyword::UNPIVOT,
11217+
Keyword::JOIN,
11218+
Keyword::INNER,
11219+
Keyword::LEFT,
11220+
Keyword::RIGHT,
11221+
Keyword::FULL,
11222+
Keyword::CROSS,
1116811223
])?;
1116911224
match kw {
1117011225
Keyword::SELECT => {
@@ -11231,6 +11286,121 @@ impl<'a> Parser<'a> {
1123111286
let sample = self.parse_table_sample(TableSampleModifier::TableSample)?;
1123211287
pipe_operators.push(PipeOperator::TableSample { sample });
1123311288
}
11289+
Keyword::RENAME => {
11290+
let mappings =
11291+
self.parse_comma_separated(Parser::parse_identifier_with_optional_alias)?;
11292+
pipe_operators.push(PipeOperator::Rename { mappings });
11293+
}
11294+
Keyword::UNION => {
11295+
let set_quantifier = self.parse_set_quantifier(&Some(SetOperator::Union));
11296+
let queries = self.parse_pipe_operator_queries()?;
11297+
pipe_operators.push(PipeOperator::Union {
11298+
set_quantifier,
11299+
queries,
11300+
});
11301+
}
11302+
Keyword::INTERSECT => {
11303+
let set_quantifier =
11304+
self.parse_distinct_required_set_quantifier("INTERSECT")?;
11305+
let queries = self.parse_pipe_operator_queries()?;
11306+
pipe_operators.push(PipeOperator::Intersect {
11307+
set_quantifier,
11308+
queries,
11309+
});
11310+
}
11311+
Keyword::EXCEPT => {
11312+
let set_quantifier = self.parse_distinct_required_set_quantifier("EXCEPT")?;
11313+
let queries = self.parse_pipe_operator_queries()?;
11314+
pipe_operators.push(PipeOperator::Except {
11315+
set_quantifier,
11316+
queries,
11317+
});
11318+
}
11319+
Keyword::CALL => {
11320+
let function_name = self.parse_object_name(false)?;
11321+
let function_expr = self.parse_function(function_name)?;
11322+
if let Expr::Function(function) = function_expr {
11323+
let alias = self.parse_identifier_optional_alias()?;
11324+
pipe_operators.push(PipeOperator::Call { function, alias });
11325+
} else {
11326+
return Err(ParserError::ParserError(
11327+
"Expected function call after CALL".to_string(),
11328+
));
11329+
}
11330+
}
11331+
Keyword::PIVOT => {
11332+
self.expect_token(&Token::LParen)?;
11333+
let aggregate_functions =
11334+
self.parse_comma_separated(Self::parse_aliased_function_call)?;
11335+
self.expect_keyword_is(Keyword::FOR)?;
11336+
let value_column = self.parse_period_separated(|p| p.parse_identifier())?;
11337+
self.expect_keyword_is(Keyword::IN)?;
11338+
11339+
self.expect_token(&Token::LParen)?;
11340+
let value_source = if self.parse_keyword(Keyword::ANY) {
11341+
let order_by = if self.parse_keywords(&[Keyword::ORDER, Keyword::BY]) {
11342+
self.parse_comma_separated(Parser::parse_order_by_expr)?
11343+
} else {
11344+
vec![]
11345+
};
11346+
PivotValueSource::Any(order_by)
11347+
} else if self.peek_sub_query() {
11348+
PivotValueSource::Subquery(self.parse_query()?)
11349+
} else {
11350+
PivotValueSource::List(
11351+
self.parse_comma_separated(Self::parse_expr_with_alias)?,
11352+
)
11353+
};
11354+
self.expect_token(&Token::RParen)?;
11355+
self.expect_token(&Token::RParen)?;
11356+
11357+
let alias = self.parse_identifier_optional_alias()?;
11358+
11359+
pipe_operators.push(PipeOperator::Pivot {
11360+
aggregate_functions,
11361+
value_column,
11362+
value_source,
11363+
alias,
11364+
});
11365+
}
11366+
Keyword::UNPIVOT => {
11367+
self.expect_token(&Token::LParen)?;
11368+
let value_column = self.parse_identifier()?;
11369+
self.expect_keyword(Keyword::FOR)?;
11370+
let name_column = self.parse_identifier()?;
11371+
self.expect_keyword(Keyword::IN)?;
11372+
11373+
self.expect_token(&Token::LParen)?;
11374+
let unpivot_columns = self.parse_comma_separated(Parser::parse_identifier)?;
11375+
self.expect_token(&Token::RParen)?;
11376+
11377+
self.expect_token(&Token::RParen)?;
11378+
11379+
let alias = self.parse_identifier_optional_alias()?;
11380+
11381+
pipe_operators.push(PipeOperator::Unpivot {
11382+
value_column,
11383+
name_column,
11384+
unpivot_columns,
11385+
alias,
11386+
});
11387+
}
11388+
Keyword::JOIN
11389+
| Keyword::INNER
11390+
| Keyword::LEFT
11391+
| Keyword::RIGHT
11392+
| Keyword::FULL
11393+
| Keyword::CROSS => {
11394+
self.prev_token();
11395+
let mut joins = self.parse_joins()?;
11396+
if joins.len() != 1 {
11397+
return Err(ParserError::ParserError(
11398+
"Join pipe operator must have a single join".to_string(),
11399+
));
11400+
}
11401+
let join = joins.swap_remove(0);
11402+
pipe_operators.push(PipeOperator::Join(join))
11403+
}
1123411404
unhandled => {
1123511405
return Err(ParserError::ParserError(format!(
1123611406
"`expect_one_of_keywords` further up allowed unhandled keyword: {unhandled:?}"

0 commit comments

Comments
 (0)