From 5dfe9b06de814ae9f99a2d164eec47e96e0bd44c Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Wed, 22 Jan 2025 22:04:23 +0000 Subject: [PATCH 1/3] schema_registry_decode: use hamba/avro as an alternative impl I went back and forth on patching goavro, but this just seems simpler and is much faster too. The tree/schema walking is a little complex but here we are. Hopefully we can eventually just remove the code for goavro at some point, but we have other code we'd need to switch around. --- .../processors/schema_registry_decode.adoc | 37 +- go.mod | 2 +- .../processor_schema_registry_decode.go | 70 +++- .../processor_schema_registry_decode_test.go | 138 ++++++-- .../{serde_avro.go => serde_goavro.go} | 4 +- ...erde_avro_test.go => serde_goavro_test.go} | 4 +- internal/impl/confluent/serde_hamba_avro.go | 201 +++++++++++ .../impl/confluent/serde_hamba_avro_test.go | 322 ++++++++++++++++++ internal/impl/confluent/serde_json_test.go | 2 +- .../impl/confluent/serde_protobuf_test.go | 4 +- 10 files changed, 726 insertions(+), 58 deletions(-) rename internal/impl/confluent/{serde_avro.go => serde_goavro.go} (95%) rename internal/impl/confluent/{serde_avro_test.go => serde_goavro_test.go} (97%) create mode 100644 internal/impl/confluent/serde_hamba_avro.go create mode 100644 internal/impl/confluent/serde_hamba_avro_test.go diff --git a/docs/modules/components/pages/processors/schema_registry_decode.adoc b/docs/modules/components/pages/processors/schema_registry_decode.adoc index f9f03cc8bf..c422de05d1 100644 --- a/docs/modules/components/pages/processors/schema_registry_decode.adoc +++ b/docs/modules/components/pages/processors/schema_registry_decode.adoc @@ -36,6 +36,9 @@ Common:: # Common config fields, showing default values label: "" schema_registry_decode: + avro: + raw_unions: false # No default (optional) + preserve_logical_types: false url: "" # No default (required) ``` @@ -48,7 +51,9 @@ Advanced:: # All config fields, showing default values label: "" schema_registry_decode: - avro_raw_json: false + avro: + raw_unions: false # No default (optional) + preserve_logical_types: false url: "" # No default (required) oauth: enabled: false @@ -103,9 +108,35 @@ This processor decodes protobuf messages to JSON documents, you can read more ab == Fields -=== `avro_raw_json` +=== `avro` -Whether Avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json. +Sorry! This field is missing documentation. + + +*Type*: `object` + + +=== `avro.raw_unions` + +Wheather avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^]. + +For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, would with raw_unions as false (the default) you get: +- `null` as `null`; +- the string `"a"` as `\{"string": "a"}`; and +- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance. + +When raw_unions is set to true then the above union schema is decoded as the following: +- `null` as `null`; +- the string `"a"` as `"a"`; and +- a `Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo` instance. + + +*Type*: `bool` + + +=== `avro.preserve_logical_types` + +Wheather logical types should be preserved or transformed back into their primative type. For example, decimals are decoded as raw bytes and timestamps are decoded as plain integers. *Type*: `bool` diff --git a/go.mod b/go.mod index e327845f46..73fefe3e59 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.2.1 github.com/googleapis/go-sql-spanner v1.8.0 github.com/gosimple/slug v1.14.0 + github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/jackc/pgx/v4 v4.18.3 github.com/jackc/pgx/v5 v5.6.0 @@ -173,7 +174,6 @@ require ( github.com/containerd/platforms v0.2.1 // indirect github.com/envoyproxy/go-control-plane v0.13.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect - github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jzelinskie/stringz v0.0.3 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index 110b7c4deb..b8795b79a1 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -63,8 +63,29 @@ This processor decodes protobuf messages to JSON documents, you can read more ab `). Field(service.NewBoolField("avro_raw_json"). Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json."). - Advanced().Default(false)). - Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) + Advanced().Default(false).Deprecated()). + Fields( + service.NewObjectField( + "avro", + service.NewBoolField("raw_unions").Description(`Wheather avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^]. + +For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, would with raw_unions as false (the default) you get: +- `+"`null` as `null`"+`; +- the string `+"`\"a\"` as `\\{\"string\": \"a\"}`"+`; and +- a `+"`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance. + +When raw_unions is set to true then the above union schema is decoded as the following: +- `+"`null` as `null`"+`; +- the string `+"`\"a\"` as `\"a\"`"+`; and +- a `+"`Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance. +`).Optional(), + service.NewBoolField("preserve_logical_types").Description(`Wheather logical types should be preserved or transformed back into their primative type. For example, decimals are decoded as raw bytes and timestamps are decoded as plain integers.`).Default(false), + ), + ). + Field(service.NewURLField("url").Description("The base URL of the schema registry service.")). + LintRule(`root = match { + this.exists("avro.raw_unions") && this.exists("avro_raw_json") => ["fields avro.raw_unions and avro_raw_json cannot be set simultaneously"], + }`) for _, f := range service.NewHTTPRequestAuthSignerFields() { spec = spec.Field(f.Version("4.7.0")) @@ -86,9 +107,16 @@ func init() { //------------------------------------------------------------------------------ +type decodingConfig struct { + avro struct { + useHamba bool + rawUnions bool + } +} + type schemaRegistryDecoder struct { - avroRawJSON bool - client *sr.Client + cfg decodingConfig + client *sr.Client schemas map[int]*cachedSchemaDecoder cacheMut sync.RWMutex @@ -112,26 +140,38 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service if err != nil { return nil, err } - avroRawJSON, err := conf.FieldBool("avro_raw_json") + var cfg decodingConfig + cfg.avro.rawUnions, err = conf.FieldBool("avro_raw_json") + if err != nil { + return nil, err + } + + cfg.avro.useHamba, err = conf.FieldBool("avro", "preserve_logical_types") if err != nil { return nil, err } - return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr) + if conf.Contains("avro", "raw_unions") { + cfg.avro.rawUnions, err = conf.FieldBool("avro", "raw_unions") + if err != nil { + return nil, err + } + } + return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, cfg, mgr) } func newSchemaRegistryDecoder( urlStr string, reqSigner func(f fs.FS, req *http.Request) error, tlsConf *tls.Config, - avroRawJSON bool, + cfg decodingConfig, mgr *service.Resources, ) (*schemaRegistryDecoder, error) { s := &schemaRegistryDecoder{ - avroRawJSON: avroRawJSON, - schemas: map[int]*cachedSchemaDecoder{}, - shutSig: shutdown.NewSignaller(), - logger: mgr.Logger(), - mgr: mgr, + cfg: cfg, + schemas: map[int]*cachedSchemaDecoder{}, + shutSig: shutdown.NewSignaller(), + logger: mgr.Logger(), + mgr: mgr, } var err error if s.client, err = sr.NewClient(urlStr, reqSigner, tlsConf, mgr); err != nil { @@ -265,7 +305,11 @@ func (s *schemaRegistryDecoder) getDecoder(id int) (schemaDecoder, error) { case franz_sr.TypeJSON: decoder, err = s.getJSONDecoder(ctx, resPayload) default: - decoder, err = s.getAvroDecoder(ctx, resPayload) + if s.cfg.avro.useHamba { + decoder, err = s.getHambaAvroDecoder(ctx, resPayload) + } else { + decoder, err = s.getGoAvroDecoder(ctx, resPayload) + } } if err != nil { return nil, err diff --git a/internal/impl/confluent/processor_schema_registry_decode_test.go b/internal/impl/confluent/processor_schema_registry_decode_test.go index 03ea165128..56665397c6 100644 --- a/internal/impl/confluent/processor_schema_registry_decode_test.go +++ b/internal/impl/confluent/processor_schema_registry_decode_test.go @@ -37,6 +37,7 @@ func TestSchemaRegistryDecoderConfigParse(t *testing.T) { config string errContains string expectedBaseURL string + hambaEnabled bool }{ { name: "bad url", @@ -63,6 +64,27 @@ basic_auth: `, expectedBaseURL: "http://example.com/v1", }, + { + name: "hamba enabled", + config: ` +url: http://example.com/v1 +avro: + raw_unions: false + preserve_logical_types: true +`, + expectedBaseURL: "http://example.com/v1", + hambaEnabled: true, + }, + { + name: "hamba enabled with removing unions", + config: ` +url: http://example.com/v1 +avro: + preserve_logical_types: true +`, + expectedBaseURL: "http://example.com/v1", + hambaEnabled: true, + }, } spec := schemaRegistryDecoderConfig() @@ -73,6 +95,10 @@ basic_auth: require.NoError(t, err) e, err := newSchemaRegistryDecoderFromConfig(conf, service.MockResources()) + if e != nil { + assert.Equal(t, test.hambaEnabled, e.cfg.avro.useHamba) + } + if err == nil { _ = e.Close(context.Background()) } @@ -218,12 +244,11 @@ func mustJBytes(t testing.TB, obj any) []byte { } func TestSchemaRegistryDecodeAvro(t *testing.T) { - returnedSchema3 := false + returnedSchema3Count := 0 urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { switch path { case "/schemas/ids/3": - assert.False(t, returnedSchema3) - returnedSchema3 = true + returnedSchema3Count++ return mustJBytes(t, map[string]any{ "schema": testSchema, }), nil @@ -237,13 +262,11 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) - require.NoError(t, err) - tests := []struct { name string input string output string + hambaOutput string errContains string }{ { @@ -262,9 +285,10 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { output: `{"Name":"foo","MaybeHobby":null,"Address": null}`, }, { - name: "successful message with logical types", - input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80揪\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!", - output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":"!"}}`, + name: "successful message with logical types", + input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80揪\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!", + output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":"!"}}`, + hambaOutput: `{"int_time_millis":{"int.time-millis":"9h47m25s"},"long_time_micros":{"long.time-micros":"5608h53m20s"},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":"0.33"}}`, }, { name: "non-empty magic byte", @@ -283,9 +307,21 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { }, } + cfg := decodingConfig{} + cfg.avro.rawUnions = false + goAvroDecoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) + require.NoError(t, err) + cfg.avro.useHamba = true + hambaDecoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) + require.NoError(t, err) + for _, test := range tests { test := test - t.Run(test.name, func(t *testing.T) { + fn := func(t *testing.T, useHamba bool) { + decoder := goAvroDecoder + if useHamba { + decoder = hambaDecoder + } outMsgs, err := decoder.Process(context.Background(), service.NewMessage([]byte(test.input))) if test.errContains != "" { require.Error(t, err) @@ -298,16 +334,27 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { require.NoError(t, err) jdopts := jsondiff.DefaultJSONOptions() - diff, explanation := jsondiff.Compare(b, []byte(test.output), &jdopts) + output := test.output + if useHamba && test.hambaOutput != "" { + output = test.hambaOutput + } + diff, explanation := jsondiff.Compare(b, []byte(output), &jdopts) + assert.JSONEq(t, output, string(b)) assert.Equalf(t, jsondiff.FullMatch.String(), diff.String(), "%s: %s", test.name, explanation) } - }) + } + t.Run("hamba/"+test.name, func(t *testing.T) { fn(t, true) }) + t.Run("goavro/"+test.name, func(t *testing.T) { fn(t, false) }) } - require.NoError(t, decoder.Close(context.Background())) - decoder.cacheMut.Lock() - assert.Empty(t, decoder.schemas) - decoder.cacheMut.Unlock() + for _, decoder := range []*schemaRegistryDecoder{goAvroDecoder, hambaDecoder} { + require.NoError(t, decoder.Close(context.Background())) + decoder.cacheMut.Lock() + assert.Empty(t, decoder.schemas) + decoder.cacheMut.Unlock() + } + + assert.Equal(t, 2, returnedSchema3Count) } func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { @@ -325,12 +372,11 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { }) require.NoError(t, err) - returnedSchema3 := false + returnedSchema3Count := 0 urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { switch path { case "/schemas/ids/3": - assert.False(t, returnedSchema3) - returnedSchema3 = true + returnedSchema3Count++ return payload3, nil case "/schemas/ids/4": return payload4, nil @@ -340,13 +386,11 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) - require.NoError(t, err) - tests := []struct { name string input string output string + hambaOutput string errContains string }{ { @@ -365,9 +409,10 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { output: `{"Name":"foo","MaybeHobby":null,"Address": null}`, }, { - name: "successful message with logical types", - input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80揪\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!", - output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":"!"}`, + name: "successful message with logical types", + input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80揪\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!", + output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":"!"}`, + hambaOutput: `{"int_time_millis":"9h47m25s","long_time_micros":"5608h53m20s","long_timestamp_micros":"3939-01-01T00:00:00Z","pos_0_33333333":"0.33"}`, }, { name: "non-empty magic byte", @@ -385,10 +430,21 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { errContains: "schema 5 not found by registry: nope", }, } + cfg := decodingConfig{} + cfg.avro.rawUnions = true + goAvroDecoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) + require.NoError(t, err) + cfg.avro.useHamba = true + hambaDecoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) + require.NoError(t, err) for _, test := range tests { test := test - t.Run(test.name, func(t *testing.T) { + fn := func(t *testing.T, useHamba bool) { + decoder := goAvroDecoder + if useHamba { + decoder = hambaDecoder + } outMsgs, err := decoder.Process(context.Background(), service.NewMessage([]byte(test.input))) if test.errContains != "" { require.Error(t, err) @@ -400,17 +456,29 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { b, err := outMsgs[0].AsBytes() require.NoError(t, err) + output := test.output + if useHamba && test.hambaOutput != "" { + output = test.hambaOutput + } + assert.JSONEq(t, output, string(b)) jdopts := jsondiff.DefaultJSONOptions() - diff, explanation := jsondiff.Compare(b, []byte(test.output), &jdopts) + diff, explanation := jsondiff.Compare(b, []byte(output), &jdopts) assert.Equalf(t, jsondiff.FullMatch.String(), diff.String(), "%s: %s", test.name, explanation) } - }) + + } + t.Run("hamba/"+test.name, func(t *testing.T) { fn(t, true) }) + t.Run("goavro/"+test.name, func(t *testing.T) { fn(t, false) }) } - require.NoError(t, decoder.Close(context.Background())) - decoder.cacheMut.Lock() - assert.Empty(t, decoder.schemas) - decoder.cacheMut.Unlock() + for _, decoder := range []*schemaRegistryDecoder{goAvroDecoder, hambaDecoder} { + require.NoError(t, decoder.Close(context.Background())) + decoder.cacheMut.Lock() + assert.Empty(t, decoder.schemas) + decoder.cacheMut.Unlock() + } + + assert.Equal(t, 2, returnedSchema3Count) } func TestSchemaRegistryDecodeClearExpired(t *testing.T) { @@ -418,7 +486,7 @@ func TestSchemaRegistryDecodeClearExpired(t *testing.T) { return nil, fmt.Errorf("nope") }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, decodingConfig{}, service.MockResources()) require.NoError(t, err) require.NoError(t, decoder.Close(context.Background())) @@ -465,7 +533,7 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, decodingConfig{}, service.MockResources()) require.NoError(t, err) tests := []struct { @@ -528,7 +596,7 @@ func TestSchemaRegistryDecodeJson(t *testing.T) { return nil, nil }) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, false, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, decodingConfig{}, service.MockResources()) require.NoError(t, err) tests := []struct { diff --git a/internal/impl/confluent/serde_avro.go b/internal/impl/confluent/serde_goavro.go similarity index 95% rename from internal/impl/confluent/serde_avro.go rename to internal/impl/confluent/serde_goavro.go index 0a8d43d0c5..9a562ee2f5 100644 --- a/internal/impl/confluent/serde_avro.go +++ b/internal/impl/confluent/serde_goavro.go @@ -100,14 +100,14 @@ func (s *schemaRegistryEncoder) getAvroEncoder(ctx context.Context, schema franz }, nil } -func (s *schemaRegistryDecoder) getAvroDecoder(ctx context.Context, schema franz_sr.Schema) (schemaDecoder, error) { +func (s *schemaRegistryDecoder) getGoAvroDecoder(ctx context.Context, schema franz_sr.Schema) (schemaDecoder, error) { schemaSpec, err := resolveAvroReferences(ctx, s.client, schema) if err != nil { return nil, err } var codec *goavro.Codec - if s.avroRawJSON { + if s.cfg.avro.rawUnions { codec, err = goavro.NewCodecForStandardJSONFull(schemaSpec) } else { codec, err = goavro.NewCodec(schemaSpec) diff --git a/internal/impl/confluent/serde_avro_test.go b/internal/impl/confluent/serde_goavro_test.go similarity index 97% rename from internal/impl/confluent/serde_avro_test.go rename to internal/impl/confluent/serde_goavro_test.go index cde3a55a09..c14edb6e53 100644 --- a/internal/impl/confluent/serde_avro_test.go +++ b/internal/impl/confluent/serde_goavro_test.go @@ -107,7 +107,9 @@ func TestAvroReferences(t *testing.T) { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + cfg := decodingConfig{} + cfg.avro.rawUnions = true + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_hamba_avro.go b/internal/impl/confluent/serde_hamba_avro.go new file mode 100644 index 0000000000..9b2383f47d --- /dev/null +++ b/internal/impl/confluent/serde_hamba_avro.go @@ -0,0 +1,201 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package confluent + +import ( + "context" + "encoding/json" + "fmt" + "math/big" + "slices" + "strings" + "time" + + franz_sr "github.com/twmb/franz-go/pkg/sr" + + "github.com/hamba/avro/v2" + "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" +) + +func resolveHambaAvroReferences(ctx context.Context, client *sr.Client, schema franz_sr.Schema) ([]franz_sr.Schema, error) { + if len(schema.References) == 0 { + return []franz_sr.Schema{schema}, nil + } + schemas := []franz_sr.Schema{schema} + if err := client.WalkReferences(ctx, schema.References, func(ctx context.Context, name string, schema franz_sr.Schema) error { + schemas = append(schemas, schema) + return nil + }); err != nil { + return nil, fmt.Errorf("unable to walk schema references: %w", err) + } + // We walk the above schemas in top down order, however when resolving references we need to add schemas to our codec + // in a bottom up manner. + slices.Reverse(schemas) + return schemas, nil +} + +func (s *schemaRegistryDecoder) getHambaAvroDecoder(ctx context.Context, schema franz_sr.Schema) (schemaDecoder, error) { + schemaSpecs, err := resolveHambaAvroReferences(ctx, s.client, schema) + if err != nil { + return nil, err + } + cache := &avro.SchemaCache{} + var codec avro.Schema + for _, schema := range schemaSpecs { + codec, err = avro.ParseWithCache(schema.Schema, "", cache) + if err != nil { + return nil, fmt.Errorf("unable to parse schema %w", err) + } + } + + decoder := func(m *service.Message) error { + b, err := m.AsBytes() + if err != nil { + return fmt.Errorf("unable to extract bytes from message: %w", err) + } + var native any + if err := avro.Unmarshal(codec, b, &native); err != nil { + return fmt.Errorf("unable to unmarshal avro: %w", err) + } + var w avroSchemaWalker + w.unnestUnions = s.cfg.avro.rawUnions + if native, err = w.walk(native, codec); err != nil { + return fmt.Errorf("unable to transform avro data into expected format: %w", err) + } + m.SetStructuredMut(native) + return nil + } + + return decoder, nil +} + +type avroSchemaWalker struct { + unnestUnions bool +} + +func (w *avroSchemaWalker) walk(root any, schema avro.Schema) (any, error) { + switch s := schema.(type) { + case *avro.RecordSchema: + v, ok := root.(map[string]any) + if !ok { + return nil, fmt.Errorf("expected map for RecordSchema got: %T", root) + } + return w.walkRecord(v, s) + case *avro.MapSchema: + v, ok := root.(map[string]any) + if !ok { + return nil, fmt.Errorf("expected map for MapSchema got: %T", root) + } + return w.walkMap(v, s) + case *avro.ArraySchema: + v, ok := root.([]any) + if !ok { + return nil, fmt.Errorf("expected slice for ArraySchema got: %T", root) + } + return w.walkSlice(v, s) + case *avro.RefSchema: + return w.walk(root, s.Schema()) + case *avro.UnionSchema: + if root == nil { + return nil, nil + } + u, ok := root.(map[string]any) + if !ok { + return nil, fmt.Errorf("expected map for UnionSchema got: %T", root) + } + if len(u) != 1 { + return nil, fmt.Errorf("expected map with size 1 for UnionSchema got: %v", len(u)) + } + for k, v := range u { + t, _ := s.Types().Get(k) + if t == nil { + names := []string{} + for _, t := range s.Types() { + names = append(names, string(t.Type())) + } + return nil, fmt.Errorf("unknown union variant %q, expected one of [%s]", k, strings.Join(names, ", ")) + } + if w.unnestUnions { + return w.walk(v, t) + } + var err error + u[k], err = w.walk(v, t) + return u, err + } + return nil, fmt.Errorf("impossible empty map, got size: %v", len(u)) + case avro.LogicalTypeSchema: + l := s.Logical() + if l == nil { + return root, nil + } + switch l.Type() { + case avro.Decimal: + v, ok := root.(*big.Rat) + if !ok { + return nil, fmt.Errorf("expected *big.Rat for DecimalLogicalType got: %T", root) + } + ls, ok := l.(*avro.DecimalLogicalSchema) + if !ok { + return nil, fmt.Errorf("expected *avro.LogicalTypeSchema for DecimalLogicalType got: %T", l) + } + return json.Number(v.FloatString(ls.Scale())), nil + case avro.Duration, avro.TimeMicros, avro.TimeMillis: + v, ok := root.(time.Duration) + if !ok { + return nil, fmt.Errorf("expected time.Duration for %v got: %T", l.Type(), root) + } + return v.String(), nil + } + return root, nil + default: + return root, nil + } +} + +func (w *avroSchemaWalker) walkRecord(record map[string]any, schema *avro.RecordSchema) (map[string]any, error) { + var err error + for _, f := range schema.Fields() { + v, ok := record[f.Name()] + if !ok { + return nil, fmt.Errorf("unexpected missing field from avro record: %q", f.Name()) + } + if record[f.Name()], err = w.walk(v, f.Type()); err != nil { + return nil, err + } + } + return record, nil +} + +func (w *avroSchemaWalker) walkMap(dict map[string]any, schema *avro.MapSchema) (map[string]any, error) { + var err error + for k, v := range dict { + if dict[k], err = w.walk(v, schema.Values()); err != nil { + return nil, err + } + } + return dict, nil +} + +func (w *avroSchemaWalker) walkSlice(slice []any, schema *avro.ArraySchema) ([]any, error) { + var err error + for i, v := range slice { + if slice[i], err = w.walk(v, schema.Items()); err != nil { + return nil, err + } + } + return slice, nil +} diff --git a/internal/impl/confluent/serde_hamba_avro_test.go b/internal/impl/confluent/serde_hamba_avro_test.go new file mode 100644 index 0000000000..fedfe75d15 --- /dev/null +++ b/internal/impl/confluent/serde_hamba_avro_test.go @@ -0,0 +1,322 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package confluent + +import ( + "context" + "encoding/base64" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +func TestHambaAvroReferences(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Second*10) + defer done() + + rootSchema := `[ + "benthos.namespace.com.foo", + "benthos.namespace.com.bar" +]` + + fooSchema := `{ + "namespace": "benthos.namespace.com", + "type": "record", + "name": "foo", + "fields": [ + { "name": "Woof", "type": "string"} + ] +}` + + barSchema := `{ + "namespace": "benthos.namespace.com", + "type": "record", + "name": "bar", + "fields": [ + { "name": "Moo", "type": "string"} + ] +}` + + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/subjects/root/versions/latest", "/schemas/ids/1": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 10, + "schema": rootSchema, + "schemaType": "AVRO", + "references": []any{ + map[string]any{"name": "benthos.namespace.com.foo", "subject": "foo", "version": 10}, + map[string]any{"name": "benthos.namespace.com.bar", "subject": "bar", "version": 20}, + }, + }), nil + case "/subjects/foo/versions/10", "/schemas/ids/2": + return mustJBytes(t, map[string]any{ + "id": 2, "version": 10, "schemaType": "AVRO", + "schema": fooSchema, + }), nil + case "/subjects/bar/versions/20", "/schemas/ids/3": + return mustJBytes(t, map[string]any{ + "id": 3, "version": 20, "schemaType": "AVRO", + "schema": barSchema, + }), nil + } + return nil, nil + }) + + subj, err := service.NewInterpolatedString("root") + require.NoError(t, err) + + tests := []struct { + name string + input string + output string + errContains []string + }{ + { + name: "a foo", + input: `{"Woof":"hhnnnnnnroooo"}`, + output: `{"Woof":"hhnnnnnnroooo"}`, + }, + { + name: "a bar", + input: `{"Moo":"mmuuuuuueew"}`, + output: `{"Moo":"mmuuuuuueew"}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) + require.NoError(t, err) + + cfg := decodingConfig{} + cfg.avro.useHamba = true + cfg.avro.rawUnions = true + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) + require.NoError(t, err) + + t.Cleanup(func() { + _ = encoder.Close(tCtx) + _ = decoder.Close(tCtx) + }) + + inMsg := service.NewMessage([]byte(test.input)) + + encodedMsgs, err := encoder.ProcessBatch(tCtx, service.MessageBatch{inMsg}) + require.NoError(t, err) + require.Len(t, encodedMsgs, 1) + require.Len(t, encodedMsgs[0], 1) + + encodedMsg := encodedMsgs[0][0] + + if len(test.errContains) > 0 { + require.Error(t, encodedMsg.GetError()) + for _, errStr := range test.errContains { + assert.Contains(t, encodedMsg.GetError().Error(), errStr) + } + return + } + + b, err := encodedMsg.AsBytes() + require.NoError(t, err) + + require.NoError(t, encodedMsg.GetError()) + require.NotEqual(t, test.input, string(b)) + + var n any + require.Error(t, json.Unmarshal(b, &n), "message contents should no longer be valid JSON") + + decodedMsgs, err := decoder.Process(tCtx, encodedMsg) + require.NoError(t, err) + require.Len(t, decodedMsgs, 1) + + decodedMsg := decodedMsgs[0] + + b, err = decodedMsg.AsBytes() + require.NoError(t, err) + + require.NoError(t, decodedMsg.GetError()) + require.JSONEq(t, test.output, string(b)) + }) + } +} + +func TestHambaDecodeAvroUnions(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Second*10) + defer done() + + rootSchema := `{ + "type": "record", + "name": "TestRecord", + "namespace": "com.example.test", + "fields": [ + { "name": "booleanField", "type": "boolean" }, + { "name": "intField", "type": "int" }, + { "name": "longField", "type": "long" }, + { "name": "floatField", "type": "float" }, + { "name": "doubleField", "type": "double" }, + { "name": "bytesField", "type": "bytes" }, + { "name": "stringField", "type": "string" }, + { + "name": "arrayField", + "type": { "type": "array", "items": "int" } + }, + { + "name": "mapField", + "type": { "type": "map", "values": "string" } + }, + { + "name": "unionField", + "type": ["null", "string", "int"] + }, + { + "name": "fixedField", + "type": { "type": "fixed", "name": "FixedType", "size": 16 } + }, + { + "name": "enumField", + "type": { "type": "enum", "name": "EnumType", "symbols": ["A", "B", "C"] } + }, + { + "name": "recordField", + "type": { + "type": "record", + "name": "NestedRecord", + "fields": [ + { "name": "nestedIntField", "type": "int" }, + { "name": "nestedStringField", "type": "string" } + ] + } + }, + { + "name": "dateField", + "type": { "type": "int", "logicalType": "date" } + }, + { + "name": "timestampMillisField", + "type": { "type": "long", "logicalType": "timestamp-millis" } + }, + { + "name": "timestampMicrosField", + "type": { "type": "long", "logicalType": "timestamp-micros" } + }, + { + "name": "timeMillisField", + "type": { "type": "int", "logicalType": "time-millis" } + }, + { + "name": "timeMicrosField", + "type": { "type": "long", "logicalType": "time-micros" } + }, + { + "name": "decimalBytesField", + "type": { + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 2 + } + }, + { + "name": "decimalFixedField", + "type": { + "type": "fixed", + "name": "DecimalFixed", + "size": 8, + "logicalType": "decimal", + "precision": 16, + "scale": 4 + } + }, + { + "name": "uuidField", + "type": { "type": "string", "logicalType": "uuid" } + } + ] +}` + + urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) { + switch path { + case "/subjects/root/versions/latest", "/schemas/ids/1": + return mustJBytes(t, map[string]any{ + "id": 1, + "version": 10, + "schema": rootSchema, + "schemaType": "AVRO", + }), nil + } + return nil, nil + }) + + tests := []struct { + name string + input string + output string + unnestUnions bool + }{ + { + name: "all types nested union", + input: "AZ340UnQtM3BiI/ihdEBfH9KP+XDphvske0/HlUnSXt2V81gmbTQunPejR5yaHhxZHRsd2VscGRqdHgatLHK9QuZ0cXsD+eJsMYNv8a+2Qzc986nBuz76MAG97W//AmGzbjxDuGnvP4NptCcvQvqveF2n/uQ1gbf9eJMABQCcBZrY2hreHN6d29sawJhGm9hcnhscGZteG5rcWUCch5odmpycWR4cGliZGhzaG0CcxpiZ2lzcmR6eWFtcnlpAnQeanN2Y252bWpsbWJzaGlrAnYeZHlrdml1b2l3Z2N1c2RhAmgUcnl2aW96aWxqaQJ4GnZkaG5icnRkbXRxbWQCaRJkY29lYm1lY3MCbB5maHl6eWV1YnBiaHh5cmoABMHn5qsNBrOnYNqXtmbEr69wkjaZ1ALbv/9dGGJsbmxnbnJvYmx1Y4AqstKWqoEy4qfwyModq8jmuAKgqpm+8/zjpswBCv76A5LP83wuR7QwOQAUcmNwdmp5eG5ueA==", + unnestUnions: false, + output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":"-43953964.01","decimalFixedField":"-90179493988032.6912","doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":{"int":-1790761441},"uuidField":"rcpvjyxnnx"}`, + }, + { + name: "all types raw union", + input: "AZ340UnQtM3BiI/ihdEBfH9KP+XDphvske0/HlUnSXt2V81gmbTQunPejR5yaHhxZHRsd2VscGRqdHgatLHK9QuZ0cXsD+eJsMYNv8a+2Qzc986nBuz76MAG97W//AmGzbjxDuGnvP4NptCcvQvqveF2n/uQ1gbf9eJMABQCcBZrY2hreHN6d29sawJhGm9hcnhscGZteG5rcWUCch5odmpycWR4cGliZGhzaG0CcxpiZ2lzcmR6eWFtcnlpAnQeanN2Y252bWpsbWJzaGlrAnYeZHlrdml1b2l3Z2N1c2RhAmgUcnl2aW96aWxqaQJ4GnZkaG5icnRkbXRxbWQCaRJkY29lYm1lY3MCbB5maHl6eWV1YnBiaHh5cmoABMHn5qsNBrOnYNqXtmbEr69wkjaZ1ALbv/9dGGJsbmxnbnJvYmx1Y4AqstKWqoEy4qfwyModq8jmuAKgqpm+8/zjpswBCv76A5LP83wuR7QwOQAUcmNwdmp5eG5ueA==", + unnestUnions: true, + output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":"-43953964.01","decimalFixedField":"-90179493988032.6912","doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":-1790761441,"uuidField":"rcpvjyxnnx"}`, + }, + } + + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + cfg := decodingConfig{} + cfg.avro.useHamba = true + cfg.avro.rawUnions = test.unnestUnions + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, service.MockResources()) + require.NoError(t, err) + + t.Cleanup(func() { + _ = decoder.Close(tCtx) + }) + + b, err := base64.StdEncoding.DecodeString(test.input) + require.NoError(t, err) + // Prepend magic bytes + b = append([]byte{0, 0, 0, 0, 1}, b...) + inMsg := service.NewMessage(b) + + decodedMsgs, err := decoder.Process(tCtx, inMsg) + require.NoError(t, err) + require.Len(t, decodedMsgs, 1) + + decodedMsg := decodedMsgs[0] + + b, err = decodedMsg.AsBytes() + require.NoError(t, err) + + require.NoError(t, decodedMsg.GetError()) + require.JSONEq(t, test.output, string(b)) + }) + } +} diff --git a/internal/impl/confluent/serde_json_test.go b/internal/impl/confluent/serde_json_test.go index f1fd391cb2..7fc780ccf2 100644 --- a/internal/impl/confluent/serde_json_test.go +++ b/internal/impl/confluent/serde_json_test.go @@ -110,7 +110,7 @@ func TestResolveJsonSchema(t *testing.T) { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, decodingConfig{}, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { diff --git a/internal/impl/confluent/serde_protobuf_test.go b/internal/impl/confluent/serde_protobuf_test.go index ab406c2aef..3d771dc2c5 100644 --- a/internal/impl/confluent/serde_protobuf_test.go +++ b/internal/impl/confluent/serde_protobuf_test.go @@ -96,7 +96,7 @@ message bar { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, decodingConfig{}, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { @@ -226,7 +226,7 @@ message bar { encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, true, time.Minute*10, time.Minute, service.MockResources()) require.NoError(t, err) - decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, true, service.MockResources()) + decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, decodingConfig{}, service.MockResources()) require.NoError(t, err) t.Cleanup(func() { From cf8e023a350160a1bd4edb8d4fd67562da8112a4 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Wed, 22 Jan 2025 22:10:08 +0000 Subject: [PATCH 2/3] update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74deeaf4b8..875acc43c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ All notable changes to this project will be documented in this file. - New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad) - Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj) - Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj) +- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primative representation. (@rockwotj) + +### Changed + +- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj) ## 4.45.1 - 2025-01-17 From baafe333ceef5104f2d3fee9515d2b032bdc872c Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 23 Jan 2025 03:25:02 +0000 Subject: [PATCH 3/3] review feedback --- CHANGELOG.md | 2 +- .../processors/schema_registry_decode.adoc | 16 +++++++------- .../processor_schema_registry_decode.go | 21 ++++++++----------- .../processor_schema_registry_decode_test.go | 4 ++-- .../impl/confluent/serde_hamba_avro_test.go | 4 ++-- 5 files changed, 22 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 875acc43c4..7f56b91086 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file. - New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad) - Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj) - Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj) -- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primative representation. (@rockwotj) +- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj) ### Changed diff --git a/docs/modules/components/pages/processors/schema_registry_decode.adoc b/docs/modules/components/pages/processors/schema_registry_decode.adoc index c422de05d1..732657b190 100644 --- a/docs/modules/components/pages/processors/schema_registry_decode.adoc +++ b/docs/modules/components/pages/processors/schema_registry_decode.adoc @@ -96,8 +96,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre For example, the union schema `["null","string","Foo"]`, where `Foo` is a record name, would encode: - `null` as `null`; -- the string `"a"` as `\{"string": "a"}`; and -- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance. +- the string `"a"` as `{"string": "a"}`; and +- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance. However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field <> to `true`. @@ -110,7 +110,7 @@ This processor decodes protobuf messages to JSON documents, you can read more ab === `avro` -Sorry! This field is missing documentation. +Configuration for how to decode schemas that are of type AVRO. *Type*: `object` @@ -118,12 +118,12 @@ Sorry! This field is missing documentation. === `avro.raw_unions` -Wheather avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^]. +Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^]. -For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, would with raw_unions as false (the default) you get: +For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, with raw_unions as false (the default) you get: - `null` as `null`; -- the string `"a"` as `\{"string": "a"}`; and -- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance. +- the string `"a"` as `{"string": "a"}`; and +- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance. When raw_unions is set to true then the above union schema is decoded as the following: - `null` as `null`; @@ -136,7 +136,7 @@ When raw_unions is set to true then the above union schema is decoded as the fol === `avro.preserve_logical_types` -Wheather logical types should be preserved or transformed back into their primative type. For example, decimals are decoded as raw bytes and timestamps are decoded as plain integers. +Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values. *Type*: `bool` diff --git a/internal/impl/confluent/processor_schema_registry_decode.go b/internal/impl/confluent/processor_schema_registry_decode.go index b8795b79a1..74442ec264 100644 --- a/internal/impl/confluent/processor_schema_registry_decode.go +++ b/internal/impl/confluent/processor_schema_registry_decode.go @@ -52,8 +52,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre For example, the union schema ` + "`[\"null\",\"string\",\"Foo\"]`, where `Foo`" + ` is a record name, would encode: - ` + "`null` as `null`" + `; -- the string ` + "`\"a\"` as `\\{\"string\": \"a\"}`" + `; and -- a ` + "`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance. +- the string ` + "`\"a\"` as `{\"string\": \"a\"}`" + `; and +- a ` + "`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance. However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field ` + "<> to `true`" + `. @@ -67,25 +67,22 @@ This processor decodes protobuf messages to JSON documents, you can read more ab Fields( service.NewObjectField( "avro", - service.NewBoolField("raw_unions").Description(`Wheather avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^]. + service.NewBoolField("raw_unions").Description(`Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^]. -For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, would with raw_unions as false (the default) you get: +For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, with raw_unions as false (the default) you get: - `+"`null` as `null`"+`; -- the string `+"`\"a\"` as `\\{\"string\": \"a\"}`"+`; and -- a `+"`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance. +- the string `+"`\"a\"` as `{\"string\": \"a\"}`"+`; and +- a `+"`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance. When raw_unions is set to true then the above union schema is decoded as the following: - `+"`null` as `null`"+`; - the string `+"`\"a\"` as `\"a\"`"+`; and - a `+"`Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance. `).Optional(), - service.NewBoolField("preserve_logical_types").Description(`Wheather logical types should be preserved or transformed back into their primative type. For example, decimals are decoded as raw bytes and timestamps are decoded as plain integers.`).Default(false), - ), + service.NewBoolField("preserve_logical_types").Description(`Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.`).Default(false), + ).Description("Configuration for how to decode schemas that are of type AVRO."), ). - Field(service.NewURLField("url").Description("The base URL of the schema registry service.")). - LintRule(`root = match { - this.exists("avro.raw_unions") && this.exists("avro_raw_json") => ["fields avro.raw_unions and avro_raw_json cannot be set simultaneously"], - }`) + Field(service.NewURLField("url").Description("The base URL of the schema registry service.")) for _, f := range service.NewHTTPRequestAuthSignerFields() { spec = spec.Field(f.Version("4.7.0")) diff --git a/internal/impl/confluent/processor_schema_registry_decode_test.go b/internal/impl/confluent/processor_schema_registry_decode_test.go index 56665397c6..52928a7a5f 100644 --- a/internal/impl/confluent/processor_schema_registry_decode_test.go +++ b/internal/impl/confluent/processor_schema_registry_decode_test.go @@ -288,7 +288,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) { name: "successful message with logical types", input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80揪\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!", output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":"!"}}`, - hambaOutput: `{"int_time_millis":{"int.time-millis":"9h47m25s"},"long_time_micros":{"long.time-micros":"5608h53m20s"},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":"0.33"}}`, + hambaOutput: `{"int_time_millis":{"int.time-millis":"9h47m25s"},"long_time_micros":{"long.time-micros":"5608h53m20s"},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":0.33}}`, }, { name: "non-empty magic byte", @@ -412,7 +412,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) { name: "successful message with logical types", input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80揪\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!", output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":"!"}`, - hambaOutput: `{"int_time_millis":"9h47m25s","long_time_micros":"5608h53m20s","long_timestamp_micros":"3939-01-01T00:00:00Z","pos_0_33333333":"0.33"}`, + hambaOutput: `{"int_time_millis":"9h47m25s","long_time_micros":"5608h53m20s","long_timestamp_micros":"3939-01-01T00:00:00Z","pos_0_33333333":0.33}`, }, { name: "non-empty magic byte", diff --git a/internal/impl/confluent/serde_hamba_avro_test.go b/internal/impl/confluent/serde_hamba_avro_test.go index fedfe75d15..b0dc15459a 100644 --- a/internal/impl/confluent/serde_hamba_avro_test.go +++ b/internal/impl/confluent/serde_hamba_avro_test.go @@ -277,13 +277,13 @@ func TestHambaDecodeAvroUnions(t *testing.T) { name: "all types nested union", input: "AZ340UnQtM3BiI/ihdEBfH9KP+XDphvske0/HlUnSXt2V81gmbTQunPejR5yaHhxZHRsd2VscGRqdHgatLHK9QuZ0cXsD+eJsMYNv8a+2Qzc986nBuz76MAG97W//AmGzbjxDuGnvP4NptCcvQvqveF2n/uQ1gbf9eJMABQCcBZrY2hreHN6d29sawJhGm9hcnhscGZteG5rcWUCch5odmpycWR4cGliZGhzaG0CcxpiZ2lzcmR6eWFtcnlpAnQeanN2Y252bWpsbWJzaGlrAnYeZHlrdml1b2l3Z2N1c2RhAmgUcnl2aW96aWxqaQJ4GnZkaG5icnRkbXRxbWQCaRJkY29lYm1lY3MCbB5maHl6eWV1YnBiaHh5cmoABMHn5qsNBrOnYNqXtmbEr69wkjaZ1ALbv/9dGGJsbmxnbnJvYmx1Y4AqstKWqoEy4qfwyModq8jmuAKgqpm+8/zjpswBCv76A5LP83wuR7QwOQAUcmNwdmp5eG5ueA==", unnestUnions: false, - output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":"-43953964.01","decimalFixedField":"-90179493988032.6912","doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":{"int":-1790761441},"uuidField":"rcpvjyxnnx"}`, + output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":-43953964.01,"decimalFixedField":-90179493988032.6912,"doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":{"int":-1790761441},"uuidField":"rcpvjyxnnx"}`, }, { name: "all types raw union", input: "AZ340UnQtM3BiI/ihdEBfH9KP+XDphvske0/HlUnSXt2V81gmbTQunPejR5yaHhxZHRsd2VscGRqdHgatLHK9QuZ0cXsD+eJsMYNv8a+2Qzc986nBuz76MAG97W//AmGzbjxDuGnvP4NptCcvQvqveF2n/uQ1gbf9eJMABQCcBZrY2hreHN6d29sawJhGm9hcnhscGZteG5rcWUCch5odmpycWR4cGliZGhzaG0CcxpiZ2lzcmR6eWFtcnlpAnQeanN2Y252bWpsbWJzaGlrAnYeZHlrdml1b2l3Z2N1c2RhAmgUcnl2aW96aWxqaQJ4GnZkaG5icnRkbXRxbWQCaRJkY29lYm1lY3MCbB5maHl6eWV1YnBiaHh5cmoABMHn5qsNBrOnYNqXtmbEr69wkjaZ1ALbv/9dGGJsbmxnbnJvYmx1Y4AqstKWqoEy4qfwyModq8jmuAKgqpm+8/zjpswBCv76A5LP83wuR7QwOQAUcmNwdmp5eG5ueA==", unnestUnions: true, - output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":"-43953964.01","decimalFixedField":"-90179493988032.6912","doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":-1790761441,"uuidField":"rcpvjyxnnx"}`, + output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":-43953964.01,"decimalFixedField":-90179493988032.6912,"doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":-1790761441,"uuidField":"rcpvjyxnnx"}`, }, }