diff --git a/kafka/error.go b/kafka/error.go index 06c94bb4e..a75877293 100644 --- a/kafka/error.go +++ b/kafka/error.go @@ -130,7 +130,7 @@ func (e Error) IsRetriable() bool { // IsTimeout returns true if the error is a timeout error. // A timeout error indicates that the operation timed out locally. func (e Error) IsTimeout() bool { - return e.code == ErrTimedOut || e.code == ErrTimedOutQueue + return e.code == ErrTimedOut || e.code == ErrTimedOutQueue || e.code == ErrMsgTimedOut } // TxnRequiresAbort returns true if the error is an abortable transaction error diff --git a/kafka/error_test.go b/kafka/error_test.go index a2756bf56..50a8ee32b 100644 --- a/kafka/error_test.go +++ b/kafka/error_test.go @@ -37,6 +37,15 @@ func TestFatalError(t *testing.T) { t.Logf("%v", normalErr) } +// TestIsTimeoutError tests timeout errors +func TestIsTimeoutError(t *testing.T) { + err := newErrorFromString(ErrMsgTimedOut, "Testing timeout error") + if !err.IsTimeout() { + t.Errorf("Expected IsTimeout() to return true for %v", err) + } + t.Logf("%v", err) +} + // TestFatalErrorClient tests fatal errors using a client instance func TestFatalErrorClient(t *testing.T) { p, err := NewProducer(&ConfigMap{})