diff --git a/consumer/option.go b/consumer/option.go index 24acf7c3..2e08163d 100644 --- a/consumer/option.go +++ b/consumer/option.go @@ -382,6 +382,15 @@ func WithLimiter(limiter Limiter) Option { } } +// WithRemotingTimeout set remote client timeout options +func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option { + return func(opts *consumerOptions) { + opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout + opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout + opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout + } +} + func WithTls(useTls bool) Option { return func(opts *consumerOptions) { opts.ClientOptions.RemotingClientConfig.UseTls = useTls diff --git a/consumer/option_test.go b/consumer/option_test.go index ab99b632..4db5b93b 100644 --- a/consumer/option_test.go +++ b/consumer/option_test.go @@ -3,6 +3,7 @@ package consumer import ( "reflect" "testing" + "time" ) func getFieldString(obj interface{}, field string) string { @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string { }).String() } +func TestWithRemotingTimeout(t *testing.T) { + opt := defaultPushConsumerOptions() + WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt) + if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second { + t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout) + } + if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second { + t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout) + } + if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second { + t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout) + } +} + func TestWithUnitName(t *testing.T) { opt := defaultPushConsumerOptions() unitName := "unsh" diff --git a/producer/option.go b/producer/option.go index 6e43cc25..72af3c6a 100644 --- a/producer/option.go +++ b/producer/option.go @@ -179,6 +179,15 @@ func WithCompressLevel(level int) Option { } } +// WithRemotingTimeout set remote client timeout options +func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option { + return func(opts *producerOptions) { + opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout + opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout + opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout + } +} + func WithTls(useTls bool) Option { return func(opts *producerOptions) { opts.ClientOptions.RemotingClientConfig.UseTls = useTls diff --git a/producer/option_test.go b/producer/option_test.go index 723da031..9b6ee133 100644 --- a/producer/option_test.go +++ b/producer/option_test.go @@ -3,6 +3,7 @@ package producer import ( "reflect" "testing" + "time" ) func getFieldString(obj interface{}, field string) string { @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string { }).String() } +func TestWithRemotingTimeout(t *testing.T) { + opt := defaultProducerOptions() + WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt) + if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second { + t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout) + } + if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second { + t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout) + } + if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second { + t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout) + } +} + func TestWithUnitName(t *testing.T) { opt := defaultProducerOptions() unitName := "unsh" diff --git a/producer/producer.go b/producer/producer.go index 13650a4c..70e8d013 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -26,14 +26,15 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" + "github.com/pkg/errors" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/internal/utils" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/rlog" - "github.com/google/uuid" - "github.com/pkg/errors" ) type defaultProducer struct {