From 91ffe5178afa9886841b5552386908aba4559949 Mon Sep 17 00:00:00 2001 From: wushengyu Date: Thu, 29 May 2025 18:30:14 +0800 Subject: [PATCH] add ReqUnRegisterClient, graceful Exit --- internal/client.go | 43 +++++++++++++++++++++++++++++++++++++++++-- internal/request.go | 15 +++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/internal/client.go b/internal/client.go index 6f27f5e8..28565a43 100644 --- a/internal/client.go +++ b/internal/client.go @@ -411,8 +411,8 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R } func (c *rmqClient) Start() { - //ctx, cancel := context.WithCancel(context.Background()) - //c.cancel = cancel + // ctx, cancel := context.WithCancel(context.Background()) + // c.cancel = cancel atomic.AddInt32(&c.instanceCount, 1) c.once.Do(func() { if !c.option.Credentials.IsEmpty() { @@ -687,6 +687,43 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() { }) } +func (c *rmqClient) unRegisterClientWithLock(producerGroup string, consumerGroup string) { + c.hbMutex.Lock() + defer c.hbMutex.Unlock() + c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value interface{}) bool { + brokerName := key.(string) + data := value.(*BrokerData) + header := &UnregisterClientRequestHeader{ + clientID: c.ClientID(), + producerGroup: producerGroup, + consumerGroup: consumerGroup, + } + for id, addr := range data.BrokerAddresses { + cmd := remote.NewRemotingCommand(ReqUnRegisterClient, header, nil) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + response, err := c.remoteClient.InvokeSync(ctx, addr, cmd) + if err != nil { + cancel() + rlog.Warning("send unRegister client to broker error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return true + } + cancel() + if response.Code != ResSuccess { + rlog.Warning("send unRegister client to broker failed", map[string]interface{}{ + "brokerName": brokerName, + "brokerId": id, + "brokerAddr": addr, + "responseCode": response.Code, + "remark": response.Remark, + }) + } + } + return true + }) +} + func (c *rmqClient) UpdateTopicRouteInfo() { allTopics := make(map[string]bool, 0) publishTopicSet := make(map[string]bool, 0) @@ -843,6 +880,7 @@ func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error func (c *rmqClient) UnregisterConsumer(group string) { c.consumerMap.Delete(group) + c.unRegisterClientWithLock("", group) } func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) error { @@ -859,6 +897,7 @@ func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) error func (c *rmqClient) UnregisterProducer(group string) { c.producerMap.Delete(group) + c.unRegisterClientWithLock(group, "") } func (c *rmqClient) RebalanceImmediately() { diff --git a/internal/request.go b/internal/request.go index 93d7a5de..d786077a 100644 --- a/internal/request.go +++ b/internal/request.go @@ -35,6 +35,7 @@ const ( ReqGetMinOffset = int16(31) ReqViewMessageByID = int16(33) ReqHeartBeat = int16(34) + ReqUnRegisterClient = int16(35) ReqConsumerSendMsgBack = int16(36) ReqENDTransaction = int16(37) ReqGetConsumerListByGroup = int16(38) @@ -634,3 +635,17 @@ func (request *ReplyMessageRequestHeader) Decode(properties map[string]string) { request.storeTimestamp, _ = strconv.ParseInt(v, 10, 0) } } + +type UnregisterClientRequestHeader struct { + clientID string + producerGroup string + consumerGroup string +} + +func (request *UnregisterClientRequestHeader) Encode() map[string]string { + maps := make(map[string]string) + maps["clientId"] = request.clientID + maps["producerGroup"] = request.producerGroup + maps["consumerGroup"] = request.consumerGroup + return maps +}