Skip to content

Commit 3d2a584

Browse files
authored
Merge pull request #14 from ChannexIO/hotfixes/CNX-1692-CNX-1700_fix_catch_exit
CNX-1692 CNX-1700 Fix RabbitMQ catch exit error
2 parents 961f154 + c1b644a commit 3d2a584

File tree

5 files changed

+71
-31
lines changed

5 files changed

+71
-31
lines changed

lib/message_queue/adapters/rabbitmq/consumer.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
6464
Process.sleep(@reconnect_interval)
6565
{:noreply, state, {:continue, :connect}}
6666
end
67+
catch
68+
:exit, error ->
69+
Logger.error("RabbitMQ error: #{inspect(error)} Reconnecting later...")
70+
Process.sleep(@reconnect_interval)
71+
{:noreply, state, {:continue, :connect}}
6772
end
6873

6974
@impl true

lib/message_queue/adapters/rabbitmq/producer.ex

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -44,34 +44,42 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
4444

4545
@impl true
4646
def handle_call({:publish, message, queue, options}, _, conn) do
47-
case Channel.open(conn) do
48-
{:ok, channel} ->
49-
with {:ok, exchange} <- get_exchange_name(queue, options),
50-
{:ok, routing_key} <- declare_and_bind_queue(exchange, channel, queue, options),
51-
:ok <- Confirm.select(channel),
52-
{:ok, encoded_message} <- Jason.encode(message),
53-
:ok <- Basic.publish(channel, exchange, routing_key, encoded_message, options),
54-
{:published, true} <- {:published, Confirm.wait_for_confirms(channel)} do
55-
spawn(fn -> close_channel(channel) end)
56-
{:reply, :ok, conn}
57-
else
58-
{:published, _} ->
59-
spawn(fn -> close_channel(channel) end)
60-
{:reply, {:error, :not_published}, conn}
47+
case prepare_publish(conn, queue, options) do
48+
{:ok, %{channel: channel, routing_key: routing_key, exchange: exchange}} ->
49+
result = publish_message(channel, message, exchange, routing_key, options)
50+
spawn(fn -> close_channel(channel) end)
6151

62-
error ->
63-
spawn(fn -> close_channel(channel) end)
64-
{:reply, error, conn}
65-
end
52+
{:reply, result, conn}
6653

67-
{:error, error} ->
54+
error ->
6855
{:reply, error, conn}
6956
end
7057
end
7158

72-
defp get_exchange_name(queues, option) when is_list(queues), do: {:ok, "amq.fanout"}
59+
defp prepare_publish(conn, queue, options) do
60+
with {:ok, channel} <- Channel.open(conn),
61+
{:ok, exchange} <- get_exchange_name(queue, options),
62+
{:ok, %{channel: channel, routing_key: routing_key}} <-
63+
declare_and_bind_queue(exchange, channel, queue, options) do
64+
{:ok, %{channel: channel, routing_key: routing_key, exchange: exchange}}
65+
end
66+
end
67+
68+
defp publish_message(channel, message, exchange, routing_key, options) do
69+
with :ok <- Confirm.select(channel),
70+
{:ok, encoded_message} <- Jason.encode(message),
71+
:ok <- Basic.publish(channel, exchange, routing_key, encoded_message, options),
72+
{:published, true} <- {:published, Confirm.wait_for_confirms(channel)} do
73+
:ok
74+
else
75+
{:published, _} -> {:error, :not_published}
76+
error -> error
77+
end
78+
end
79+
80+
defp get_exchange_name(queues, _option) when is_list(queues), do: {:ok, "amq.fanout"}
7381

74-
defp get_exchange_name("" = queue, options) do
82+
defp get_exchange_name("" = _queue, options) do
7583
if match?([_ | _], options[:headers]), do: {:ok, "amq.headers"}, else: {:ok, "amq.direct"}
7684
end
7785

@@ -82,7 +90,7 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
8290
defp declare_and_bind_queue(exchange, channel, queues, options) when is_list(queues) do
8391
Enum.reduce_while(queues, {:ok, ""}, fn queue, acc ->
8492
case declare_and_bind_queue(exchange, channel, queue, options) do
85-
{:ok, _routing_key} -> {:cont, acc}
93+
{:ok, _} -> {:cont, acc}
8694
error -> {:halt, error}
8795
end
8896
end)
@@ -94,16 +102,33 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
94102
with {:ok, %{queue: queue}} <- Queue.declare(channel, queue, [{:passive, true} | options]),
95103
routing_key <- Keyword.get(options, :routing_key, queue),
96104
:ok <- Queue.bind(channel, queue, exchange, routing_key: routing_key) do
97-
{:ok, routing_key}
105+
{:ok, %{routing_key: routing_key, channel: channel}}
106+
else
107+
error ->
108+
spawn(fn -> close_channel(channel) end)
109+
error
98110
end
99111
catch
100-
:exit, _ ->
101-
with {:ok, channel} <- Channel.open(channel.conn),
102-
{:ok, %{queue: queue}} <- Queue.declare(channel, queue, options),
103-
routing_key <- Keyword.get(options, :routing_key, queue),
104-
:ok <- Queue.bind(channel, queue, exchange, routing_key: routing_key) do
105-
{:ok, routing_key}
106-
end
112+
:exit, {{:shutdown, {:server_initiated_close, 404, "NOT_FOUND - no queue" <> _}}, _} ->
113+
redeclare_and_bind_queue(exchange, channel, queue, options)
114+
end
115+
116+
defp redeclare_and_bind_queue(exchange, channel, queue, options) do
117+
case Channel.open(channel.conn) do
118+
{:ok, channel} ->
119+
with {:ok, %{queue: queue}} <- Queue.declare(channel, queue, options),
120+
routing_key <- Keyword.get(options, :routing_key, queue),
121+
:ok <- Queue.bind(channel, queue, exchange, routing_key: routing_key) do
122+
{:ok, %{routing_key: routing_key, channel: channel}}
123+
else
124+
error ->
125+
spawn(fn -> close_channel(channel) end)
126+
error
127+
end
128+
129+
error ->
130+
error
131+
end
107132
end
108133

109134
defp close_channel(channel) do

lib/message_queue/adapters/rabbitmq/rpc_client.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCClient do
3838
Process.sleep(@reconnect_interval)
3939
{:noreply, state, {:continue, :connect}}
4040
end
41+
catch
42+
:exit, error ->
43+
Logger.error("RabbitMQ error: #{inspect(error)}. Reconnecting later...")
44+
Process.sleep(@reconnect_interval)
45+
{:noreply, state, {:continue, :connect}}
4146
end
4247

4348
@impl true

lib/message_queue/adapters/rabbitmq/rpc_server.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCServer do
3535
Process.sleep(@reconnect_interval)
3636
{:noreply, state, {:continue, :connect}}
3737
end
38+
catch
39+
:exit, error ->
40+
Logger.error("RabbitMQ error: #{inspect(error)}. Reconnecting later...")
41+
Process.sleep(@reconnect_interval)
42+
{:noreply, state, {:continue, :connect}}
3843
end
3944

4045
@impl true

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule MessageQueue.MixProject do
22
use Mix.Project
33

44
@name "MessageQueue"
5-
@version "0.1.7"
5+
@version "0.1.8"
66
@repo_url "https://github.com/ChannexIO/message_queue"
77

88
def project do

0 commit comments

Comments
 (0)