@@ -18,12 +18,18 @@ import (
18
18
"github.com/stretchr/testify/require"
19
19
"github.com/zillow/zfmt"
20
20
"github.com/zillow/zkafka/v2"
21
+ "go.opentelemetry.io/otel/propagation"
22
+ "go.opentelemetry.io/otel/trace/noop"
21
23
"golang.org/x/sync/errgroup"
22
24
)
23
25
24
26
// TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart will test that a kafka consumer can properly read messages
25
27
// written by the kafka producer. It will additionally, confirm that when a group is restarted that it starts off where
26
- // it left off (addressing an off by 1 bug seen with an earlier version)
28
+ // it left off (addressing an off by 1 bug seen with an earlier version).
29
+ // Finally it'll confirm that otel propagation can be used to manipulate written contents of kafka.Message
30
+ // And that options exist to create a zkafka.Writer which disables that propagation. This is done by including a propagator
31
+ // which always adds the header "kobe"="bryant". Writer1 uses this propagator, writer2 disables it. Assertions are made
32
+ // on the presence of the header.
27
33
//
28
34
// The following steps are followed
29
35
// 1. Create a new consumer group that is reading from the topic
@@ -43,14 +49,34 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T)
43
49
44
50
groupID := uuid .NewString ()
45
51
46
- client := zkafka .NewClient (zkafka.Config {BootstrapServers : []string {bootstrapServer }}, zkafka .LoggerOption (stdLogger {}))
52
+ client := zkafka .NewClient (
53
+ zkafka.Config {BootstrapServers : []string {bootstrapServer }},
54
+ zkafka .LoggerOption (stdLogger {}),
55
+ zkafka .WithClientTextMapPropagator (fakeTextMapPropagator {
56
+ inject : func (ctx context.Context , carrier propagation.TextMapCarrier ) {
57
+ carrier .Set ("kobe" , "bryant" )
58
+ },
59
+ }),
60
+ zkafka .WithClientTracerProviderOption (noop .NewTracerProvider ()),
61
+ )
47
62
defer func () { require .NoError (t , client .Close ()) }()
48
63
49
- writer , err := client .Writer (ctx , zkafka.ProducerTopicConfig {
50
- ClientID : fmt .Sprintf ("writer-%s-%s" , t .Name (), uuid .NewString ()),
64
+ writer1 , err := client .Writer (ctx , zkafka.ProducerTopicConfig {
65
+ ClientID : fmt .Sprintf ("writer1-%s-%s" , t .Name (), uuid .NewString ()),
66
+ Topic : topic ,
67
+ Formatter : zfmt .JSONFmt ,
68
+ })
69
+ require .NoError (t , err )
70
+
71
+ writer2 , err := client .Writer (ctx , zkafka.ProducerTopicConfig {
72
+ ClientID : fmt .Sprintf ("writer2-%s-%s" , t .Name (), uuid .NewString ()),
51
73
Topic : topic ,
52
74
Formatter : zfmt .JSONFmt ,
75
+ }, func (settings * zkafka.WriterSettings ) {
76
+ settings .DisableTracePropagation = true
53
77
})
78
+ require .NoError (t , err )
79
+
54
80
consumerTopicConfig := zkafka.ConsumerTopicConfig {
55
81
ClientID : fmt .Sprintf ("reader-%s-%s" , t .Name (), uuid .NewString ()),
56
82
Topic : topic ,
@@ -111,11 +137,11 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T)
111
137
}, readResponses )
112
138
113
139
// write msg1, and msg2
114
- resWrite1 , err := writer .Write (ctx , msg1 )
140
+ resWrite1 , err := writer1 .Write (ctx , msg1 )
115
141
require .NoError (t , err )
116
142
117
143
msg2 := Msg {Val : "2" }
118
- resWrite2 , err := writer .Write (ctx , msg2 )
144
+ resWrite2 , err := writer2 .Write (ctx , msg2 )
119
145
require .NoError (t , err )
120
146
121
147
// reader will send on channel the messages it has read (should just be msg1)
@@ -125,6 +151,7 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T)
125
151
require .NoError (t , resp .err )
126
152
require .NotNil (t , rmsg1 , "expected written message to be read" )
127
153
require .Equal (t , int (rmsg1 .Offset ), int (resWrite1 .Offset ), "expected read offset to match written" )
154
+ require .Equal (t , "bryant" , string (rmsg1 .Headers ["kobe" ]))
128
155
129
156
gotMsg1 := Msg {}
130
157
err = resp .msg .Decode (& gotMsg1 )
@@ -151,6 +178,7 @@ func TestKafkaClientsCanReadOwnWritesAndBehaveProperlyAfterRestart(t *testing.T)
151
178
152
179
// assert offset is for second message written (no replay of old message)
153
180
require .Equal (t , int (rmsg2 .Offset ), int (resWrite2 .Offset ), "expected read offset to match written" )
181
+ require .Empty (t , rmsg2 .Headers ["kobe" ])
154
182
155
183
gotMsg2 := Msg {}
156
184
err = rmsg2 .Decode (& gotMsg2 )
@@ -1285,7 +1313,7 @@ func Test_DeadletterClientDoesntCollideWithProducer(t *testing.T) {
1285
1313
if msgCount .Load ()% 2 == 0 {
1286
1314
return errors .New ("random error occurred" )
1287
1315
}
1288
- _ , err := processorWriter .WriteRaw (ctx , nil , msg .Value ())
1316
+ _ , err := processorWriter .WriteRaw (ctx , nil , msg .Value (), nil )
1289
1317
1290
1318
return err
1291
1319
})
@@ -1458,3 +1486,23 @@ type partition struct {
1458
1486
offset int64
1459
1487
topic string
1460
1488
}
1489
+
1490
+ type fakeTextMapPropagator struct {
1491
+ inject func (ctx context.Context , carrier propagation.TextMapCarrier )
1492
+ }
1493
+
1494
+ func (f fakeTextMapPropagator ) Inject (ctx context.Context , carrier propagation.TextMapCarrier ) {
1495
+ if f .inject != nil {
1496
+ f .inject (ctx , carrier )
1497
+ }
1498
+ }
1499
+
1500
+ func (f fakeTextMapPropagator ) Extract (ctx context.Context , carrier propagation.TextMapCarrier ) context.Context {
1501
+ return ctx
1502
+ }
1503
+
1504
+ func (f fakeTextMapPropagator ) Fields () []string {
1505
+ return nil
1506
+ }
1507
+
1508
+ var _ propagation.TextMapPropagator = (* fakeTextMapPropagator )(nil )
0 commit comments