diff --git a/lib/hutch/broker.rb b/lib/hutch/broker.rb index ec24ee7..0ea2bd4 100644 --- a/lib/hutch/broker.rb +++ b/lib/hutch/broker.rb @@ -173,12 +173,16 @@ def tracing_enabled? # Create / get a durable queue and apply namespace if it exists. def queue(name, options = {}) with_bunny_precondition_handler('queue') do - namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "") - queue_name = namespace.present? ? "#{namespace}:#{name}" : name - channel.queue(queue_name, **options) + channel.queue(name, **options) end end + def namespaced_queue_name(name) + namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "") + name = name.prepend(namespace + ":") if namespace.present? + name + end + # Return a mapping of queue names to the routing keys they're bound to. def bindings results = Hash.new { |hash, key| hash[key] = [] } diff --git a/lib/hutch/consumer.rb b/lib/hutch/consumer.rb index 5810380..09bebeb 100644 --- a/lib/hutch/consumer.rb +++ b/lib/hutch/consumer.rb @@ -43,7 +43,8 @@ def consume(*routing_keys) attr_reader :queue_mode, :queue_type, :initial_group_size # Explicitly set the queue name - def queue_name(name) + def queue_name(name, exact: false) + @queue_name_exact = exact @queue_name = name end @@ -91,6 +92,11 @@ def get_queue_name queue_name.downcase end + # Ask for the queue name to be used verbatim (ignore namespace) + def get_queue_name_exact? + @queue_name_exact + end + # Returns consumer custom arguments. def get_arguments all_arguments = @arguments || {} diff --git a/lib/hutch/worker.rb b/lib/hutch/worker.rb index 1368839..137f17c 100644 --- a/lib/hutch/worker.rb +++ b/lib/hutch/worker.rb @@ -45,9 +45,11 @@ def setup_queues # Bind a consumer's routing keys to its queue, and set up a subscription to # receive messages sent to the queue. def setup_queue(consumer) - logger.info "setting up queue: #{consumer.get_queue_name}" + queue_name = consumer.get_queue_name + queue_name = @broker.namespaced_queue_name(queue_name) unless consumer.get_queue_name_exact? + logger.info "setting up queue: #{queue_name}" - queue = @broker.queue(consumer.get_queue_name, consumer.get_options) + queue = @broker.queue(queue_name, consumer.get_options) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|