11defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
2- @ moduledoc false
2+ @ moduledoc """
3+ A RabbitMQ consumer for MessageQueue.
4+
5+ ## Options
6+
7+ * `:queue` - Required. The name of the queue
8+ * `:prefetch_count` - Optional. Prefetch options used by the RabbitMQ client. By default is 1
9+ * `:queue_options` - Optional. Queue options used by RabbitMQ client. For example:
10+ %{queue_options: [
11+ durable: false, arguments: [
12+ {"x-dead-letter-exchange", :longstr, ""},
13+ {"x-dead-letter-routing-key", :longstr, "tasks.errors"}
14+ ]
15+ ]
16+
17+ `durable: true` will be added automatically.
18+ * `:bindings. Optional. a list of bindings for the `:queue`. This option
19+ allows you to bind the queue to one or more exchanges. Each binding is a tuple
20+ `{exchange_name, binding_options}` where so that the queue will be bound
21+ to `exchange_name` through `AMQP.Queue.bind/4` using `binding_options` as
22+ the options. Bindings are idempotent so you can bind the same queue to the
23+ same exchange multiple times.
24+ """
325
426 defmacro __using__ ( _opts ) do
527 quote do
@@ -20,11 +42,13 @@ defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
2042 prefetch_count = Map . get ( options , :prefetch_count , 1 )
2143 queue = Map . get ( options , :queue )
2244 queue_options = Map . get ( options , :queue_options , [ ] )
45+ binding_options = Map . get ( options , :bindings , [ ] )
2346
2447 with { :ok , conn } <- Connection . open ( connection ) ,
2548 { :ok , channel } <- Channel . open ( conn ) ,
2649 :ok <- Basic . qos ( channel , prefetch_count: prefetch_count ) ,
2750 { :ok , _ } <- Queue . declare ( channel , queue , queue_options ++ [ durable: true ] ) ,
51+ :ok <- binding_if_needs ( channel , queue , binding_options ) ,
2852 { :ok , _ } <- Basic . consume ( channel , queue ) do
2953 Process . monitor ( channel . pid )
3054 { :noreply , % { channel: channel , options: options } }
@@ -89,6 +113,17 @@ defmodule MessageQueue.Adapters.RabbitMQ.Consumer do
89113 Basic . reject ( channel , tag , options )
90114 end
91115
116+ defp binding_if_needs ( _ , _ , [ ] ) , do: :ok
117+
118+ defp binding_if_needs ( channel , queue , bindings ) do
119+ Enum . reduce_while ( bindings , :ok , fn { exchange , options } , result ->
120+ case Queue . bind ( channel , queue , exchange , options ) do
121+ :ok -> { :cont , :ok }
122+ error -> { :halt , error }
123+ end
124+ end )
125+ end
126+
92127 defoverridable handle_message: 3
93128 end
94129 end
0 commit comments