Skip to content

Commit 6dd4f51

Browse files
authored
add brokerName in request protocol
1 parent 8cfa33a commit 6dd4f51

8 files changed

+41
-11
lines changed

consumer/consumer.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa
889889
SubExpression: data.SubString,
890890
// TODO: add subversion
891891
ExpressionType: string(data.ExpType),
892+
BrokerName: queue.BrokerName,
892893
}
893894

894895
if data.ExpType == string(TAG) {
@@ -999,8 +1000,9 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
9991000
}
10001001

10011002
request := &internal.GetMaxOffsetRequestHeader{
1002-
Topic: mq.Topic,
1003-
QueueId: mq.QueueId,
1003+
Topic: mq.Topic,
1004+
QueueId: mq.QueueId,
1005+
BrokerName: mq.BrokerName,
10041006
}
10051007

10061008
cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
@@ -1029,9 +1031,10 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
10291031
}
10301032

10311033
request := &internal.SearchOffsetRequestHeader{
1032-
Topic: mq.Topic,
1033-
QueueId: mq.QueueId,
1034-
Timestamp: timestamp,
1034+
Topic: mq.Topic,
1035+
QueueId: mq.QueueId,
1036+
Timestamp: timestamp,
1037+
BrokerName: mq.BrokerName,
10351038
}
10361039

10371040
cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
@@ -1128,6 +1131,7 @@ func buildSendToRetryRequest(mq *primitive.MessageQueue, msg *primitive.Message,
11281131
Properties: msg.MarshallProperties(),
11291132
ReconsumeTimes: int(reconsumeTimes),
11301133
MaxReconsumeTimes: int(maxReconsumeTimes),
1134+
BrokerName: mq.BrokerName,
11311135
}
11321136

11331137
return remote.NewRemotingCommand(internal.ReqSendMessage, req, msg.Body)

consumer/offset_store.go

+2
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq
390390
ConsumerGroup: group,
391391
Topic: mq.Topic,
392392
QueueId: mq.QueueId,
393+
BrokerName: mq.BrokerName,
393394
}
394395
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
395396
res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
@@ -429,6 +430,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p
429430
Topic: mq.Topic,
430431
QueueId: mq.QueueId,
431432
CommitOffset: off,
433+
BrokerName: mq.BrokerName,
432434
}
433435
cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
434436
return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)

consumer/process_queue.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func (pq *processQueue) cleanExpiredMsg(pc *pushConsumer) {
275275
rlog.LogKeyQueueOffset: msg.QueueOffset,
276276
})
277277
pq.mutex.RUnlock()
278-
if !pc.sendMessageBack("", msg, int(3+msg.ReconsumeTimes)) {
278+
if !pc.sendMessageBack(msg.Queue.BrokerName, msg, int(3+msg.ReconsumeTimes)) {
279279
rlog.Error("send message back to broker error when clean expired messages", map[string]interface{}{
280280
rlog.LogKeyConsumerGroup: pc.consumerGroup,
281281
})

consumer/pull_consumer.go

+2
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ func (pc *defaultPullConsumer) buildSendBackRequest(msg *primitive.MessageExt, d
644644
DelayLevel: delayLevel,
645645
OriginMsgId: msg.MsgId,
646646
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
647+
BrokerName: msg.Queue.BrokerName,
647648
}
648649

649650
return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)
@@ -746,6 +747,7 @@ func (pc *defaultPullConsumer) pullMessage(request *PullRequest) {
746747
SubExpression: sd.SubString,
747748
ExpressionType: string(TAG),
748749
SuspendTimeoutMillis: 20 * time.Second,
750+
BrokerName: request.mq.BrokerName,
749751
}
750752

751753
brokerResult := pc.defaultConsumer.tryFindBroker(request.mq)

consumer/push_consumer.go

+2
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
819819
SubExpression: subExpression,
820820
ExpressionType: string(TAG),
821821
SuspendTimeoutMillis: 20 * time.Second,
822+
BrokerName: request.mq.BrokerName,
822823
}
823824
//
824825
//if data.ExpType == string(TAG) {
@@ -937,6 +938,7 @@ func (pc *pushConsumer) buildSendBackRequest(msg *primitive.MessageExt, delayLev
937938
DelayLevel: delayLevel,
938939
OriginMsgId: msg.MsgId,
939940
MaxReconsumeTimes: pc.getMaxReconsumeTimes(),
941+
BrokerName: msg.Queue.BrokerName,
940942
}
941943

942944
return remote.NewRemotingCommand(internal.ReqConsumerSendMsgBack, req, nil)

internal/request.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type SendMessageRequestHeader struct {
7272
Batch bool
7373
DefaultTopic string
7474
DefaultTopicQueueNums int
75+
BrokerName string
7576
}
7677

7778
func (request *SendMessageRequestHeader) Encode() map[string]string {
@@ -89,6 +90,7 @@ func (request *SendMessageRequestHeader) Encode() map[string]string {
8990
maps["defaultTopicQueueNums"] = "4"
9091
maps["batch"] = strconv.FormatBool(request.Batch)
9192
maps["properties"] = request.Properties
93+
maps["bname"] = request.BrokerName
9294

9395
return maps
9496
}
@@ -101,6 +103,7 @@ type EndTransactionRequestHeader struct {
101103
FromTransactionCheck bool
102104
MsgID string
103105
TransactionId string
106+
BrokerName string
104107
}
105108

106109
type SendMessageRequestV2Header struct {
@@ -122,6 +125,7 @@ func (request *SendMessageRequestV2Header) Encode() map[string]string {
122125
maps["k"] = strconv.FormatBool(request.UnitMode)
123126
maps["l"] = strconv.Itoa(request.MaxReconsumeTimes)
124127
maps["m"] = strconv.FormatBool(request.Batch)
128+
maps["n"] = request.BrokerName
125129
return maps
126130
}
127131

@@ -134,6 +138,7 @@ func (request *EndTransactionRequestHeader) Encode() map[string]string {
134138
maps["fromTransactionCheck"] = strconv.FormatBool(request.FromTransactionCheck)
135139
maps["msgId"] = request.MsgID
136140
maps["transactionId"] = request.TransactionId
141+
maps["bname"] = request.BrokerName
137142
return maps
138143
}
139144

@@ -185,6 +190,7 @@ type ConsumerSendMsgBackRequestHeader struct {
185190
OriginTopic string
186191
UnitMode bool
187192
MaxReconsumeTimes int32
193+
BrokerName string
188194
}
189195

190196
func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
@@ -196,6 +202,7 @@ func (request *ConsumerSendMsgBackRequestHeader) Encode() map[string]string {
196202
maps["originTopic"] = request.OriginTopic
197203
maps["unitMode"] = strconv.FormatBool(request.UnitMode)
198204
maps["maxReconsumeTimes"] = strconv.Itoa(int(request.MaxReconsumeTimes))
205+
maps["bname"] = request.BrokerName
199206

200207
return maps
201208
}
@@ -212,6 +219,7 @@ type PullMessageRequestHeader struct {
212219
SubExpression string
213220
SubVersion int64
214221
ExpressionType string
222+
BrokerName string
215223
}
216224

217225
func (request *PullMessageRequestHeader) Encode() map[string]string {
@@ -227,6 +235,7 @@ func (request *PullMessageRequestHeader) Encode() map[string]string {
227235
maps["subscription"] = request.SubExpression
228236
maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
229237
maps["expressionType"] = request.ExpressionType
238+
maps["bname"] = request.BrokerName
230239

231240
return maps
232241
}
@@ -242,42 +251,48 @@ func (request *GetConsumerListRequestHeader) Encode() map[string]string {
242251
}
243252

244253
type GetMaxOffsetRequestHeader struct {
245-
Topic string
246-
QueueId int
254+
Topic string
255+
QueueId int
256+
BrokerName string
247257
}
248258

249259
func (request *GetMaxOffsetRequestHeader) Encode() map[string]string {
250260
maps := make(map[string]string)
251261
maps["topic"] = request.Topic
252262
maps["queueId"] = strconv.Itoa(request.QueueId)
263+
maps["bname"] = request.BrokerName
253264
return maps
254265
}
255266

256267
type QueryConsumerOffsetRequestHeader struct {
257268
ConsumerGroup string
258269
Topic string
259270
QueueId int
271+
BrokerName string
260272
}
261273

262274
func (request *QueryConsumerOffsetRequestHeader) Encode() map[string]string {
263275
maps := make(map[string]string)
264276
maps["consumerGroup"] = request.ConsumerGroup
265277
maps["topic"] = request.Topic
266278
maps["queueId"] = strconv.Itoa(request.QueueId)
279+
maps["bname"] = request.BrokerName
267280
return maps
268281
}
269282

270283
type SearchOffsetRequestHeader struct {
271-
Topic string
272-
QueueId int
273-
Timestamp int64
284+
Topic string
285+
QueueId int
286+
Timestamp int64
287+
BrokerName string
274288
}
275289

276290
func (request *SearchOffsetRequestHeader) Encode() map[string]string {
277291
maps := make(map[string]string)
278292
maps["topic"] = request.Topic
279293
maps["queueId"] = strconv.Itoa(request.QueueId)
280294
maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
295+
maps["bname"] = request.BrokerName
281296
return maps
282297
}
283298

@@ -286,6 +301,7 @@ type UpdateConsumerOffsetRequestHeader struct {
286301
Topic string
287302
QueueId int
288303
CommitOffset int64
304+
BrokerName string
289305
}
290306

291307
func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
@@ -294,6 +310,7 @@ func (request *UpdateConsumerOffsetRequestHeader) Encode() map[string]string {
294310
maps["topic"] = request.Topic
295311
maps["queueId"] = strconv.Itoa(request.QueueId)
296312
maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
313+
maps["bname"] = request.BrokerName
297314
return maps
298315
}
299316

internal/trace.go

+1
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
527527
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
528528
Flag: msg.Flag,
529529
Properties: msg.MarshallProperties(),
530+
BrokerName: mq.BrokerName,
530531
}
531532

532533
return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)

producer/producer.go

+2
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,7 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
542542
Batch: msg.Batch,
543543
DefaultTopic: p.options.CreateTopicKey,
544544
DefaultTopicQueueNums: p.options.DefaultTopicQueueNums,
545+
BrokerName: mq.BrokerName,
545546
}
546547

547548
msgType := msg.GetProperty(primitive.PropertyMsgType)
@@ -762,6 +763,7 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e
762763
TranStateTableOffset: result.QueueOffset,
763764
MsgID: result.MsgID,
764765
CommitOrRollback: tp.transactionState(state),
766+
BrokerName: result.MessageQueue.BrokerName,
765767
}
766768

767769
req := remote.NewRemotingCommand(internal.ReqENDTransaction, requestHeader, nil)

0 commit comments

Comments
 (0)