Skip to content

Commit

Permalink
commit offset explicitly when AutoCommit is disabled and added Test C…
Browse files Browse the repository at this point in the history
…ode for Manual Commit (#25)

* commit offset explicitly then AutoCommit disabled

* rename method for readability

* Update subscriber.go

* Trigger CI

* speedup TestPublishSubscribe_AutoCommitDisabled and other tests

---------

Co-authored-by: tjnet <tanakajun374@gmail>
Co-authored-by: tjnet <tjnet>
Co-authored-by: Miłosz Smółka <[email protected]>
Co-authored-by: Robert Laszczak <[email protected]>
  • Loading branch information
4 people authored Jan 13, 2025
1 parent d852df8 commit 008c08b
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.23.0

require (
github.com/IBM/sarama v1.43.3
github.com/ThreeDotsLabs/watermill v1.3.7
github.com/ThreeDotsLabs/watermill v1.4.3
github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0
github.com/hashicorp/go-multierror v1.1.1
github.com/pkg/errors v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA=
github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ=
github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o=
github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.3 h1:cRT1v7jlAgoPyEknvz0IFp3EKdSBRD/0Qbtz6KhexG8=
github.com/ThreeDotsLabs/watermill v1.4.3/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
4 changes: 2 additions & 2 deletions pkg/kafka/pubsub_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestPublishSubscribe_stress(t *testing.T) {
Persistent: true,
},
createPubSub,
createPubSubWithConsumerGrup,
createPubSubWithConsumerGroup,
)
}

Expand All @@ -39,6 +39,6 @@ func TestPublishSubscribe_ordered_stress(t *testing.T) {
Persistent: true,
},
createPartitionedPubSub,
createPubSubWithConsumerGrup,
createPubSubWithConsumerGroup,
)
}
57 changes: 50 additions & 7 deletions pkg/kafka/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ func kafkaBrokers() []string {
return []string{"localhost:9091", "localhost:9092", "localhost:9093"}
}

func newPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup string) (*kafka.Publisher, *kafka.Subscriber) {
logger := watermill.NewStdLogger(true, true)
func newPubSub(
t *testing.T,
marshaler kafka.MarshalerUnmarshaler,
consumerGroup string,
saramaOpts ...func(*sarama.Config),
) (*kafka.Publisher, *kafka.Subscriber) {
logger := watermill.NewStdLogger(false, false)

var err error
var publisher *kafka.Publisher
Expand Down Expand Up @@ -58,6 +63,10 @@ func newPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup
saramaConfig.Consumer.Group.Heartbeat.Interval = time.Millisecond * 500
saramaConfig.Consumer.Group.Rebalance.Timeout = time.Second * 3

for _, o := range saramaOpts {
o(saramaConfig)
}

var subscriber *kafka.Subscriber

retriesLeft = 5
Expand All @@ -69,7 +78,7 @@ func newPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup
OverwriteSaramaConfig: saramaConfig,
ConsumerGroup: consumerGroup,
InitializeTopicDetails: &sarama.TopicDetail{
NumPartitions: 8,
NumPartitions: 50,
ReplicationFactor: 1,
},
},
Expand All @@ -93,12 +102,12 @@ func generatePartitionKey(topic string, msg *message.Message) (string, error) {
return msg.Metadata.Get("partition_key"), nil
}

func createPubSubWithConsumerGrup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) {
func createPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) {
return newPubSub(t, kafka.DefaultMarshaler{}, consumerGroup)
}

func createPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
return createPubSubWithConsumerGrup(t, "test")
return createPubSubWithConsumerGroup(t, "test")
}

func createPartitionedPubSub(t *testing.T) (message.Publisher, message.Subscriber) {
Expand All @@ -121,7 +130,7 @@ func TestPublishSubscribe(t *testing.T) {
t,
features,
createPubSub,
createPubSubWithConsumerGrup,
createPubSubWithConsumerGroup,
)
}

Expand All @@ -130,6 +139,8 @@ func TestPublishSubscribe_ordered(t *testing.T) {
t.Skip("skipping long tests")
}

t.Parallel()

tests.TestPubSub(
t,
tests.Features{
Expand All @@ -139,7 +150,7 @@ func TestPublishSubscribe_ordered(t *testing.T) {
Persistent: true,
},
createPartitionedPubSub,
createPubSubWithConsumerGrup,
createPubSubWithConsumerGroup,
)
}

Expand All @@ -148,6 +159,8 @@ func TestNoGroupSubscriber(t *testing.T) {
t.Skip("skipping long tests")
}

t.Parallel()

tests.TestPubSub(
t,
tests.Features{
Expand Down Expand Up @@ -212,6 +225,36 @@ func TestCtxValues(t *testing.T) {
require.NoError(t, pub.Close())
}

func TestPublishSubscribe_AutoCommitDisabled(t *testing.T) {
t.Parallel()

features := tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: false,
Persistent: true,
// Disabled AutoCommit slow down Pub/Sub because of making commits synchronously
ForceShort: true,
}

pubSubConstructorWithConsumerGroup := func(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) {
return newPubSub(t, kafka.DefaultMarshaler{}, consumerGroup, func(config *sarama.Config) {
// commit messages manually
config.Consumer.Offsets.AutoCommit.Enable = false
})
}
pubSubConstructor := func(t *testing.T) (message.Publisher, message.Subscriber) {
return pubSubConstructorWithConsumerGroup(t, "test")
}

tests.TestPubSub(
t,
features,
pubSubConstructor,
pubSubConstructorWithConsumerGroup,
)
}

func readAfterRetries(messagesCh <-chan *message.Message, retriesN int, timeout time.Duration) (receivedMessage *message.Message, ok bool) {
retries := 0

Expand Down
6 changes: 6 additions & 0 deletions pkg/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func (s *Subscriber) consumePartition(
func (s *Subscriber) createMessagesHandler(output chan *message.Message) messageHandler {
return messageHandler{
outputChannel: output,
saramaConfig: s.config.OverwriteSaramaConfig,
unmarshaler: s.config.Unmarshaler,
nackResendSleep: s.config.NackResendSleep,
logger: s.logger,
Expand Down Expand Up @@ -533,6 +534,7 @@ func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, cla
type messageHandler struct {
outputChannel chan<- *message.Message
unmarshaler Unmarshaler
saramaConfig *sarama.Config

nackResendSleep time.Duration

Expand Down Expand Up @@ -590,6 +592,10 @@ ResendLoop:
if sess != nil {
if sess.Context().Err() == nil {
sess.MarkMessage(kafkaMsg, "")
if !h.saramaConfig.Consumer.Offsets.AutoCommit.Enable {
// AutoCommit is disabled, so we should commit offset explicitly
sess.Commit()
}
} else {
logFields := receivedMsgLogFields.Add(
watermill.LogFields{
Expand Down

0 comments on commit 008c08b

Please sign in to comment.