Skip to content

Commit 3b12cf0

Browse files
Merge pull request #7987 from rabbitmq/mergify/bp/v3.12.x/pr-7981
Allow setting consumer timeout via queue policy/arg and as consumer argument (backport #7981)
2 parents 04668ee + a52215c commit 3b12cf0

File tree

4 files changed

+137
-50
lines changed

4 files changed

+137
-50
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2804,34 +2804,67 @@ get_operation_timeout_and_deadline() ->
28042804
Deadline = now_millis() + Timeout,
28052805
{Timeout, Deadline}.
28062806

2807-
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
2808-
consumer_timeout = Timeout},
2809-
unacked_message_q = UAMQ}) ->
2810-
Now = os:system_time(millisecond),
2807+
get_queue_consumer_timeout(_PA = #pending_ack{queue = QName},
2808+
_State = #ch{cfg = #conf{consumer_timeout = GCT}}) ->
2809+
case rabbit_amqqueue:lookup(QName) of
2810+
{ok, Q} -> %% should we account for different queue states here?
2811+
case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>,
2812+
fun (X, Y) -> erlang:min(X, Y) end, Q) of
2813+
undefined -> GCT;
2814+
Val -> Val
2815+
end;
2816+
_ ->
2817+
GCT
2818+
end.
2819+
2820+
get_consumer_timeout(PA = #pending_ack{tag = CTag},
2821+
State = #ch{consumer_mapping = CMap}) ->
2822+
case maps:find(CTag, CMap) of
2823+
{ok, {_, {_, _, _, Args}}} ->
2824+
case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of
2825+
{long, Timeout} -> Timeout;
2826+
_ -> get_queue_consumer_timeout(PA, State)
2827+
end;
2828+
_ ->
2829+
get_queue_consumer_timeout(PA, State)
2830+
end.
2831+
2832+
evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) ->
28112833
case ?QUEUE:get(UAMQ, empty) of
2812-
#pending_ack{delivery_tag = ConsumerTag,
2813-
delivered_at = Time}
2814-
when is_integer(Timeout)
2815-
andalso Time < Now - Timeout ->
2816-
rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out "
2817-
"waiting for delivery acknowledgement. Timeout used: ~tp ms. "
2818-
"This timeout value can be configured, see consumers doc guide to learn more",
2819-
[rabbit_data_coercion:to_binary(ConsumerTag),
2820-
Channel, Timeout]),
2821-
Ex = rabbit_misc:amqp_error(precondition_failed,
2822-
"delivery acknowledgement on channel ~w timed out. "
2823-
"Timeout value used: ~tp ms. "
2824-
"This timeout value can be configured, see consumers doc guide to learn more",
2825-
[Channel, Timeout], none),
2826-
handle_exception(Ex, State0);
2834+
empty ->
2835+
{noreply, State};
2836+
PA -> evaluate_consumer_timeout1(PA, State)
2837+
end.
2838+
2839+
evaluate_consumer_timeout1(PA = #pending_ack{delivered_at = Time},
2840+
State) ->
2841+
Now = os:system_time(millisecond),
2842+
case get_consumer_timeout(PA, State) of
2843+
Timeout when is_integer(Timeout)
2844+
andalso Time < Now - Timeout ->
2845+
handle_consumer_timed_out(Timeout, PA, State);
28272846
_ ->
2828-
{noreply, State0}
2847+
{noreply, State}
28292848
end.
28302849

2850+
handle_consumer_timed_out(Timeout,#pending_ack{delivery_tag = DeliveryTag},
2851+
State = #ch{cfg = #conf{channel = Channel}}) ->
2852+
rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out "
2853+
"waiting for delivery acknowledgement. Timeout used: ~tp ms. "
2854+
"This timeout value can be configured, see consumers doc guide to learn more",
2855+
[rabbit_data_coercion:to_binary(DeliveryTag),
2856+
Channel, Timeout]),
2857+
Ex = rabbit_misc:amqp_error(precondition_failed,
2858+
"delivery acknowledgement on channel ~w timed out. "
2859+
"Timeout value used: ~tp ms. "
2860+
"This timeout value can be configured, see consumers doc guide to learn more",
2861+
[Channel, Timeout], none),
2862+
handle_exception(Ex, State).
2863+
28312864
handle_queue_actions(Actions, #ch{} = State0) ->
28322865
WriterPid = State0#ch.cfg#conf.writer_pid,
28332866
lists:foldl(
2834-
fun
2867+
fun
28352868
({settled, QRef, MsgSeqNos}, S0) ->
28362869
confirm(MsgSeqNos, QRef, S0);
28372870
({rejected, _QRef, MsgSeqNos}, S0) ->

deps/rabbit/src/rabbit_networking.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ listener_of_protocol(Protocol) ->
262262

263263
-spec stop_ranch_listener_of_protocol(atom()) -> ok | {error, not_found}.
264264
stop_ranch_listener_of_protocol(Protocol) ->
265-
case rabbit_networking:ranch_ref_of_protocol(Protocol) of
265+
case ranch_ref_of_protocol(Protocol) of
266266
undefined -> ok;
267267
Ref ->
268268
rabbit_log:debug("Stopping Ranch listener for protocol ~ts", [Protocol]),
@@ -523,7 +523,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
523523
emit_connection_info_local(Items, Ref, AggregatorPid) ->
524524
rabbit_control_misc:emitting_map_with_exit_handler(
525525
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
526-
connections_local()).
526+
connections_local() ++ local_non_amqp_connections()).
527527

528528
-spec close_connection(pid(), string()) -> 'ok'.
529529

deps/rabbit/src/rabbit_policies.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ register() ->
2828
%% such as rabbit_mirror_queue_misc
2929
[rabbit_registry:register(Class, Name, ?MODULE) ||
3030
{Class, Name} <- [{policy_validator, <<"alternate-exchange">>},
31+
{policy_validator, <<"consumer-timeout">>},
3132
{policy_validator, <<"dead-letter-exchange">>},
3233
{policy_validator, <<"dead-letter-routing-key">>},
3334
{policy_validator, <<"dead-letter-strategy">>},
@@ -74,6 +75,12 @@ validate_policy0(<<"alternate-exchange">>, Value)
7475
validate_policy0(<<"alternate-exchange">>, Value) ->
7576
{error, "~tp is not a valid alternate exchange name", [Value]};
7677

78+
validate_policy0(<<"consumer-timeout">>, Value)
79+
when is_integer(Value), Value >= 0 ->
80+
ok;
81+
validate_policy0(<<"consumer-timeout">>, Value) ->
82+
{error, "~tp is not a valid consumer timeout", [Value]};
83+
7784
validate_policy0(<<"dead-letter-exchange">>, Value)
7885
when is_binary(Value) ->
7986
ok;

deps/rabbit/test/consumer_timeout_SUITE.erl

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,58 @@
1313

1414
-compile(export_all).
1515

16-
-define(TIMEOUT, 30000).
16+
-define(CONSUMER_TIMEOUT, 3000).
17+
-define(RECEIVE_TIMEOUT, 5000).
18+
19+
-define(GROUP_CONFIG,
20+
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
21+
{queue_policy, []},
22+
{queue_arguments, []},
23+
{consumer_arguments, []}],
24+
queue_policy_consumer_timeout => [{rabbit, []},
25+
{queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]},
26+
{queue_arguments, []},
27+
{consumer_arguments, []}],
28+
queue_argument_consumer_timeout => [{rabbit, []},
29+
{queue_policy, []},
30+
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]},
31+
{consumer_arguments, []}],
32+
consumer_argument_consumer_timeout => [{rabbit, []},
33+
{queue_policy, []},
34+
{queue_arguments, []},
35+
{consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).
1736

1837
-import(quorum_queue_utils, [wait_for_messages/2]).
1938

2039
all() ->
2140
[
22-
{group, parallel_tests}
41+
{group, global_consumer_timeout},
42+
{group, queue_policy_consumer_timeout},
43+
{group, queue_argument_consumer_timeout},
44+
{group, consumer_argument_consumer_timeout}
2345
].
2446

2547
groups() ->
26-
AllTests = [consumer_timeout,
27-
consumer_timeout_basic_get,
28-
consumer_timeout_no_basic_cancel_capability
29-
],
30-
[
31-
{parallel_tests, [],
32-
[
48+
ConsumerTests = [consumer_timeout,
49+
consumer_timeout_no_basic_cancel_capability],
50+
AllTests = ConsumerTests ++ [consumer_timeout_basic_get],
51+
52+
ConsumerTestsParallel = [
53+
{classic_queue, [parallel], ConsumerTests},
54+
{mirrored_queue, [parallel], ConsumerTests},
55+
{quorum_queue, [parallel], ConsumerTests}
56+
],
57+
58+
AllTestsParallel = [
3359
{classic_queue, [parallel], AllTests},
3460
{mirrored_queue, [parallel], AllTests},
3561
{quorum_queue, [parallel], AllTests}
36-
]}
62+
],
63+
[
64+
{global_consumer_timeout, [], AllTestsParallel},
65+
{queue_policy_consumer_timeout, [], AllTestsParallel},
66+
{queue_argument_consumer_timeout, [], AllTestsParallel},
67+
{consumer_argument_consumer_timeout, [], ConsumerTestsParallel}
3768
].
3869

3970
suite() ->
@@ -55,33 +86,36 @@ end_per_suite(Config) ->
5586
init_per_group(classic_queue, Config) ->
5687
rabbit_ct_helpers:set_config(
5788
Config,
58-
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
89+
[{policy_type, <<"classic_queues">>},
90+
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
5991
{queue_durable, true}]);
6092
init_per_group(quorum_queue, Config) ->
6193
rabbit_ct_helpers:set_config(
6294
Config,
63-
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
95+
[{policy_type, <<"quorum_queues">>},
96+
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
6497
{queue_durable, true}]);
6598
init_per_group(mirrored_queue, Config) ->
6699
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
67100
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
68101
Config1 = rabbit_ct_helpers:set_config(
69-
Config, [{is_mirrored, true},
102+
Config, [{policy_type, <<"classic_queues">>},
103+
{is_mirrored, true},
70104
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
71105
{queue_durable, true}]),
72106
rabbit_ct_helpers:run_steps(Config1, []);
73107
init_per_group(Group, Config0) ->
74108
case lists:member({group, Group}, all()) of
75109
true ->
110+
GroupConfig = maps:get(Group, ?GROUP_CONFIG),
76111
ClusterSize = 3,
77112
Config = rabbit_ct_helpers:merge_app_env(
78113
Config0, {rabbit, [{channel_tick_interval, 1000},
79-
{quorum_tick_interval, 1000},
80-
{consumer_timeout, 5000}]}),
114+
{quorum_tick_interval, 1000}] ++ ?config(rabbit, GroupConfig)}),
81115
Config1 = rabbit_ct_helpers:set_config(
82116
Config, [ {rmq_nodename_suffix, Group},
83117
{rmq_nodes_count, ClusterSize}
84-
]),
118+
] ++ GroupConfig),
85119
rabbit_ct_helpers:run_steps(Config1,
86120
rabbit_ct_broker_helpers:setup_steps() ++
87121
rabbit_ct_client_helpers:setup_steps());
@@ -92,6 +126,11 @@ init_per_group(Group, Config0) ->
92126
end_per_group(Group, Config) ->
93127
case lists:member({group, Group}, all()) of
94128
true ->
129+
case ?config(queue_policy, Config) of
130+
[] -> ok;
131+
_Policy ->
132+
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>)
133+
end,
95134
rabbit_ct_helpers:run_steps(Config,
96135
rabbit_ct_client_helpers:teardown_steps() ++
97136
rabbit_ct_broker_helpers:teardown_steps());
@@ -119,12 +158,12 @@ consumer_timeout(Config) ->
119158
declare_queue(Ch, Config, QName),
120159
publish(Ch, QName, [<<"msg1">>]),
121160
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
122-
subscribe(Ch, QName, false),
161+
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
123162
erlang:monitor(process, Conn),
124163
erlang:monitor(process, Ch),
125164
receive
126165
{'DOWN', _, process, Ch, _} -> ok
127-
after 30000 ->
166+
after ?RECEIVE_TIMEOUT ->
128167
flush(1),
129168
exit(channel_exit_expected)
130169
end,
@@ -149,7 +188,7 @@ consumer_timeout_basic_get(Config) ->
149188
erlang:monitor(process, Ch),
150189
receive
151190
{'DOWN', _, process, Ch, _} -> ok
152-
after 30000 ->
191+
after ?RECEIVE_TIMEOUT ->
153192
flush(1),
154193
exit(channel_exit_expected)
155194
end,
@@ -187,18 +226,18 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
187226
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
188227
erlang:monitor(process, Conn),
189228
erlang:monitor(process, Ch),
190-
subscribe(Ch, QName, false),
229+
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
191230
receive
192231
{#'basic.deliver'{delivery_tag = _,
193232
redelivered = false}, _} ->
194233
%% do nothing with the delivery should trigger timeout
195234
ok
196-
after 5000 ->
235+
after ?RECEIVE_TIMEOUT ->
197236
exit(deliver_timeout)
198237
end,
199238
receive
200239
{'DOWN', _, process, Ch, _} -> ok
201-
after 30000 ->
240+
after ?RECEIVE_TIMEOUT ->
202241
flush(1),
203242
exit(channel_exit_expected)
204243
end,
@@ -217,8 +256,14 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
217256
declare_queue(Ch, Config, QName) ->
218257
Args = ?config(queue_args, Config),
219258
Durable = ?config(queue_durable, Config),
259+
case ?config(queue_policy, Config) of
260+
[] -> ok;
261+
Policy ->
262+
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>,
263+
<<".*">>, ?config(policy_type, Config), Policy)
264+
end,
220265
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
221-
arguments = Args,
266+
arguments = Args ++ ?config(queue_arguments, Config),
222267
durable = Durable}).
223268
publish(Ch, QName, Payloads) ->
224269
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
@@ -235,13 +280,15 @@ consume(Ch, QName, NoAck, Payloads) ->
235280
DTag
236281
end || Payload <- Payloads].
237282

238-
subscribe(Ch, Queue, NoAck) ->
239-
subscribe(Ch, Queue, NoAck, <<"ctag">>).
283+
subscribe(Ch, Queue, NoAck, Args) ->
284+
subscribe(Ch, Queue, NoAck, <<"ctag">>, Args).
240285

241-
subscribe(Ch, Queue, NoAck, Ctag) ->
286+
subscribe(Ch, Queue, NoAck, Ctag, Args) ->
242287
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
243288
no_ack = NoAck,
244-
consumer_tag = Ctag},
289+
consumer_tag = Ctag,
290+
arguments = Args
291+
},
245292
self()),
246293
receive
247294
#'basic.consume_ok'{consumer_tag = Ctag} ->

0 commit comments

Comments
 (0)