From f1afcb3967283992bc7a3cc91fcfb95a7eb5f269 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 17:52:49 +0000 Subject: [PATCH 1/7] snowpipe: add message to the error --- internal/impl/snowflake/schema_evolution.go | 2 +- internal/impl/snowflake/streaming/parquet.go | 4 ++-- .../impl/snowflake/streaming/schema_errors.go | 18 ++++++++++++++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/internal/impl/snowflake/schema_evolution.go b/internal/impl/snowflake/schema_evolution.go index 5e7cad7f80..bd27fea425 100644 --- a/internal/impl/snowflake/schema_evolution.go +++ b/internal/impl/snowflake/schema_evolution.go @@ -153,7 +153,7 @@ func (o *snowpipeSchemaEvolver) CreateOutputTable(ctx context.Context, batch ser } columns := []string{} for k, v := range row { - col := streaming.NewMissingColumnError(k, v) + col := streaming.NewMissingColumnError(msg, k, v) colType, err := o.ComputeMissingColumnType(col) if err != nil { return err diff --git a/internal/impl/snowflake/streaming/parquet.go b/internal/impl/snowflake/streaming/parquet.go index a32cee4475..b21c341013 100644 --- a/internal/impl/snowflake/streaming/parquet.go +++ b/internal/impl/snowflake/streaming/parquet.go @@ -39,7 +39,7 @@ func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int idx, ok := nameToPosition[normalizeColumnName(k)] if !ok { if !allowExtraProperties && v != nil { - missingColumns = append(missingColumns, &MissingColumnError{columnName: k, val: v}) + missingColumns = append(missingColumns, NewMissingColumnError(msg, k, v)) } continue } @@ -94,7 +94,7 @@ func constructRowGroup( err = t.converter.ValidateAndConvert(s, v, b) if err != nil { if errors.Is(err, errNullValue) { - return nil, nil, &NonNullColumnError{t.column.Name} + return nil, nil, &NonNullColumnError{msg, t.column.Name} } // There is not special typed error for a validation error, there really isn't // anything we can do about it. diff --git a/internal/impl/snowflake/streaming/schema_errors.go b/internal/impl/snowflake/streaming/schema_errors.go index 595484808f..1568afd04e 100644 --- a/internal/impl/snowflake/streaming/schema_errors.go +++ b/internal/impl/snowflake/streaming/schema_errors.go @@ -13,6 +13,8 @@ package streaming import ( "errors" "fmt" + + "github.com/redpanda-data/benthos/v4/public/service" ) // SchemaMismatchError occurs when the user provided data has data that @@ -47,6 +49,7 @@ var _ SchemaMismatchError = &NonNullColumnError{} // NonNullColumnError occurs when a column with a NOT NULL constraint // gets a value with a `NULL` value. type NonNullColumnError struct { + message *service.Message columnName string } @@ -61,6 +64,11 @@ func (e *NonNullColumnError) Value() any { return nil } +// Message returns the message that caused this error +func (e *NonNullColumnError) Message() *service.Message { + return e.message +} + // Error implements the error interface func (e *NonNullColumnError) Error() string { return fmt.Sprintf("column %q has a NOT NULL constraint and recieved a nil value", e.columnName) @@ -72,13 +80,19 @@ var _ SchemaMismatchError = &MissingColumnError{} // MissingColumnError occurs when a column that is not in the table is // found on a record type MissingColumnError struct { + message *service.Message columnName string val any } // NewMissingColumnError creates a new MissingColumnError object -func NewMissingColumnError(rawName string, val any) *MissingColumnError { - return &MissingColumnError{rawName, val} +func NewMissingColumnError(message *service.Message, rawName string, val any) *MissingColumnError { + return &MissingColumnError{message, rawName, val} +} + +// Message returns the message that caused this error +func (e *MissingColumnError) Message() *service.Message { + return e.message } // ColumnName returns the column name of the data that was not in the table From 8dec78c9b7f84e1a9c94ae65b82b92ce9c14a9a4 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 18:31:05 +0000 Subject: [PATCH 2/7] snowpipe: support processors when evolving the schema --- .../pages/outputs/snowflake_streaming.adoc | 14 ++++++- .../snowflake/output_snowflake_streaming.go | 27 ++++++++++--- internal/impl/snowflake/schema_evolution.go | 40 ++++++++++++++++--- 3 files changed, 69 insertions(+), 12 deletions(-) diff --git a/docs/modules/components/pages/outputs/snowflake_streaming.adoc b/docs/modules/components/pages/outputs/snowflake_streaming.adoc index 1f058aaa8b..3eb33d2249 100644 --- a/docs/modules/components/pages/outputs/snowflake_streaming.adoc +++ b/docs/modules/components/pages/outputs/snowflake_streaming.adoc @@ -62,6 +62,7 @@ output: this == "timestamp" => "TIMESTAMP" _ => "VARIANT" } + processors: [] # No default (optional) batching: count: 0 byte_size: 0 @@ -103,6 +104,7 @@ output: this == "timestamp" => "TIMESTAMP" _ => "VARIANT" } + processors: [] # No default (optional) build_options: parallelism: 1 chunk_size: 50000 @@ -463,13 +465,23 @@ Whether schema evolution is enabled. The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `root` variable being assigned a string with the data type for the new column in Snowflake. -The input to this mapping is an object with the value and the name of the new column, for example: `{"value": 42.3, "name":"new_data_field"}" + The input to this mapping is either the output of `processors` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"} *Type*: `string` *Default*: `"root = match this.value.type() {\n this == \"string\" =\u003e \"STRING\"\n this == \"bytes\" =\u003e \"BINARY\"\n this == \"number\" =\u003e \"DOUBLE\"\n this == \"bool\" =\u003e \"BOOLEAN\"\n this == \"timestamp\" =\u003e \"TIMESTAMP\"\n _ =\u003e \"VARIANT\"\n}"` +=== `schema_evolution.processors` + +A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. + + The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}` + + +*Type*: `array` + + === `build_options` Options to optimize the time to build output data that is sent to Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`. diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 43341a7ecd..d70d8115a9 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -49,6 +49,7 @@ const ( ssoFieldSchemaEvolution = "schema_evolution" ssoFieldSchemaEvolutionEnabled = "enabled" ssoFieldSchemaEvolutionNewColumnTypeMapping = "new_column_type_mapping" + ssoFieldSchemaEvolutionProcessors = "processors" defaultSchemaEvolutionNewColumnMapping = `root = match this.value.type() { this == "string" => "STRING" @@ -117,7 +118,11 @@ ALTER TABLE t1 ADD COLUMN a2 NUMBER; service.NewBloblangField(ssoFieldSchemaEvolutionNewColumnTypeMapping).Description(` The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `+"`root`"+` variable being assigned a string with the data type for the new column in Snowflake. -The input to this mapping is an object with the value and the name of the new column, for example: `+"`"+`{"value": 42.3, "name":"new_data_field"}`+`"`).Default(defaultSchemaEvolutionNewColumnMapping), + The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Default(defaultSchemaEvolutionNewColumnMapping), + service.NewProcessorListField(ssoFieldSchemaEvolutionProcessors).Description(` +A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. + + The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`").Optional(), ).Description(`Options to control schema evolution within the pipeline as new columns are added to the pipeline.`).Optional(), service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Optional().Advanced().Deprecated(), service.NewObjectField(ssoFieldBuildOpts, @@ -398,14 +403,25 @@ func newSnowflakeStreamer( } } var schemaEvolutionMapping *bloblang.Executor + var schemaEvolutionProcessors []*service.OwnedProcessor if conf.Contains(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled) { - enabled, err := conf.FieldBool(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled) - if err == nil && enabled { - schemaEvolutionMapping, err = conf.FieldBloblang(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionNewColumnTypeMapping) - } + seConf := conf.Namespace(ssoFieldSchemaEvolution) + enabled, err := seConf.FieldBool(ssoFieldSchemaEvolutionEnabled) if err != nil { return nil, err } + if enabled { + schemaEvolutionMapping, err = seConf.FieldBloblang(ssoFieldSchemaEvolutionNewColumnTypeMapping) + if err != nil { + return nil, err + } + } + if seConf.Contains(ssoFieldSchemaEvolutionProcessors) { + schemaEvolutionProcessors, err = seConf.FieldProcessorList(ssoFieldSchemaEvolutionProcessors) + if err != nil { + return nil, err + } + } } var buildOpts streaming.BuildOptions @@ -512,6 +528,7 @@ func newSnowflakeStreamer( if schemaEvolutionMapping != nil { schemaEvolver = &snowpipeSchemaEvolver{ schemaEvolutionMapping: schemaEvolutionMapping, + pipeline: schemaEvolutionProcessors, restClient: restClient, logger: mgr.Logger(), db: db, diff --git a/internal/impl/snowflake/schema_evolution.go b/internal/impl/snowflake/schema_evolution.go index bd27fea425..0d36bb1153 100644 --- a/internal/impl/snowflake/schema_evolution.go +++ b/internal/impl/snowflake/schema_evolution.go @@ -67,18 +67,46 @@ func asSchemaMigrationError(err error) (*schemaMigrationNeededError, bool) { type snowpipeSchemaEvolver struct { schemaEvolutionMapping *bloblang.Executor + pipeline []*service.OwnedProcessor logger *service.Logger // The evolver does not close nor own this rest client. restClient *streaming.SnowflakeRestClient db, schema, table, role string } -func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingColumnError) (string, error) { - msg := service.NewMessage(nil) +func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(ctx context.Context, col *streaming.MissingColumnError) (string, error) { + msg := col.Message().Copy() + original, err := msg.AsStructuredMut() + if err != nil { + // This should never happen, we had to get the data as structured to be able to know it was a missing column type + return "", fmt.Errorf("unable to extract JSON data from message that caused schema evolution: %w", err) + } + msg.SetError(nil) // Clear error msg.SetStructuredMut(map[string]any{ - "name": col.RawName(), - "value": col.Value(), + "name": col.RawName(), + "value": col.Value(), + "message": original, + "db": o.db, + "schema": o.schema, + "table": o.table, }) + if len(o.pipeline) > 0 { + batches, err := service.ExecuteProcessors(ctx, o.pipeline, service.MessageBatch{msg}) + if err != nil { + return "", fmt.Errorf("failure to execute %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err) + } + if len(batches) != 1 { + return "", fmt.Errorf("expected a single batch output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batches)) + } + batch := batches[0] + if len(batch) != 1 { + return "", fmt.Errorf("expected a single message output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batch)) + } + msg = batch[0] + if err := msg.GetError(); err != nil { + return "", fmt.Errorf("message failure executing %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err) + } + } out, err := msg.BloblangQuery(o.schemaEvolutionMapping) if err != nil { return "", fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err) @@ -95,7 +123,7 @@ func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingC } func (o *snowpipeSchemaEvolver) MigrateMissingColumn(ctx context.Context, col *streaming.MissingColumnError) error { - columnType, err := o.ComputeMissingColumnType(col) + columnType, err := o.ComputeMissingColumnType(ctx, col) if err != nil { return err } @@ -154,7 +182,7 @@ func (o *snowpipeSchemaEvolver) CreateOutputTable(ctx context.Context, batch ser columns := []string{} for k, v := range row { col := streaming.NewMissingColumnError(msg, k, v) - colType, err := o.ComputeMissingColumnType(col) + colType, err := o.ComputeMissingColumnType(ctx, col) if err != nil { return err } From 46c67fc7903a1d846ca8c20398126a667e3ad4f8 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 18:38:15 +0000 Subject: [PATCH 3/7] update changelog --- CHANGELOG.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 016f488dda..98f8819091 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,13 +12,11 @@ All notable changes to this project will be documented in this file. - Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj) - Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj) - Processor `schema_registry_decode` now adds metadata `schema_id` for the schema's ID in the schema registry. (@rockwotj) +- Field `schema_evolution.processors` added to `snowpipe_streaming` to support side effects or enrichment during schema evolution. (@rockwotj) ### Changed - Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj) - -### Changed - - The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj) ## 4.45.1 - 2025-01-17 From 8f3805a7f7f08b7427f410d7afaad4cc55002ce0 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 20:40:04 +0000 Subject: [PATCH 4/7] snowpipe: add test for custom processors in schema evolution --- internal/impl/snowflake/integration_test.go | 51 +++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/internal/impl/snowflake/integration_test.go b/internal/impl/snowflake/integration_test.go index 813ca2b0ab..1012ac7210 100644 --- a/internal/impl/snowflake/integration_test.go +++ b/internal/impl/snowflake/integration_test.go @@ -14,6 +14,7 @@ import ( "context" "errors" "os" + "strings" "sync" "testing" @@ -84,6 +85,10 @@ func RunSQLQuery(t *testing.T, stream *service.Stream, sql string) [][]string { require.True(t, ok) client, ok := resource.(*streaming.SnowflakeRestClient) require.True(t, ok) + sql = strings.NewReplacer( + "$DATABASE", EnvOrDefault("SNOWFLAKE_DB", "BABY_DATABASE"), + "$SCHEMA", "PUBLIC", + ).Replace(sql) resp, err := client.RunSQL(context.Background(), streaming.RunSQLRequest{ Statement: sql, Database: EnvOrDefault("SNOWFLAKE_DB", "BABY_DATABASE"), @@ -271,3 +276,49 @@ snowflake_streaming: {"zoom", "4", "foo", "foo"}, }, rows) } + +func TestIntegrationSchemaEvolutionPipeline(t *testing.T) { + integration.CheckSkip(t) + produce, stream := SetupSnowflakeStream(t, ` +label: snowpipe_streaming +snowflake_streaming: + account: "${SNOWFLAKE_ACCOUNT:WQKFXQQ-WI77362}" + user: "${SNOWFLAKE_USER:ROCKWOODREDPANDA}" + role: ACCOUNTADMIN + database: "${SNOWFLAKE_DB:BABY_DATABASE}" + schema: PUBLIC + table: integration_test_pipeline + init_statement: | + DROP TABLE IF EXISTS integration_test_pipeline; + private_key_file: "${SNOWFLAKE_PRIVATE_KEY:./streaming/resources/rsa_key.p8}" + max_in_flight: 4 + channel_name: "${!this.channel}" + schema_evolution: + enabled: true + processors: + - mapping: | + root = match { + this.name == "token" => "NUMBER" + _ => "variant" + } + new_column_type_mapping: | + root = content() +`) + RunStreamInBackground(t, stream) + require.NoError(t, produce(context.Background(), Batch([]map[string]any{ + {"foo": "bar", "token": 1, "channel": "foo"}, + {"foo": "baz", "token": 2, "channel": "foo"}, + {"foo": "qux", "token": 3, "channel": "foo"}, + {"foo": "zoom", "token": 4, "channel": "foo"}, + }))) + rows := RunSQLQuery( + t, + stream, + `SELECT column_name, data_type, numeric_precision, numeric_scale FROM $DATABASE.information_schema.columns WHERE table_name = 'INTEGRATION_TEST_PIPELINE' AND table_schema = '$SCHEMA' ORDER BY column_name`, + ) + require.Equal(t, [][]string{ + {"CHANNEL", "VARIANT", "", ""}, + {"FOO", "VARIANT", "", ""}, + {"TOKEN", "NUMBER", "38", "0"}, + }, rows) +} From aefa809e426671f6840bdf9b3f7ad81639852fb2 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 20:47:24 +0000 Subject: [PATCH 5/7] snowflake: mark custom schema evolution as advanced --- .../components/pages/outputs/snowflake_streaming.adoc | 10 ---------- internal/impl/snowflake/output_snowflake_streaming.go | 4 ++-- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/docs/modules/components/pages/outputs/snowflake_streaming.adoc b/docs/modules/components/pages/outputs/snowflake_streaming.adoc index 3eb33d2249..b2a376c4c4 100644 --- a/docs/modules/components/pages/outputs/snowflake_streaming.adoc +++ b/docs/modules/components/pages/outputs/snowflake_streaming.adoc @@ -53,16 +53,6 @@ output: CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); schema_evolution: enabled: false # No default (required) - new_column_type_mapping: |- - root = match this.value.type() { - this == "string" => "STRING" - this == "bytes" => "BINARY" - this == "number" => "DOUBLE" - this == "bool" => "BOOLEAN" - this == "timestamp" => "TIMESTAMP" - _ => "VARIANT" - } - processors: [] # No default (optional) batching: count: 0 byte_size: 0 diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index d70d8115a9..5af11fffa7 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -118,11 +118,11 @@ ALTER TABLE t1 ADD COLUMN a2 NUMBER; service.NewBloblangField(ssoFieldSchemaEvolutionNewColumnTypeMapping).Description(` The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `+"`root`"+` variable being assigned a string with the data type for the new column in Snowflake. - The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Default(defaultSchemaEvolutionNewColumnMapping), + The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Default(defaultSchemaEvolutionNewColumnMapping).Advanced(), service.NewProcessorListField(ssoFieldSchemaEvolutionProcessors).Description(` A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. - The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`").Optional(), + The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`").Optional().Advanced(), ).Description(`Options to control schema evolution within the pipeline as new columns are added to the pipeline.`).Optional(), service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Optional().Advanced().Deprecated(), service.NewObjectField(ssoFieldBuildOpts, From 545acc8ceebf54039ea46914ebd19c6c634e8f5e Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 24 Jan 2025 03:40:02 +0000 Subject: [PATCH 6/7] snowpipe: deprecate the blobl mapping They can just add a processor that is a mapping, and this is generally cleaner/simpler. --- .../pages/outputs/snowflake_streaming.adoc | 39 ++++++------- internal/impl/snowflake/integration_test.go | 2 - .../snowflake/output_snowflake_streaming.go | 23 ++++---- internal/impl/snowflake/schema_evolution.go | 57 ++++++++++++------- 4 files changed, 68 insertions(+), 53 deletions(-) diff --git a/docs/modules/components/pages/outputs/snowflake_streaming.adoc b/docs/modules/components/pages/outputs/snowflake_streaming.adoc index b2a376c4c4..7feb5879e1 100644 --- a/docs/modules/components/pages/outputs/snowflake_streaming.adoc +++ b/docs/modules/components/pages/outputs/snowflake_streaming.adoc @@ -85,15 +85,6 @@ output: CREATE TABLE IF NOT EXISTS mytable (amount NUMBER); schema_evolution: enabled: false # No default (required) - new_column_type_mapping: |- - root = match this.value.type() { - this == "string" => "STRING" - this == "bytes" => "BINARY" - this == "number" => "DOUBLE" - this == "bool" => "BOOLEAN" - this == "timestamp" => "TIMESTAMP" - _ => "VARIANT" - } processors: [] # No default (optional) build_options: parallelism: 1 @@ -451,27 +442,31 @@ Whether schema evolution is enabled. *Type*: `bool` -=== `schema_evolution.new_column_type_mapping` - -The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `root` variable being assigned a string with the data type for the new column in Snowflake. - - The input to this mapping is either the output of `processors` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"} - - -*Type*: `string` - -*Default*: `"root = match this.value.type() {\n this == \"string\" =\u003e \"STRING\"\n this == \"bytes\" =\u003e \"BINARY\"\n this == \"number\" =\u003e \"DOUBLE\"\n this == \"bool\" =\u003e \"BOOLEAN\"\n this == \"timestamp\" =\u003e \"TIMESTAMP\"\n _ =\u003e \"VARIANT\"\n}"` - === `schema_evolution.processors` -A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. +A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional data to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. - The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}` + The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`. The output of these series of processors should be a single message, where the contents of the message is a string indicating the column data type to use (FLOAT, VARIANT, NUMBER(38, 0), etc. An ALTER TABLE statement will then be executed on the table in Snowflake to add the column with the corresponding data type. *Type*: `array` +```yml +# Examples + +processors: + - mapping: |- + root = match this.value.type() { + this == "string" => "STRING" + this == "bytes" => "BINARY" + this == "number" => "DOUBLE" + this == "bool" => "BOOLEAN" + this == "timestamp" => "TIMESTAMP" + _ => "VARIANT" + } +``` + === `build_options` Options to optimize the time to build output data that is sent to Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`. diff --git a/internal/impl/snowflake/integration_test.go b/internal/impl/snowflake/integration_test.go index 1012ac7210..5dd2758e45 100644 --- a/internal/impl/snowflake/integration_test.go +++ b/internal/impl/snowflake/integration_test.go @@ -301,8 +301,6 @@ snowflake_streaming: this.name == "token" => "NUMBER" _ => "variant" } - new_column_type_mapping: | - root = content() `) RunStreamInBackground(t, stream) require.NoError(t, produce(context.Background(), Batch([]map[string]any{ diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index 5af11fffa7..63d097d478 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -118,11 +118,13 @@ ALTER TABLE t1 ADD COLUMN a2 NUMBER; service.NewBloblangField(ssoFieldSchemaEvolutionNewColumnTypeMapping).Description(` The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `+"`root`"+` variable being assigned a string with the data type for the new column in Snowflake. - The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Default(defaultSchemaEvolutionNewColumnMapping).Advanced(), + The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Optional().Deprecated(), service.NewProcessorListField(ssoFieldSchemaEvolutionProcessors).Description(` -A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. +A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional data to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be. - The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`").Optional().Advanced(), + The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`. The output of these series of processors should be a single message, where the contents of the message is a string indicating the column data type to use (FLOAT, VARIANT, NUMBER(38, 0), etc. An ALTER TABLE statement will then be executed on the table in Snowflake to add the column with the corresponding data type.").Optional().Advanced().Example([]map[string]any{ + {"mapping": defaultSchemaEvolutionNewColumnMapping}, + }), ).Description(`Options to control schema evolution within the pipeline as new columns are added to the pipeline.`).Optional(), service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Optional().Advanced().Deprecated(), service.NewObjectField(ssoFieldBuildOpts, @@ -402,22 +404,23 @@ func newSnowflakeStreamer( return nil, err } } - var schemaEvolutionMapping *bloblang.Executor + var schemaEvolutionEnabled bool var schemaEvolutionProcessors []*service.OwnedProcessor + var schemaEvolutionMapping *bloblang.Executor if conf.Contains(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled) { seConf := conf.Namespace(ssoFieldSchemaEvolution) - enabled, err := seConf.FieldBool(ssoFieldSchemaEvolutionEnabled) + schemaEvolutionEnabled, err = seConf.FieldBool(ssoFieldSchemaEvolutionEnabled) if err != nil { return nil, err } - if enabled { - schemaEvolutionMapping, err = seConf.FieldBloblang(ssoFieldSchemaEvolutionNewColumnTypeMapping) + if seConf.Contains(ssoFieldSchemaEvolutionProcessors) { + schemaEvolutionProcessors, err = seConf.FieldProcessorList(ssoFieldSchemaEvolutionProcessors) if err != nil { return nil, err } } - if seConf.Contains(ssoFieldSchemaEvolutionProcessors) { - schemaEvolutionProcessors, err = seConf.FieldProcessorList(ssoFieldSchemaEvolutionProcessors) + if seConf.Contains(ssoFieldSchemaEvolutionNewColumnTypeMapping) { + schemaEvolutionMapping, err = seConf.FieldBloblang(ssoFieldSchemaEvolutionNewColumnTypeMapping) if err != nil { return nil, err } @@ -525,7 +528,7 @@ func newSnowflakeStreamer( mgr.SetGeneric(SnowflakeClientResourceForTesting, restClient) makeImpl := func(table string) (*snowpipeSchemaEvolver, service.BatchOutput) { var schemaEvolver *snowpipeSchemaEvolver - if schemaEvolutionMapping != nil { + if schemaEvolutionEnabled { schemaEvolver = &snowpipeSchemaEvolver{ schemaEvolutionMapping: schemaEvolutionMapping, pipeline: schemaEvolutionProcessors, diff --git a/internal/impl/snowflake/schema_evolution.go b/internal/impl/snowflake/schema_evolution.go index 0d36bb1153..97e068111d 100644 --- a/internal/impl/snowflake/schema_evolution.go +++ b/internal/impl/snowflake/schema_evolution.go @@ -10,10 +10,12 @@ package snowflake import ( "context" + "encoding/json" "errors" "fmt" "regexp" "strings" + "time" "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" @@ -75,6 +77,23 @@ type snowpipeSchemaEvolver struct { } func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(ctx context.Context, col *streaming.MissingColumnError) (string, error) { + if len(o.pipeline) == 0 && o.schemaEvolutionMapping == nil { + // The default mapping if not specified by a user + switch col.Value().(type) { + case []byte: + return "BINARY", nil + case string: + return "STRING", nil + case bool: + return "BOOLEAN", nil + case time.Time: + return "TIMESTAMP", nil + case json.Number, int, int64, int32, int16, int8, uint, uint64, uint32, uint16, uint8, float32, float64: + return "DOUBLE", nil + default: + return "VARIANT", nil + } + } msg := col.Message().Copy() original, err := msg.AsStructuredMut() if err != nil { @@ -90,28 +109,28 @@ func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(ctx context.Context, co "schema": o.schema, "table": o.table, }) - if len(o.pipeline) > 0 { - batches, err := service.ExecuteProcessors(ctx, o.pipeline, service.MessageBatch{msg}) + batches, err := service.ExecuteProcessors(ctx, o.pipeline, service.MessageBatch{msg}) + if err != nil { + return "", fmt.Errorf("failure to execute %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err) + } + if len(batches) != 1 { + return "", fmt.Errorf("expected a single batch output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batches)) + } + batch := batches[0] + if len(batch) != 1 { + return "", fmt.Errorf("expected a single message output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batch)) + } + msg = batch[0] + if err := msg.GetError(); err != nil { + return "", fmt.Errorf("message failure executing %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err) + } + if o.schemaEvolutionMapping != nil { + msg, err = msg.BloblangQuery(o.schemaEvolutionMapping) if err != nil { - return "", fmt.Errorf("failure to execute %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err) - } - if len(batches) != 1 { - return "", fmt.Errorf("expected a single batch output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batches)) - } - batch := batches[0] - if len(batch) != 1 { - return "", fmt.Errorf("expected a single message output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batch)) + return "", fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err) } - msg = batch[0] - if err := msg.GetError(); err != nil { - return "", fmt.Errorf("message failure executing %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err) - } - } - out, err := msg.BloblangQuery(o.schemaEvolutionMapping) - if err != nil { - return "", fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err) } - v, err := out.AsBytes() + v, err := msg.AsBytes() if err != nil { return "", fmt.Errorf("unable to extract result from new column type mapping for %s: %w", col.ColumnName(), err) } From a99a4b0b8b819bd39c738df37fd23b584c90d9ba Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Fri, 24 Jan 2025 03:44:02 +0000 Subject: [PATCH 7/7] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98f8819091..b3a7d5d054 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ All notable changes to this project will be documented in this file. - Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj) - The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj) +- Field `schema_evolution.new_column_type_mapping` for `snowpipe_streaming` is deprecated and can be replaced with `schema_evolution.processors`. (@rockwotj) ## 4.45.1 - 2025-01-17