Skip to content

Commit a8b4a34

Browse files
committed
snowpipe: support processors when evolving the schema
1 parent 874ee25 commit a8b4a34

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

docs/modules/components/pages/outputs/snowflake_streaming.adoc

+13-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ output:
6262
this == "timestamp" => "TIMESTAMP"
6363
_ => "VARIANT"
6464
}
65+
processors: [] # No default (optional)
6566
batching:
6667
count: 0
6768
byte_size: 0
@@ -103,6 +104,7 @@ output:
103104
this == "timestamp" => "TIMESTAMP"
104105
_ => "VARIANT"
105106
}
107+
processors: [] # No default (optional)
106108
build_options:
107109
parallelism: 1
108110
chunk_size: 50000
@@ -463,13 +465,23 @@ Whether schema evolution is enabled.
463465
464466
The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `root` variable being assigned a string with the data type for the new column in Snowflake.
465467
466-
The input to this mapping is an object with the value and the name of the new column, for example: `{"value": 42.3, "name":"new_data_field"}"
468+
The input to this mapping is either the output of `processors` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}
467469
468470
469471
*Type*: `string`
470472
471473
*Default*: `"root = match this.value.type() {\n this == \"string\" =\u003e \"STRING\"\n this == \"bytes\" =\u003e \"BINARY\"\n this == \"number\" =\u003e \"DOUBLE\"\n this == \"bool\" =\u003e \"BOOLEAN\"\n this == \"timestamp\" =\u003e \"TIMESTAMP\"\n _ =\u003e \"VARIANT\"\n}"`
472474
475+
=== `schema_evolution.processors`
476+
477+
A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be.
478+
479+
The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`
480+
481+
482+
*Type*: `array`
483+
484+
473485
=== `build_options`
474486
475487
Options to optimize the time to build output data that is sent to Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.

internal/impl/snowflake/output_snowflake_streaming.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ const (
4949
ssoFieldSchemaEvolution = "schema_evolution"
5050
ssoFieldSchemaEvolutionEnabled = "enabled"
5151
ssoFieldSchemaEvolutionNewColumnTypeMapping = "new_column_type_mapping"
52+
ssoFieldSchemaEvolutionProcessors = "processors"
5253

5354
defaultSchemaEvolutionNewColumnMapping = `root = match this.value.type() {
5455
this == "string" => "STRING"
@@ -117,7 +118,11 @@ ALTER TABLE t1 ADD COLUMN a2 NUMBER;
117118
service.NewBloblangField(ssoFieldSchemaEvolutionNewColumnTypeMapping).Description(`
118119
The mapping function from Redpanda Connect type to column type in Snowflake. Overriding this can allow for customization of the datatype if there is specific information that you know about the data types in use. This mapping should result in the `+"`root`"+` variable being assigned a string with the data type for the new column in Snowflake.
119120
120-
The input to this mapping is an object with the value and the name of the new column, for example: `+"`"+`{"value": 42.3, "name":"new_data_field"}`+`"`).Default(defaultSchemaEvolutionNewColumnMapping),
121+
The input to this mapping is either the output of `+"`processors`"+` if specified, otherwise it is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`).Default(defaultSchemaEvolutionNewColumnMapping),
122+
service.NewProcessorListField(ssoFieldSchemaEvolutionProcessors).Description(`
123+
A series of processors to execute when new columns are added to the table. Specifying this can support running side effects when the schema evolves or enriching the message with additional message to guide the schema changes. For example, one could read the schema the message was produced with from the schema registry and use that to decide which type the new column in Snowflake should be.
124+
125+
The input to these processors is an object with the value and the name of the new column, the original message and table being written too. The metadata is unchanged from the original message that caused the schema to change. For example: `+"`"+`{"value": 42.3, "name":"new_data_field", "message": {"existing_data_field": 42, "new_data_field": "foo"}, "db": MY_DATABASE", "schema": "MY_SCHEMA", "table": "MY_TABLE"}`+"`").Optional(),
121126
).Description(`Options to control schema evolution within the pipeline as new columns are added to the pipeline.`).Optional(),
122127
service.NewIntField(ssoFieldBuildParallelism).Description("The maximum amount of parallelism to use when building the output for Snowflake. The metric to watch to see if you need to change this is `snowflake_build_output_latency_ns`.").Optional().Advanced().Deprecated(),
123128
service.NewObjectField(ssoFieldBuildOpts,
@@ -398,14 +403,25 @@ func newSnowflakeStreamer(
398403
}
399404
}
400405
var schemaEvolutionMapping *bloblang.Executor
406+
var schemaEvolutionProcessors []*service.OwnedProcessor
401407
if conf.Contains(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled) {
402-
enabled, err := conf.FieldBool(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionEnabled)
403-
if err == nil && enabled {
404-
schemaEvolutionMapping, err = conf.FieldBloblang(ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionNewColumnTypeMapping)
405-
}
408+
seConf := conf.Namespace(ssoFieldSchemaEvolution)
409+
enabled, err := seConf.FieldBool(ssoFieldSchemaEvolutionEnabled)
406410
if err != nil {
407411
return nil, err
408412
}
413+
if enabled {
414+
schemaEvolutionMapping, err = seConf.FieldBloblang(ssoFieldSchemaEvolutionNewColumnTypeMapping)
415+
if err != nil {
416+
return nil, err
417+
}
418+
}
419+
if seConf.Contains(ssoFieldSchemaEvolutionProcessors) {
420+
schemaEvolutionProcessors, err = seConf.FieldProcessorList(ssoFieldSchemaEvolutionProcessors)
421+
if err != nil {
422+
return nil, err
423+
}
424+
}
409425
}
410426

411427
var buildOpts streaming.BuildOptions
@@ -512,6 +528,7 @@ func newSnowflakeStreamer(
512528
if schemaEvolutionMapping != nil {
513529
schemaEvolver = &snowpipeSchemaEvolver{
514530
schemaEvolutionMapping: schemaEvolutionMapping,
531+
pipeline: schemaEvolutionProcessors,
515532
restClient: restClient,
516533
logger: mgr.Logger(),
517534
db: db,

internal/impl/snowflake/schema_evolution.go

+34-6
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,46 @@ func asSchemaMigrationError(err error) (*schemaMigrationNeededError, bool) {
6767

6868
type snowpipeSchemaEvolver struct {
6969
schemaEvolutionMapping *bloblang.Executor
70+
pipeline []*service.OwnedProcessor
7071
logger *service.Logger
7172
// The evolver does not close nor own this rest client.
7273
restClient *streaming.SnowflakeRestClient
7374
db, schema, table, role string
7475
}
7576

76-
func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingColumnError) (string, error) {
77-
msg := service.NewMessage(nil)
77+
func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(ctx context.Context, col *streaming.MissingColumnError) (string, error) {
78+
msg := col.Message().Copy()
79+
original, err := msg.AsStructuredMut()
80+
if err != nil {
81+
// This should never happen, we had to get the data as structured to be able to know it was a missing column type
82+
return "", fmt.Errorf("unable to extract JSON data from message that caused schema evolution: %w", err)
83+
}
84+
msg.SetError(nil) // Clear error
7885
msg.SetStructuredMut(map[string]any{
79-
"name": col.RawName(),
80-
"value": col.Value(),
86+
"name": col.RawName(),
87+
"value": col.Value(),
88+
"message": original,
89+
"db": o.db,
90+
"schema": o.schema,
91+
"table": o.table,
8192
})
93+
if len(o.pipeline) > 0 {
94+
batches, err := service.ExecuteProcessors(ctx, o.pipeline, service.MessageBatch{msg})
95+
if err != nil {
96+
return "", fmt.Errorf("failure to execute %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err)
97+
}
98+
if len(batches) != 1 {
99+
return "", fmt.Errorf("expected a single batch output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batches))
100+
}
101+
batch := batches[0]
102+
if len(batch) != 1 {
103+
return "", fmt.Errorf("expected a single message output from %s.%s, got: %d", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, len(batch))
104+
}
105+
msg = batch[0]
106+
if err := msg.GetError(); err != nil {
107+
return "", fmt.Errorf("message failure executing %s.%s prior to schema evolution: %w", ssoFieldSchemaEvolution, ssoFieldSchemaEvolutionProcessors, err)
108+
}
109+
}
82110
out, err := msg.BloblangQuery(o.schemaEvolutionMapping)
83111
if err != nil {
84112
return "", fmt.Errorf("unable to compute new column type for %s: %w", col.ColumnName(), err)
@@ -95,7 +123,7 @@ func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingC
95123
}
96124

97125
func (o *snowpipeSchemaEvolver) MigrateMissingColumn(ctx context.Context, col *streaming.MissingColumnError) error {
98-
columnType, err := o.ComputeMissingColumnType(col)
126+
columnType, err := o.ComputeMissingColumnType(ctx, col)
99127
if err != nil {
100128
return err
101129
}
@@ -154,7 +182,7 @@ func (o *snowpipeSchemaEvolver) CreateOutputTable(ctx context.Context, batch ser
154182
columns := []string{}
155183
for k, v := range row {
156184
col := streaming.NewMissingColumnError(msg, k, v)
157-
colType, err := o.ComputeMissingColumnType(col)
185+
colType, err := o.ComputeMissingColumnType(ctx, col)
158186
if err != nil {
159187
return err
160188
}

0 commit comments

Comments
 (0)