diff --git a/consumer/consumer.go b/consumer/consumer.go index 9e9bedb2..7d22b797 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -448,7 +448,7 @@ func (dc *defaultConsumer) doBalance() { "rebalanceResultSize": len(allocateResult), "rebalanceResultSet": allocateResult, }) - } + } } return true }) @@ -709,10 +709,12 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv mq := key.(primitive.MessageQueue) pq := value.(*processQueue) if mq.Topic == topic { + unlockMqs := make([]*primitive.MessageQueue, 0, 1) if !mqSet[mq] { pq.WithDropped(true) if dc.removeUnnecessaryMessageQueue(&mq, pq) { dc.processQueueTable.Delete(key) + unlockMqs = append(unlockMqs, &mq) changed = true rlog.Info("remove unnecessary mq when updateProcessQueueTable", map[string]interface{}{ rlog.LogKeyConsumerGroup: dc.consumerGroup, @@ -723,6 +725,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv pq.WithDropped(true) if dc.removeUnnecessaryMessageQueue(&mq, pq) { dc.processQueueTable.Delete(key) + unlockMqs = append(unlockMqs, &mq) changed = true rlog.Warning("remove unnecessary mq because pull was expired, prepare to fix it", map[string]interface{}{ rlog.LogKeyConsumerGroup: dc.consumerGroup, @@ -730,6 +733,22 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv }) } } + + if dc.consumeOrderly && len(unlockMqs) > 0 { + // 释放掉不再订阅的mq的锁,不再订阅的mq已经在上面被删除了 + go func() { + brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true) + + if brokerResult != nil { + body := &lockBatchRequestBody{ + ConsumerGroup: dc.consumerGroup, + ClientId: dc.client.ClientID(), + MQs: unlockMqs, + } + dc.doUnlock(brokerResult.BrokerAddr, body, false) + } + }() + } } return true })