Skip to content

Commit e8a520e

Browse files
authored
Merge pull request #16 from ChannexIO/features/single_connection
Implement single connection experiment
2 parents 0ef44b9 + cd49d86 commit e8a520e

File tree

12 files changed

+150
-30
lines changed

12 files changed

+150
-30
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ An adapter is a set of instructions for how to communicate with a specific servi
2323

2424
MessageQueue provides adapters for use RabbitMQ and for testing. To use these adapters, declare them in the environment configuration.
2525

26-
You can create new adapters for any environment by implementing the `MessageQueue.Adapters.Producer`, `MessageQueue.Adapters.Consumer`, `MessageQueue.Adapters.RPCServer` or `MessageQueue.Adapters.RPCCclient` behaviour.
26+
You can create new adapters for any environment by implementing the `MessageQueue.Adapters.Producer`, `MessageQueue.Adapters.Consumer`, `MessageQueue.Adapters.RPCServer`, `MessageQueue.Adapters.RPCCclient` or `MessageQueue.Adapters.Connection` behaviour.
2727

2828
```elixir
2929
config :message_queue,
@@ -41,6 +41,7 @@ MessageQueue.CustomLocalAdapter.Producer
4141
MessageQueue.CustomLocalAdapter.Consumer
4242
MessageQueue.CustomLocalAdapter.RPCServer
4343
MessageQueue.CustomLocalAdapter.RPCClient
44+
MessageQueue.CustomLocalAdapter.Connection
4445
```
4546

4647
### Consumer

lib/message_queue.ex

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
defmodule MessageQueue do
22
@moduledoc false
33

4+
defdelegate publish(message, queue, options \\ []), to: MessageQueue.Producer
5+
defdelegate rpc_call(module, function, args), to: MessageQueue.RPCClient, as: :call
6+
defdelegate get_connection, to: MessageQueue.Connection, as: :get
7+
48
@spec producer() :: module()
59
def producer, do: adapter(Producer)
610

@@ -13,6 +17,9 @@ defmodule MessageQueue do
1317
@spec rpc_client() :: module()
1418
def rpc_client, do: adapter(RPCClient)
1519

20+
@spec connection() :: module()
21+
def connection, do: adapter(Connection)
22+
1623
@spec rpc_queue() :: binary()
1724
def rpc_queue do
1825
app_name = Application.get_env(:message_queue, :app_name)
@@ -25,15 +32,10 @@ defmodule MessageQueue do
2532
end
2633

2734
@spec connection() :: keyword()
28-
def connection do
35+
def connection_details do
2936
Application.get_env(:message_queue, :connection)
3037
end
3138

32-
@spec publish(term(), String.t() | list(String.t()), keyword()) :: :ok | term()
33-
def publish(message, queue, options) do
34-
producer().publish(message, queue, options)
35-
end
36-
3739
defp adapter(module) do
3840
case Application.get_env(:message_queue, :adapter) do
3941
:rabbitmq -> MessageQueue.Adapters.RabbitMQ
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
defmodule MessageQueue.Adapters.Connection do
2+
@moduledoc """
3+
Behaviour for creating MessageQueue connections
4+
"""
5+
6+
@callback get() :: {:ok | AMQP.Connection.t() | term()} | {:error, :not_connected}
7+
end
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
defmodule MessageQueue.Adapters.RabbitMQ.Connection do
2+
@moduledoc false
3+
4+
alias AMQP.Connection
5+
use GenServer
6+
require Logger
7+
8+
@reconnect_interval 10_000
9+
10+
def start_link(_) do
11+
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
12+
end
13+
14+
def get do
15+
GenServer.call(__MODULE__, :get)
16+
end
17+
18+
# Callbacks
19+
20+
@impl true
21+
def init(_) do
22+
{:ok, %{connection: nil}, {:continue, :connect}}
23+
end
24+
25+
@impl true
26+
def handle_call(:get, _from, %{connection: conn} = state) do
27+
{:reply, {:ok, conn}, state}
28+
end
29+
30+
@impl true
31+
def handle_continue(:connect, state) do
32+
case MessageQueue.connection_details() do
33+
nil -> {:noreply, state}
34+
connection_details -> connect(connection_details, state)
35+
end
36+
end
37+
38+
@impl true
39+
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
40+
{:stop, {:connection_lost, reason}, nil}
41+
end
42+
43+
defp connect(connection_details, state) do
44+
with {:ok, conn} <- Connection.open(connection_details) do
45+
Process.monitor(conn.pid)
46+
{:noreply, %{connection: conn}}
47+
else
48+
_error ->
49+
Logger.error("Failed to connect RabbitMQ. Reconnecting later...")
50+
Process.sleep(@reconnect_interval)
51+
{:noreply, state, {:continue, :connect}}
52+
end
53+
catch
54+
:exit, error ->
55+
Logger.error("RabbitMQ error: #{inspect(error)} Reconnecting later...")
56+
Process.sleep(@reconnect_interval)
57+
{:noreply, state, {:continue, :connect}}
58+
end
59+
end

lib/message_queue/adapters/rabbitmq/consumer.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,13 @@ defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
4242

4343
@impl true
4444
def handle_continue(:connect, %{options: options} = state) do
45-
connection = MessageQueue.connection()
4645
prefetch_count = Map.get(options, :prefetch_count, 1)
4746
queue = Map.get(options, :queue)
4847
queue_options = Map.get(options, :queue_options, [])
4948
binding_options = Map.get(options, :bindings, [])
5049
after_connect = Map.get(options, :after_connect, fn _channel -> :ok end)
5150

52-
with {:ok, conn} <- Connection.open(connection),
51+
with {:ok, conn} <- MessageQueue.get_connection(),
5352
{:ok, channel} <- Channel.open(conn),
5453
:ok <- call_after_connect(after_connect, channel),
5554
:ok <- Basic.qos(channel, prefetch_count: prefetch_count),
@@ -59,7 +58,7 @@ defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
5958
Process.monitor(channel.pid)
6059
{:noreply, %{channel: channel, options: options}}
6160
else
62-
{:error, _} ->
61+
_error ->
6362
Logger.error("Failed to connect RabbitMQ. Reconnecting later...")
6463
Process.sleep(@reconnect_interval)
6564
{:noreply, state, {:continue, :connect}}
@@ -81,6 +80,11 @@ defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
8180
{:stop, :normal, nil}
8281
end
8382

83+
@impl true
84+
def handle_info({_ref, {:ok, _connection}}, state) do
85+
{:noreply, state}
86+
end
87+
8488
@impl true
8589
def handle_info({:basic_consume_ok, %{consumer_tag: _}}, state) do
8690
{:noreply, state, :hibernate}

lib/message_queue/adapters/rabbitmq/producer.ex

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
1717

1818
@impl true
1919
def init(state) do
20-
Process.flag(:trap_exit, true)
2120
{:ok, state, {:continue, :connect}}
2221
end
2322

2423
@impl true
2524
def handle_continue(:connect, state) do
26-
connection = MessageQueue.connection()
27-
28-
case Connection.open(connection) do
25+
case MessageQueue.get_connection() do
2926
{:ok, conn} ->
3027
Process.monitor(conn.pid)
3128
{:noreply, conn}
@@ -42,6 +39,11 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
4239
{:stop, {:connection_lost, reason}, nil}
4340
end
4441

42+
@impl true
43+
def handle_info({_ref, {:ok, _connection}}, state) do
44+
{:noreply, state}
45+
end
46+
4547
@impl true
4648
def handle_call({:publish, message, queue, options}, _, conn) do
4749
case prepare_publish(conn, queue, options) do

lib/message_queue/adapters/rabbitmq/rpc_client.ex

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,19 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCClient do
1919

2020
@impl true
2121
def init(state) do
22-
Process.flag(:trap_exit, true)
2322
{:ok, state, {:continue, :connect}}
2423
end
2524

2625
@impl true
2726
def handle_continue(:connect, state) do
28-
connection = MessageQueue.connection()
29-
30-
with {:ok, conn} <- Connection.open(connection),
27+
with {:ok, conn} <- MessageQueue.get_connection(),
3128
{:ok, channel} <- Channel.open(conn),
3229
{:ok, %{queue: queue}} <- Queue.declare(channel, "", exclusive: true),
3330
{:ok, _} <- Basic.consume(channel, queue, nil, no_ack: true) do
31+
Process.monitor(channel.pid)
3432
{:noreply, %{channel: channel, queue: queue, calls: %{}}, :hibernate}
3533
else
36-
{:error, _} ->
34+
_error ->
3735
Logger.error("Failed to connect RabbitMQ. Reconnecting later...")
3836
Process.sleep(@reconnect_interval)
3937
{:noreply, state, {:continue, :connect}}
@@ -46,8 +44,13 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCClient do
4644
end
4745

4846
@impl true
49-
def handle_info({:EXIT, _, :normal}, _state) do
50-
{:stop, :normal, nil}
47+
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
48+
{:stop, {:connection_lost, reason}, nil}
49+
end
50+
51+
@impl true
52+
def handle_info({_ref, {:ok, _connection}}, state) do
53+
{:noreply, state}
5154
end
5255

5356
@impl true

lib/message_queue/adapters/rabbitmq/rpc_server.ex

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,22 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCServer do
1414

1515
@impl true
1616
def init(state) do
17-
Process.flag(:trap_exit, true)
1817
{:ok, state, {:continue, :connect}}
1918
end
2019

2120
@impl true
2221
def handle_continue(:connect, state) do
23-
connection = MessageQueue.connection()
2422
rpc_queue = MessageQueue.rpc_queue()
2523

26-
with {:ok, conn} <- Connection.open(connection),
24+
with {:ok, conn} <- MessageQueue.get_connection(),
2725
{:ok, channel} <- Channel.open(conn),
2826
{:ok, _} <- Queue.declare(channel, rpc_queue),
2927
:ok <- Basic.qos(channel, prefetch_count: 1),
3028
{:ok, _} <- Basic.consume(channel, rpc_queue) do
29+
Process.monitor(channel.pid)
3130
{:noreply, channel, :hibernate}
3231
else
33-
{:error, _} ->
32+
_error ->
3433
Logger.error("Failed to connect RabbitMQ. Reconnecting later...")
3534
Process.sleep(@reconnect_interval)
3635
{:noreply, state, {:continue, :connect}}
@@ -43,8 +42,13 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCServer do
4342
end
4443

4544
@impl true
46-
def handle_info({:EXIT, _, :normal}, _channel) do
47-
{:stop, :normal, nil}
45+
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
46+
{:stop, {:connection_lost, reason}, nil}
47+
end
48+
49+
@impl true
50+
def handle_info({_ref, {:ok, _connection}}, state) do
51+
{:noreply, state}
4852
end
4953

5054
@impl true
@@ -72,7 +76,7 @@ defmodule MessageQueue.Adapters.RabbitMQ.RPCServer do
7276

7377
@impl true
7478
def terminate(_, channel) do
75-
if is_pid(channel.pid) and Process.alive?(channel.pid) do
79+
if not is_nil(channel) and is_pid(channel.pid) and Process.alive?(channel.pid) do
7680
Channel.close(channel)
7781
end
7882

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
defmodule MessageQueue.Adapters.Sandbox.Connection do
2+
@moduledoc false
3+
4+
use GenServer
5+
6+
def start_link(_) do
7+
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
8+
end
9+
10+
@impl true
11+
def init(state), do: {:ok, state}
12+
13+
def get, do: {:ok, self()}
14+
end

lib/message_queue/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ defmodule MessageQueue.Application do
1010
)
1111

1212
children = [
13+
MessageQueue.Connection,
1314
MessageQueue.Producer,
1415
MessageQueue.RPCClient,
1516
MessageQueue.RPCServer
1617
]
1718

18-
opts = [strategy: :one_for_one, name: __MODULE__]
19+
opts = [strategy: :one_for_one, name: __MODULE__, max_restarts: 100]
1920
Supervisor.start_link(children, opts)
2021
end
2122
end

0 commit comments

Comments
 (0)