You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
consumer:=client.ConsumerGroup("test_group_1")
deferconsumer.Close()
err:=consumer.ConsumeFunc(context.Background(), []string{topic}, func(ctx context.Context, message*sarama.ConsumerMessage) error {
// process message// if return not nil error, a message wil be commited and error wil be logger returnnil
})
Create consumer with retry
import (
retry "github.com/cenkalti/backoff/v4"
)
consumer:=client.ConsumerGroup("test_group_1")
deferconsumer.Close()
ctx:=context.Background()
err:=consumer.ConsumeFunc(ctx, []string{topic}, func(ctx context.Context, message*sarama.ConsumerMessage) error {
// process message// if return not nil error, a message wil be retrying// if retrying ended with error, a message wil be commited and error wil be loggerreturnnil
}, WithRetryBackOff(retry.WithContext(retry.NewExponentialBackOff(), ctx)))
Create consumer with error handler
consumer:=client.ConsumerGroup("test_group_1")
deferconsumer.Close()
ctx:=context.Background()
err:=consumer.ConsumeFunc(ctx, []string{topic}, func(ctx context.Context, message*sarama.ConsumerMessage) error {
// process message// if return not nil error, ErrHandlerFunc wil be called and error wil not be loggedreturnnil
}, WithErrHandler(func(ctx context.Context, message*sarama.ConsumerMessage, errerror) {
zap.NewNop().Error("error on consumer message", zap.Error(err))
// or save a message into storage
}))
Create consumer with retry and error handler
import (
retry "github.com/cenkalti/backoff/v4"
)
consumer:=client.ConsumerGroup("test_group_1")
deferconsumer.Close()
ctx:=context.Background()
err:=consumer.ConsumeFunc(ctx, []string{topic}, func(ctx context.Context, message*sarama.ConsumerMessage) error {
// process message// if return not nil error, a message wil be retrying// if retrying ended with error, ErrHandlerFunc wil be called and error wil not be loggedreturnnil
},
WithRetryBackOff(retry.WithContext(retry.NewExponentialBackOff(), ctx)),
WithErrHandler(func(ctx context.Context, message*sarama.ConsumerMessage, errerror) {
zap.NewNop().Error("error on consumer message", zap.Error(err))
// or save a message into storage
}),
)