Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set crrst flag on response header to ensure kitex client won't reuse bad connections #1653

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/remote/trans/default_server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
if err != nil {
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
}
Expand All @@ -203,7 +203,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
var methodInfo serviceinfo.MethodInfo
if methodInfo, err = GetMethodInfo(ri, svcInfo); err != nil {
// it won't be error, because the method has been checked in decode, err check here just do defensive inspection
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true, true)
// for proxy case, need read actual remoteAddr, error print must exec after writeErrorReplyIfNeeded,
// t.OnError(ctx, err, conn) will be executed at outer function when transServer close the conn
return err
Expand All @@ -219,7 +219,7 @@ func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error)
// error cannot be wrapped to print here, so it must exec before NewTransError
t.OnError(ctx, err, conn)
err = remote.NewTransError(remote.InternalError, err)
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false, false); closeConn {
return err
}
// connection don't need to be closed when the error is return by the server handler
Expand Down Expand Up @@ -288,7 +288,7 @@ func (t *svrTransHandler) SetPipeline(p *remote.TransPipeline) {
}

func (t *svrTransHandler) writeErrorReplyIfNeeded(
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage bool,
ctx context.Context, recvMsg remote.Message, conn net.Conn, err error, ri rpcinfo.RPCInfo, doOnMessage, connReset bool,
) (shouldCloseConn bool) {
if cn, ok := conn.(remote.IsActive); ok && !cn.IsActive() {
// conn is closed, no need reply
Expand All @@ -313,6 +313,13 @@ func (t *svrTransHandler) writeErrorReplyIfNeeded(
// if error happen before normal OnMessage, exec it to transfer header trans info into rpcinfo
t.transPipe.OnMessage(ctx, recvMsg, errMsg)
}
if connReset {
// if connection needs to be closed, set ConnResetTag to response header
// to ensure the client won't reuse the connection.
if ei := rpcinfo.AsTaggable(ri.To()); ei != nil {
ei.SetTag(rpcinfo.ConnResetTag, "1")
}
}
ctx, err = t.transPipe.Write(ctx, conn, errMsg)
if err != nil {
klog.CtxErrorf(ctx, "KITEX: write error reply failed, remote=%s, error=%s", conn.RemoteAddr(), err.Error())
Expand Down
Loading