Skip to content

Commit eec8df0

Browse files
postgres: add subscription DDL and predefined-role grant/revoke parsing
1 parent a281171 commit eec8df0

5 files changed

Lines changed: 591 additions & 7 deletions

File tree

src/ast/mod.rs

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2513,6 +2513,8 @@ pub enum CommentObject {
25132513
Schema,
25142514
/// A sequence.
25152515
Sequence,
2516+
/// A subscription.
2517+
Subscription,
25162518
/// A table.
25172519
Table,
25182520
/// A type.
@@ -2538,6 +2540,7 @@ impl fmt::Display for CommentObject {
25382540
CommentObject::Role => f.write_str("ROLE"),
25392541
CommentObject::Schema => f.write_str("SCHEMA"),
25402542
CommentObject::Sequence => f.write_str("SEQUENCE"),
2543+
CommentObject::Subscription => f.write_str("SUBSCRIPTION"),
25412544
CommentObject::Table => f.write_str("TABLE"),
25422545
CommentObject::Type => f.write_str("TYPE"),
25432546
CommentObject::User => f.write_str("USER"),
@@ -3737,6 +3740,12 @@ pub enum Statement {
37373740
/// A `CREATE SERVER` statement.
37383741
CreateServer(CreateServerStatement),
37393742
/// ```sql
3743+
/// CREATE SUBSCRIPTION
3744+
/// ```
3745+
///
3746+
/// Note: this is a PostgreSQL-specific statement.
3747+
CreateSubscription(CreateSubscription),
3748+
/// ```sql
37403749
/// CREATE POLICY
37413750
/// ```
37423751
/// See [PostgreSQL](https://www.postgresql.org/docs/current/sql-createpolicy.html)
@@ -3835,6 +3844,12 @@ pub enum Statement {
38353844
operation: AlterRoleOperation,
38363845
},
38373846
/// ```sql
3847+
/// ALTER SUBSCRIPTION
3848+
/// ```
3849+
///
3850+
/// Note: this is a PostgreSQL-specific statement.
3851+
AlterSubscription(AlterSubscription),
3852+
/// ```sql
38383853
/// ALTER POLICY <NAME> ON <TABLE NAME> [<OPERATION>]
38393854
/// ```
38403855
/// (Postgresql-specific)
@@ -5542,6 +5557,9 @@ impl fmt::Display for Statement {
55425557
Statement::CreateServer(stmt) => {
55435558
write!(f, "{stmt}")
55445559
}
5560+
Statement::CreateSubscription(stmt) => {
5561+
write!(f, "{stmt}")
5562+
}
55455563
Statement::CreatePolicy(policy) => write!(f, "{policy}"),
55465564
Statement::CreateConnector(create_connector) => create_connector.fmt(f),
55475565
Statement::CreateOperator(create_operator) => create_operator.fmt(f),
@@ -5583,6 +5601,9 @@ impl fmt::Display for Statement {
55835601
Statement::AlterRole { name, operation } => {
55845602
write!(f, "ALTER ROLE {name} {operation}")
55855603
}
5604+
Statement::AlterSubscription(alter_subscription) => {
5605+
write!(f, "{alter_subscription}")
5606+
}
55865607
Statement::AlterPolicy(alter_policy) => write!(f, "{alter_policy}"),
55875608
Statement::AlterConnector {
55885609
name,
@@ -6989,6 +7010,11 @@ pub enum Action {
69897010
BindServiceEndpoint,
69907011
/// Connect permission.
69917012
Connect,
7013+
/// Custom privilege name (primarily PostgreSQL).
7014+
Custom {
7015+
/// The custom privilege identifier.
7016+
name: Ident,
7017+
},
69927018
/// Create action, optionally specifying an object type.
69937019
Create {
69947020
/// Optional object type to create.
@@ -7103,6 +7129,7 @@ impl fmt::Display for Action {
71037129
Action::Audit => f.write_str("AUDIT")?,
71047130
Action::BindServiceEndpoint => f.write_str("BIND SERVICE ENDPOINT")?,
71057131
Action::Connect => f.write_str("CONNECT")?,
7132+
Action::Custom { name } => write!(f, "{name}")?,
71067133
Action::Create { obj_type } => {
71077134
f.write_str("CREATE")?;
71087135
if let Some(obj_type) = obj_type {
@@ -8477,6 +8504,8 @@ pub enum ObjectType {
84778504
Role,
84788505
/// A sequence.
84798506
Sequence,
8507+
/// A subscription.
8508+
Subscription,
84808509
/// A stage.
84818510
Stage,
84828511
/// A type definition.
@@ -8499,6 +8528,7 @@ impl fmt::Display for ObjectType {
84998528
ObjectType::Database => "DATABASE",
85008529
ObjectType::Role => "ROLE",
85018530
ObjectType::Sequence => "SEQUENCE",
8531+
ObjectType::Subscription => "SUBSCRIPTION",
85028532
ObjectType::Stage => "STAGE",
85038533
ObjectType::Type => "TYPE",
85048534
ObjectType::User => "USER",
@@ -9011,6 +9041,206 @@ impl fmt::Display for CreateServerOption {
90119041
}
90129042
}
90139043

9044+
/// A subscription option used by `CREATE/ALTER SUBSCRIPTION`.
9045+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
9046+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
9047+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
9048+
pub struct SubscriptionOption {
9049+
/// Subscription parameter name.
9050+
pub name: Ident,
9051+
/// Optional parameter value.
9052+
pub value: Option<Expr>,
9053+
}
9054+
9055+
impl fmt::Display for SubscriptionOption {
9056+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
9057+
if let Some(value) = &self.value {
9058+
write!(f, "{} = {}", self.name, value)
9059+
} else {
9060+
write!(f, "{}", self.name)
9061+
}
9062+
}
9063+
}
9064+
9065+
/// A `CREATE SUBSCRIPTION` statement.
9066+
///
9067+
/// [PostgreSQL Documentation](https://www.postgresql.org/docs/current/sql-createsubscription.html)
9068+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
9069+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
9070+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
9071+
pub struct CreateSubscription {
9072+
/// Subscription name.
9073+
pub name: ObjectName,
9074+
/// Connection string.
9075+
pub connection: String,
9076+
/// Publication names.
9077+
pub publications: Vec<Ident>,
9078+
/// Optional subscription parameters from `WITH (...)`.
9079+
pub with_options: Vec<SubscriptionOption>,
9080+
}
9081+
9082+
impl fmt::Display for CreateSubscription {
9083+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
9084+
write!(
9085+
f,
9086+
"CREATE SUBSCRIPTION {} CONNECTION '{}' PUBLICATION {}",
9087+
self.name,
9088+
value::escape_single_quote_string(&self.connection),
9089+
display_comma_separated(&self.publications)
9090+
)?;
9091+
9092+
if !self.with_options.is_empty() {
9093+
write!(f, " WITH ({})", display_comma_separated(&self.with_options))?;
9094+
}
9095+
9096+
Ok(())
9097+
}
9098+
}
9099+
9100+
/// An `ALTER SUBSCRIPTION` statement.
9101+
///
9102+
/// [PostgreSQL Documentation](https://www.postgresql.org/docs/current/sql-altersubscription.html)
9103+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
9104+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
9105+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
9106+
pub struct AlterSubscription {
9107+
/// Subscription name.
9108+
pub name: ObjectName,
9109+
/// Operation to perform.
9110+
pub operation: AlterSubscriptionOperation,
9111+
}
9112+
9113+
impl fmt::Display for AlterSubscription {
9114+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
9115+
write!(f, "ALTER SUBSCRIPTION {} {}", self.name, self.operation)
9116+
}
9117+
}
9118+
9119+
/// Operations supported by `ALTER SUBSCRIPTION`.
9120+
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
9121+
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
9122+
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
9123+
pub enum AlterSubscriptionOperation {
9124+
/// Update the subscription connection string.
9125+
Connection {
9126+
/// New connection string.
9127+
connection: String,
9128+
},
9129+
/// Replace subscription publications.
9130+
SetPublication {
9131+
/// Publication names.
9132+
publications: Vec<Ident>,
9133+
/// Optional `WITH (...)` parameters.
9134+
with_options: Vec<SubscriptionOption>,
9135+
},
9136+
/// Add publications to the subscription.
9137+
AddPublication {
9138+
/// Publication names.
9139+
publications: Vec<Ident>,
9140+
/// Optional `WITH (...)` parameters.
9141+
with_options: Vec<SubscriptionOption>,
9142+
},
9143+
/// Drop publications from the subscription.
9144+
DropPublication {
9145+
/// Publication names.
9146+
publications: Vec<Ident>,
9147+
/// Optional `WITH (...)` parameters.
9148+
with_options: Vec<SubscriptionOption>,
9149+
},
9150+
/// Refresh subscription publications.
9151+
RefreshPublication {
9152+
/// Optional `WITH (...)` parameters.
9153+
with_options: Vec<SubscriptionOption>,
9154+
},
9155+
/// Enable the subscription.
9156+
Enable,
9157+
/// Disable the subscription.
9158+
Disable,
9159+
/// Set subscription parameters.
9160+
SetOptions {
9161+
/// Parameters within `SET (...)`.
9162+
options: Vec<SubscriptionOption>,
9163+
},
9164+
/// Change subscription owner.
9165+
OwnerTo {
9166+
/// New owner.
9167+
owner: Owner,
9168+
},
9169+
/// Rename the subscription.
9170+
RenameTo {
9171+
/// New subscription name.
9172+
new_name: ObjectName,
9173+
},
9174+
}
9175+
9176+
impl fmt::Display for AlterSubscriptionOperation {
9177+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
9178+
fn write_with_options(
9179+
f: &mut fmt::Formatter<'_>,
9180+
with_options: &[SubscriptionOption],
9181+
) -> fmt::Result {
9182+
if !with_options.is_empty() {
9183+
write!(f, " WITH ({})", display_comma_separated(with_options))?;
9184+
}
9185+
Ok(())
9186+
}
9187+
9188+
match self {
9189+
AlterSubscriptionOperation::Connection { connection } => {
9190+
write!(
9191+
f,
9192+
"CONNECTION '{}'",
9193+
value::escape_single_quote_string(connection)
9194+
)
9195+
}
9196+
AlterSubscriptionOperation::SetPublication {
9197+
publications,
9198+
with_options,
9199+
} => {
9200+
write!(
9201+
f,
9202+
"SET PUBLICATION {}",
9203+
display_comma_separated(publications)
9204+
)?;
9205+
write_with_options(f, with_options)
9206+
}
9207+
AlterSubscriptionOperation::AddPublication {
9208+
publications,
9209+
with_options,
9210+
} => {
9211+
write!(
9212+
f,
9213+
"ADD PUBLICATION {}",
9214+
display_comma_separated(publications)
9215+
)?;
9216+
write_with_options(f, with_options)
9217+
}
9218+
AlterSubscriptionOperation::DropPublication {
9219+
publications,
9220+
with_options,
9221+
} => {
9222+
write!(
9223+
f,
9224+
"DROP PUBLICATION {}",
9225+
display_comma_separated(publications)
9226+
)?;
9227+
write_with_options(f, with_options)
9228+
}
9229+
AlterSubscriptionOperation::RefreshPublication { with_options } => {
9230+
write!(f, "REFRESH PUBLICATION")?;
9231+
write_with_options(f, with_options)
9232+
}
9233+
AlterSubscriptionOperation::Enable => write!(f, "ENABLE"),
9234+
AlterSubscriptionOperation::Disable => write!(f, "DISABLE"),
9235+
AlterSubscriptionOperation::SetOptions { options } => {
9236+
write!(f, "SET ({})", display_comma_separated(options))
9237+
}
9238+
AlterSubscriptionOperation::OwnerTo { owner } => write!(f, "OWNER TO {owner}"),
9239+
AlterSubscriptionOperation::RenameTo { new_name } => write!(f, "RENAME TO {new_name}"),
9240+
}
9241+
}
9242+
}
9243+
90149244
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
90159245
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
90169246
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
@@ -11955,6 +12185,8 @@ impl fmt::Display for VacuumStatement {
1195512185
pub enum Reset {
1195612186
/// Resets all session parameters to their default values.
1195712187
ALL,
12188+
/// Resets session authorization to the session user.
12189+
SessionAuthorization,
1195812190

1195912191
/// Resets a specific session parameter to its default value.
1196012192
ConfigurationParameter(ObjectName),
@@ -12037,6 +12269,7 @@ impl fmt::Display for ResetStatement {
1203712269
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1203812270
match &self.reset {
1203912271
Reset::ALL => write!(f, "RESET ALL"),
12272+
Reset::SessionAuthorization => write!(f, "RESET SESSION AUTHORIZATION"),
1204012273
Reset::ConfigurationParameter(param) => write!(f, "RESET {}", param),
1204112274
}
1204212275
}
@@ -12180,6 +12413,12 @@ impl From<CreateServerStatement> for Statement {
1218012413
}
1218112414
}
1218212415

12416+
impl From<CreateSubscription> for Statement {
12417+
fn from(c: CreateSubscription) -> Self {
12418+
Self::CreateSubscription(c)
12419+
}
12420+
}
12421+
1218312422
impl From<CreateConnector> for Statement {
1218412423
fn from(c: CreateConnector) -> Self {
1218512424
Self::CreateConnector(c)
@@ -12216,6 +12455,12 @@ impl From<AlterFunction> for Statement {
1221612455
}
1221712456
}
1221812457

12458+
impl From<AlterSubscription> for Statement {
12459+
fn from(a: AlterSubscription) -> Self {
12460+
Self::AlterSubscription(a)
12461+
}
12462+
}
12463+
1221912464
impl From<AlterType> for Statement {
1222012465
fn from(a: AlterType) -> Self {
1222112466
Self::AlterType(a)

src/ast/spans.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ impl Spanned for Statement {
393393
Statement::DropOperatorClass(drop_operator_class) => drop_operator_class.span(),
394394
Statement::CreateSecret { .. } => Span::empty(),
395395
Statement::CreateServer { .. } => Span::empty(),
396+
Statement::CreateSubscription { .. } => Span::empty(),
396397
Statement::CreateConnector { .. } => Span::empty(),
397398
Statement::CreateOperator(create_operator) => create_operator.span(),
398399
Statement::CreateOperatorFamily(create_operator_family) => {
@@ -420,6 +421,7 @@ impl Spanned for Statement {
420421
Statement::AlterOperatorFamily { .. } => Span::empty(),
421422
Statement::AlterOperatorClass { .. } => Span::empty(),
422423
Statement::AlterRole { .. } => Span::empty(),
424+
Statement::AlterSubscription { .. } => Span::empty(),
423425
Statement::AlterSession { .. } => Span::empty(),
424426
Statement::AttachDatabase { .. } => Span::empty(),
425427
Statement::AttachDuckDBDatabase { .. } => Span::empty(),

src/keywords.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,7 @@ define_keywords!(
818818
PROGRAM,
819819
PROJECTION,
820820
PUBLIC,
821+
PUBLICATION,
821822
PURCHASE,
822823
PURGE,
823824
QUALIFY,
@@ -1006,6 +1007,7 @@ define_keywords!(
10061007
STRUCT,
10071008
SUBMULTISET,
10081009
SUBSCRIPT,
1010+
SUBSCRIPTION,
10091011
SUBSTR,
10101012
SUBSTRING,
10111013
SUBSTRING_REGEX,

0 commit comments

Comments
 (0)