Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/hutch/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,16 @@ def tracing_enabled?
# Create / get a durable queue and apply namespace if it exists.
def queue(name, options = {})
with_bunny_precondition_handler('queue') do
namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
queue_name = namespace.present? ? "#{namespace}:#{name}" : name
channel.queue(queue_name, **options)
channel.queue(name, **options)
end
end

def namespaced_queue_name(name)
namespace = @config[:namespace].to_s.downcase.gsub(/[^-_:\.\w]/, "")
name = name.prepend(namespace + ":") if namespace.present?
name
end

# Return a mapping of queue names to the routing keys they're bound to.
def bindings
results = Hash.new { |hash, key| hash[key] = [] }
Expand Down
8 changes: 7 additions & 1 deletion lib/hutch/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def consume(*routing_keys)
attr_reader :queue_mode, :queue_type, :initial_group_size

# Explicitly set the queue name
def queue_name(name)
def queue_name(name, exact: false)
@queue_name_exact = exact
@queue_name = name
end

Expand Down Expand Up @@ -91,6 +92,11 @@ def get_queue_name
queue_name.downcase
end

# Ask for the queue name to be used verbatim (ignore namespace)
def get_queue_name_exact?
@queue_name_exact
end

# Returns consumer custom arguments.
def get_arguments
all_arguments = @arguments || {}
Expand Down
6 changes: 4 additions & 2 deletions lib/hutch/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ def setup_queues
# Bind a consumer's routing keys to its queue, and set up a subscription to
# receive messages sent to the queue.
def setup_queue(consumer)
logger.info "setting up queue: #{consumer.get_queue_name}"
queue_name = consumer.get_queue_name
queue_name = @broker.namespaced_queue_name(queue_name) unless consumer.get_queue_name_exact?
logger.info "setting up queue: #{queue_name}"

queue = @broker.queue(consumer.get_queue_name, consumer.get_options)
queue = @broker.queue(queue_name, consumer.get_options)
@broker.bind_queue(queue, consumer.routing_keys)

queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args|
Expand Down