diff --git a/pkg/logutil/log.go b/pkg/logutil/log.go index 1f7afe61d79..138db763b9c 100644 --- a/pkg/logutil/log.go +++ b/pkg/logutil/log.go @@ -223,14 +223,11 @@ func initMySQLLogger() error { // initSaramaLogger hacks logger used in sarama lib func initSaramaLogger(level zapcore.Level) error { - // only available less than info level - if !zapcore.InfoLevel.Enabled(level) { - logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) - if err != nil { - return errors.Trace(err) - } - sarama.Logger = logger + logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level) + if err != nil { + return errors.Trace(err) } + sarama.Logger = logger return nil } diff --git a/pkg/sink/kafka/sarama.go b/pkg/sink/kafka/sarama.go index df8877412b3..98030d158d2 100644 --- a/pkg/sink/kafka/sarama.go +++ b/pkg/sink/kafka/sarama.go @@ -58,8 +58,10 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { // For kafka cluster with a bad network condition, producer should not try to // waster too much time on sending a message, get response no matter success // or fail as soon as possible is preferred. - config.Producer.Retry.Max = 3 - config.Producer.Retry.Backoff = 100 * time.Millisecond + // According to the https://github.com/IBM/sarama/issues/2619, + // sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1, + // when the kafka cluster is unhealthy and trigger the internal retry mechanism. + config.Producer.Retry.Max = 0 // make sure sarama producer flush messages as soon as possible. config.Producer.Flush.Bytes = 0 @@ -67,6 +69,7 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) { config.Producer.Flush.Frequency = time.Duration(0) config.Producer.Flush.MaxMessages = o.MaxMessages + config.Net.MaxOpenRequests = 1 config.Net.DialTimeout = o.DialTimeout config.Net.WriteTimeout = o.WriteTimeout config.Net.ReadTimeout = o.ReadTimeout