Skip to content

Commit cc8c073

Browse files
esskarbrmagadutra
authored andcommitted
feat: add support for csharp namespace in protobuf schemas
1 parent 2826d37 commit cc8c073

File tree

2 files changed

+64
-3
lines changed

2 files changed

+64
-3
lines changed

src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufTypeNameResolver.cs

+9-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@ public async Task<string> ResolveAsync(int id)
2020
var schemaString = (await _client.GetSchemaAsync(id, "serialized")).SchemaString;
2121

2222
var protoFields = FileDescriptorProto.Parser.ParseFrom(ByteString.FromBase64(schemaString));
23-
24-
return $"{protoFields.Package}.{protoFields.MessageType.FirstOrDefault()?.Name}";
23+
var messageType = protoFields.MessageType.FirstOrDefault()?.Name;
24+
var ns = protoFields.Options?.HasCsharpNamespace == true
25+
? protoFields.Options.CsharpNamespace
26+
: protoFields.Package;
27+
return BuildTypeName(messageType, ns);
2528
}
26-
}
29+
30+
private static string BuildTypeName(string messageType, string ns)
31+
=> string.IsNullOrEmpty(ns) ? messageType ?? string.Empty : $"{ns}.{messageType}";
32+
}

tests/KafkaFlow.UnitTests/Middlewares/Serialization/ConfluentProtobufTypeNameResolverTests.cs

+55
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Threading.Tasks;
34
using Confluent.SchemaRegistry;
45

@@ -44,4 +45,58 @@ public async Task ResolveAsync_ValidProtobufObject_ReturnsProtoFields()
4445
// Assert
4546
protoFields.Should().NotBeNull();
4647
}
48+
49+
[TestMethod]
50+
public async Task ResolveAsync_SchemaWithPackageOnly_ReturnsTypeNameWithPackageNamespace()
51+
{
52+
// Arrange
53+
// below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\npackage kafkaflow.test;\n\nmessage Person {\n string name = 1;\n}\n'
54+
var schemaString = "CgdkZWZhdWx0Eg5rYWZrYWZsb3cudGVzdCIUCgZQZXJzb24SCgoEbmFtZRgBKAliBnByb3RvMw==";
55+
var schemaId = 420;
56+
57+
_schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
58+
.ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List<SchemaReference>()));
59+
60+
// Act
61+
var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId);
62+
63+
// Assert
64+
actual.Should().Be("kafkaflow.test.Person");
65+
}
66+
67+
[TestMethod]
68+
public async Task ResolveAsync_SchemaWithCsharpNamespace_ReturnsTypeNameWithCsharpNamespace()
69+
{
70+
// Arrange
71+
// below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\npackage kafkaflow.test;\n\nmessage Person {\n string name = 1;\n}\n'
72+
var schemaString = "CgdkZWZhdWx0Eg5rYWZrYWZsb3cudGVzdCIUCgZQZXJzb24SCgoEbmFtZRgBKAlCEaoCDkthZmthRmxvdy5UZXN0YgZwcm90bzM=";
73+
var schemaId = 420;
74+
75+
_schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
76+
.ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List<SchemaReference>()));
77+
78+
// Act
79+
var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId);
80+
81+
// Assert
82+
Assert.AreEqual("KafkaFlow.Test.Person", actual);
83+
}
84+
85+
[TestMethod]
86+
public async Task ResolveAsync_SchemaWithoutPackageOrNamespace_ReturnsTypeNameWithoutNamespace()
87+
{
88+
// Arrange
89+
// below schema-string is base64 encoded protobuf schema of 'syntax = \"proto3\";\n\nmessage Person {\n string name = 1;\n}\n'
90+
var schemaString = "CgdkZWZhdWx0IhQKBlBlcnNvbhIKCgRuYW1lGAEoCWIGcHJvdG8z";
91+
var schemaId = 420;
92+
93+
_schemaRegistryClient.Setup(client => client.GetSchemaAsync(schemaId, "serialized"))
94+
.ReturnsAsync(new RegisteredSchema("test", 1, schemaId, schemaString, SchemaType.Protobuf, new List<SchemaReference>()));
95+
96+
// Act
97+
var actual = await _schemaRegistryTypeResolver.ResolveAsync(schemaId);
98+
99+
// Assert
100+
Assert.AreEqual("Person", actual);
101+
}
47102
}

0 commit comments

Comments
 (0)