Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Jan 23, 2025
1 parent cf8e023 commit baafe33
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file.
- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primative representation. (@rockwotj)
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj)

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre
For example, the union schema `["null","string","Foo"]`, where `Foo` is a record name, would encode:
- `null` as `null`;
- the string `"a"` as `\{"string": "a"}`; and
- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
- the string `"a"` as `{"string": "a"}`; and
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field <<avro_raw_json, `avro_raw_json`>> to `true`.
Expand All @@ -110,20 +110,20 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
=== `avro`
Sorry! This field is missing documentation.
Configuration for how to decode schemas that are of type AVRO.
*Type*: `object`
=== `avro.raw_unions`
Wheather avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].
Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].
For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, would with raw_unions as false (the default) you get:
For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, with raw_unions as false (the default) you get:
- `null` as `null`;
- the string `"a"` as `\{"string": "a"}`; and
- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
- the string `"a"` as `{"string": "a"}`; and
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
When raw_unions is set to true then the above union schema is decoded as the following:
- `null` as `null`;
Expand All @@ -136,7 +136,7 @@ When raw_unions is set to true then the above union schema is decoded as the fol
=== `avro.preserve_logical_types`
Wheather logical types should be preserved or transformed back into their primative type. For example, decimals are decoded as raw bytes and timestamps are decoded as plain integers.
Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.
*Type*: `bool`
Expand Down
21 changes: 9 additions & 12 deletions internal/impl/confluent/processor_schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre
For example, the union schema ` + "`[\"null\",\"string\",\"Foo\"]`, where `Foo`" + ` is a record name, would encode:
- ` + "`null` as `null`" + `;
- the string ` + "`\"a\"` as `\\{\"string\": \"a\"}`" + `; and
- a ` + "`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
- the string ` + "`\"a\"` as `{\"string\": \"a\"}`" + `; and
- a ` + "`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field ` + "<<avro_raw_json, `avro_raw_json`>> to `true`" + `.
Expand All @@ -67,25 +67,22 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
Fields(
service.NewObjectField(
"avro",
service.NewBoolField("raw_unions").Description(`Wheather avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].
service.NewBoolField("raw_unions").Description(`Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].
For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, would with raw_unions as false (the default) you get:
For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, with raw_unions as false (the default) you get:
- `+"`null` as `null`"+`;
- the string `+"`\"a\"` as `\\{\"string\": \"a\"}`"+`; and
- a `+"`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.
- the string `+"`\"a\"` as `{\"string\": \"a\"}`"+`; and
- a `+"`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.
When raw_unions is set to true then the above union schema is decoded as the following:
- `+"`null` as `null`"+`;
- the string `+"`\"a\"` as `\"a\"`"+`; and
- a `+"`Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.
`).Optional(),
service.NewBoolField("preserve_logical_types").Description(`Wheather logical types should be preserved or transformed back into their primative type. For example, decimals are decoded as raw bytes and timestamps are decoded as plain integers.`).Default(false),
),
service.NewBoolField("preserve_logical_types").Description(`Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.`).Default(false),
).Description("Configuration for how to decode schemas that are of type AVRO."),
).
Field(service.NewURLField("url").Description("The base URL of the schema registry service.")).
LintRule(`root = match {
this.exists("avro.raw_unions") && this.exists("avro_raw_json") => ["fields avro.raw_unions and avro_raw_json cannot be set simultaneously"],
}`)
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))

for _, f := range service.NewHTTPRequestAuthSignerFields() {
spec = spec.Field(f.Version("4.7.0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
name: "successful message with logical types",
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":"!"}}`,
hambaOutput: `{"int_time_millis":{"int.time-millis":"9h47m25s"},"long_time_micros":{"long.time-micros":"5608h53m20s"},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":"0.33"}}`,
hambaOutput: `{"int_time_millis":{"int.time-millis":"9h47m25s"},"long_time_micros":{"long.time-micros":"5608h53m20s"},"long_timestamp_micros":{"long.timestamp-micros":"3939-01-01T00:00:00Z"},"pos_0_33333333":{"bytes.decimal":0.33}}`,
},
{
name: "non-empty magic byte",
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
name: "successful message with logical types",
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":"!"}`,
hambaOutput: `{"int_time_millis":"9h47m25s","long_time_micros":"5608h53m20s","long_timestamp_micros":"3939-01-01T00:00:00Z","pos_0_33333333":"0.33"}`,
hambaOutput: `{"int_time_millis":"9h47m25s","long_time_micros":"5608h53m20s","long_timestamp_micros":"3939-01-01T00:00:00Z","pos_0_33333333":0.33}`,
},
{
name: "non-empty magic byte",
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/confluent/serde_hamba_avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,13 @@ func TestHambaDecodeAvroUnions(t *testing.T) {
name: "all types nested union",
input: "AZ340UnQtM3BiI/ihdEBfH9KP+XDphvske0/HlUnSXt2V81gmbTQunPejR5yaHhxZHRsd2VscGRqdHgatLHK9QuZ0cXsD+eJsMYNv8a+2Qzc986nBuz76MAG97W//AmGzbjxDuGnvP4NptCcvQvqveF2n/uQ1gbf9eJMABQCcBZrY2hreHN6d29sawJhGm9hcnhscGZteG5rcWUCch5odmpycWR4cGliZGhzaG0CcxpiZ2lzcmR6eWFtcnlpAnQeanN2Y252bWpsbWJzaGlrAnYeZHlrdml1b2l3Z2N1c2RhAmgUcnl2aW96aWxqaQJ4GnZkaG5icnRkbXRxbWQCaRJkY29lYm1lY3MCbB5maHl6eWV1YnBiaHh5cmoABMHn5qsNBrOnYNqXtmbEr69wkjaZ1ALbv/9dGGJsbmxnbnJvYmx1Y4AqstKWqoEy4qfwyModq8jmuAKgqpm+8/zjpswBCv76A5LP83wuR7QwOQAUcmNwdmp5eG5ueA==",
unnestUnions: false,
output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":"-43953964.01","decimalFixedField":"-90179493988032.6912","doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":{"int":-1790761441},"uuidField":"rcpvjyxnnx"}`,
output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":-43953964.01,"decimalFixedField":-90179493988032.6912,"doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":{"int":-1790761441},"uuidField":"rcpvjyxnnx"}`,
},
{
name: "all types raw union",
input: "AZ340UnQtM3BiI/ihdEBfH9KP+XDphvske0/HlUnSXt2V81gmbTQunPejR5yaHhxZHRsd2VscGRqdHgatLHK9QuZ0cXsD+eJsMYNv8a+2Qzc986nBuz76MAG97W//AmGzbjxDuGnvP4NptCcvQvqveF2n/uQ1gbf9eJMABQCcBZrY2hreHN6d29sawJhGm9hcnhscGZteG5rcWUCch5odmpycWR4cGliZGhzaG0CcxpiZ2lzcmR6eWFtcnlpAnQeanN2Y252bWpsbWJzaGlrAnYeZHlrdml1b2l3Z2N1c2RhAmgUcnl2aW96aWxqaQJ4GnZkaG5icnRkbXRxbWQCaRJkY29lYm1lY3MCbB5maHl6eWV1YnBiaHh5cmoABMHn5qsNBrOnYNqXtmbEr69wkjaZ1ALbv/9dGGJsbmxnbnJvYmx1Y4AqstKWqoEy4qfwyModq8jmuAKgqpm+8/zjpswBCv76A5LP83wuR7QwOQAUcmNwdmp5eG5ueA==",
unnestUnions: true,
output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":"-43953964.01","decimalFixedField":"-90179493988032.6912","doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":-1790761441,"uuidField":"rcpvjyxnnx"}`,
output: `{"arrayField":[1599687770,-2127082573,-1818624628,-1704448416,846847470,873275126,-1338502524,1998000963,-1877445105,1540592659,124530549,-895622864,-80502128],"booleanField":true,"bytesField":"VSdJe3ZXzWCZtNC6c96N","dateField":"1977-05-12T00:00:00Z","decimalBytesField":-43953964.01,"decimalFixedField":-90179493988032.6912,"doubleField":0.9240627803866316,"enumField":"B","fixedField":[6,179,167,96,218,151,182,102,196,175,175,112,146,54,153,212],"floatField":0.79100776,"intField":-77217295,"longField":7531641714966637864,"mapField":{"a":"oarxlpfmxnkqe","h":"ryviozilji","i":"dcoebmecs","l":"fhyzyeubpbhxyrj","p":"kchkxszwolk","r":"hvjrqdxpibdhshm","s":"bgisrdzyamryi","t":"jsvcnvmjlmbshik","v":"dykviuoiwgcusda","x":"vdhnbrtdmtqmd"},"recordField":{"nestedIntField":-98562030,"nestedStringField":"blnlgnrobluc"},"stringField":"rhxqdtlwelpdjtx","timeMicrosField":"149890h11m19.879705216s","timeMillisField":"-91h6m35.926s","timestampMicrosField":"1970-01-06T21:10:24.735729Z","timestampMillisField":"1997-03-24T02:51:42.617Z","unionField":-1790761441,"uuidField":"rcpvjyxnnx"}`,
},
}

Expand Down

0 comments on commit baafe33

Please sign in to comment.