Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add support for CREATE TABLE ... FROM WEBHOOK #30821

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions misc/python/materialize/checks/all_checks/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def initialize(self) -> Testdrive:

> CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS;

> CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES;
>[version<12800] CREATE SOURCE webhook_bytes FROM WEBHOOK BODY FORMAT BYTES;

>[version>=12800] CREATE TABLE webhook_bytes FROM WEBHOOK BODY FORMAT BYTES;

$ webhook-append database=materialize schema=public name=webhook_text
fooä
Expand Down Expand Up @@ -114,8 +116,11 @@ def validate(self) -> Testdrive:
> SHOW CREATE SOURCE webhook_json
materialize.public.webhook_json "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_json\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS"

> SHOW CREATE SOURCE webhook_bytes
>[version<12800] SHOW CREATE SOURCE webhook_bytes
materialize.public.webhook_bytes "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_bytes\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT BYTES"

>[version>=12800] SHOW CREATE SOURCE webhook_bytes
materialize.public.webhook_bytes "CREATE TABLE \\"materialize\\".\\"public\\".\\"webhook_bytes\\" FROM WEBHOOK BODY FORMAT BYTES"
"""
)
)
21 changes: 16 additions & 5 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,11 @@ impl<T: AstInfo> AstDisplay for ValidateConnectionStatement<T> {
}
impl_display_t!(ValidateConnectionStatement);

/// `CREATE SOURCE <name> FROM WEBHOOK`
/// `CREATE (SOURCE | TABLE) <name> FROM WEBHOOK`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CreateWebhookSourceStatement<T: AstInfo> {
pub name: UnresolvedItemName,
pub is_table: bool,
pub if_not_exists: bool,
pub body_format: Format<T>,
pub include_headers: CreateWebhookSourceIncludeHeaders,
Expand All @@ -746,15 +747,25 @@ pub struct CreateWebhookSourceStatement<T: AstInfo> {

impl<T: AstInfo> AstDisplay for CreateWebhookSourceStatement<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("CREATE SOURCE ");
f.write_str("CREATE ");

if self.is_table {
f.write_str("TABLE ");
} else {
f.write_str("SOURCE ");
}

if self.if_not_exists {
f.write_str("IF NOT EXISTS ");
}
f.write_node(&self.name);

if let Some(cluster_name) = &self.in_cluster {
f.write_str(" IN CLUSTER ");
f.write_node(cluster_name);
// CREATE TABLE ... FROM WEBHOOK does not support specifying a cluster.
if !self.is_table {
if let Some(cluster_name) = &self.in_cluster {
f.write_str(" IN CLUSTER ");
f.write_node(cluster_name);
}
}

f.write_str(" FROM WEBHOOK ");
Expand Down
19 changes: 13 additions & 6 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,9 @@ impl<'a> Parser<'a> {
|| self.peek_keywords(&[TEMP, TABLE])
|| self.peek_keywords(&[TEMPORARY, TABLE])
{
if self.peek_keywords_lookahead(&[FROM, SOURCE]) {
if self.peek_keywords_lookahead(&[FROM, SOURCE])
|| self.peek_keywords_lookahead(&[FROM, WEBHOOK])
{
self.parse_create_table_from_source()
.map_parser_err(StatementKind::CreateTableFromSource)
} else {
Expand Down Expand Up @@ -2811,8 +2813,8 @@ impl<'a> Parser<'a> {
self.expect_keyword(FROM)?;

// Webhook Source, which works differently than all other sources.
if self.peek_keyword(WEBHOOK) {
return self.parse_create_webhook_source(name, if_not_exists, in_cluster);
if self.parse_keyword(WEBHOOK) {
return self.parse_create_webhook_source(name, if_not_exists, in_cluster, false);
}

let connection = self.parse_create_source_connection()?;
Expand Down Expand Up @@ -2981,10 +2983,8 @@ impl<'a> Parser<'a> {
name: UnresolvedItemName,
if_not_exists: bool,
in_cluster: Option<RawClusterName>,
is_table: bool,
) -> Result<Statement<Raw>, ParserError> {
// Consume this keyword because we only peeked it earlier.
self.expect_keyword(WEBHOOK)?;

self.expect_keywords(&[BODY, FORMAT])?;

// Note: we don't use `parse_format()` here because we support fewer formats than other
Expand Down Expand Up @@ -3055,6 +3055,7 @@ impl<'a> Parser<'a> {
Ok(Statement::CreateWebhookSource(
CreateWebhookSourceStatement {
name,
is_table,
if_not_exists,
body_format,
include_headers,
Expand Down Expand Up @@ -4767,6 +4768,12 @@ impl<'a> Parser<'a> {
self.expect_keyword(TABLE)?;
let if_not_exists = self.parse_if_not_exists()?;
let table_name = self.parse_item_name()?;

if self.parse_keywords(&[FROM, WEBHOOK]) {
// Webhook Source, which works differently than all other sources.
return self.parse_create_webhook_source(table_name, if_not_exists, None, true);
}

let (columns, constraints) = self.parse_table_from_source_columns()?;

self.expect_keywords(&[FROM, SOURCE])?;
Expand Down
Loading
Loading