Skip to content

Commit 2e96b16

Browse files
authored
Merge pull request #15 from ChannexIO/features/CNX-1786_live_feed_producer
CNX-1786 Implement logic for creating custom exchange
2 parents 3d2a584 + 88833f6 commit 2e96b16

File tree

2 files changed

+45
-12
lines changed

2 files changed

+45
-12
lines changed

lib/message_queue/adapters/rabbitmq/producer.ex

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,26 +77,57 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
7777
end
7878
end
7979

80-
defp get_exchange_name(queues, _option) when is_list(queues), do: {:ok, "amq.fanout"}
80+
defp get_exchange_type(queue, options) do
81+
{_exchange_name, exchange_type} = get_exchange(queue, options)
82+
{:ok, exchange_type}
83+
end
84+
85+
defp get_exchange_name(queue, options) do
86+
{exchange_name, _exchange_type} = get_exchange(queue, options)
87+
{:ok, exchange_name}
88+
end
8189

82-
defp get_exchange_name("" = _queue, options) do
83-
if match?([_ | _], options[:headers]), do: {:ok, "amq.headers"}, else: {:ok, "amq.direct"}
90+
defp get_exchange(queues, options) when is_list(queues) do
91+
exchange_type = options[:exchange_type] || :fanout
92+
exchange_name = options[:exchange] || "amq.#{exchange_type}"
93+
{exchange_name, exchange_type}
8494
end
8595

86-
defp get_exchange_name(_queue, _options), do: {:ok, "amq.direct"}
96+
defp get_exchange("" = _queue, options) do
97+
default_type = if match?([_ | _], options[:headers]), do: :headers, else: :direct
98+
exchange_type = options[:exchange_type] || default_type
99+
exchange_name = options[:exchange] || "amq.#{exchange_type}"
100+
{exchange_name, exchange_type}
101+
end
102+
103+
defp get_exchange(_queue, options) do
104+
exchange_type = options[:exchange_type] || :direct
105+
exchange_name = options[:exchange] || "amq.#{exchange_type}"
106+
{exchange_name, exchange_type}
107+
end
87108

88-
defp declare_and_bind_queue("amq.headers", _channel, _queue, _options), do: {:ok, ""}
109+
defp declare_and_bind_queue("amq.headers", channel, _queue, _options) do
110+
{:ok, %{routing_key: "", channel: channel}}
111+
end
112+
113+
defp declare_and_bind_queue(exchange, channel, queues, options) do
114+
if match?([_ | _], options[:headers]) do
115+
{:ok, %{routing_key: "", channel: channel}}
116+
else
117+
declare_and_bind(exchange, channel, queues, options)
118+
end
119+
end
89120

90-
defp declare_and_bind_queue(exchange, channel, queues, options) when is_list(queues) do
91-
Enum.reduce_while(queues, {:ok, ""}, fn queue, acc ->
92-
case declare_and_bind_queue(exchange, channel, queue, options) do
93-
{:ok, _} -> {:cont, acc}
121+
defp declare_and_bind(exchange, channel, queues, options) when is_list(queues) do
122+
Enum.reduce_while(queues, {:ok, %{routing_key: "", channel: channel}}, fn queue, acc ->
123+
case declare_and_bind(exchange, channel, queue, options) do
124+
{:ok, _} = result -> {:cont, result}
94125
error -> {:halt, error}
95126
end
96127
end)
97128
end
98129

99-
defp declare_and_bind_queue(exchange, channel, queue, options) do
130+
defp declare_and_bind(exchange, channel, queue, options) do
100131
options = Keyword.put_new(options, :durable, true)
101132

102133
with {:ok, %{queue: queue}} <- Queue.declare(channel, queue, [{:passive, true} | options]),
@@ -116,7 +147,9 @@ defmodule MessageQueue.Adapters.RabbitMQ.Producer do
116147
defp redeclare_and_bind_queue(exchange, channel, queue, options) do
117148
case Channel.open(channel.conn) do
118149
{:ok, channel} ->
119-
with {:ok, %{queue: queue}} <- Queue.declare(channel, queue, options),
150+
with {:ok, exchange_type} <- get_exchange_type(queue, options),
151+
:ok <- Exchange.declare(channel, exchange, exchange_type, options),
152+
{:ok, %{queue: queue}} <- Queue.declare(channel, queue, options),
120153
routing_key <- Keyword.get(options, :routing_key, queue),
121154
:ok <- Queue.bind(channel, queue, exchange, routing_key: routing_key) do
122155
{:ok, %{routing_key: routing_key, channel: channel}}

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule MessageQueue.MixProject do
22
use Mix.Project
33

44
@name "MessageQueue"
5-
@version "0.1.8"
5+
@version "0.1.9"
66
@repo_url "https://github.com/ChannexIO/message_queue"
77

88
def project do

0 commit comments

Comments
 (0)