Skip to content

Commit 123f4a7

Browse files
authored
Merge pull request #3142 from rockwotj/schema_id
2 parents b20ec06 + 04890af commit 123f4a7

File tree

4 files changed

+57
-20
lines changed

4 files changed

+57
-20
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
1111
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)
1212
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)
1313
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj)
14+
- Processor `schema_registry_decode` now adds metadata `schema_id` for the schema's ID in the schema registry. (@rockwotj)
1415

1516
### Changed
1617

docs/modules/components/pages/processors/schema_registry_decode.adoc

+6
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ However, it is possible to instead create documents in https://pkg.go.dev/github
105105
106106
This processor decodes protobuf messages to JSON documents, you can read more about JSON mapping of protobuf messages here: https://developers.google.com/protocol-buffers/docs/proto3#json
107107
108+
== Metadata
109+
110+
This processor also adds the following metadata to each outgoing message:
111+
112+
schema_id: the ID of the schema in the schema registry that was associated with the message.
113+
108114
109115
== Fields
110116

internal/impl/confluent/processor_schema_registry_decode.go

+7
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ However, it is possible to instead create documents in https://pkg.go.dev/github
6060
== Protobuf format
6161
6262
This processor decodes protobuf messages to JSON documents, you can read more about JSON mapping of protobuf messages here: https://developers.google.com/protocol-buffers/docs/proto3#json
63+
64+
== Metadata
65+
66+
This processor also adds the following metadata to each outgoing message:
67+
68+
schema_id: the ID of the schema in the schema registry that was associated with the message.
6369
`).
6470
Field(service.NewBoolField("avro_raw_json").
6571
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
@@ -209,6 +215,7 @@ func (s *schemaRegistryDecoder) Process(ctx context.Context, msg *service.Messag
209215
if err := decoder(msg); err != nil {
210216
return nil, err
211217
}
218+
msg.MetaSetMut("schema_id", id)
212219

213220
return service.MessageBatch{msg}, nil
214221
}

internal/impl/confluent/processor_schema_registry_decode_test.go

+43-20
Original file line numberDiff line numberDiff line change
@@ -263,28 +263,33 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
263263
})
264264

265265
tests := []struct {
266+
schemaID int
266267
name string
267268
input string
268269
output string
269270
hambaOutput string
270271
errContains string
271272
}{
272273
{
273-
name: "successful message",
274-
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x02\x0edancing",
275-
output: `{"Address":{"my.namespace.com.address":{"City":{"string":"foo"},"State":"bar"}},"MaybeHobby":{"string":"dancing"},"Name":"foo"}`,
274+
schemaID: 3,
275+
name: "successful message",
276+
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x02\x0edancing",
277+
output: `{"Address":{"my.namespace.com.address":{"City":{"string":"foo"},"State":"bar"}},"MaybeHobby":{"string":"dancing"},"Name":"foo"}`,
276278
},
277279
{
278-
name: "successful message with null hobby",
279-
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x00",
280-
output: `{"Address":{"my.namespace.com.address":{"City":{"string":"foo"},"State":"bar"}},"MaybeHobby":null,"Name":"foo"}`,
280+
schemaID: 3,
281+
name: "successful message with null hobby",
282+
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x00",
283+
output: `{"Address":{"my.namespace.com.address":{"City":{"string":"foo"},"State":"bar"}},"MaybeHobby":null,"Name":"foo"}`,
281284
},
282285
{
283-
name: "successful message no address and null hobby",
284-
input: "\x00\x00\x00\x00\x03\x06foo\x00\x00",
285-
output: `{"Name":"foo","MaybeHobby":null,"Address": null}`,
286+
schemaID: 3,
287+
name: "successful message no address and null hobby",
288+
input: "\x00\x00\x00\x00\x03\x06foo\x00\x00",
289+
output: `{"Name":"foo","MaybeHobby":null,"Address": null}`,
286290
},
287291
{
292+
schemaID: 4,
288293
name: "successful message with logical types",
289294
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
290295
output: `{"int_time_millis":{"int.time-millis":35245000},"long_time_micros":{"long.time-micros":20192000000000},"long_timestamp_micros":{"long.timestamp-micros":62135596800000000},"pos_0_33333333":{"bytes.decimal":"!"}}`,
@@ -341,6 +346,10 @@ func TestSchemaRegistryDecodeAvro(t *testing.T) {
341346
diff, explanation := jsondiff.Compare(b, []byte(output), &jdopts)
342347
assert.JSONEq(t, output, string(b))
343348
assert.Equalf(t, jsondiff.FullMatch.String(), diff.String(), "%s: %s", test.name, explanation)
349+
350+
v, ok := outMsgs[0].MetaGetMut("schema_id")
351+
assert.True(t, ok)
352+
assert.Equal(t, test.schemaID, v)
344353
}
345354
}
346355
t.Run("hamba/"+test.name, func(t *testing.T) { fn(t, true) })
@@ -387,28 +396,33 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
387396
})
388397

389398
tests := []struct {
399+
schemaID int
390400
name string
391401
input string
392402
output string
393403
hambaOutput string
394404
errContains string
395405
}{
396406
{
397-
name: "successful message",
398-
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x02\x0edancing",
399-
output: `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}`,
407+
schemaID: 3,
408+
name: "successful message",
409+
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x02\x0edancing",
410+
output: `{"Address":{"City":"foo","State":"bar"},"Name":"foo","MaybeHobby":"dancing"}`,
400411
},
401412
{
402-
name: "successful message with null hobby",
403-
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x00",
404-
output: `{"Address":{"City":"foo","State":"bar"},"MaybeHobby":null,"Name":"foo"}`,
413+
schemaID: 3,
414+
name: "successful message with null hobby",
415+
input: "\x00\x00\x00\x00\x03\x06foo\x02\x02\x06foo\x06bar\x00",
416+
output: `{"Address":{"City":"foo","State":"bar"},"MaybeHobby":null,"Name":"foo"}`,
405417
},
406418
{
407-
name: "successful message no address and null hobby",
408-
input: "\x00\x00\x00\x00\x03\x06foo\x00\x00",
409-
output: `{"Name":"foo","MaybeHobby":null,"Address": null}`,
419+
schemaID: 3,
420+
name: "successful message no address and null hobby",
421+
input: "\x00\x00\x00\x00\x03\x06foo\x00\x00",
422+
output: `{"Name":"foo","MaybeHobby":null,"Address": null}`,
410423
},
411424
{
425+
schemaID: 4,
412426
name: "successful message with logical types",
413427
input: "\x00\x00\x00\x00\x04\x02\x90\xaf\xce!\x02\x80\x80\x97\t\x02\x80\x80\xde\xf2\xdf\xff\xdf\xdc\x01\x02\x02!",
414428
output: `{"int_time_millis":35245000,"long_time_micros":20192000000000,"long_timestamp_micros":62135596800000000,"pos_0_33333333":"!"}`,
@@ -464,8 +478,11 @@ func TestSchemaRegistryDecodeAvroRawJson(t *testing.T) {
464478
jdopts := jsondiff.DefaultJSONOptions()
465479
diff, explanation := jsondiff.Compare(b, []byte(output), &jdopts)
466480
assert.Equalf(t, jsondiff.FullMatch.String(), diff.String(), "%s: %s", test.name, explanation)
467-
}
468481

482+
v, ok := outMsgs[0].MetaGetMut("schema_id")
483+
assert.True(t, ok)
484+
assert.Equal(t, test.schemaID, v)
485+
}
469486
}
470487
t.Run("hamba/"+test.name, func(t *testing.T) { fn(t, true) })
471488
t.Run("goavro/"+test.name, func(t *testing.T) { fn(t, false) })
@@ -567,8 +584,11 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
567584

568585
b, err := outMsgs[0].AsBytes()
569586
require.NoError(t, err)
570-
571587
assert.JSONEq(t, test.output, string(b), "%s: %s", test.name)
588+
589+
v, ok := outMsgs[0].MetaGetMut("schema_id")
590+
assert.True(t, ok)
591+
assert.Equal(t, 1, v)
572592
}
573593
})
574594
}
@@ -639,6 +659,9 @@ func TestSchemaRegistryDecodeJson(t *testing.T) {
639659
jdopts := jsondiff.DefaultJSONOptions()
640660
diff, explanation := jsondiff.Compare(b, []byte(test.output), &jdopts)
641661
assert.Equalf(t, jsondiff.FullMatch.String(), diff.String(), "%s: %s", test.name, explanation)
662+
v, ok := outMsgs[0].MetaGetMut("schema_id")
663+
assert.True(t, ok)
664+
assert.Equal(t, 3, v)
642665
}
643666
})
644667
}

0 commit comments

Comments
 (0)