Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3386659

Browse files
committedFeb 17, 2025··
Avoid double deserialization with Kafka
Fixes #1246
1 parent 667b668 commit 3386659

File tree

3 files changed

+52
-0
lines changed

3 files changed

+52
-0
lines changed
 

‎src/Motor.Extensions.Hosting.Kafka/KafkaClientExtensions.cs

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
using System;
2+
using System.Text;
3+
using System.Text.Json;
4+
using System.Text.Json.Nodes;
25
using CloudNative.CloudEvents;
36
using CloudNative.CloudEvents.Kafka;
47
using Confluent.Kafka;
@@ -25,6 +28,11 @@ public static MotorCloudEvent<byte[]> ToMotorCloudEvent(this Message<string?, by
2528
{
2629
throw new ArgumentException("Source property of CloudEvent is null");
2730
}
31+
32+
if (cloudEvent.Data is JsonElement element)
33+
{
34+
cloudEvent.Data = Encoding.UTF8.GetBytes(element.GetRawText());
35+
}
2836
var motorCloudEvent = new MotorCloudEvent<byte[]>(applicationNameService, (byte[])cloudEvent.Data,
2937
cloudEvent.Type, cloudEvent.Source, cloudEvent.Id, cloudEvent.Time, cloudEvent.DataContentType);
3038
foreach (var (key, value) in cloudEvent.GetPopulatedAttributes())

‎src/Motor.Extensions.Hosting.Kafka/Motor.Extensions.Hosting.Kafka.csproj

+4
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@
2222

2323
<Import Project="$(MSBuildThisFileDirectory)../../shared.csproj" />
2424

25+
<ItemGroup>
26+
<InternalsVisibleTo Include="Motor.Extensions.Hosting.Kafka_UnitTest" />
27+
</ItemGroup>
28+
2529
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System;
2+
using System.Globalization;
3+
using System.Text;
4+
using CloudNative.CloudEvents.SystemTextJson;
5+
using Confluent.Kafka;
6+
using Moq;
7+
using Motor.Extensions.Hosting.CloudEvents;
8+
using Motor.Extensions.Hosting.Kafka;
9+
using Xunit;
10+
11+
namespace Motor.Extensions.Hosting.Kafka_UnitTest;
12+
13+
public class KafkaClientExtensionsRegressionTests
14+
{
15+
[Fact]
16+
public void KafkaClientExtensionsRegression_ToMotorCloudEvent_DataIsByteArrayt ()
17+
{
18+
var applicationNameServiceMock = Mock.Of<IApplicationNameService>();
19+
var actualContent = "{\"key\":\"value\"}";
20+
var message = new Message<string, byte[]>
21+
{
22+
Headers = new Headers
23+
{
24+
{ "content-type", "application/cloudevents+json"u8.ToArray() }
25+
},
26+
Value = Encoding.UTF8.GetBytes($@"
27+
{{
28+
""specversion"": ""1.0"",
29+
""type"": ""imaginary.type"",
30+
""source"": ""/imaginary-source"",
31+
""id"": ""{Guid.NewGuid()}"",
32+
""time"": ""{DateTimeOffset.UnixEpoch.ToString("o", CultureInfo.InvariantCulture)}"",
33+
""datacontenttype"": ""application/json"",
34+
""data"": {actualContent}
35+
}}")
36+
};
37+
var cloudEvent = message.ToMotorCloudEvent(applicationNameServiceMock, new JsonEventFormatter());
38+
Assert.Equal(Encoding.UTF8.GetBytes(actualContent), cloudEvent.Data);
39+
}
40+
}

0 commit comments

Comments
 (0)
Please sign in to comment.