Skip to content

Commit

Permalink
feat: set crrst flag to ensure kitex client won't reuse bad connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Dec 27, 2024
1 parent c50055a commit 3578c90
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pkg/remote/trans/default_server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
if err != nil {
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
}
Expand All @@ -187,7 +187,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
var methodInfo serviceinfo.MethodInfo
if methodInfo, err = GetMethodInfo(ri, svcInfo); err != nil {
// it won't be err, because the method has been checked in decode, err check here just do defensive inspection
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// for proxy case, need read actual remoteAddr, error print must exec after writeErrorReplyIfNeeded,
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
Expand All @@ -203,7 +203,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
// error cannot be wrapped to print here, so it must exec before NewTransError
t.OnError(ctx, err, conn)
err = remote.NewTransError(remote.InternalError, err)
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false, false); closeConn {
return err
}
// connection don't need to be closed when the error is return by the server handler
Expand Down Expand Up @@ -272,7 +272,7 @@ func (t *svrTransHandler) SetPipeline(p *remote.TransPipeline) {
}

func (t *svrTransHandler) writeErrorReplyIfNeeded(
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool,
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool, connReset bool,
) (shouldCloseConn bool) {
if cn, ok := conn.(remote.IsActive); ok && !cn.IsActive() {
// conn is closed, no need reply
Expand All @@ -297,6 +297,11 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(
// if error happen before normal OnMessage, exec it to transfer header trans info into rpcinfo
t.transPipe.OnMessage(ctx, recvMsg, errMsg)
}
if connReset {
// if connection needs to be closed, set ConnResetTag to response header
// to ensure the peer won't reuse the connection.
rpcinfo.AsMutableEndpointInfo(ri.From()).SetTag(rpcinfo.ConnResetTag, "1")
}
ctx, err = t.transPipe.Write(ctx, conn, errMsg)
if err != nil {
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
Expand Down
6 changes: 6 additions & 0 deletions pkg/transmeta/ttheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (ch *clientTTHeaderHandler) ReadMeta(ctx context.Context, msg remote.Messag
if setter, ok := ri.Invocation().(rpcinfo.InvocationSetter); ok && bizErr != nil {
setter.SetBizStatusErr(bizErr)
}
if val, ok := strInfo[transmeta.HeaderConnectionReadyToReset]; ok {
rpcinfo.AsMutableEndpointInfo(ri.To()).SetTag(rpcinfo.ConnResetTag, val)
}
return ctx, nil
}

Expand Down Expand Up @@ -190,6 +193,9 @@ func (sh *serverTTHeaderHandler) WriteMeta(ctx context.Context, msg remote.Messa
strInfo[bizExtra], _ = utils.Map2JSONStr(bizErr.BizExtra())
}
}
if val, ok := ri.From().Tag(rpcinfo.ConnResetTag); ok {
strInfo[transmeta.HeaderConnectionReadyToReset] = val
}

return ctx, nil
}
Expand Down

0 comments on commit 3578c90

Please sign in to comment.