Skip to content

Commit c96fb3a

Browse files
committed
AMQP 0-9-1: Handle per-queue-type disk alarms
This covers both network and direct connections for 0-9-1. We store a set of the queue types which have been published into on both a channel and connection level since blocking is done on the connection level but only the channel knows what queue types have been published. Then when the published queue types or the set of alarms changes, the connection evaluates whether it is affected by the alarm. If not it may publish but once a channel publishes to an alarmed queue type the connection then blocks until the channel exits or the alarm clears.
1 parent 4776581 commit c96fb3a

File tree

3 files changed

+107
-20
lines changed

3 files changed

+107
-20
lines changed

deps/amqp_client/src/amqp_gen_connection.erl

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
%% connection.block, connection.unblock handler
3333
block_handler,
3434
blocked_by = sets:new([{version, 2}]),
35+
queue_types_published = sets:new([{version, 2}]),
3536
closing = false %% #closing{} | false
3637
}).
3738

@@ -214,18 +215,30 @@ handle_cast({register_blocked_handler, HandlerPid},
214215
{noreply, State1};
215216
handle_cast({conserve_resources, Source, Conserve},
216217
#state{blocked_by = BlockedBy} = State) ->
217-
WasNotBlocked = sets:is_empty(BlockedBy),
218+
WasBlocked = should_block(State),
218219
BlockedBy1 = case Conserve of
219220
true ->
220221
sets:add_element(Source, BlockedBy);
221222
false ->
222223
sets:del_element(Source, BlockedBy)
223224
end,
224225
State1 = State#state{blocked_by = BlockedBy1},
225-
case sets:is_empty(BlockedBy1) of
226+
case should_block(State1) of
226227
true ->
227228
handle_method(#'connection.unblocked'{}, State1);
228-
false when WasNotBlocked ->
229+
false when not WasBlocked ->
230+
handle_method(#'connection.blocked'{}, State1);
231+
false ->
232+
{noreply, State1}
233+
end;
234+
handle_cast({channel_published_to_queue_type, _ChPid, QT},
235+
#state{queue_types_published = QTs} = State) ->
236+
WasBlocked = should_block(State),
237+
State1 = State#state{queue_types_published = sets:add_element(QT, QTs)},
238+
case should_block(State1) of
239+
true ->
240+
handle_method(#'connection.unblocked'{}, State1);
241+
false when not WasBlocked ->
229242
handle_method(#'connection.blocked'{}, State1);
230243
false ->
231244
{noreply, State1}
@@ -274,6 +287,13 @@ i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
274287
register_blocked_handler(Pid, HandlerPid) ->
275288
gen_server:cast(Pid, {register_blocked_handler, HandlerPid}).
276289

290+
should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) ->
291+
lists:any(fun ({disk, QT}) ->
292+
sets:is_element(QT, QTs);
293+
(_Resource) ->
294+
true
295+
end, sets:to_list(BlockedBy)).
296+
277297
%%---------------------------------------------------------------------------
278298
%% Command handling
279299
%%---------------------------------------------------------------------------

deps/rabbit/src/rabbit_channel.erl

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@
170170
interceptor_state,
171171
queue_states,
172172
tick_timer,
173-
publishing_mode = false :: boolean()
173+
publishing_mode = false :: boolean(),
174+
queue_types_published = sets:new([{version, 2}]) ::
175+
sets:set(rabbit_queue_type:queue_type())
174176
}).
175177

176178
-define(QUEUE, lqueue).
@@ -2097,9 +2099,10 @@ deliver_to_queues(XName,
20972099
ok = process_routing_mandatory(Mandatory, RoutedToQueues, Message, XName, State0),
20982100
MsgSeqNo = maps:get(correlation, Options, undefined),
20992101
State1 = process_routing_confirm(MsgSeqNo, QueueNames, XName, State0),
2102+
State2 = notify_published_queue_types(Qs, State1),
21002103
%% Actions must be processed after registering confirms as actions may
21012104
%% contain rejections of publishes
2102-
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
2105+
State = handle_queue_actions(Actions, State2#ch{queue_states = QueueStates}),
21032106
case rabbit_event:stats_level(State, #ch.stats_timer) of
21042107
fine ->
21052108
?INCR_STATS(exchange_stats, XName, 1, publish),
@@ -2165,6 +2168,27 @@ process_routing_confirm(MsgSeqNo, QRefs, XName, State)
21652168
State#ch{unconfirmed =
21662169
rabbit_confirms:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}.
21672170

2171+
notify_published_queue_types(Qs, #ch{cfg = #conf{conn_pid = ConnPid},
2172+
queue_types_published = QTs0} = State0) ->
2173+
QTs = lists:foldl(
2174+
fun(Q0, Acc) ->
2175+
Q = case Q0 of
2176+
{Q1, _RouteInfo} -> Q1;
2177+
_ -> Q0
2178+
end,
2179+
QT = amqqueue:get_type(Q),
2180+
case sets:is_element(QT, Acc) of
2181+
true ->
2182+
Acc;
2183+
false ->
2184+
gen_server:cast(
2185+
ConnPid,
2186+
{channel_published_to_queue_type, self(), QT}),
2187+
sets:add_element(QT, Acc)
2188+
end
2189+
end, QTs0, Qs),
2190+
State0#ch{queue_types_published = QTs}.
2191+
21682192
confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
21692193
%% NOTE: if queue name does not exist here it's likely that the ref also
21702194
%% does not exist in unconfirmed messages.

deps/rabbit/src/rabbit_reader.erl

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,14 @@
113113
%% a set of the reasons why we are
114114
%% blocked: {resource, memory}, {resource, disk}.
115115
%% More reasons can be added in the future.
116-
blocked_by,
116+
blocked_by = sets:new([{version, 2}]) ::
117+
sets:set(flow | {resource,
118+
rabbit_alarm:resource_alarm_source()}),
119+
%% the set of queue types which have been published to
120+
%% by channels on this connection, used for per-queue
121+
%% type disk alarm blocking
122+
queue_types_published = #{} :: #{ChannelPid :: pid() =>
123+
sets:set(rabbit_queue_type:queue_type())},
117124
%% true if received any publishes, false otherwise
118125
%% note that this will also be true when connection is
119126
%% already blocked
@@ -335,7 +342,6 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
335342
throttle = #throttle{
336343
last_blocked_at = never,
337344
should_block = false,
338-
blocked_by = sets:new([{version, 2}]),
339345
connection_blocked_message_sent = false
340346
},
341347
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
@@ -677,6 +683,14 @@ handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
677683
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
678684
%% Ignore, we will emit a created event once we start running.
679685
State;
686+
handle_other({'$gen_cast', {channel_published_to_queue_type, ChPid, QT}},
687+
#v1{throttle = Throttle0} = State0) ->
688+
QTs = maps:update_with(
689+
ChPid, fun(ChQTs) -> sets:add_element(QT, ChQTs) end,
690+
sets:from_list([QT], [{version, 2}]),
691+
Throttle0#throttle.queue_types_published),
692+
Throttle = Throttle0#throttle{queue_types_published = QTs},
693+
control_throttle(State0#v1{throttle = Throttle});
680694
handle_other(ensure_stats, State) ->
681695
ensure_stats_timer(State);
682696
handle_other(emit_stats, State) ->
@@ -1007,14 +1021,21 @@ is_over_node_channel_limit() ->
10071021
end
10081022
end.
10091023

1010-
channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
1024+
channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount,
1025+
throttle = Throttle0} = State) ->
10111026
case get({ch_pid, ChPid}) of
1012-
undefined -> {undefined, State};
1013-
{Channel, MRef} -> credit_flow:peer_down(ChPid),
1014-
erase({channel, Channel}),
1015-
erase({ch_pid, ChPid}),
1016-
erlang:demonitor(MRef, [flush]),
1017-
{Channel, State#v1{channel_count = ChannelCount - 1}}
1027+
undefined ->
1028+
{undefined, State};
1029+
{Channel, MRef} ->
1030+
credit_flow:peer_down(ChPid),
1031+
erase({channel, Channel}),
1032+
erase({ch_pid, ChPid}),
1033+
erlang:demonitor(MRef, [flush]),
1034+
QT = maps:remove(ChPid,
1035+
Throttle0#throttle.queue_types_published),
1036+
Throttle = Throttle0#throttle{queue_types_published = QT},
1037+
{Channel, State#v1{channel_count = ChannelCount - 1,
1038+
throttle = Throttle}}
10181039
end.
10191040

10201041
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
@@ -1738,22 +1759,44 @@ update_last_blocked_at(Throttle) ->
17381759
connection_blocked_message_sent(
17391760
#throttle{connection_blocked_message_sent = BS}) -> BS.
17401761

1741-
should_send_blocked(Throttle = #throttle{blocked_by = Reasons}) ->
1762+
should_send_blocked(Throttle) ->
17421763
should_block(Throttle)
17431764
andalso
1744-
sets:size(sets:del_element(flow, Reasons)) =/= 0
1765+
do_throttle_reasons_apply(Throttle)
17451766
andalso
17461767
not connection_blocked_message_sent(Throttle).
17471768

1748-
should_send_unblocked(Throttle = #throttle{blocked_by = Reasons}) ->
1769+
should_send_unblocked(Throttle) ->
17491770
connection_blocked_message_sent(Throttle)
17501771
andalso
1751-
sets:size(sets:del_element(flow, Reasons)) == 0.
1772+
not do_throttle_reasons_apply(Throttle).
1773+
1774+
do_throttle_reasons_apply(#throttle{blocked_by = Reasons} = Throttle) ->
1775+
lists:any(
1776+
fun ({resource, disk}) ->
1777+
true;
1778+
({resource, memory}) ->
1779+
true;
1780+
({resource, {disk, QT}}) ->
1781+
has_published_to_queue_type(QT, Throttle);
1782+
(_) ->
1783+
%% Flow control should not send connection.blocked
1784+
false
1785+
end, sets:to_list(Reasons)).
1786+
1787+
has_published_to_queue_type(QT, #throttle{queue_types_published = QTs}) ->
1788+
rabbit_misc:maps_any(
1789+
fun(_ChPid, ChQT) -> sets:is_element(QT, ChQT) end, QTs).
17521790

17531791
%% Returns true if we have a reason to block
17541792
%% this connection.
1755-
has_reasons_to_block(#throttle{blocked_by = Reasons}) ->
1756-
sets:size(Reasons) > 0.
1793+
has_reasons_to_block(#throttle{blocked_by = Reasons} = Throttle) ->
1794+
lists:any(
1795+
fun ({resource, {disk, QType}}) ->
1796+
has_published_to_queue_type(QType, Throttle);
1797+
(_) ->
1798+
true
1799+
end, sets:to_list(Reasons)).
17571800

17581801
is_blocked_by_flow(#throttle{blocked_by = Reasons}) ->
17591802
sets:is_element(flow, Reasons).

0 commit comments

Comments
 (0)