-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path7c98402b.html
More file actions
425 lines (319 loc) · 46.1 KB
/
7c98402b.html
File metadata and controls
425 lines (319 loc) · 46.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width">
<meta name="theme-color" content="#222"><meta name="generator" content="Hexo 6.3.0">
<link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png">
<link rel="icon" type="image/png" sizes="32x32" href="/images/favicon.png">
<link rel="icon" type="image/png" sizes="16x16" href="/images/favicon.png">
<link rel="mask-icon" href="/images/logo.svg" color="#222">
<link rel="stylesheet" href="/css/main.css">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css" integrity="sha256-HtsXJanqjKTc8vVQjO4YMhiqFoXkfBsjBWcX91T1jr8=" crossorigin="anonymous">
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/animate.css/3.1.1/animate.min.css" integrity="sha256-PR7ttpcvz8qrF57fur/yAx1qXMFJeJFiA6pSzWi0OIE=" crossorigin="anonymous">
<script class="next-config" data-name="main" type="application/json">{"hostname":"tallate.github.io","root":"/","images":"/images","scheme":"Gemini","darkmode":false,"version":"8.18.0","exturl":false,"sidebar":{"position":"left","display":"post","padding":18,"offset":12},"copycode":{"enable":false,"style":null},"fold":{"enable":false,"height":500},"bookmark":{"enable":false,"color":"#222","save":"auto"},"mediumzoom":false,"lazyload":false,"pangu":false,"comments":{"style":"tabs","active":null,"storage":true,"lazyload":false,"nav":null},"stickytabs":false,"motion":{"enable":true,"async":false,"transition":{"menu_item":"fadeInDown","post_block":"fadeIn","post_header":"fadeInDown","post_body":"fadeInDown","coll_header":"fadeInLeft","sidebar":"fadeInUp"}},"prism":false,"i18n":{"placeholder":"搜索...","empty":"没有找到任何搜索结果:${query}","hits_time":"找到 ${hits} 个搜索结果(用时 ${time} 毫秒)","hits":"找到 ${hits} 个搜索结果"}}</script><script src="/js/config.js"></script>
<meta name="description" content="重试和幂等Retry队列和offset在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。重试队列存在的意义就是快速推进offset,重试topic的名字是%RETRY%+consumerGroup,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的">
<meta property="og:type" content="article">
<meta property="og:title" content="RocketMQ 消息重试和幂等">
<meta property="og:url" content="https://tallate.github.io/7c98402b.html">
<meta property="og:site_name" content="Tallate">
<meta property="og:description" content="重试和幂等Retry队列和offset在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。重试队列存在的意义就是快速推进offset,重试topic的名字是%RETRY%+consumerGroup,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的">
<meta property="og:locale" content="zh_CN">
<meta property="og:image" content="https://tallate.github.io/imgs/RocketMQ/offset%E7%9A%84%E6%9B%B4%E6%96%B0.png">
<meta property="article:published_time" content="2020-08-12T04:23:48.000Z">
<meta property="article:modified_time" content="2025-07-06T17:56:20.899Z">
<meta property="article:author" content="tallate">
<meta property="article:tag" content="RocketMQ">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://tallate.github.io/imgs/RocketMQ/offset%E7%9A%84%E6%9B%B4%E6%96%B0.png">
<link rel="canonical" href="https://tallate.github.io/7c98402b.html">
<script class="next-config" data-name="page" type="application/json">{"sidebar":"","isHome":false,"isPost":true,"lang":"zh-CN","comments":true,"permalink":"https://tallate.github.io/7c98402b.html","path":"/7c98402b.html","title":"RocketMQ 消息重试和幂等"}</script>
<script class="next-config" data-name="calendar" type="application/json">""</script>
<title>RocketMQ 消息重试和幂等 | Tallate</title>
<noscript>
<link rel="stylesheet" href="/css/noscript.css">
</noscript>
</head>
<body itemscope itemtype="http://schema.org/WebPage" class="use-motion">
<div class="headband"></div>
<main class="main">
<div class="column">
<header class="header" itemscope itemtype="http://schema.org/WPHeader"><div class="site-brand-container">
<div class="site-nav-toggle">
<div class="toggle" aria-label="切换导航栏" role="button">
<span class="toggle-line"></span>
<span class="toggle-line"></span>
<span class="toggle-line"></span>
</div>
</div>
<div class="site-meta">
<a href="/" class="brand" rel="start">
<i class="logo-line"></i>
<p class="site-title">Tallate</p>
<i class="logo-line"></i>
</a>
<p class="site-subtitle" itemprop="description">该吃吃该喝喝 啥事别往心里搁</p>
</div>
<div class="site-nav-right">
<div class="toggle popup-trigger" aria-label="搜索" role="button">
<i class="fa fa-search fa-fw fa-lg"></i>
</div>
</div>
</div>
<nav class="site-nav">
<ul class="main-menu menu"><li class="menu-item menu-item-home"><a href="/" rel="section"><i class="home fa-fw"></i>首页</a></li><li class="menu-item menu-item-about"><a href="/about/" rel="section"><i class="user fa-fw"></i>关于</a></li><li class="menu-item menu-item-tags"><a href="/tags/" rel="section"><i class="tags fa-fw"></i>标签<span class="badge">84</span></a></li><li class="menu-item menu-item-categories"><a href="/categories/" rel="section"><i class="th fa-fw"></i>分类<span class="badge">25</span></a></li><li class="menu-item menu-item-archives"><a href="/archives/" rel="section"><i class="archive fa-fw"></i>归档<span class="badge">192</span></a></li>
<li class="menu-item menu-item-search">
<a role="button" class="popup-trigger"><i class="fa fa-search fa-fw"></i>搜索
</a>
</li>
</ul>
</nav>
<div class="search-pop-overlay">
<div class="popup search-popup"><div class="search-header">
<span class="search-icon">
<i class="fa fa-search"></i>
</span>
<div class="search-input-container">
<input autocomplete="off" autocapitalize="off" maxlength="80"
placeholder="搜索..." spellcheck="false"
type="search" class="search-input">
</div>
<span class="popup-btn-close" role="button">
<i class="fa fa-times-circle"></i>
</span>
</div>
<div class="search-result-container no-result">
<div class="search-result-icon">
<i class="fa fa-spinner fa-pulse fa-5x"></i>
</div>
</div>
</div>
</div>
</header>
<aside class="sidebar">
<div class="sidebar-inner sidebar-nav-active sidebar-toc-active">
<ul class="sidebar-nav">
<li class="sidebar-nav-toc">
文章目录
</li>
<li class="sidebar-nav-overview">
站点概览
</li>
</ul>
<div class="sidebar-panel-container">
<!--noindex-->
<div class="post-toc-wrap sidebar-panel">
<div class="post-toc animated"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#%E9%87%8D%E8%AF%95%E5%92%8C%E5%B9%82%E7%AD%89"><span class="nav-number">1.</span> <span class="nav-text">重试和幂等</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#Retry%E9%98%9F%E5%88%97%E5%92%8Coffset"><span class="nav-number">1.1.</span> <span class="nav-text">Retry队列和offset</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#Producer%E7%AB%AF%E9%87%8D%E8%AF%95"><span class="nav-number">1.2.</span> <span class="nav-text">Producer端重试</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#Consumer%E9%87%8D%E8%AF%95"><span class="nav-number">1.3.</span> <span class="nav-text">Consumer重试</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#%E6%B6%88%E8%B4%B9%E8%BF%9B%E5%BA%A6%E5%92%8C-offset"><span class="nav-number">1.4.</span> <span class="nav-text">消费进度和 offset</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#Consumer%E6%9B%B4%E6%96%B0offset%E5%88%B0Broker"><span class="nav-number">1.5.</span> <span class="nav-text">Consumer更新offset到Broker</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#Broker%E7%AB%AFoffset%E7%9A%84%E5%AD%98%E5%82%A8"><span class="nav-number">1.6.</span> <span class="nav-text">Broker端offset的存储</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#Consumer%E4%BB%8EBroker%E6%8B%89%E5%8F%96offset"><span class="nav-number">1.7.</span> <span class="nav-text">Consumer从Broker拉取offset</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#%E6%B6%88%E6%81%AF%E5%B9%82%E7%AD%89"><span class="nav-number">1.8.</span> <span class="nav-text">消息幂等</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#%E9%87%8D%E8%AF%95%E6%BA%90%E7%A0%81"><span class="nav-number">1.9.</span> <span class="nav-text">重试源码</span></a></li></ol></li></ol></div>
</div>
<!--/noindex-->
<div class="site-overview-wrap sidebar-panel">
<div class="site-author animated" itemprop="author" itemscope itemtype="http://schema.org/Person">
<p class="site-author-name" itemprop="name">tallate</p>
<div class="site-description" itemprop="description"></div>
</div>
<div class="site-state-wrap animated">
<nav class="site-state">
<div class="site-state-item site-state-posts">
<a href="/archives/">
<span class="site-state-item-count">192</span>
<span class="site-state-item-name">日志</span>
</a>
</div>
<div class="site-state-item site-state-categories">
<a href="/categories/">
<span class="site-state-item-count">25</span>
<span class="site-state-item-name">分类</span></a>
</div>
<div class="site-state-item site-state-tags">
<a href="/tags/">
<span class="site-state-item-count">84</span>
<span class="site-state-item-name">标签</span></a>
</div>
</nav>
</div>
</div>
</div>
</div>
</aside>
</div>
<div class="main-inner post posts-expand">
<div class="post-block">
<article itemscope itemtype="http://schema.org/Article" class="post-content" lang="zh-CN">
<link itemprop="mainEntityOfPage" href="https://tallate.github.io/7c98402b.html">
<span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
<meta itemprop="image" content="/images/avatar.gif">
<meta itemprop="name" content="tallate">
</span>
<span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
<meta itemprop="name" content="Tallate">
<meta itemprop="description" content="">
</span>
<span hidden itemprop="post" itemscope itemtype="http://schema.org/CreativeWork">
<meta itemprop="name" content="RocketMQ 消息重试和幂等 | Tallate">
<meta itemprop="description" content="">
</span>
<header class="post-header">
<h1 class="post-title" itemprop="name headline">
RocketMQ 消息重试和幂等
</h1>
<div class="post-meta-container">
<div class="post-meta">
<span class="post-meta-item">
<span class="post-meta-item-icon">
<i class="far fa-calendar"></i>
</span>
<span class="post-meta-item-text">发表于</span>
<time title="创建时间:2020-08-12 12:23:48" itemprop="dateCreated datePublished" datetime="2020-08-12T12:23:48+08:00">2020-08-12</time>
</span>
<span class="post-meta-item">
<span class="post-meta-item-icon">
<i class="far fa-calendar-check"></i>
</span>
<span class="post-meta-item-text">更新于</span>
<time title="修改时间:2025-07-07 01:56:20" itemprop="dateModified" datetime="2025-07-07T01:56:20+08:00">2025-07-07</time>
</span>
<span class="post-meta-item">
<span class="post-meta-item-icon">
<i class="far fa-folder"></i>
</span>
<span class="post-meta-item-text">分类于</span>
<span itemprop="about" itemscope itemtype="http://schema.org/Thing">
<a href="/categories/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/" itemprop="url" rel="index"><span itemprop="name">消息队列</span></a>
</span>
</span>
</div>
</div>
</header>
<div class="post-body" itemprop="articleBody"><h2 id="重试和幂等"><a href="#重试和幂等" class="headerlink" title="重试和幂等"></a>重试和幂等</h2><h3 id="Retry队列和offset"><a href="#Retry队列和offset" class="headerlink" title="Retry队列和offset"></a>Retry队列和offset</h3><p>在RocketMQ启动时,每个group层面都会再定义一个专用的重试topic,重试消息被插入了重试topic队列。<br><strong>重试队列存在的意义就是快速推进offset</strong>,重试topic的名字是<code>%RETRY%+consumerGroup</code>,因此重试topic是group维度的,所以默认情况下一个group的consumer会有2个订阅topic,2个topic同时进行队列的rebalance。<br><strong>offset是按照MessageQueue的维度进行维护的</strong>。<br>消息重试有2种反馈方式:</p>
<ol>
<li>重试队列:客户端先通过Netty API发送消息到Broker,如果这时调用Netty发送异常则调用Producer发送到RetryTopic中。</li>
<li>死信队列:如果重试次数过多(默认16次)则会进入死信队列,死信队列的逻辑在Broker,Client不会将消息发送至死信队列Topic。</li>
</ol>
<h3 id="Producer端重试"><a href="#Producer端重试" class="headerlink" title="Producer端重试"></a>Producer端重试</h3><p>下面的代码同步发送消息,如果5秒内没有发送成功,则重试5次</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line">DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");</span><br><span class="line">producer.setRetryTimesWhenSendFailed(5);</span><br><span class="line">producer.send(msg,5000L);</span><br></pre></td></tr></table></figure>
<p>Producer 的 send 方法本身支持<strong>内部重试</strong>:<br>同步发送代码:<code>org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message)</code>,注意传超时参数时取的<code>defaultMQProducer.getSendMsgTimeout()</code>。<br>异步发送:<code>org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message, SendCallback)</code><br>实际发送消息的代码位置(注意对sendResult的处理):<code>org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl</code><br>从源码中可以得到以下结论:</p>
<ul>
<li>至多重试 2 次。<br>同步发送为 2 次,异步发送为 0 次,也就是说,异步发送是不会重试的。</li>
<li>如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。</li>
<li>如果本身向 broker 发送消息产生超时异常,就不会再重试。</li>
</ul>
<p>除了Producer客户端的自动重试外,应用程序在接收到SendResult后也可以自己尝试去重试。</p>
<h3 id="Consumer重试"><a href="#Consumer重试" class="headerlink" title="Consumer重试"></a>Consumer重试</h3><p>消费者消费消息后需要给Broker返回消费状态,比如并发消费者<code>MessageListenerConcurrently</code>会返回<code>ConsumeConcurrentlyStatus</code>:</p>
<ul>
<li>如果消费成功,返回<code>CONSUME_SUCCESS</code>;</li>
<li>如果消费出错,返回<code>RECONSUME_LATER</code>,一段时间后重试。</li>
</ul>
<p>状态的返回是由用户线程控制的,但还有第三种可能,就是超时了,因此Consumer端的重试包含以下两种情况:</p>
<ol>
<li>异常重试:Consumer端主动返回<code>RECONSUME_LATER</code>状态,Broker会在一段时间后重试;</li>
<li>超时重试:Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。</li>
</ol>
<p>如果Consumer端因为各种类型异常导致本次消费失败(如上所述的两种情况),为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为<strong>重试队列</strong>。RocketMQ会为每个消费组都设置一个Topic名称为<code>%RETRY%+consumerGroup</code>的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)。</p>
<blockquote>
<p>不能保证消息消费失败加入重试队列后还能被同一消费者消费,可能会破坏消息的顺序性。</p>
</blockquote>
<p>由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个<strong>死信队列</strong>中,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。一般在实际应用中,移入至死信队列的消息,<strong>需要人工干预处理</strong>;</p>
<p>另外还有两种需要注意的情况:</p>
<ul>
<li>只有消息模式为<code>MessageModel.CLUSTERING</code>集群模式时,Broker才会自动进行重试,而广播消息是不会重试的。</li>
<li>事务消息中的半事务消息通过 Broker 的回查机制重试,具体流程见下面的<strong>事务消息</strong>。</li>
</ul>
<h3 id="消费进度和-offset"><a href="#消费进度和-offset" class="headerlink" title="消费进度和 offset"></a>消费进度和 offset</h3><p><img src="/imgs/RocketMQ/offset%E7%9A%84%E6%9B%B4%E6%96%B0.png" alt="offset的更新" title="offset的更新"><br>消息消费完成后,需要将消费进度存储起来,即前面提到的offset。<br>consumerQueue类似一个无限长的数组,可以利用offset来直接定位。<br>offset的存储分为本地模式和远程模式:</p>
<ul>
<li>本地模式:广播模式下,同消费组的消费者相互独立,消费进度要单独存储,对应的数据结构是<code>LocalFileOffsetStore</code>;</li>
<li>远程模式:集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,对应的数据结构是<code>RemoteBrokerOffsetStore</code>,下面对offset的讨论集中于远程模式。</li>
</ul>
<h3 id="Consumer更新offset到Broker"><a href="#Consumer更新offset到Broker" class="headerlink" title="Consumer更新offset到Broker"></a>Consumer更新offset到Broker</h3><ol>
<li>消费消息维护offset<br><code>org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult</code><br>处理失败的消息会反馈给Consumer,然后发送到topic对应的RetryTopic,这样能快速令offset前进。</li>
<li>定时任务<br>每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由<code>DefaultMQPushConsumer</code>的<code>persistConsumerOffsetInterval</code>属性控制,默认为5秒。<br><code>MQClientInstance#startScheduledTask</code> -> <code>MQClientInstance#persistAllConsumerOffset</code><br>启动一个定时任务提交offset。<br><code>RemoteBrokerOffsetStore#updateConsumeOffsetToBroker</code><br>将offset发送到Broker。</li>
</ol>
<h3 id="Broker端offset的存储"><a href="#Broker端offset的存储" class="headerlink" title="Broker端offset的存储"></a>Broker端offset的存储</h3><p><code>org.apache.rocketmq.broker.processor.ConsumerManageProcessor#updateConsumerOffset</code><br>Broker会将offset存储在内存的一个<code>offsetTable</code>中,即<code>RemoteBrokerOffsetStore</code>。</p>
<h3 id="Consumer从Broker拉取offset"><a href="#Consumer从Broker拉取offset" class="headerlink" title="Consumer从Broker拉取offset"></a>Consumer从Broker拉取offset</h3><p><code>DefaultMQPushConsumerImpl#pullMessage</code><br>拉消息后触发offset的更新。<br><code>RemoteBrokerOffsetStore#readOffset</code><br>将offset保存到缓存<code>offsetTable</code>中。</p>
<h3 id="消息幂等"><a href="#消息幂等" class="headerlink" title="消息幂等"></a>消息幂等</h3><p>RocketMQ提供<strong>At least once</strong>的消息服务质量标准,表示一条消息至少被送达一次,也就是说,不允许丢消息,但允许有少量重复消息出现。</p>
<blockquote>
<p>另外两种服务质量标准是<strong>At most once</strong>和<strong>Exactly once</strong>。</p>
</blockquote>
<p>比如Producer发出了10个消息,如果Consumer接收中间两条消息时出错了,返回<code>RECONSUME_LATER</code>,则该两条消息会被加入到<code>RETRY</code>队列中重新消费。</p>
<p>解决消息重复消费问题的主要方法是<strong>幂等</strong>,一个幂等操作的特点是,<strong>其任意多次执行仅会产生一次影响</strong>,因此从对系统的影响结果来说:<strong>At least once + 幂等消费 = Exactly once</strong>。<br>实现幂等的方式有很多种,不过这些方案与消息队列本身已经没有多大关系了,因此这里仅仅简单描述一下这些实现方式:</p>
<ol>
<li>利用数据库的唯一约束实现幂等<br>为一个操作设置一个唯一键,比如一个账单每个用户只允许变更一次,则可以给转账流水表中的账单ID和账户ID创建一个唯一约束。</li>
<li>加上前置条件<br>限制数据更新前的状态,比如只有在余额为500的时候才允许更新。<br>也可以单独加上一个唯一ID,每次发消息时生成一个全局唯一ID,消费时检查这个唯一ID是否有被消费过。</li>
</ol>
<h3 id="重试源码"><a href="#重试源码" class="headerlink" title="重试源码"></a>重试源码</h3><p>1、Consumer端初始化重试队列信息<br>1.1、Consumer端启动后,创建重试队列的订阅group</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br></pre></td><td class="code"><pre><span class="line">// Consumer自动创建一个group=%RETRY%+ConsumerGroup,用于后续的消费重试</span><br><span class="line">switch (this.defaultMQPushConsumer.getMessageModel()) {</span><br><span class="line"> case BROADCASTING:</span><br><span class="line"> break;</span><br><span class="line"> case CLUSTERING:</span><br><span class="line"> final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());</span><br><span class="line"> SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),</span><br><span class="line"> retryTopic, SubscriptionData.SUB_ALL);</span><br><span class="line"> this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);</span><br><span class="line"> break;</span><br><span class="line"> default:</span><br><span class="line"> break;</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>2、Consumer端处理消费结果<code>ConsumeMessageConcurrentlyService#processConsumeResult</code><br>2.1、设置ack<br>如果ConsumeRequest封装的消息全消费成功,则设置ackIndex的值为消息总条数-1,反之ackIndex-=1</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br></pre></td><td class="code"><pre><span class="line">switch (status) {</span><br><span class="line"> // 消费成功</span><br><span class="line"> case CONSUME_SUCCESS:</span><br><span class="line"> if (ackIndex >= consumeRequest.getMsgs().size()) {</span><br><span class="line"> ackIndex = consumeRequest.getMsgs().size() - 1;</span><br><span class="line"> }</span><br><span class="line"> int ok = ackIndex + 1;</span><br><span class="line"> int failed = consumeRequest.getMsgs().size() - ok;</span><br><span class="line"> this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);</span><br><span class="line"> this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);</span><br><span class="line"> break;</span><br><span class="line"> // 消费失败、重试</span><br><span class="line"> case RECONSUME_LATER:</span><br><span class="line"> ackIndex = -1;</span><br><span class="line"> this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),</span><br><span class="line"> consumeRequest.getMsgs().size());</span><br><span class="line"> break;</span><br><span class="line"> default:</span><br><span class="line"> break;</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>2.2、消费失败的消息触发重试<br>sendMessageBack将消费失败的msg发回broker,如果sendMessageBack也失败则保存到msgBackFailed</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br></pre></td><td class="code"><pre><span class="line">switch (this.defaultMQPushConsumer.getMessageModel()) {</span><br><span class="line"> ...</span><br><span class="line"> case CLUSTERING:</span><br><span class="line"> List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());</span><br><span class="line"> // ackIndex+1开始的是未成功消费的</span><br><span class="line"> for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {</span><br><span class="line"> MessageExt msg = consumeRequest.getMsgs().get(i);</span><br><span class="line"> boolean result = this.sendMessageBack(msg, context);</span><br><span class="line"> // 如果发送失败,则保存到msgBackFailed</span><br><span class="line"> if (!result) {</span><br><span class="line"> msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);</span><br><span class="line"> msgBackFailed.add(msg);</span><br><span class="line"> }</span><br><span class="line"> }</span><br><span class="line"></span><br><span class="line"> // 将sendMessageBack失败的消息从consumeRequest移除,并包装起来5s后转发给消费线程池继续消费</span><br><span class="line"> if (!msgBackFailed.isEmpty()) {</span><br><span class="line"> consumeRequest.getMsgs().removeAll(msgBackFailed);</span><br><span class="line"></span><br><span class="line"> this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());</span><br><span class="line"> }</span><br><span class="line"> break;</span><br><span class="line"> default:</span><br><span class="line"> break;</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>2.3、更新offset<br>本地消费成功后会将消费进度同步到本地的processQueue<br>sendMessageBack成功的消息会从本地processQueue中移除,并更新进度,这条消息的消费会交由消费集群中的一个节点去继续消费,取决于负载均衡将此消息对应的topic对应的重试队列retryQueue分配给哪个节点。</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line">// 这里开始更新offset</span><br><span class="line">// 先从队列在consumer端的视图(一个treeMap)中移除</span><br><span class="line">// 这里返回的offset是经过删除后最小的偏移量</span><br><span class="line">long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());</span><br><span class="line">if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {</span><br><span class="line"> this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>3、Broker端接收sendMessageBack消息<br>Broker端的处理主要是重试和延迟<br>3.1、设置topic<br>设置此条消息新的topic为%RETRY%消费组的名称,并且选择新topic的队列(默认为0,默认情况下RetryQueueNum为1)</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line">String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());</span><br><span class="line">int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();</span><br><span class="line">int topicSysFlag = 0;</span><br><span class="line">if (requestHeader.isUnitMode()) {</span><br><span class="line"> topicSysFlag = TopicSysFlag.buildSysFlag(false, true);</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>3.2、将消息topic设置为重试topic<br>通过物理偏移量找到消息体</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line">MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());</span><br><span class="line">if (null == msgExt) {</span><br><span class="line"> response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line"> response.setRemark("look message by offset failed, " + requestHeader.getOffset());</span><br><span class="line"> return CompletableFuture.completedFuture(response);</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>给原始消息新增属性,key为RETRY_TOPIC,value为原始消息的实际topic<br>和Consumer端消费消息时的resetRetryTopic(msgs)相呼应</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line">final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);</span><br><span class="line">if (null == retryTopic) {</span><br><span class="line"> MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());</span><br><span class="line">}</span><br><span class="line">msgExt.setWaitStoreMsgOK(false);</span><br></pre></td></tr></table></figure>
<p>3.3、获取延迟并判断是否进入死信队列<br>获取消息的延迟级别,默认此时的值为0</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line">int delayLevel = requestHeader.getDelayLevel();</span><br><span class="line"></span><br><span class="line">int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();</span><br><span class="line">if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {</span><br><span class="line"> maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>消息每消费失败一次都会增加ReconsumeTimes的值,当这个值达到了maxReconsumeTimes(默认为16),则将此消息送入死信队列,且此死信队列不可读,也就是说这条消息在没有人工干预的情况下再也不能被消费了。</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br></pre></td><td class="code"><pre><span class="line">if (msgExt.getReconsumeTimes() >= maxReconsumeTimes </span><br><span class="line"> || delayLevel < 0) {</span><br><span class="line"> newTopic = MixAll.getDLQTopic(requestHeader.getGroup());</span><br><span class="line"> queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;</span><br><span class="line"></span><br><span class="line"> topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,</span><br><span class="line"> DLQ_NUMS_PER_GROUP,</span><br><span class="line"> PermName.PERM_WRITE, 0);</span><br><span class="line"> if (null == topicConfig) {</span><br><span class="line"> response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line"> response.setRemark("topic[" + newTopic + "] not exist");</span><br><span class="line"> return CompletableFuture.completedFuture(response);</span><br><span class="line"> }</span><br><span class="line">} else {</span><br><span class="line"> // 设置延迟级别为3,意味着要延迟10s再消费这条消息,消息重复消费需要借助延迟消费的功能实现</span><br><span class="line"> if (0 == delayLevel) {</span><br><span class="line"> delayLevel = 3 + msgExt.getReconsumeTimes();</span><br><span class="line"> }</span><br><span class="line"> msgExt.setDelayTimeLevel(delayLevel);</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>3.4、存储新创建的消息<br>这里有延迟消息的实现:如果delayLevel大于0,会将此消息的topic和queueID再进行一次转换,将此消息的newTopic、queueID存入到属性中(real_topic, real_qid),新的topic为SCHEDULE_TOPIC_XXXX,新的queue为根据delayLevel的等级去本地<code>delayTimeLevel</code>找到对应的队列;后续会有<code>ScheduleMessageService</code>做后续的逻辑。</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br></pre></td><td class="code"><pre><span class="line">CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);</span><br><span class="line">return putMessageResult.thenApply((r) -> {</span><br><span class="line"> if (r != null) {</span><br><span class="line"> switch (r.getPutMessageStatus()) {</span><br><span class="line"> case PUT_OK:</span><br><span class="line"> String backTopic = msgExt.getTopic();</span><br><span class="line"> String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);</span><br><span class="line"> if (correctTopic != null) {</span><br><span class="line"> backTopic = correctTopic;</span><br><span class="line"> }</span><br><span class="line"> this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);</span><br><span class="line"> response.setCode(ResponseCode.SUCCESS);</span><br><span class="line"> response.setRemark(null);</span><br><span class="line"> return response;</span><br><span class="line"> default:</span><br><span class="line"> break;</span><br><span class="line"> }</span><br><span class="line"> response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line"> response.setRemark(r.getPutMessageStatus().name());</span><br><span class="line"> return response;</span><br><span class="line"> }</span><br><span class="line"> response.setCode(ResponseCode.SYSTEM_ERROR);</span><br><span class="line"> response.setRemark("putMessageResult is null");</span><br><span class="line"> return response;</span><br><span class="line">});</span><br></pre></td></tr></table></figure>
<p>4、Broker端重试<br><code>ScheduleMessageService</code>服务是来处理延迟消息的服务组件,<code>delayLevelTable</code>存储了不同的延迟级别的延迟时间,可配置。</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";</span><br></pre></td></tr></table></figure>
<p>4.1、遍历消费队列</p>
<figure class="highlight plaintext"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br></pre></td><td class="code"><pre><span class="line">// 遍历delayLevelTable里所有级别队列</span><br><span class="line">for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {</span><br><span class="line"> Integer level = entry.getKey();</span><br><span class="line"> Long timeDelay = entry.getValue();</span><br><span class="line"> Long offset = this.offsetTable.get(level);</span><br><span class="line"> if (null == offset) {</span><br><span class="line"> offset = 0L;</span><br><span class="line"> }</span><br><span class="line"></span><br><span class="line"> if (timeDelay != null) {</span><br><span class="line"> // 由一个timer来处理</span><br><span class="line"> this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);</span><br><span class="line"> }</span><br><span class="line">}</span><br></pre></td></tr></table></figure>
<p>4.2、重置延时消息<br>判断时间是否达到了延迟时间,达到了再将这些消息的原始topic和原始队列取出转发存储起来,待消费者消费。<br>4.3、设置重试消息<br>重试消息会被转变2次topic和queueID,导致在<code>ScheduleMessageService</code>转发存储的时候会将第一次转变的topic和queueID取出转发到<code>topic=%RETRY%+consumerGroup</code>、queueId=0的消息队列。<br>这个消息会被consumerGroup这个消费组消费,至于哪个节点消费则由负载均衡来决定。</p>
<script type="text/javascript" src="https://cdn.jsdelivr.net/npm/[email protected]/dist/kity.min.js"></script><script type="text/javascript" src="https://cdn.jsdelivr.net/npm/[email protected]/dist/kityminder.core.min.js"></script><script defer="true" type="text/javascript" src="https://cdn.jsdelivr.net/npm/[email protected]/dist/mindmap.min.js"></script><link rel="stylesheet" type="text/css" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/mindmap.min.css">
</div>
<footer class="post-footer">
<div class="post-tags">
<a href="/tags/RocketMQ/" rel="tag"># RocketMQ</a>
</div>
<div class="post-nav">
<div class="post-nav-item">
<a href="/965af9c3.html" rel="prev" title="RocketMQ 延时消息">
<i class="fa fa-angle-left"></i> RocketMQ 延时消息
</a>
</div>
<div class="post-nav-item">
<a href="/91c9d48a.html" rel="next" title="RocketMQ 消息发送流程">
RocketMQ 消息发送流程 <i class="fa fa-angle-right"></i>
</a>
</div>
</div>
</footer>
</article>
</div>
</div>
</main>
<footer class="footer">
<div class="footer-inner">
<div class="copyright">
©
<span itemprop="copyrightYear">2025</span>
<span class="with-love">
<i class="fa fa-heart"></i>
</span>
<span class="author" itemprop="copyrightHolder">tallate</span>
</div>
<div class="powered-by">由 <a href="https://hexo.io/" rel="noopener" target="_blank">Hexo</a> & <a href="https://theme-next.js.org/" rel="noopener" target="_blank">NexT.Gemini</a> 强力驱动
</div>
</div>
</footer>
<div class="back-to-top" role="button" aria-label="返回顶部">
<i class="fa fa-arrow-up fa-lg"></i>
<span>0%</span>
</div>
<a href="https://github.com/tallate" class="github-corner" title="在 GitHub 上关注我" aria-label="在 GitHub 上关注我" rel="noopener" target="_blank"><svg width="80" height="80" viewBox="0 0 250 250" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin: 130px 106px;" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg></a>
<noscript>
<div class="noscript-warning">Theme NexT works best with JavaScript enabled</div>
</noscript>
<script src="https://cdnjs.cloudflare.com/ajax/libs/animejs/3.2.1/anime.min.js" integrity="sha256-XL2inqUJaslATFnHdJOi9GfQ60on8Wx1C2H8DYiN1xY=" crossorigin="anonymous"></script>
<script src="/js/comments.js"></script><script src="/js/utils.js"></script><script src="/js/motion.js"></script><script src="/js/next-boot.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/hexo-generator-searchdb/1.4.1/search.js" integrity="sha256-1kfA5uHPf65M5cphT2dvymhkuyHPQp5A53EGZOnOLmc=" crossorigin="anonymous"></script>
<script src="/js/third-party/search/local-search.js"></script>
<script class="next-config" data-name="mermaid" type="application/json">{"enable":true,"version":"7.1.2","options":null,"js":{"url":"https://cdnjs.cloudflare.com/ajax/libs/mermaid/10.3.0/mermaid.min.js","integrity":"sha256-9y71g5Lz/KLsHjB8uXwnkuWDtAMDSzD/HdIbqhJfTAI="}}</script>
<script src="/js/third-party/tags/mermaid.js"></script>
</body>
</html>