Skip to content

Commit 35160e8

Browse files
committed
feat(ast): add CreatePublication and CreateSubscription AST nodes
Add PublicationTarget enum (AllTables, Tables, TablesInSchema), CreatePublication struct, and CreateSubscription struct to ddl.rs. Add Statement variants, Display arms, and re-exports in mod.rs. Add Span::empty() arms in spans.rs. Add PUBLICATION and SUBSCRIPTION to keywords.rs.
1 parent 18b7650 commit 35160e8

6 files changed

Lines changed: 292 additions & 2 deletions

File tree

src/ast/ddl.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6136,3 +6136,111 @@ impl From<CreateTextSearchTemplate> for crate::ast::Statement {
61366136
crate::ast::Statement::CreateTextSearchTemplate(v)
61376137
}
61386138
}
6139+
6140+
/// The target of a `CREATE PUBLICATION` statement: which rows to publish.
6141+
///
6142+
/// See <https://www.postgresql.org/docs/current/sql-createpublication.html>
6143+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
6144+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
6145+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
6146+
pub enum PublicationTarget {
6147+
/// `FOR ALL TABLES`
6148+
AllTables,
6149+
/// `FOR TABLE table [, ...]`
6150+
Tables(Vec<ObjectName>),
6151+
/// `FOR TABLES IN SCHEMA schema [, ...]`
6152+
TablesInSchema(Vec<Ident>),
6153+
}
6154+
6155+
impl fmt::Display for PublicationTarget {
6156+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6157+
match self {
6158+
PublicationTarget::AllTables => write!(f, "FOR ALL TABLES"),
6159+
PublicationTarget::Tables(tables) => {
6160+
write!(f, "FOR TABLE {}", display_comma_separated(tables))
6161+
}
6162+
PublicationTarget::TablesInSchema(schemas) => {
6163+
write!(
6164+
f,
6165+
"FOR TABLES IN SCHEMA {}",
6166+
display_comma_separated(schemas)
6167+
)
6168+
}
6169+
}
6170+
}
6171+
}
6172+
6173+
/// A `CREATE PUBLICATION` statement.
6174+
///
6175+
/// Note: this is a PostgreSQL-specific statement.
6176+
/// <https://www.postgresql.org/docs/current/sql-createpublication.html>
6177+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
6178+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
6179+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
6180+
pub struct CreatePublication {
6181+
/// The publication name.
6182+
pub name: Ident,
6183+
/// Optional target specification (`FOR ALL TABLES`, `FOR TABLE ...`, or `FOR TABLES IN SCHEMA ...`).
6184+
pub target: Option<PublicationTarget>,
6185+
/// Optional `WITH (key = value, ...)` clause.
6186+
pub with_options: Vec<SqlOption>,
6187+
}
6188+
6189+
impl fmt::Display for CreatePublication {
6190+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6191+
write!(f, "CREATE PUBLICATION {}", self.name)?;
6192+
if let Some(target) = &self.target {
6193+
write!(f, " {target}")?;
6194+
}
6195+
if !self.with_options.is_empty() {
6196+
write!(f, " WITH ({})", display_comma_separated(&self.with_options))?;
6197+
}
6198+
Ok(())
6199+
}
6200+
}
6201+
6202+
impl From<CreatePublication> for crate::ast::Statement {
6203+
fn from(v: CreatePublication) -> Self {
6204+
crate::ast::Statement::CreatePublication(v)
6205+
}
6206+
}
6207+
6208+
/// A `CREATE SUBSCRIPTION` statement.
6209+
///
6210+
/// Note: this is a PostgreSQL-specific statement.
6211+
/// <https://www.postgresql.org/docs/current/sql-createsubscription.html>
6212+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
6213+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
6214+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
6215+
pub struct CreateSubscription {
6216+
/// The subscription name.
6217+
pub name: Ident,
6218+
/// The `CONNECTION 'conninfo'` string.
6219+
pub connection: Value,
6220+
/// The `PUBLICATION publication_name [, ...]` list.
6221+
pub publications: Vec<Ident>,
6222+
/// Optional `WITH (key = value, ...)` clause.
6223+
pub with_options: Vec<SqlOption>,
6224+
}
6225+
6226+
impl fmt::Display for CreateSubscription {
6227+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
6228+
write!(
6229+
f,
6230+
"CREATE SUBSCRIPTION {name} CONNECTION {connection} PUBLICATION {publications}",
6231+
name = self.name,
6232+
connection = self.connection,
6233+
publications = display_comma_separated(&self.publications),
6234+
)?;
6235+
if !self.with_options.is_empty() {
6236+
write!(f, " WITH ({})", display_comma_separated(&self.with_options))?;
6237+
}
6238+
Ok(())
6239+
}
6240+
}
6241+
6242+
impl From<CreateSubscription> for crate::ast::Statement {
6243+
fn from(v: CreateSubscription) -> Self {
6244+
crate::ast::Statement::CreateSubscription(v)
6245+
}
6246+
}

src/ast/mod.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ pub use self::ddl::{
7272
CreateAggregateOption, CreateCollation, CreateCollationDefinition, CreateConnector,
7373
CreateDomain, CreateExtension, CreateForeignDataWrapper, CreateForeignTable, CreateFunction,
7474
CreateIndex, CreateOperator, CreateOperatorClass, CreateOperatorFamily, CreatePolicy,
75-
CreatePolicyCommand, CreatePolicyType, CreateTable, CreateTextSearchConfiguration,
76-
CreateTextSearchDictionary, CreateTextSearchParser, CreateTextSearchTemplate, CreateTrigger,
75+
CreatePolicyCommand, CreatePolicyType, CreatePublication, CreateSubscription, CreateTable,
76+
CreateTextSearchConfiguration, CreateTextSearchDictionary, CreateTextSearchParser,
77+
CreateTextSearchTemplate, CreateTrigger, PublicationTarget,
7778
CreateView, Deduplicate, DeferrableInitial, DistStyle, DropBehavior, DropExtension,
7879
DropFunction, DropOperator, DropOperatorClass, DropOperatorFamily, DropOperatorSignature,
7980
DropPolicy, DropTrigger, FdwRoutineClause, ForValues, FunctionReturnType, GeneratedAs,
@@ -4017,6 +4018,18 @@ pub enum Statement {
40174018
/// <https://www.postgresql.org/docs/current/sql-createtstemplate.html>
40184019
CreateTextSearchTemplate(CreateTextSearchTemplate),
40194020
/// ```sql
4021+
/// CREATE PUBLICATION name [ FOR ALL TABLES | FOR TABLE table [, ...] | FOR TABLES IN SCHEMA schema [, ...] ] [ WITH ( option = value [, ...] ) ]
4022+
/// ```
4023+
/// Note: this is a PostgreSQL-specific statement.
4024+
/// <https://www.postgresql.org/docs/current/sql-createpublication.html>
4025+
CreatePublication(CreatePublication),
4026+
/// ```sql
4027+
/// CREATE SUBSCRIPTION name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( option = value [, ...] ) ]
4028+
/// ```
4029+
/// Note: this is a PostgreSQL-specific statement.
4030+
/// <https://www.postgresql.org/docs/current/sql-createsubscription.html>
4031+
CreateSubscription(CreateSubscription),
4032+
/// ```sql
40204033
/// DROP EXTENSION [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]
40214034
/// ```
40224035
/// Note: this is a PostgreSQL-specific statement.
@@ -5503,6 +5516,8 @@ impl fmt::Display for Statement {
55035516
Statement::CreateTextSearchDictionary(v) => write!(f, "{v}"),
55045517
Statement::CreateTextSearchParser(v) => write!(f, "{v}"),
55055518
Statement::CreateTextSearchTemplate(v) => write!(f, "{v}"),
5519+
Statement::CreatePublication(v) => write!(f, "{v}"),
5520+
Statement::CreateSubscription(v) => write!(f, "{v}"),
55065521
Statement::DropExtension(drop_extension) => write!(f, "{drop_extension}"),
55075522
Statement::DropOperator(drop_operator) => write!(f, "{drop_operator}"),
55085523
Statement::DropOperatorFamily(drop_operator_family) => {

src/ast/spans.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,8 @@ impl Spanned for Statement {
388388
Statement::CreateTextSearchDictionary(_) => Span::empty(),
389389
Statement::CreateTextSearchParser(_) => Span::empty(),
390390
Statement::CreateTextSearchTemplate(_) => Span::empty(),
391+
Statement::CreatePublication(_) => Span::empty(),
392+
Statement::CreateSubscription(_) => Span::empty(),
391393
Statement::DropExtension(drop_extension) => drop_extension.span(),
392394
Statement::DropOperator(drop_operator) => drop_operator.span(),
393395
Statement::DropOperatorFamily(drop_operator_family) => drop_operator_family.span(),

src/keywords.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,7 @@ define_keywords!(
822822
PROGRAM,
823823
PROJECTION,
824824
PUBLIC,
825+
PUBLICATION,
825826
PURCHASE,
826827
PURGE,
827828
QUALIFY,
@@ -1010,6 +1011,7 @@ define_keywords!(
10101011
STRUCT,
10111012
SUBMULTISET,
10121013
SUBSCRIPT,
1014+
SUBSCRIPTION,
10131015
SUBSTR,
10141016
SUBSTRING,
10151017
SUBSTRING_REGEX,

src/parser/mod.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5228,6 +5228,10 @@ impl<'a> Parser<'a> {
52285228
}
52295229
} else if self.parse_keywords(&[Keyword::TEXT, Keyword::SEARCH]) {
52305230
self.parse_create_text_search()
5231+
} else if self.parse_keyword(Keyword::PUBLICATION) {
5232+
self.parse_create_publication().map(Into::into)
5233+
} else if self.parse_keyword(Keyword::SUBSCRIPTION) {
5234+
self.parse_create_subscription().map(Into::into)
52315235
} else {
52325236
self.expected_ref("an object type after CREATE", self.peek_token_ref())
52335237
}
@@ -20083,6 +20087,59 @@ impl<'a> Parser<'a> {
2008320087
})
2008420088
}
2008520089

20090+
/// Parse a `CREATE PUBLICATION` statement.
20091+
///
20092+
/// See <https://www.postgresql.org/docs/current/sql-createpublication.html>
20093+
pub fn parse_create_publication(&mut self) -> Result<CreatePublication, ParserError> {
20094+
let name = self.parse_identifier()?;
20095+
20096+
let target = if self.parse_keyword(Keyword::FOR) {
20097+
if self.parse_keywords(&[Keyword::ALL, Keyword::TABLES]) {
20098+
Some(PublicationTarget::AllTables)
20099+
} else if self.parse_keyword(Keyword::TABLE) {
20100+
let tables = self.parse_comma_separated(|p| p.parse_object_name(false))?;
20101+
Some(PublicationTarget::Tables(tables))
20102+
} else if self.parse_keywords(&[Keyword::TABLES, Keyword::IN, Keyword::SCHEMA]) {
20103+
let schemas = self.parse_comma_separated(|p| p.parse_identifier())?;
20104+
Some(PublicationTarget::TablesInSchema(schemas))
20105+
} else {
20106+
return self.expected_ref(
20107+
"ALL TABLES, TABLE, or TABLES IN SCHEMA after FOR",
20108+
self.peek_token_ref(),
20109+
);
20110+
}
20111+
} else {
20112+
None
20113+
};
20114+
20115+
let with_options = self.parse_options(Keyword::WITH)?;
20116+
20117+
Ok(CreatePublication {
20118+
name,
20119+
target,
20120+
with_options,
20121+
})
20122+
}
20123+
20124+
/// Parse a `CREATE SUBSCRIPTION` statement.
20125+
///
20126+
/// See <https://www.postgresql.org/docs/current/sql-createsubscription.html>
20127+
pub fn parse_create_subscription(&mut self) -> Result<CreateSubscription, ParserError> {
20128+
let name = self.parse_identifier()?;
20129+
self.expect_keyword_is(Keyword::CONNECTION)?;
20130+
let connection = self.parse_value()?.value;
20131+
self.expect_keyword_is(Keyword::PUBLICATION)?;
20132+
let publications = self.parse_comma_separated(|p| p.parse_identifier())?;
20133+
let with_options = self.parse_options(Keyword::WITH)?;
20134+
20135+
Ok(CreateSubscription {
20136+
name,
20137+
connection,
20138+
publications,
20139+
with_options,
20140+
})
20141+
}
20142+
2008620143
/// The index of the first unprocessed token.
2008720144
pub fn index(&self) -> usize {
2008820145
self.index

tests/sqlparser_postgres.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9615,3 +9615,109 @@ fn parse_create_aggregate_with_moving_aggregate_options() {
96159615
_ => panic!("Expected CreateAggregate, got: {stmt:?}"),
96169616
}
96179617
}
9618+
9619+
#[test]
9620+
fn parse_create_publication_basic() {
9621+
let sql = "CREATE PUBLICATION mypub FOR TABLE public.t";
9622+
let Statement::CreatePublication(stmt) = pg().verified_stmt(sql) else {
9623+
unreachable!()
9624+
};
9625+
assert_eq!(stmt.name.value, "mypub");
9626+
assert!(stmt.with_options.is_empty());
9627+
match stmt.target.unwrap() {
9628+
PublicationTarget::Tables(tables) => {
9629+
assert_eq!(tables.len(), 1);
9630+
assert_eq!(tables[0].to_string(), "public.t");
9631+
}
9632+
other => panic!("unexpected target: {other:?}"),
9633+
}
9634+
}
9635+
9636+
#[test]
9637+
fn parse_create_publication_for_all_tables() {
9638+
let sql = "CREATE PUBLICATION mypub FOR ALL TABLES";
9639+
let Statement::CreatePublication(stmt) = pg().verified_stmt(sql) else {
9640+
unreachable!()
9641+
};
9642+
assert_eq!(stmt.name.value, "mypub");
9643+
assert!(matches!(stmt.target, Some(PublicationTarget::AllTables)));
9644+
assert!(stmt.with_options.is_empty());
9645+
}
9646+
9647+
#[test]
9648+
fn parse_create_publication_for_tables_in_schema() {
9649+
let sql = "CREATE PUBLICATION mypub FOR TABLES IN SCHEMA myschema";
9650+
let Statement::CreatePublication(stmt) = pg().verified_stmt(sql) else {
9651+
unreachable!()
9652+
};
9653+
assert_eq!(stmt.name.value, "mypub");
9654+
match stmt.target.unwrap() {
9655+
PublicationTarget::TablesInSchema(schemas) => {
9656+
assert_eq!(schemas.len(), 1);
9657+
assert_eq!(schemas[0].value, "myschema");
9658+
}
9659+
other => panic!("unexpected target: {other:?}"),
9660+
}
9661+
}
9662+
9663+
#[test]
9664+
fn parse_create_publication_with_options() {
9665+
let sql = "CREATE PUBLICATION mypub FOR ALL TABLES WITH (publish = 'insert, update')";
9666+
let Statement::CreatePublication(stmt) = pg().verified_stmt(sql) else {
9667+
unreachable!()
9668+
};
9669+
assert_eq!(stmt.name.value, "mypub");
9670+
assert!(matches!(stmt.target, Some(PublicationTarget::AllTables)));
9671+
assert_eq!(stmt.with_options.len(), 1);
9672+
match &stmt.with_options[0] {
9673+
SqlOption::KeyValue { key, value } => {
9674+
assert_eq!(key.value, "publish");
9675+
assert_eq!(value.to_string(), "'insert, update'");
9676+
}
9677+
other => panic!("unexpected option: {other:?}"),
9678+
}
9679+
}
9680+
9681+
#[test]
9682+
fn parse_create_subscription_basic() {
9683+
let sql = "CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost' PUBLICATION mypub";
9684+
let Statement::CreateSubscription(stmt) = pg().verified_stmt(sql) else {
9685+
unreachable!()
9686+
};
9687+
assert_eq!(stmt.name.value, "mysub");
9688+
assert_eq!(stmt.connection.to_string(), "'host=localhost'");
9689+
assert_eq!(stmt.publications.len(), 1);
9690+
assert_eq!(stmt.publications[0].value, "mypub");
9691+
assert!(stmt.with_options.is_empty());
9692+
}
9693+
9694+
#[test]
9695+
fn parse_create_subscription_with_options() {
9696+
let sql = "CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost dbname=mydb' PUBLICATION mypub, otherpub WITH (copy_data = true, slot_name = 'myslot')";
9697+
let Statement::CreateSubscription(stmt) = pg().verified_stmt(sql) else {
9698+
unreachable!()
9699+
};
9700+
assert_eq!(stmt.name.value, "mysub");
9701+
assert_eq!(
9702+
stmt.connection.to_string(),
9703+
"'host=localhost dbname=mydb'"
9704+
);
9705+
assert_eq!(stmt.publications.len(), 2);
9706+
assert_eq!(stmt.publications[0].value, "mypub");
9707+
assert_eq!(stmt.publications[1].value, "otherpub");
9708+
assert_eq!(stmt.with_options.len(), 2);
9709+
match &stmt.with_options[0] {
9710+
SqlOption::KeyValue { key, value } => {
9711+
assert_eq!(key.value, "copy_data");
9712+
assert_eq!(value.to_string(), "true");
9713+
}
9714+
other => panic!("unexpected option: {other:?}"),
9715+
}
9716+
match &stmt.with_options[1] {
9717+
SqlOption::KeyValue { key, value } => {
9718+
assert_eq!(key.value, "slot_name");
9719+
assert_eq!(value.to_string(), "'myslot'");
9720+
}
9721+
other => panic!("unexpected option: {other:?}"),
9722+
}
9723+
}

0 commit comments

Comments
 (0)