Skip to content

Commit 2a4301e

Browse files
committed
Nack rejected messages to MQTT 5.0 client
since MQTT 5.0 supports negative acknowledgements thanks to reason codes in the PUBACK packet. We could either choose reason code 128 or 131. The description code for 131 applies for rejected messages, hence this commit uses 131: > The PUBLISH is valid but the receiver is not willing to accept it.
1 parent d7c700a commit 2a4301e

File tree

8 files changed

+207
-107
lines changed

8 files changed

+207
-107
lines changed

deps/rabbit/src/rabbit_confirms.erl

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,3 @@ next_smallest(S, U) when is_map_key(S, U) ->
144144
next_smallest(S, U) ->
145145
%% TODO: this is potentially infinitely recursive if called incorrectly
146146
next_smallest(S+1, U).
147-
148-
149-
150-
-ifdef(TEST).
151-
-include_lib("eunit/include/eunit.hrl").
152-
-endif.

deps/rabbit/test/rabbit_confirms_SUITE.erl

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
-module(rabbit_confirms_SUITE).
22

3-
-compile(export_all).
4-
5-
-export([
6-
]).
7-
8-
-include_lib("common_test/include/ct.hrl").
3+
-compile([export_all, nowarn_export_all]).
94
-include_lib("eunit/include/eunit.hrl").
105

116
%%%===================================================================
@@ -17,40 +12,13 @@ all() ->
1712
{group, tests}
1813
].
1914

20-
21-
all_tests() ->
22-
[
23-
confirm,
24-
reject,
25-
remove_queue
26-
].
27-
2815
groups() ->
2916
[
30-
{tests, [], all_tests()}
31-
].
32-
33-
init_per_suite(Config) ->
34-
Config.
35-
36-
end_per_suite(_Config) ->
37-
ok.
38-
39-
init_per_group(_Group, Config) ->
40-
Config.
41-
42-
end_per_group(_Group, _Config) ->
43-
ok.
44-
45-
init_per_testcase(_TestCase, Config) ->
46-
Config.
47-
48-
end_per_testcase(_TestCase, _Config) ->
49-
ok.
50-
51-
%%%===================================================================
52-
%%% Test cases
53-
%%%===================================================================
17+
{tests, [shuffle],
18+
[confirm,
19+
reject,
20+
remove_queue
21+
]}].
5422

5523
confirm(_Config) ->
5624
XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
@@ -93,7 +61,6 @@ confirm(_Config) ->
9361
?assertEqual(0, rabbit_confirms:size(U7)),
9462
?assertEqual(undefined, rabbit_confirms:smallest(U7)),
9563

96-
9764
U8 = rabbit_confirms:insert(2, [QName], XName, U1),
9865
{[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
9966
ok.
@@ -126,7 +93,6 @@ reject(_Config) ->
12693
{error, not_found} = rabbit_confirms:reject(2, U5),
12794
?assertEqual(1, rabbit_confirms:size(U5)),
12895
?assertEqual(1, rabbit_confirms:smallest(U5)),
129-
13096
ok.
13197

13298
remove_queue(_Config) ->
@@ -147,8 +113,4 @@ remove_queue(_Config) ->
147113
U5 = rabbit_confirms:insert(1, [QName], XName, U0),
148114
U6 = rabbit_confirms:insert(2, [QName], XName, U5),
149115
{[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),
150-
151116
ok.
152-
153-
154-
%% Utility

deps/rabbitmq_mqtt/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,14 @@ rabbitmq_suite(
281281
],
282282
)
283283

284+
rabbitmq_suite(
285+
name = "rabbit_mqtt_confirms_SUITE",
286+
size = "small",
287+
deps = [
288+
"//deps/rabbit_common:erlang_app",
289+
],
290+
)
291+
284292
assert_suites()
285293

286294
alias(

deps/rabbitmq_mqtt/app.bzl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,11 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
331331
erlc_opts = "//:test_erlc_opts",
332332
deps = ["//deps/amqp_client:erlang_app"],
333333
)
334+
erlang_bytecode(
335+
name = "rabbit_mqtt_confirms_SUITE_beam_files",
336+
testonly = True,
337+
srcs = ["test/rabbit_mqtt_confirms_SUITE.erl"],
338+
outs = ["test/rabbit_mqtt_confirms_SUITE.beam"],
339+
app_name = "rabbitmq_mqtt",
340+
erlc_opts = "//:test_erlc_opts",
341+
)

deps/rabbitmq_mqtt/src/rabbit_mqtt_confirms.erl

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,21 @@ insert(PktId, QNames, State)
4949
-spec confirm([packet_id()], queue_name(), state()) ->
5050
{[packet_id()], state()}.
5151
confirm(PktIds, QName, State0) ->
52-
{L0, State} = lists:foldl(fun(PktId, Acc) ->
53-
confirm_one(PktId, QName, Acc)
54-
end, {[], State0}, PktIds),
55-
L = lists:reverse(L0),
56-
{L, State}.
52+
lists:foldl(fun(PktId, Acc) ->
53+
confirm_one(PktId, QName, Acc)
54+
end, {[], State0}, PktIds).
5755

58-
-spec reject(packet_id(), state()) ->
59-
{ok, state()} | {error, not_found}.
60-
reject(PktId, State0)
61-
when is_integer(PktId) ->
62-
case maps:take(PktId, State0) of
63-
{_, State} ->
64-
{ok, State};
65-
error ->
66-
{error, not_found}
67-
end.
56+
-spec reject([packet_id()], state()) ->
57+
{[packet_id()], state()}.
58+
reject(PktIds, State0) ->
59+
lists:foldl(fun(PktId, Acc = {Rejs, S0}) ->
60+
case maps:take(PktId, S0) of
61+
{_, S} ->
62+
{[PktId | Rejs], S};
63+
error ->
64+
Acc
65+
end
66+
end, {[], State0}, PktIds).
6867

6968
%% idempotent
7069
-spec remove_queue(queue_name(), state()) ->
@@ -77,7 +76,7 @@ remove_queue(QName, State) ->
7776
(_, _, PktIds) ->
7877
PktIds
7978
end, [], State),
80-
confirm(lists:sort(PktIds), QName, State).
79+
confirm(PktIds, QName, State).
8180

8281
%% INTERNAL
8382

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,17 +1686,15 @@ process_routing_confirm(#delivery{confirm = true,
16861686
U = rabbit_mqtt_confirms:insert(PktId, QNames, U0),
16871687
State#state{unacked_client_pubs = U}.
16881688

1689-
-spec send_puback(list(packet_id()), state()) -> ok.
1690-
send_puback(PktIds0, State)
1689+
-spec send_puback(packet_id() | list(packet_id()), reason_code(), state()) -> ok.
1690+
send_puback(PktIds0, ReasonCode, State)
16911691
when is_list(PktIds0) ->
16921692
%% Classic queues confirm messages unordered.
16931693
%% Let's sort them here assuming most MQTT clients send with an increasing packet identifier.
16941694
PktIds = lists:usort(PktIds0),
16951695
lists:foreach(fun(Id) ->
1696-
send_puback(Id, ?RC_SUCCESS, State)
1697-
end, PktIds).
1698-
1699-
-spec send_puback(packet_id(), reason_code(), state()) -> ok.
1696+
send_puback(Id, ReasonCode, State)
1697+
end, PktIds);
17001698
send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
17011699
rabbit_global_counters:messages_confirmed(ProtoVer, 1),
17021700
Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK},
@@ -1971,7 +1969,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
19711969
QStates = rabbit_queue_type:remove(QRef, QStates1),
19721970
State = State0#state{queue_states = QStates,
19731971
unacked_client_pubs = U},
1974-
send_puback(ConfirmPktIds, State),
1972+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
19751973
{ok, State}
19761974
end.
19771975

@@ -2001,7 +1999,7 @@ handle_queue_event({queue_event, QName, Evt},
20011999
QStates = rabbit_queue_type:remove(QName, QStates0),
20022000
State = State1#state{queue_states = QStates,
20032001
unacked_client_pubs = U},
2004-
send_puback(ConfirmPktIds, State),
2002+
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
20052003
{ok, State};
20062004
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
20072005
{error, Error, State0}
@@ -2013,19 +2011,21 @@ handle_queue_actions(Actions, #state{} = State0) ->
20132011
deliver_to_client(Msgs, Ack, S);
20142012
({settled, QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
20152013
{ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0),
2016-
send_puback(ConfirmPktIds, S),
2017-
S#state{unacked_client_pubs = U};
2018-
({rejected, _QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
2019-
%% Negative acks are supported in MQTT v5 only.
2020-
%% Therefore, in MQTT v3 and v4 we ignore rejected messages.
2021-
U = lists:foldl(
2022-
fun(PktId, Acc0) ->
2023-
case rabbit_mqtt_confirms:reject(PktId, Acc0) of
2024-
{ok, Acc} -> Acc;
2025-
{error, not_found} -> Acc0
2026-
end
2027-
end, U0, PktIds),
2014+
send_puback(ConfirmPktIds, ?RC_SUCCESS, S),
20282015
S#state{unacked_client_pubs = U};
2016+
({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0,
2017+
cfg = #cfg{proto_ver = ProtoVer}}) ->
2018+
{RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0),
2019+
S = S0#state{unacked_client_pubs = U},
2020+
%% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore
2021+
%% rejected messages since we can only (but must not) send a positive ack.
2022+
case ProtoVer of
2023+
?MQTT_PROTO_V5 ->
2024+
send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S);
2025+
_ ->
2026+
ok
2027+
end,
2028+
S;
20292029
({block, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
20302030
S#state{queues_soft_limit_exceeded = sets:add_element(QName, QSLE)};
20312031
({unblock, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
-module(rabbit_mqtt_confirms_SUITE).
2+
3+
-compile([export_all, nowarn_export_all]).
4+
-include_lib("eunit/include/eunit.hrl").
5+
6+
%%%===================================================================
7+
%%% Common Test callbacks
8+
%%%===================================================================
9+
10+
all() ->
11+
[
12+
{group, tests}
13+
].
14+
15+
groups() ->
16+
[
17+
{tests, [shuffle],
18+
[confirm,
19+
reject,
20+
remove_queue
21+
]}].
22+
23+
-define(MOD, rabbit_mqtt_confirms).
24+
25+
confirm(_Config) ->
26+
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
27+
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
28+
U0 = ?MOD:init(),
29+
?assertEqual(0, ?MOD:size(U0)),
30+
31+
U1 = ?MOD:insert(1, [QName], U0),
32+
?assertEqual(1, ?MOD:size(U1)),
33+
?assert(?MOD:contains(1, U1)),
34+
35+
{[1], U2} = ?MOD:confirm([1], QName, U1),
36+
?assertEqual(0, ?MOD:size(U2)),
37+
?assertNot(?MOD:contains(1, U2)),
38+
39+
U3 = ?MOD:insert(2, [QName], U1),
40+
?assertEqual(2, ?MOD:size(U3)),
41+
?assert(?MOD:contains(1, U3)),
42+
?assert(?MOD:contains(2, U3)),
43+
44+
{[1], U4} = ?MOD:confirm([1], QName, U3),
45+
?assertEqual(1, ?MOD:size(U4)),
46+
?assertNot(?MOD:contains(1, U4)),
47+
?assert(?MOD:contains(2, U4)),
48+
49+
U5 = ?MOD:insert(2, [QName, QName2], U1),
50+
?assertEqual(2, ?MOD:size(U5)),
51+
?assert(?MOD:contains(1, U5)),
52+
?assert(?MOD:contains(2, U5)),
53+
54+
{[1], U6} = ?MOD:confirm([1, 2], QName, U5),
55+
{[2], U7} = ?MOD:confirm([2], QName2, U6),
56+
?assertEqual(0, ?MOD:size(U7)),
57+
58+
U8 = ?MOD:insert(2, [QName], U1),
59+
{Confirmed, _U9} = ?MOD:confirm([1, 2], QName, U8),
60+
?assertEqual([1, 2], lists:sort(Confirmed)),
61+
ok.
62+
63+
64+
reject(_Config) ->
65+
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
66+
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
67+
U0 = ?MOD:init(),
68+
?assertEqual(0, ?MOD:size(U0)),
69+
70+
U1 = ?MOD:insert(1, [QName], U0),
71+
?assert(?MOD:contains(1, U1)),
72+
73+
{[1], U2} = ?MOD:reject([1], U1),
74+
{[], U2} = ?MOD:reject([1], U2),
75+
?assertEqual(0, ?MOD:size(U2)),
76+
?assertNot(?MOD:contains(1, U2)),
77+
78+
U3 = ?MOD:insert(2, [QName, QName2], U1),
79+
80+
{[1], U4} = ?MOD:reject([1], U3),
81+
{[], U4} = ?MOD:reject([1], U4),
82+
?assertEqual(1, ?MOD:size(U4)),
83+
84+
{[2], U5} = ?MOD:reject([2], U3),
85+
{[], U5} = ?MOD:reject([2], U5),
86+
?assertEqual(1, ?MOD:size(U5)),
87+
?assert(?MOD:contains(1, U5)),
88+
ok.
89+
90+
remove_queue(_Config) ->
91+
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
92+
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
93+
U0 = ?MOD:init(),
94+
95+
U1 = ?MOD:insert(1, [QName, QName2], U0),
96+
U2 = ?MOD:insert(2, [QName2], U1),
97+
{[2], U3} = ?MOD:remove_queue(QName2, U2),
98+
?assertEqual(1, ?MOD:size(U3)),
99+
?assert(?MOD:contains(1, U3)),
100+
{[1], U4} = ?MOD:remove_queue(QName, U3),
101+
?assertEqual(0, ?MOD:size(U4)),
102+
?assertNot(?MOD:contains(1, U4)),
103+
104+
U5 = ?MOD:insert(1, [QName], U0),
105+
U6 = ?MOD:insert(2, [QName], U5),
106+
{Confirmed, U7} = ?MOD:remove_queue(QName, U6),
107+
?assertEqual([1, 2], lists:sort(Confirmed)),
108+
?assertEqual(0, ?MOD:size(U7)),
109+
ok.

0 commit comments

Comments
 (0)