Skip to content

Commit

Permalink
Merge pull request #3141 from rockwotj/schema_evolution
Browse files Browse the repository at this point in the history
snowpipe: schema evolution processors
  • Loading branch information
rockwotj authored Jan 24, 2025
2 parents 123f4a7 + a99a4b0 commit dc9ac0a
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 46 deletions.
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ All notable changes to this project will be documented in this file.
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj)
- Processor `schema_registry_decode` now adds metadata `schema_id` for the schema's ID in the schema registry. (@rockwotj)
- Field `schema_evolution.processors` added to `snowpipe_streaming` to support side effects or enrichment during schema evolution. (@rockwotj)

### Changed

- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj)

### Changed

- The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj)
- Field `schema_evolution.new_column_type_mapping` for `snowpipe_streaming` is deprecated and can be replaced with `schema_evolution.processors`. (@rockwotj)

## 4.45.1 - 2025-01-17

Expand Down
43 changes: 20 additions & 23 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@ output:
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
schema_evolution:
enabled: false # No default (required)
new_column_type_mapping: |-
root = match this.value.type() {
this == "string" => "STRING"
this == "bytes" => "BINARY"
this == "number" => "DOUBLE"
this == "bool" => "BOOLEAN"
this == "timestamp" => "TIMESTAMP"
_ => "VARIANT"
}
batching:
count: 0
byte_size: 0
Expand Down Expand Up @@ -94,15 +85,7 @@ output:
CREATE TABLE IF NOT EXISTS mytable (amount NUMBER);
schema_evolution:
enabled: false # No default (required)
new_column_type_mapping: |-
root = match this.value.type() {
this == "string" => "STRING"
this == "bytes" => "BINARY"
this == "number" => "DOUBLE"
this == "bool" => "BOOLEAN"
this == "timestamp" => "TIMESTAMP"
_ => "VARIANT"
}
processors: [] # No default (optional)
build_options:
parallelism: 1
chunk_size: 50000
Expand Down Expand Up @@ -459,16 +442,30 @@ Whether schema evolution is enabled.
*Type*: `bool`
=== `schema_evolution.new_column_type_mapping`
=== `schema_evolution.processors`
The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `root` variable being assigned a string with the data type for the new column in Snowflake.
A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional data to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be.
The input to this mapping is an object with the value and the name of the new column, for example: `{"value": 42.3, "name":"new_data_field"}"
The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`. The output of these series of processors should be a single message, where the contents of the message is a string indicating the column data type to use (FLOAT, VARIANT, NUMBER(38, 0), etc. An ALTER TABLE statement will then be executed on the table in Snowflake to add the column with the corresponding data type.
*Type*: `string`
*Type*: `array`
*Default*: `"root = match this.value.type() {\n this == \"string\" =\u003e \"STRING\"\n this == \"bytes\" =\u003e \"BINARY\"\n this == \"number\" =\u003e \"DOUBLE\"\n this == \"bool\" =\u003e \"BOOLEAN\"\n this == \"timestamp\" =\u003e \"TIMESTAMP\"\n _ =\u003e \"VARIANT\"\n}"`
```yml
# Examples
processors:
- mapping: |-
root = match this.value.type() {
this == "string" => "STRING"
this == "bytes" => "BINARY"
this == "number" => "DOUBLE"
this == "bool" => "BOOLEAN"
this == "timestamp" => "TIMESTAMP"
_ => "VARIANT"
}
```
=== `build_options`
Expand Down
49 changes: 49 additions & 0 deletions internal/impl/snowflake/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"errors"
"os"
"strings"
"sync"
"testing"

Expand Down Expand Up @@ -84,6 +85,10 @@ func RunSQLQuery(t *testing.T, stream *service.Stream, sql string) [][]string {
require.True(t, ok)
client, ok := resource.(*streaming.SnowflakeRestClient)
require.True(t, ok)
sql = strings.NewReplacer(
"$DATABASE", EnvOrDefault("SNOWFLAKE_DB", "BABY_DATABASE"),
"$SCHEMA", "PUBLIC",
).Replace(sql)
resp, err := client.RunSQL(context.Background(), streaming.RunSQLRequest{
Statement: sql,
Database: EnvOrDefault("SNOWFLAKE_DB", "BABY_DATABASE"),
Expand Down Expand Up @@ -271,3 +276,47 @@ snowflake_streaming:
{"zoom", "4", "foo", "foo"},
}, rows)
}

func TestIntegrationSchemaEvolutionPipeline(t *testing.T) {
integration.CheckSkip(t)
produce, stream := SetupSnowflakeStream(t, `
label: snowpipe_streaming
snowflake_streaming:
account: "${SNOWFLAKE_ACCOUNT:WQKFXQQ-WI77362}"
user: "${SNOWFLAKE_USER:ROCKWOODREDPANDA}"
role: ACCOUNTADMIN
database: "${SNOWFLAKE_DB:BABY_DATABASE}"
schema: PUBLIC
table: integration_test_pipeline
init_statement: |
DROP TABLE IF EXISTS integration_test_pipeline;
private_key_file: "${SNOWFLAKE_PRIVATE_KEY:./streaming/resources/rsa_key.p8}"
max_in_flight: 4
channel_name: "${!this.channel}"
schema_evolution:
enabled: true
processors:
- mapping: |
root = match {
this.name == "token" => "NUMBER"
_ => "variant"
}
`)
RunStreamInBackground(t, stream)
require.NoError(t, produce(context.Background(), Batch([]map[string]any{
{"foo": "bar", "token": 1, "channel": "foo"},
{"foo": "baz", "token": 2, "channel": "foo"},
{"foo": "qux", "token": 3, "channel": "foo"},
{"foo": "zoom", "token": 4, "channel": "foo"},
})))
rows := RunSQLQuery(
t,
stream,
`SELECT column_name, data_type, numeric_precision, numeric_scale FROM $DATABASE.information_schema.columns WHERE table_name = 'INTEGRATION_TEST_PIPELINE' AND table_schema = '$SCHEMA' ORDER BY column_name`,
)
require.Equal(t, [][]string{
{"CHANNEL", "VARIANT", "", ""},
{"FOO", "VARIANT", "", ""},
{"TOKEN", "NUMBER", "38", "0"},
}, rows)
}
32 changes: 26 additions & 6 deletions internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
ssoFieldSchemaEvolution = "schema_evolution"
ssoFieldSchemaEvolutionEnabled = "enabled"
ssoFieldSchemaEvolutionNewColumnTypeMapping = "new_column_type_mapping"
ssoFieldSchemaEvolutionProcessors = "processors"

defaultSchemaEvolutionNewColumnMapping = `root = match this.value.type() {
this == "string" => "STRING"
Expand Down Expand Up @@ -117,7 +118,13 @@ ALTER TABLE t1 ADD COLUMN a2 NUMBER;
service.NewBloblangField(ssoFieldSchemaEvolutionNewColumnTypeMapping).Description(`
The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `+"`root`"+` variable being assigned a string with the data type for the new column in Snowflake.
The input to this mapping is an object with the value and the name of the new column, for example: `+"`"+`{"value": 42.3, "name":"new_data_field"}`+`"`).Default(defaultSchemaEvolutionNewColumnMapping),
The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Optional().Deprecated(),
service.NewProcessorListField(ssoFieldSchemaEvolutionProcessors).Description(`
A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional data to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be.
The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`. The output of these series of processors should be a single message, where the contents of the message is a string indicating the column data type to use (FLOAT, VARIANT, NUMBER(38, 0), etc. An ALTER TABLE statement will then be executed on the table in Snowflake to add the column with the corresponding data type.").Optional().Advanced().Example([]map[string]any{
{"mapping": defaultSchemaEvolutionNewColumnMapping},
}),
).Description(`Options to control schema evolution within the pipeline as new columns are added to the pipeline.`).Optional(),
service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Optional().Advanced().Deprecated(),
service.NewObjectField(ssoFieldBuildOpts,
Expand Down Expand Up @@ -397,15 +404,27 @@ func newSnowflakeStreamer(
return nil, err
}
}
var schemaEvolutionEnabled bool
var schemaEvolutionProcessors []*service.OwnedProcessor
var schemaEvolutionMapping *bloblang.Executor
if conf.Contains(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled) {
enabled, err := conf.FieldBool(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled)
if err == nil && enabled {
schemaEvolutionMapping, err = conf.FieldBloblang(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionNewColumnTypeMapping)
}
seConf := conf.Namespace(ssoFieldSchemaEvolution)
schemaEvolutionEnabled, err = seConf.FieldBool(ssoFieldSchemaEvolutionEnabled)
if err != nil {
return nil, err
}
if seConf.Contains(ssoFieldSchemaEvolutionProcessors) {
schemaEvolutionProcessors, err = seConf.FieldProcessorList(ssoFieldSchemaEvolutionProcessors)
if err != nil {
return nil, err
}
}
if seConf.Contains(ssoFieldSchemaEvolutionNewColumnTypeMapping) {
schemaEvolutionMapping, err = seConf.FieldBloblang(ssoFieldSchemaEvolutionNewColumnTypeMapping)
if err != nil {
return nil, err
}
}
}

var buildOpts streaming.BuildOptions
Expand Down Expand Up @@ -509,9 +528,10 @@ func newSnowflakeStreamer(
mgr.SetGeneric(SnowflakeClientResourceForTesting, restClient)
makeImpl := func(table string) (*snowpipeSchemaEvolver, service.BatchOutput) {
var schemaEvolver *snowpipeSchemaEvolver
if schemaEvolutionMapping != nil {
if schemaEvolutionEnabled {
schemaEvolver = &snowpipeSchemaEvolver{
schemaEvolutionMapping: schemaEvolutionMapping,
pipeline: schemaEvolutionProcessors,
restClient: restClient,
logger: mgr.Logger(),
db: db,
Expand Down
67 changes: 57 additions & 10 deletions internal/impl/snowflake/schema_evolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ package snowflake

import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"time"

"github.com/redpanda-data/benthos/v4/public/bloblang"
"github.com/redpanda-data/benthos/v4/public/service"
Expand Down Expand Up @@ -67,23 +69,68 @@ func asSchemaMigrationError(err error) (*schemaMigrationNeededError, bool) {

type snowpipeSchemaEvolver struct {
schemaEvolutionMapping *bloblang.Executor
pipeline []*service.OwnedProcessor
logger *service.Logger
// The evolver does not close nor own this rest client.
restClient *streaming.SnowflakeRestClient
db, schema, table, role string
}

func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingColumnError) (string, error) {
msg := service.NewMessage(nil)
func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(ctx context.Context, col *streaming.MissingColumnError) (string, error) {
if len(o.pipeline) == 0 && o.schemaEvolutionMapping == nil {
// The default mapping if not specified by a user
switch col.Value().(type) {
case []byte:
return "BINARY", nil
case string:
return "STRING", nil
case bool:
return "BOOLEAN", nil
case time.Time:
return "TIMESTAMP", nil
case json.Number, int, int64, int32, int16, int8, uint, uint64, uint32, uint16, uint8, float32, float64:
return "DOUBLE", nil
default:
return "VARIANT", nil
}
}
msg := col.Message().Copy()
original, err := msg.AsStructuredMut()
if err != nil {
// This should never happen, we had to get the data as structured to be able to know it was a missing column type
return "", fmt.Errorf("unable to extract JSON data from message that caused schema evolution: %w", err)
}
msg.SetError(nil) // Clear error
msg.SetStructuredMut(map[string]any{
"name": col.RawName(),
"value": col.Value(),
"name": col.RawName(),
"value": col.Value(),
"message": original,
"db": o.db,
"schema": o.schema,
"table": o.table,
})
out, err := msg.BloblangQuery(o.schemaEvolutionMapping)
batches, err := service.ExecuteProcessors(ctx, o.pipeline, service.MessageBatch{msg})
if err != nil {
return "", fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err)
return "", fmt.Errorf("failure to execute %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err)
}
if len(batches) != 1 {
return "", fmt.Errorf("expected a single batch output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batches))
}
batch := batches[0]
if len(batch) != 1 {
return "", fmt.Errorf("expected a single message output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batch))
}
msg = batch[0]
if err := msg.GetError(); err != nil {
return "", fmt.Errorf("message failure executing %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err)
}
if o.schemaEvolutionMapping != nil {
msg, err = msg.BloblangQuery(o.schemaEvolutionMapping)
if err != nil {
return "", fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err)
}
}
v, err := out.AsBytes()
v, err := msg.AsBytes()
if err != nil {
return "", fmt.Errorf("unable to extract result from new column type mapping for %s: %w", col.ColumnName(), err)
}
Expand All @@ -95,7 +142,7 @@ func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingC
}

func (o *snowpipeSchemaEvolver) MigrateMissingColumn(ctx context.Context, col *streaming.MissingColumnError) error {
columnType, err := o.ComputeMissingColumnType(col)
columnType, err := o.ComputeMissingColumnType(ctx, col)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,8 +200,8 @@ func (o *snowpipeSchemaEvolver) CreateOutputTable(ctx context.Context, batch ser
}
columns := []string{}
for k, v := range row {
col := streaming.NewMissingColumnError(k, v)
colType, err := o.ComputeMissingColumnType(col)
col := streaming.NewMissingColumnError(msg, k, v)
colType, err := o.ComputeMissingColumnType(ctx, col)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/snowflake/streaming/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int
idx, ok := nameToPosition[normalizeColumnName(k)]
if !ok {
if !allowExtraProperties && v != nil {
missingColumns = append(missingColumns, &MissingColumnError{columnName: k, val: v})
missingColumns = append(missingColumns, NewMissingColumnError(msg, k, v))
}
continue
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func constructRowGroup(
err = t.converter.ValidateAndConvert(s, v, b)
if err != nil {
if errors.Is(err, errNullValue) {
return nil, nil, &NonNullColumnError{t.column.Name}
return nil, nil, &NonNullColumnError{msg, t.column.Name}
}
// There is not special typed error for a validation error, there really isn't
// anything we can do about it.
Expand Down
18 changes: 16 additions & 2 deletions internal/impl/snowflake/streaming/schema_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package streaming
import (
"errors"
"fmt"

"github.com/redpanda-data/benthos/v4/public/service"
)

// SchemaMismatchError occurs when the user provided data has data that
Expand Down Expand Up @@ -47,6 +49,7 @@ var _ SchemaMismatchError = &NonNullColumnError{}
// NonNullColumnError occurs when a column with a NOT NULL constraint
// gets a value with a `NULL` value.
type NonNullColumnError struct {
message *service.Message
columnName string
}

Expand All @@ -61,6 +64,11 @@ func (e *NonNullColumnError) Value() any {
return nil
}

// Message returns the message that caused this error
func (e *NonNullColumnError) Message() *service.Message {
return e.message
}

// Error implements the error interface
func (e *NonNullColumnError) Error() string {
return fmt.Sprintf("column %q has a NOT NULL constraint and recieved a nil value", e.columnName)
Expand All @@ -72,13 +80,19 @@ var _ SchemaMismatchError = &MissingColumnError{}
// MissingColumnError occurs when a column that is not in the table is
// found on a record
type MissingColumnError struct {
message *service.Message
columnName string
val any
}

// NewMissingColumnError creates a new MissingColumnError object
func NewMissingColumnError(rawName string, val any) *MissingColumnError {
return &MissingColumnError{rawName, val}
func NewMissingColumnError(message *service.Message, rawName string, val any) *MissingColumnError {
return &MissingColumnError{message, rawName, val}
}

// Message returns the message that caused this error
func (e *MissingColumnError) Message() *service.Message {
return e.message
}

// ColumnName returns the column name of the data that was not in the table
Expand Down

0 comments on commit dc9ac0a

Please sign in to comment.