Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema_registry_decode: support avro for decoding #3137

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ All notable changes to this project will be documented in this file.
- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj)

### Changed

- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj)

## 4.45.1 - 2025-01-17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Common::
# Common config fields, showing default values
label: ""
schema_registry_decode:
avro:
raw_unions: false # No default (optional)
preserve_logical_types: false
url: "" # No default (required)
```

Expand All @@ -48,7 +51,9 @@ Advanced::
# All config fields, showing default values
label: ""
schema_registry_decode:
avro_raw_json: false
avro:
raw_unions: false # No default (optional)
preserve_logical_types: false
url: "" # No default (required)
oauth:
enabled: false
Expand Down Expand Up @@ -91,8 +96,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre
For example, the union schema `["null","string","Foo"]`, where `Foo` is a record name, would encode:

- `null` as `null`;
- the string `"a"` as `\{"string": "a"}`; and
- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
- the string `"a"` as `{"string": "a"}`; and
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.

However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field <<avro_raw_json, `avro_raw_json`>> to `true`.

Expand All @@ -103,9 +108,35 @@ This processor decodes protobuf messages to JSON documents, you can read more ab

== Fields

=== `avro_raw_json`
=== `avro`

Whether Avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.
Configuration for how to decode schemas that are of type AVRO.


*Type*: `object`


=== `avro.raw_unions`

Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].

For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, with raw_unions as false (the default) you get:
- `null` as `null`;
- the string `"a"` as `{"string": "a"}`; and
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.

When raw_unions is set to true then the above union schema is decoded as the following:
- `null` as `null`;
- the string `"a"` as `"a"`; and
- a `Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo` instance.


*Type*: `bool`


=== `avro.preserve_logical_types`

Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.


*Type*: `bool`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/googleapis/go-sql-spanner v1.8.0
github.com/gosimple/slug v1.14.0
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
github.com/jackc/pgx/v4 v4.18.3
github.com/jackc/pgx/v5 v5.6.0
Expand Down Expand Up @@ -173,7 +174,6 @@ require (
github.com/containerd/platforms v0.2.1 // indirect
github.com/envoyproxy/go-control-plane v0.13.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/jzelinskie/stringz v0.0.3 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
69 changes: 55 additions & 14 deletions internal/impl/confluent/processor_schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre
For example, the union schema ` + "`[\"null\",\"string\",\"Foo\"]`, where `Foo`" + ` is a record name, would encode:

- ` + "`null` as `null`" + `;
- the string ` + "`\"a\"` as `\\{\"string\": \"a\"}`" + `; and
- a ` + "`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
- the string ` + "`\"a\"` as `{\"string\": \"a\"}`" + `; and
- a ` + "`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.

However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field ` + "<<avro_raw_json, `avro_raw_json`>> to `true`" + `.

Expand All @@ -63,7 +63,25 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
`).
Field(service.NewBoolField("avro_raw_json").
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
Advanced().Default(false)).
Advanced().Default(false).Deprecated()).
Fields(
service.NewObjectField(
mihaitodor marked this conversation as resolved.
Show resolved Hide resolved
"avro",
service.NewBoolField("raw_unions").Description(`Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].

For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, with raw_unions as false (the default) you get:
- `+"`null` as `null`"+`;
- the string `+"`\"a\"` as `{\"string\": \"a\"}`"+`; and
- a `+"`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.

When raw_unions is set to true then the above union schema is decoded as the following:
- `+"`null` as `null`"+`;
- the string `+"`\"a\"` as `\"a\"`"+`; and
- a `+"`Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.
`).Optional(),
service.NewBoolField("preserve_logical_types").Description(`Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.`).Default(false),
).Description("Configuration for how to decode schemas that are of type AVRO."),
).
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
Expand All @@ -86,9 +104,16 @@ func init() {

//------------------------------------------------------------------------------

type decodingConfig struct {
avro struct {
useHamba bool
rawUnions bool
}
}

type schemaRegistryDecoder struct {
avroRawJSON bool
client *sr.Client
cfg decodingConfig
client *sr.Client

schemas map[int]*cachedSchemaDecoder
cacheMut sync.RWMutex
Expand All @@ -112,26 +137,38 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
if err != nil {
return nil, err
}
avroRawJSON, err := conf.FieldBool("avro_raw_json")
var cfg decodingConfig
cfg.avro.rawUnions, err = conf.FieldBool("avro_raw_json")
if err != nil {
return nil, err
}

cfg.avro.useHamba, err = conf.FieldBool("avro", "preserve_logical_types")
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)
if conf.Contains("avro", "raw_unions") {
cfg.avro.rawUnions, err = conf.FieldBool("avro", "raw_unions")
if err != nil {
return nil, err
}
}
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, cfg, mgr)
}

func newSchemaRegistryDecoder(
urlStr string,
reqSigner func(f fs.FS, req *http.Request) error,
tlsConf *tls.Config,
avroRawJSON bool,
cfg decodingConfig,
mgr *service.Resources,
) (*schemaRegistryDecoder, error) {
s := &schemaRegistryDecoder{
avroRawJSON: avroRawJSON,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
cfg: cfg,
schemas: map[int]*cachedSchemaDecoder{},
shutSig: shutdown.NewSignaller(),
logger: mgr.Logger(),
mgr: mgr,
}
var err error
if s.client, err = sr.NewClient(urlStr, reqSigner, tlsConf, mgr); err != nil {
Expand Down Expand Up @@ -265,7 +302,11 @@ func (s *schemaRegistryDecoder) getDecoder(id int) (schemaDecoder, error) {
case franz_sr.TypeJSON:
decoder, err = s.getJSONDecoder(ctx, resPayload)
default:
decoder, err = s.getAvroDecoder(ctx, resPayload)
if s.cfg.avro.useHamba {
decoder, err = s.getHambaAvroDecoder(ctx, resPayload)
} else {
decoder, err = s.getGoAvroDecoder(ctx, resPayload)
}
}
if err != nil {
return nil, err
Expand Down
Loading
Loading