Skip to content

Commit 402dda1

Browse files
Merge pull request #9161 from rabbitmq/mergify/bp/v3.12.x/pr-9158
QQ: fix bug when subscribing using an already existing consumer tag (backport #9158)
2 parents 97894d8 + 0390886 commit 402dda1

File tree

4 files changed

+48
-9
lines changed

4 files changed

+48
-9
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
408408
credit = Credit,
409409
delivery_count = DeliveryCount,
410410
next_msg_id = NextMsgId} = Consumer,
411+
411412
%% reply with a consumer summary
412413
Reply = {ok, #{next_msg_id => NextMsgId,
413414
credit => Credit,

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
rabbit_queue_type:action().
4747
-type actions() :: [action()].
4848

49-
-record(consumer, {last_msg_id :: seq() | -1,
49+
-record(consumer, {last_msg_id :: seq() | -1 | undefined,
5050
ack = false :: boolean(),
5151
delivery_count = 0 :: non_neg_integer()}).
5252

@@ -359,8 +359,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
359359
%% this is the pre 3.11.1 / 3.10.9
360360
%% reply format
361361
-1;
362-
{ok, #{next_msg_id := NextMsgId}} ->
363-
NextMsgId - 1
362+
{ok, #{num_checked_out := NumChecked,
363+
next_msg_id := NextMsgId}} ->
364+
case NumChecked > 0 of
365+
true ->
366+
%% we cannot know if the pending messages
367+
%% have been delivered to the client or they
368+
%% are on their way to the current process.
369+
%% We set `undefined' to signal this uncertainty
370+
%% and will just accept the next arriving message
371+
%% irrespective of message id
372+
undefined;
373+
false ->
374+
NextMsgId - 1
375+
end
364376
end,
365377
SDels = maps:update_with(
366378
ConsumerTag, fun (C) -> C#consumer{ack = Ack} end,
@@ -713,7 +725,11 @@ handle_delivery(QName, Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
713725
%% TODO: remove potential default allocation
714726
case Consumer of
715727
#consumer{last_msg_id = Prev} = C
716-
when FstId =:= Prev+1 ->
728+
when Prev =:= undefined orelse FstId =:= Prev+1 ->
729+
%% Prev =:= undefined is a special case where a checkout was done
730+
%% for a previously cancelled consumer that still had pending messages
731+
%% In this case we can't reliably know what the next expected message
732+
%% id should be so have to accept whatever message comes next
717733
maybe_auto_ack(Ack, Del,
718734
State0#state{consumer_deliveries =
719735
update_consumer(Tag, LastId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,7 +1726,7 @@ confirm_availability_on_leader_change(Config) ->
17261726

17271727
flush(T) ->
17281728
receive X ->
1729-
ct:pal("flushed ~w", [X]),
1729+
ct:pal("flushed ~p", [X]),
17301730
flush(T)
17311731
after T ->
17321732
ok
@@ -3057,12 +3057,13 @@ cancel_and_consume_with_same_tag(Config) ->
30573057
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
30583058
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),
30593059

3060-
ok = publish(Ch, QQ),
3060+
ok = publish(Ch, QQ, <<"msg1">>),
30613061

30623062
ok = subscribe(Ch, QQ, false),
30633063

30643064
DeliveryTag = receive
3065-
{#'basic.deliver'{delivery_tag = D}, _} ->
3065+
{#'basic.deliver'{delivery_tag = D},
3066+
#amqp_msg{payload = <<"msg1">>}} ->
30663067
D
30673068
after 5000 ->
30683069
flush(100),
@@ -3073,10 +3074,11 @@ cancel_and_consume_with_same_tag(Config) ->
30733074

30743075
ok = subscribe(Ch, QQ, false),
30753076

3076-
ok = publish(Ch, QQ),
3077+
ok = publish(Ch, QQ, <<"msg2">>),
30773078

30783079
receive
3079-
{#'basic.deliver'{delivery_tag = _}, _} ->
3080+
{#'basic.deliver'{delivery_tag = _},
3081+
#amqp_msg{payload = <<"msg2">>}} ->
30803082
ok
30813083
after 5000 ->
30823084
flush(100),

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1990,6 +1990,26 @@ chunk_disk_msgs_test(_Config) ->
19901990
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
19911991
ok.
19921992

1993+
checkout_metadata_test(Config) ->
1994+
Cid = {<<"cid">>, self()},
1995+
{State00, _} = enq(Config, 1, 1, first, test_init(test)),
1996+
{State0, _} = enq(Config, 2, 2, second, State00),
1997+
%% NB: the consumer meta data is taken _before_ it runs a checkout
1998+
%% so in this case num_checked_out will be 0
1999+
{State1, {ok, #{next_msg_id := 0,
2000+
num_checked_out := 0}}, _} =
2001+
apply(meta(Config, ?LINE),
2002+
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
2003+
State0),
2004+
{State2, _, _} = apply(meta(Config, ?LINE),
2005+
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
2006+
{_State3, {ok, #{next_msg_id := 1,
2007+
num_checked_out := 1}}, _} =
2008+
apply(meta(Config, ?LINE),
2009+
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
2010+
State2),
2011+
ok.
2012+
19932013
%% Utility
19942014

19952015
init(Conf) -> rabbit_fifo:init(Conf).

0 commit comments

Comments
 (0)