diff --git a/misc/python/materialize/checks/all_checks/webhook.py b/misc/python/materialize/checks/all_checks/webhook.py index e1ea0eedc5154..2ae4b805e571c 100644 --- a/misc/python/materialize/checks/all_checks/webhook.py +++ b/misc/python/materialize/checks/all_checks/webhook.py @@ -29,7 +29,9 @@ def initialize(self) -> Testdrive: > CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS; - > CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES; + >[version<12800] CREATE SOURCE webhook_bytes FROM WEBHOOK BODY FORMAT BYTES; + + >[version>=12800] CREATE TABLE webhook_bytes FROM WEBHOOK BODY FORMAT BYTES; $ webhook-append database=materialize schema=public name=webhook_text fooƤ @@ -114,8 +116,11 @@ def validate(self) -> Testdrive: > SHOW CREATE SOURCE webhook_json materialize.public.webhook_json "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_json\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS" - > SHOW CREATE SOURCE webhook_bytes + >[version<12800] SHOW CREATE SOURCE webhook_bytes materialize.public.webhook_bytes "CREATE SOURCE \\"materialize\\".\\"public\\".\\"webhook_bytes\\" IN CLUSTER \\"webhook_cluster\\" FROM WEBHOOK BODY FORMAT BYTES" + + >[version>=12800] SHOW CREATE SOURCE webhook_bytes + materialize.public.webhook_bytes "CREATE TABLE \\"materialize\\".\\"public\\".\\"webhook_bytes\\" FROM WEBHOOK BODY FORMAT BYTES" """ ) ) diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 4452d9f1d3543..bc6b94fee343f 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -733,10 +733,11 @@ impl AstDisplay for ValidateConnectionStatement { } impl_display_t!(ValidateConnectionStatement); -/// `CREATE SOURCE FROM WEBHOOK` +/// `CREATE (SOURCE | TABLE) FROM WEBHOOK` #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CreateWebhookSourceStatement { pub name: UnresolvedItemName, + pub is_table: bool, pub if_not_exists: bool, pub body_format: Format, pub include_headers: CreateWebhookSourceIncludeHeaders, @@ -746,15 +747,25 @@ pub struct CreateWebhookSourceStatement { impl AstDisplay for CreateWebhookSourceStatement { fn fmt(&self, f: &mut AstFormatter) { - f.write_str("CREATE SOURCE "); + f.write_str("CREATE "); + + if self.is_table { + f.write_str("TABLE "); + } else { + f.write_str("SOURCE "); + } + if self.if_not_exists { f.write_str("IF NOT EXISTS "); } f.write_node(&self.name); - if let Some(cluster_name) = &self.in_cluster { - f.write_str(" IN CLUSTER "); - f.write_node(cluster_name); + // CREATE TABLE ... FROM WEBHOOK does not support specifying a cluster. + if !self.is_table { + if let Some(cluster_name) = &self.in_cluster { + f.write_str(" IN CLUSTER "); + f.write_node(cluster_name); + } } f.write_str(" FROM WEBHOOK "); diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index b1207db6d650c..7f52e53dd78b7 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -1871,7 +1871,9 @@ impl<'a> Parser<'a> { || self.peek_keywords(&[TEMP, TABLE]) || self.peek_keywords(&[TEMPORARY, TABLE]) { - if self.peek_keywords_lookahead(&[FROM, SOURCE]) { + if self.peek_keywords_lookahead(&[FROM, SOURCE]) + || self.peek_keywords_lookahead(&[FROM, WEBHOOK]) + { self.parse_create_table_from_source() .map_parser_err(StatementKind::CreateTableFromSource) } else { @@ -2811,8 +2813,8 @@ impl<'a> Parser<'a> { self.expect_keyword(FROM)?; // Webhook Source, which works differently than all other sources. - if self.peek_keyword(WEBHOOK) { - return self.parse_create_webhook_source(name, if_not_exists, in_cluster); + if self.parse_keyword(WEBHOOK) { + return self.parse_create_webhook_source(name, if_not_exists, in_cluster, false); } let connection = self.parse_create_source_connection()?; @@ -2981,10 +2983,8 @@ impl<'a> Parser<'a> { name: UnresolvedItemName, if_not_exists: bool, in_cluster: Option, + is_table: bool, ) -> Result, ParserError> { - // Consume this keyword because we only peeked it earlier. - self.expect_keyword(WEBHOOK)?; - self.expect_keywords(&[BODY, FORMAT])?; // Note: we don't use `parse_format()` here because we support fewer formats than other @@ -3055,6 +3055,7 @@ impl<'a> Parser<'a> { Ok(Statement::CreateWebhookSource( CreateWebhookSourceStatement { name, + is_table, if_not_exists, body_format, include_headers, @@ -4767,6 +4768,12 @@ impl<'a> Parser<'a> { self.expect_keyword(TABLE)?; let if_not_exists = self.parse_if_not_exists()?; let table_name = self.parse_item_name()?; + + if self.parse_keywords(&[FROM, WEBHOOK]) { + // Webhook Source, which works differently than all other sources. + return self.parse_create_webhook_source(table_name, if_not_exists, None, true); + } + let (columns, constraints) = self.parse_table_from_source_columns()?; self.expect_keywords(&[FROM, SOURCE])?; diff --git a/src/sql-parser/tests/testdata/create b/src/sql-parser/tests/testdata/create index 07676aecbdb8f..105fa9eb8f282 100644 --- a/src/sql-parser/tests/testdata/create +++ b/src/sql-parser/tests/testdata/create @@ -260,21 +260,21 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT J ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON ARRAY INCLUDE HEADERS ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON ARRAY INCLUDE HEADERS => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: true }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: true }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS ( 'x-signature' ) ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS ('x-signature') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -283,7 +283,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS ('x-signature', 'event-timestamp') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: false, header_name: "event-timestamp" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: false, header_name: "event-timestamp" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -292,7 +292,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS ('x-signature', NOT 'event-timestamp', 'x-another-one') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: true, header_name: "event-timestamp" }, CreateWebhookSourceFilterHeader { block: false, header_name: "x-another-one" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: true, header_name: "event-timestamp" }, CreateWebhookSourceFilterHeader { block: false, header_name: "x-another-one" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -302,7 +302,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS ('x-signature', 'x-another-one', NOT 'x-auth', NOT 'x-authorization') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: false, header_name: "x-another-one" }, CreateWebhookSourceFilterHeader { block: true, header_name: "x-auth" }, CreateWebhookSourceFilterHeader { block: true, header_name: "x-authorization" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([CreateWebhookSourceFilterHeader { block: false, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: false, header_name: "x-another-one" }, CreateWebhookSourceFilterHeader { block: true, header_name: "x-auth" }, CreateWebhookSourceFilterHeader { block: true, header_name: "x-authorization" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -313,7 +313,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADER 'x-timestamp' AS x_timestamp INCLUDE HEADER 'hash' AS hash BYTES INCLUDE HEADERS (NOT 'x-signature', 'x-another-one') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [CreateWebhookSourceMapHeader { header_name: "x-timestamp", column_name: Ident("x_timestamp"), use_bytes: false }, CreateWebhookSourceMapHeader { header_name: "hash", column_name: Ident("hash"), use_bytes: true }], column: Some([CreateWebhookSourceFilterHeader { block: true, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: false, header_name: "x-another-one" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [CreateWebhookSourceMapHeader { header_name: "x-timestamp", column_name: Ident("x_timestamp"), use_bytes: false }, CreateWebhookSourceMapHeader { header_name: "hash", column_name: Ident("hash"), use_bytes: true }], column: Some([CreateWebhookSourceFilterHeader { block: true, header_name: "x-signature" }, CreateWebhookSourceFilterHeader { block: false, header_name: "x-another-one" }]) }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -323,7 +323,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADER 'x-signature' AS x_signature INCLUDE HEADER 'x-bytes' AS bytes BYTES => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [CreateWebhookSourceMapHeader { header_name: "x-signature", column_name: Ident("x_signature"), use_bytes: false }, CreateWebhookSourceMapHeader { header_name: "x-bytes", column_name: Ident("bytes"), use_bytes: true }], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [CreateWebhookSourceMapHeader { header_name: "x-signature", column_name: Ident("x_signature"), use_bytes: false }, CreateWebhookSourceMapHeader { header_name: "x-bytes", column_name: Ident("bytes"), use_bytes: true }], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -332,7 +332,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADER 'x-case-sensitive' AS "caseSensitive" BYTES => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [CreateWebhookSourceMapHeader { header_name: "x-case-sensitive", column_name: Ident("caseSensitive"), use_bytes: true }], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [CreateWebhookSourceMapHeader { header_name: "x-case-sensitive", column_name: Ident("caseSensitive"), use_bytes: true }], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -348,21 +348,21 @@ CREATE SOURCE IF NOT EXISTS webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE IF NOT EXISTS webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_text")]), if_not_exists: true, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_text")]), is_table: false, if_not_exists: true, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json_no_headers IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON ---- CREATE SOURCE webhook_json_no_headers IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json_no_headers")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json_no_headers")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES ---- CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_bytes")]), if_not_exists: false, body_format: Bytes, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_bytes")]), is_table: false, if_not_exists: false, body_format: Bytes, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_proto IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT PROTOBUF INCLUDE HEADERS @@ -383,14 +383,14 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT J ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (headers['signature'] = 'test') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: None, using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: None, using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK ( headers['signature'] = hmac(sha256, 'body=' || body) ) ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (headers['signature'] = hmac(sha256, 'body=' || body)) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: None, using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Function(Function { name: Name(UnresolvedItemName([Ident("hmac")])), args: Args { args: [Identifier([Ident("sha256")]), Op { op: Op { namespace: None, op: "||" }, expr1: Value(String("body=")), expr2: Some(Identifier([Ident("body")])) }], order_by: [] }, filter: None, over: None, distinct: false })) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: None, using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Function(Function { name: Name(UnresolvedItemName([Ident("hmac")])), args: Args { args: [Identifier([Ident("sha256")]), Op { op: Op { namespace: None, op: "||" }, expr1: Value(String("body=")), expr2: Some(Identifier([Ident("body")])) }], order_by: [] }, filter: None, over: None, distinct: false })) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -402,7 +402,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET test_key) headers['signature'] = 'test') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -417,7 +417,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET test_key, SECRET other_key) headers['signature'] = 'test') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: None, use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: None, use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -432,7 +432,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET test_key AS foo, SECRET other_key) headers['signature'] = 'test') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: Some(Ident("foo")), use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: Some(Ident("foo")), use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -462,7 +462,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET test_key AS bar, SECRET other_key) headers['signature'] = 'test') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: Some(Ident("bar")), use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("test_key")])), alias: Some(Ident("bar")), use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: None, use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -474,7 +474,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET bytes_key BYTES) headers['signature'] = bytes_key) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("bytes_key")])), alias: None, use_bytes: true }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("bytes_key")])), alias: None, use_bytes: true }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -486,7 +486,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET bytes_key AS bytes) headers['signature'] = bytes_key) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("bytes_key")])), alias: Some(Ident("bytes")), use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("bytes_key")])), alias: Some(Ident("bytes")), use_bytes: false }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -498,7 +498,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET bytes_key AS bytes BYTES) headers['signature'] = bytes_key) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("bytes_key")])), alias: Some(Ident("bytes")), use_bytes: true }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("bytes_key")])), alias: Some(Ident("bytes")), use_bytes: true }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -510,7 +510,7 @@ CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON CHECK (WITH (SECRET secret_key, SECRET other_key AS foo BYTES) headers['signature'] = bytes_key) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("secret_key")])), alias: None, use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: Some(Ident("foo")), use_bytes: true }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_json")]), is_table: false, if_not_exists: false, body_format: Json { array: false }, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("secret_key")])), alias: None, use_bytes: false }, CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("other_key")])), alias: Some(Ident("foo")), use_bytes: true }], headers: [], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("bytes_key")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK @@ -550,21 +550,21 @@ CREATE SOURCE webhook_no_cluster FROM WEBHOOK BODY FORMAT TEXT ---- CREATE SOURCE webhook_no_cluster FROM WEBHOOK BODY FORMAT TEXT => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_no_cluster")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: None }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_no_cluster")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: None }) parse-statement CREATE SOURCE webhook_include_headers_no_cluster FROM WEBHOOK BODY FORMAT TEXT INCLUDE HEADERS ---- CREATE SOURCE webhook_include_headers_no_cluster FROM WEBHOOK BODY FORMAT TEXT INCLUDE HEADERS => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_include_headers_no_cluster")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([]) }, validate_using: None, in_cluster: None }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_include_headers_no_cluster")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: Some([]) }, validate_using: None, in_cluster: None }) parse-statement CREATE SOURCE webhook_validation_no_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK ( headers['signature'] = 'test' ) ---- CREATE SOURCE webhook_validation_no_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (headers['signature'] = 'test') => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_validation_no_cluster")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: None, using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: None }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_validation_no_cluster")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: None, using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Value(String("test"))) } }), in_cluster: None }) parse-statement CREATE SOURCE webhook_with_headers_and_body IN CLUSTER webhook_cluster FROM WEBHOOK @@ -576,7 +576,7 @@ CREATE SOURCE webhook_with_headers_and_body IN CLUSTER webhook_cluster FROM WEBH ---- CREATE SOURCE webhook_with_headers_and_body IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (HEADERS, BODY) headers['signature'] = body) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers_and_body")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [CreateWebhookSourceHeader { alias: None, use_bytes: false }], bodies: [CreateWebhookSourceBody { alias: None, use_bytes: false }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers_and_body")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [CreateWebhookSourceHeader { alias: None, use_bytes: false }], bodies: [CreateWebhookSourceBody { alias: None, use_bytes: false }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK @@ -588,7 +588,7 @@ CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (HEADERS AS h1) headers['signature'] = body) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [CreateWebhookSourceHeader { alias: Some(Ident("h1")), use_bytes: false }], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [CreateWebhookSourceHeader { alias: Some(Ident("h1")), use_bytes: false }], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK @@ -600,7 +600,7 @@ CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (HEADERS AS h1, SECRET my_secret) headers['signature'] = body) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("my_secret")])), alias: None, use_bytes: false }], headers: [CreateWebhookSourceHeader { alias: Some(Ident("h1")), use_bytes: false }], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("my_secret")])), alias: None, use_bytes: false }], headers: [CreateWebhookSourceHeader { alias: Some(Ident("h1")), use_bytes: false }], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK @@ -612,7 +612,7 @@ CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (BODY, BODY AS b2 BYTES) headers['signature'] = body) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [], bodies: [CreateWebhookSourceBody { alias: None, use_bytes: false }, CreateWebhookSourceBody { alias: Some(Ident("b2")), use_bytes: true }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [], bodies: [CreateWebhookSourceBody { alias: None, use_bytes: false }, CreateWebhookSourceBody { alias: Some(Ident("b2")), use_bytes: true }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_with_headers_thrice IN CLUSTER webhook_cluster FROM WEBHOOK @@ -624,7 +624,7 @@ CREATE SOURCE webhook_with_headers_thrice IN CLUSTER webhook_cluster FROM WEBHOO ---- CREATE SOURCE webhook_with_headers_thrice IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (HEADERS AS headers_bytes BYTES, HEADERS AS other_headers, HEADERS) headers['signature'] = body) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers_thrice")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [CreateWebhookSourceHeader { alias: Some(Ident("headers_bytes")), use_bytes: true }, CreateWebhookSourceHeader { alias: Some(Ident("other_headers")), use_bytes: false }, CreateWebhookSourceHeader { alias: None, use_bytes: false }], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers_thrice")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [], headers: [CreateWebhookSourceHeader { alias: Some(Ident("headers_bytes")), use_bytes: true }, CreateWebhookSourceHeader { alias: Some(Ident("other_headers")), use_bytes: false }, CreateWebhookSourceHeader { alias: None, use_bytes: false }], bodies: [] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK @@ -636,7 +636,7 @@ CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK ---- CREATE SOURCE webhook_with_headers IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (BODY AS b2 BYTES, SECRET kool_secret BYTES) headers['signature'] = body) => -CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("kool_secret")])), alias: None, use_bytes: true }], headers: [], bodies: [CreateWebhookSourceBody { alias: Some(Ident("b2")), use_bytes: true }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), is_table: false, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("kool_secret")])), alias: None, use_bytes: true }], headers: [], bodies: [CreateWebhookSourceBody { alias: Some(Ident("b2")), use_bytes: true }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: Some(Unresolved(Ident("webhook_cluster"))) }) parse-statement CREATE SOURCE webhook_invalid_with IN CLUSTER webhook_cluster FROM WEBHOOK @@ -650,6 +650,25 @@ error: Expected right parenthesis, found BODY WITH (SECRET kool_secret BODY) ^ +parse-statement +CREATE TABLE my_webhook FROM WEBHOOK BODY FORMAT TEXT +---- +CREATE TABLE my_webhook FROM WEBHOOK BODY FORMAT TEXT +=> +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("my_webhook")]), is_table: true, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: None, in_cluster: None }) + +parse-statement +CREATE TABLE webhook_with_headers FROM WEBHOOK + BODY FORMAT TEXT + CHECK ( + WITH (SECRET kool_secret BYTES, BODY AS b2 BYTES) + headers['signature'] = body + ) +---- +CREATE TABLE webhook_with_headers FROM WEBHOOK BODY FORMAT TEXT CHECK (WITH (BODY AS b2 BYTES, SECRET kool_secret BYTES) headers['signature'] = body) +=> +CreateWebhookSource(CreateWebhookSourceStatement { name: UnresolvedItemName([Ident("webhook_with_headers")]), is_table: true, if_not_exists: false, body_format: Text, include_headers: CreateWebhookSourceIncludeHeaders { mappings: [], column: None }, validate_using: Some(CreateWebhookSourceCheck { options: Some(CreateWebhookSourceCheckOptions { secrets: [CreateWebhookSourceSecret { secret: Name(UnresolvedItemName([Ident("kool_secret")])), alias: None, use_bytes: true }], headers: [], bodies: [CreateWebhookSourceBody { alias: Some(Ident("b2")), use_bytes: true }] }), using: Op { op: Op { namespace: None, op: "=" }, expr1: Subscript { expr: Identifier([Ident("headers")]), positions: [SubscriptPosition { start: Some(Value(String("signature"))), end: None, explicit_slice: false }] }, expr2: Some(Identifier([Ident("body")])) } }), in_cluster: None }) + parse-statement CREATE DATABASE IF NOT EXISTS db ---- diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index 417be4fc8b2a6..2013e9d220261 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -348,6 +348,7 @@ pub fn create_statement( Statement::CreateWebhookSource(CreateWebhookSourceStatement { name, + is_table: _, if_not_exists, include_headers: _, body_format: _, diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 99887e5338d4e..5a2238a986524 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -159,7 +159,8 @@ use crate::plan::{ }; use crate::session::vars::{ self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY, - ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, ENABLE_REFRESH_EVERY_MVS, + ENABLE_CREATE_TABLE_FROM_SOURCE, ENABLE_KAFKA_SINK_HEADERS, ENABLE_KAFKA_SINK_PARTITION_BY, + ENABLE_REFRESH_EVERY_MVS, }; use crate::{names, parse}; @@ -491,6 +492,10 @@ pub fn plan_create_webhook_source( scx: &StatementContext, mut stmt: CreateWebhookSourceStatement, ) -> Result { + if stmt.is_table { + scx.require_feature_flag(&ENABLE_CREATE_TABLE_FROM_SOURCE)?; + } + // We will rewrite the cluster if one is not provided, so we must use the `in_cluster` value // we plan to normalize when we canonicalize the create statement. let in_cluster = source_sink_cluster_config(scx, "source", &mut stmt.in_cluster)?; @@ -505,6 +510,8 @@ pub fn plan_create_webhook_source( validate_using, // We resolved `in_cluster` above, so we want to ignore it here. in_cluster: _, + // Whether or not we created a webhook as a source or table is irrelevant here. + is_table: _, } = stmt; let validate_using = validate_using diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 55971bb8aed12..6b40c5948b40f 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1213,7 +1213,6 @@ impl SystemVars { &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL, &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER, &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION, - &ENABLE_CREATE_TABLE_FROM_SOURCE, &FORCE_SOURCE_TABLE_SYNTAX, &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD, ]; @@ -2189,10 +2188,6 @@ impl SystemVars { *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION) } - pub fn enable_create_table_from_source(&self) -> bool { - *self.expect_value(&ENABLE_CREATE_TABLE_FROM_SOURCE) - } - pub fn force_source_table_syntax(&self) -> bool { *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX) } diff --git a/src/sql/src/session/vars/definitions.rs b/src/sql/src/session/vars/definitions.rs index af1a0429f4115..fbbcbf3c05e55 100644 --- a/src/sql/src/session/vars/definitions.rs +++ b/src/sql/src/session/vars/definitions.rs @@ -1525,13 +1525,6 @@ pub static NETWORK_POLICY: VarDefinition = VarDefinition::new_lazy( true, ); -pub static ENABLE_CREATE_TABLE_FROM_SOURCE: VarDefinition = VarDefinition::new( - "enable_create_table_from_source", - value!(bool; false), - "Whether to allow CREATE TABLE .. FROM SOURCE syntax.", - true, -); - pub static FORCE_SOURCE_TABLE_SYNTAX: VarDefinition = VarDefinition::new( "force_source_table_syntax", value!(bool; false), @@ -2203,6 +2196,12 @@ feature_flags!( default: false, enable_for_item_parsing: true, }, + { + name: enable_create_table_from_source, + desc: "Whether to allow CREATE TABLE .. FROM SOURCE syntax.", + default: false, + enable_for_item_parsing: true, + }, ); impl From<&super::SystemVars> for OptimizerFeatures { diff --git a/test/testdrive/session.td b/test/testdrive/session.td index fb62983975e7d..860a3c8e9d377 100644 --- a/test/testdrive/session.td +++ b/test/testdrive/session.td @@ -83,7 +83,6 @@ transaction_isolation "strict serializable" "Sets the curre unsafe_new_transaction_wall_time "" "Sets the wall time for all new explicit or implicit transactions to control the value of `now()`. If not set, uses the system's clock." welcome_message on "Whether to send a notice with a welcome message after a successful connection (Materialize)." enable_consolidate_after_union_negate on "consolidation after Unions that have a Negated input (Materialize)." -enable_create_table_from_source on "Whether to allow CREATE TABLE .. FROM SOURCE syntax." force_source_table_syntax off "Force use of new source model (CREATE TABLE .. FROM SOURCE) and migrate existing sources" > SET application_name = 'foo' diff --git a/test/testdrive/webhook.td b/test/testdrive/webhook.td index a8d83404ddaf3..36632500fe2c4 100644 --- a/test/testdrive/webhook.td +++ b/test/testdrive/webhook.td @@ -540,3 +540,11 @@ SHOW cluster; > SELECT c.name = '${current-cluster}' FROM mz_clusters c JOIN mz_sources s ON c.id = s.cluster_id WHERE s.name = 'webhook_in_current'; true + +> CREATE TABLE webhook_as_table FROM WEBHOOK BODY FORMAT TEXT; + +$ webhook-append name=webhook_as_table +aaa_1 + +> SELECT * FROM webhook_as_table; +aaa_1