Skip to content

Commit 0390886

Browse files
kjnilssonmergify[bot]
authored andcommitted
QQ: fix bug when subscribing using an already existing consumer tag
When subscribing using a consumer tag that is already in the quorum queues state (but perhaps with a cancelled status) and that has pending messages the next_msg_id which is used to initialise the queue type consumer state did not take the in flight message ids into account. This resulted in some messages occasionally not being delivered to the clint and thus would appear stuck as awaiting acknowledgement for the consumer. When a new checkout operation detects there are in-flight messages we set the last_msg_id to `undefined` and just accept the next message that arrives, irrespective of their message id. This isn't 100% fool proof as there may be cases where messages are lost between queue and channel where we'd miss to trigger the fallback query for missing messages. It is however much better than what we have atm. NB: really the ideal solution would be to make checkout operations async so that any inflight messages are delivered before the checkout result. That is a much bigger change for another day. (cherry picked from commit 49108a6)
1 parent 97894d8 commit 0390886

File tree

4 files changed

+48
-9
lines changed

4 files changed

+48
-9
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
408408
credit = Credit,
409409
delivery_count = DeliveryCount,
410410
next_msg_id = NextMsgId} = Consumer,
411+
411412
%% reply with a consumer summary
412413
Reply = {ok, #{next_msg_id => NextMsgId,
413414
credit => Credit,

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
rabbit_queue_type:action().
4747
-type actions() :: [action()].
4848

49-
-record(consumer, {last_msg_id :: seq() | -1,
49+
-record(consumer, {last_msg_id :: seq() | -1 | undefined,
5050
ack = false :: boolean(),
5151
delivery_count = 0 :: non_neg_integer()}).
5252

@@ -359,8 +359,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
359359
%% this is the pre 3.11.1 / 3.10.9
360360
%% reply format
361361
-1;
362-
{ok, #{next_msg_id := NextMsgId}} ->
363-
NextMsgId - 1
362+
{ok, #{num_checked_out := NumChecked,
363+
next_msg_id := NextMsgId}} ->
364+
case NumChecked > 0 of
365+
true ->
366+
%% we cannot know if the pending messages
367+
%% have been delivered to the client or they
368+
%% are on their way to the current process.
369+
%% We set `undefined' to signal this uncertainty
370+
%% and will just accept the next arriving message
371+
%% irrespective of message id
372+
undefined;
373+
false ->
374+
NextMsgId - 1
375+
end
364376
end,
365377
SDels = maps:update_with(
366378
ConsumerTag, fun (C) -> C#consumer{ack = Ack} end,
@@ -713,7 +725,11 @@ handle_delivery(QName, Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
713725
%% TODO: remove potential default allocation
714726
case Consumer of
715727
#consumer{last_msg_id = Prev} = C
716-
when FstId =:= Prev+1 ->
728+
when Prev =:= undefined orelse FstId =:= Prev+1 ->
729+
%% Prev =:= undefined is a special case where a checkout was done
730+
%% for a previously cancelled consumer that still had pending messages
731+
%% In this case we can't reliably know what the next expected message
732+
%% id should be so have to accept whatever message comes next
717733
maybe_auto_ack(Ack, Del,
718734
State0#state{consumer_deliveries =
719735
update_consumer(Tag, LastId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,7 +1726,7 @@ confirm_availability_on_leader_change(Config) ->
17261726

17271727
flush(T) ->
17281728
receive X ->
1729-
ct:pal("flushed ~w", [X]),
1729+
ct:pal("flushed ~p", [X]),
17301730
flush(T)
17311731
after T ->
17321732
ok
@@ -3057,12 +3057,13 @@ cancel_and_consume_with_same_tag(Config) ->
30573057
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
30583058
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),
30593059

3060-
ok = publish(Ch, QQ),
3060+
ok = publish(Ch, QQ, <<"msg1">>),
30613061

30623062
ok = subscribe(Ch, QQ, false),
30633063

30643064
DeliveryTag = receive
3065-
{#'basic.deliver'{delivery_tag = D}, _} ->
3065+
{#'basic.deliver'{delivery_tag = D},
3066+
#amqp_msg{payload = <<"msg1">>}} ->
30663067
D
30673068
after 5000 ->
30683069
flush(100),
@@ -3073,10 +3074,11 @@ cancel_and_consume_with_same_tag(Config) ->
30733074

30743075
ok = subscribe(Ch, QQ, false),
30753076

3076-
ok = publish(Ch, QQ),
3077+
ok = publish(Ch, QQ, <<"msg2">>),
30773078

30783079
receive
3079-
{#'basic.deliver'{delivery_tag = _}, _} ->
3080+
{#'basic.deliver'{delivery_tag = _},
3081+
#amqp_msg{payload = <<"msg2">>}} ->
30803082
ok
30813083
after 5000 ->
30823084
flush(100),

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1990,6 +1990,26 @@ chunk_disk_msgs_test(_Config) ->
19901990
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
19911991
ok.
19921992

1993+
checkout_metadata_test(Config) ->
1994+
Cid = {<<"cid">>, self()},
1995+
{State00, _} = enq(Config, 1, 1, first, test_init(test)),
1996+
{State0, _} = enq(Config, 2, 2, second, State00),
1997+
%% NB: the consumer meta data is taken _before_ it runs a checkout
1998+
%% so in this case num_checked_out will be 0
1999+
{State1, {ok, #{next_msg_id := 0,
2000+
num_checked_out := 0}}, _} =
2001+
apply(meta(Config, ?LINE),
2002+
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
2003+
State0),
2004+
{State2, _, _} = apply(meta(Config, ?LINE),
2005+
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
2006+
{_State3, {ok, #{next_msg_id := 1,
2007+
num_checked_out := 1}}, _} =
2008+
apply(meta(Config, ?LINE),
2009+
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
2010+
State2),
2011+
ok.
2012+
19932013
%% Utility
19942014

19952015
init(Conf) -> rabbit_fifo:init(Conf).

0 commit comments

Comments
 (0)