Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix producer send msg timeout option does not take effect #1109

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ func WithLimiter(limiter Limiter) Option {
}
}

// WithRemotingTimeout set remote client timeout options
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
return func(opts *consumerOptions) {
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
}
}

func WithTls(useTls bool) Option {
return func(opts *consumerOptions) {
opts.ClientOptions.RemotingClientConfig.UseTls = useTls
Expand Down
15 changes: 15 additions & 0 deletions consumer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consumer
import (
"reflect"
"testing"
"time"
)

func getFieldString(obj interface{}, field string) string {
Expand All @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
}).String()
}

func TestWithRemotingTimeout(t *testing.T) {
opt := defaultPushConsumerOptions()
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultPushConsumerOptions()
unitName := "unsh"
Expand Down
26 changes: 15 additions & 11 deletions internal/remote/remote_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *R
c.responseTable.Store(resp.Opaque, resp)
defer c.responseTable.Delete(request.Opaque)

err = c.sendRequest(conn, request)
err = c.sendRequest(ctx, conn, request)
if err != nil {
return nil, err
}
Expand All @@ -120,7 +120,7 @@ func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *
resp := NewResponseFuture(ctx, request.Opaque, callback)
c.responseTable.Store(resp.Opaque, resp)

err = c.sendRequest(conn, request)
err = c.sendRequest(ctx, conn, request)
if err != nil {
c.responseTable.Delete(request.Opaque)
return err
Expand All @@ -146,11 +146,11 @@ func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request
if err != nil {
return err
}
return c.sendRequest(conn, request)
return c.sendRequest(ctx, conn, request)
}

func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrapper, error) {
//it needs additional locker.
// it needs additional locker.
c.connectionLocker.Lock()
defer c.connectionLocker.Unlock()
conn, ok := c.connectionTable.Load(addr)
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
if res != nil {
res.Opaque = cmd.Opaque
res.Flag |= 1 << 0
err := c.sendRequest(r, res)
err := c.sendRequest(context.Background(), r, res)
if err != nil {
rlog.Warning("send response to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
Expand Down Expand Up @@ -297,23 +297,27 @@ func (c *remotingClient) createScanner(r io.Reader) *bufio.Scanner {
return scanner
}

func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
func (c *remotingClient) sendRequest(ctx context.Context, conn *tcpConnWrapper, request *RemotingCommand) error {
var err error
if c.interceptor != nil {
err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req, reply interface{}) error {
return c.doRequest(conn, request)
err = c.interceptor(ctx, request, nil, func(ctx context.Context, req, reply interface{}) error {
return c.doRequest(ctx, conn, request)
})
} else {
err = c.doRequest(conn, request)
err = c.doRequest(ctx, conn, request)
}
return err
}

func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
func (c *remotingClient) doRequest(ctx context.Context, conn *tcpConnWrapper, request *RemotingCommand) error {
conn.Lock()
defer conn.Unlock()

err := conn.Conn.SetWriteDeadline(time.Now().Add(c.config.WriteTimeout))
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(c.config.WriteTimeout)
}
err := conn.Conn.SetWriteDeadline(deadline)
if err != nil {
rlog.Error("conn error, close connection", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
Expand Down
9 changes: 9 additions & 0 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ func WithCompressLevel(level int) Option {
}
}

// WithRemotingTimeout set remote client timeout options
func WithRemotingTimeout(connectionTimeout, readTimeout, writeTimeout time.Duration) Option {
return func(opts *producerOptions) {
opts.ClientOptions.RemotingClientConfig.ConnectionTimeout = connectionTimeout
opts.ClientOptions.RemotingClientConfig.ReadTimeout = readTimeout
opts.ClientOptions.RemotingClientConfig.WriteTimeout = writeTimeout
}
}

func WithTls(useTls bool) Option {
return func(opts *producerOptions) {
opts.ClientOptions.RemotingClientConfig.UseTls = useTls
Expand Down
15 changes: 15 additions & 0 deletions producer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package producer
import (
"reflect"
"testing"
"time"
)

func getFieldString(obj interface{}, field string) string {
Expand All @@ -12,6 +13,20 @@ func getFieldString(obj interface{}, field string) string {
}).String()
}

func TestWithRemotingTimeout(t *testing.T) {
opt := defaultProducerOptions()
WithRemotingTimeout(3*time.Second, 4*time.Second, 5*time.Second)(&opt)
if timeout := opt.RemotingClientConfig.ConnectionTimeout; timeout != 3*time.Second {
t.Errorf("consumer option WithRemotingTimeout connectionTimeout. want:%s, got=%s", 3*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.ReadTimeout; timeout != 4*time.Second {
t.Errorf("consumer option WithRemotingTimeout readTimeout. want:%s, got=%s", 4*time.Second, timeout)
}
if timeout := opt.RemotingClientConfig.WriteTimeout; timeout != 5*time.Second {
t.Errorf("consumer option WithRemotingTimeout writeTimeout. want:%s, got=%s", 5*time.Second, timeout)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultProducerOptions()
unitName := "unsh"
Expand Down
11 changes: 6 additions & 5 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"

errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/google/uuid"
"github.com/pkg/errors"
)

type defaultProducer struct {
Expand Down Expand Up @@ -355,7 +356,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
producerCtx.MQ = *mq
}

res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), p.options.SendMsgTimeout)
if _err != nil {
err = _err
continue
Expand Down Expand Up @@ -400,7 +401,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
return errors.Errorf("topic=%s route info not found", mq.Topic)
}

ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
ctx, cancel := context.WithTimeout(ctx, p.options.SendMsgTimeout)
err := p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
cancel()
if err != nil {
Expand Down Expand Up @@ -465,7 +466,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}

_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), p.options.SendMsgTimeout)
if _err != nil {
err = _err
continue
Expand Down
Loading