Skip to content

Commit

Permalink
Merge pull request #17 from sevaorlov/fix-ctx-after-retry
Browse files Browse the repository at this point in the history
Set context for copied message on retry
  • Loading branch information
m110 authored Aug 28, 2024
2 parents 07a4f15 + b03a4f4 commit 07c306d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
69 changes: 69 additions & 0 deletions pkg/kafka/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,72 @@ func TestCtxValues(t *testing.T) {

require.NoError(t, pub.Close())
}

func readAfterRetries(messagesCh <-chan *message.Message, retriesN int, timeout time.Duration) (receivedMessage *message.Message, ok bool) {
retries := 0

MessagesLoop:
for retries <= retriesN {
select {
case msg, ok := <-messagesCh:
if !ok {
break MessagesLoop
}

if retries > 0 {
msg.Ack()
return msg, true
}

msg.Nack()
retries++
case <-time.After(timeout):
break MessagesLoop
}
}

return nil, false
}

func TestCtxValuesAfterRetry(t *testing.T) {
pub, sub := newPubSub(t, kafka.DefaultMarshaler{}, "")
topicName := "topic_" + watermill.NewUUID()

var messagesToPublish []*message.Message

id := watermill.NewUUID()
messagesToPublish = append(messagesToPublish, message.NewMessage(id, nil))

err := pub.Publish(topicName, messagesToPublish...)
require.NoError(t, err, "cannot publish message")

messages, err := sub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

receivedMessage, ok := readAfterRetries(messages, 1, time.Second)

expectedPartitionsOffsets := map[int32]int64{}
partition, ok := kafka.MessagePartitionFromCtx(receivedMessage.Context())
assert.True(t, ok)

messagePartitionOffset, ok := kafka.MessagePartitionOffsetFromCtx(receivedMessage.Context())
assert.True(t, ok)

kafkaMsgTimestamp, ok := kafka.MessageTimestampFromCtx(receivedMessage.Context())
assert.True(t, ok)
assert.NotZero(t, kafkaMsgTimestamp)

if expectedPartitionsOffsets[partition] <= messagePartitionOffset {
// kafka partition offset is offset of the last message + 1
expectedPartitionsOffsets[partition] = messagePartitionOffset + 1
}
assert.NotEmpty(t, expectedPartitionsOffsets)

offsets, err := sub.PartitionOffset(topicName)
require.NoError(t, err)
assert.NotEmpty(t, offsets)

assert.EqualValues(t, expectedPartitionsOffsets, offsets)

require.NoError(t, pub.Close())
}
2 changes: 2 additions & 0 deletions pkg/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ ResendLoop:

// reset acks, etc.
msg = msg.Copy()
msg.SetContext(ctx)

if h.nackResendSleep != NoSleep {
time.Sleep(h.nackResendSleep)
}
Expand Down

0 comments on commit 07c306d

Please sign in to comment.