diff --git a/README.md b/README.md index 16c66d1..b15b017 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,12 @@ Metrics representing state of the whole Sidekiq installation (queues, processes, - Number of jobs in retry set: `sidekiq_jobs_retry_count` - Number of jobs in dead set (“morgue”): `sidekiq_jobs_dead_count` - Active processes count: `sidekiq_active_processes` - - Active servers count: `sidekiq_active_workers_count` + - Active worker threads count: `sidekiq_active_workers_count` + - Busy worker threads count: `sidekiq_busy_workers_count` + - Available worker threads count: `sidekiq_available_workers_count` + - Active worker threads count per queue: `sidekiq_active_workers_count_per_queue` (segmented by queue) + - Busy worker threads count per queue: `sidekiq_busy_workers_count_per_queue` (segmented by queue) + - Available worker threads count per queue: `sidekiq_available_workers_count_per_queue` (segmented by queue) By default all sidekiq worker processes (servers) collects global metrics about whole Sidekiq installation. This can be overridden by setting `collect_cluster_metrics` config key to `true` for non-Sidekiq processes or to `false` for Sidekiq processes (e.g. by setting `YABEDA_SIDEKIQ_COLLECT_CLUSTER_METRICS` env variable to `no`, see other methods in [anyway_config] docs). diff --git a/lib/yabeda/sidekiq.rb b/lib/yabeda/sidekiq.rb index 6d425a2..c32d501 100644 --- a/lib/yabeda/sidekiq.rb +++ b/lib/yabeda/sidekiq.rb @@ -53,15 +53,30 @@ def self.config if config.collect_cluster_metrics # defaults to +::Sidekiq.server?+ retry_count_tags = config.retries_segmented_by_queue ? %i[queue] : [] - gauge :jobs_waiting_count, tags: %i[queue], aggregation: :most_recent, comment: "The number of jobs waiting to process in sidekiq." - gauge :active_workers_count, tags: [], aggregation: :most_recent, - comment: "The number of currently running machines with sidekiq workers." - gauge :jobs_scheduled_count, tags: [], aggregation: :most_recent, comment: "The number of jobs scheduled for later execution." - gauge :jobs_retry_count, tags: retry_count_tags, aggregation: :most_recent, comment: "The number of failed jobs waiting to be retried" - gauge :jobs_dead_count, tags: [], aggregation: :most_recent, comment: "The number of jobs exceeded their retry count." - gauge :active_processes, tags: [], aggregation: :most_recent, comment: "The number of active Sidekiq worker processes." - gauge :queue_latency, tags: %i[queue], aggregation: :most_recent, - comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" + gauge :jobs_waiting_count, tags: %i[queue], aggregation: :most_recent, + comment: "The number of jobs waiting to process in sidekiq." + gauge :active_workers_count, tags: [], aggregation: :most_recent, + comment: "Total number of sidekiq workers threads." + gauge :busy_workers_count, tags: [], aggregation: :most_recent, + comment: "Number of busy sidekiq workers threads." + gauge :available_workers_count, tags: [], aggregation: :most_recent, + comment: "Number of busy sidekiq workers threads." + gauge :active_workers_count_per_queue, tags: %i[queue], aggregation: :most_recent, + comment: "Total number of sidekiq workers threads that can process a given queue." + gauge :busy_workers_count_per_queue, tags: %i[queue], aggregation: :most_recent, + comment: "Number of busy sidekiq workers threads that can process a given queue." + gauge :available_workers_count_per_queue, tags: %i[queue], aggregation: :most_recent, + comment: "Number of busy sidekiq workers threads that can process a given queue." + gauge :jobs_scheduled_count, tags: [], aggregation: :most_recent, + comment: "The number of jobs scheduled for later execution." + gauge :jobs_retry_count, tags: retry_count_tags, aggregation: :most_recent, + comment: "The number of failed jobs waiting to be retried" + gauge :jobs_dead_count, tags: [], aggregation: :most_recent, + comment: "The number of jobs exceeded their retry count." + gauge :active_processes, tags: [], aggregation: :most_recent, + comment: "The number of active Sidekiq worker processes." + gauge :queue_latency, tags: %i[queue], aggregation: :most_recent, + comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued" end collect do @@ -70,15 +85,36 @@ def self.config next unless config.collect_cluster_metrics stats = ::Sidekiq::Stats.new - stats.queues.each do |k, v| sidekiq_jobs_waiting_count.set({ queue: k }, v) end - sidekiq_active_workers_count.set({}, stats.workers_size) sidekiq_jobs_scheduled_count.set({}, stats.scheduled_size) sidekiq_jobs_dead_count.set({}, stats.dead_size) sidekiq_active_processes.set({}, stats.processes_size) + process_data = { active: 0, busy: 0, available: 0 } + queue_data = {} + ::Sidekiq::ProcessSet.new.each do |process| + process_data[:active] += process['concurrency'] + process_data[:busy] += process['busy'] + process_data[:available] += process['concurrency'] - process['busy'] + process['queues'].each do |queue| + queue_data[queue] ||= { active: 0, busy: 0, available: 0 } + queue_data[queue][:active] += process['concurrency'] + queue_data[queue][:busy] += process['busy'] + queue_data[queue][:available] += process['concurrency'] - process['busy'] + end + end + sidekiq_active_workers_count.set({}, process_data[:active]) + sidekiq_busy_workers_count.set({}, process_data[:busy]) + sidekiq_available_workers_count.set({}, process_data[:available]) + queue_data.each do |queue, data| + labels = { queue: queue } + sidekiq_active_workers_count_per_queue.set(labels, data[:active]) + sidekiq_busy_workers_count_per_queue.set(labels, data[:busy]) + sidekiq_available_workers_count_per_queue.set(labels, data[:available]) + end + ::Sidekiq::Queue.all.each do |queue| sidekiq_queue_latency.set({ queue: queue.name }, queue.latency) end diff --git a/spec/yabeda/sidekiq_spec.rb b/spec/yabeda/sidekiq_spec.rb index 5c4f741..8c0490c 100644 --- a/spec/yabeda/sidekiq_spec.rb +++ b/spec/yabeda/sidekiq_spec.rb @@ -233,6 +233,13 @@ OpenStruct.new({ name: "mailers", latency: 0 }), ], ) + allow(Sidekiq::ProcessSet).to receive(:new).and_return( + [ + OpenStruct.new({ concurrency: 2, busy: 1, queues: ['default', 'low'] }), + OpenStruct.new({ concurrency: 2, busy: 0, queues: ['default', 'low'] }), + OpenStruct.new({ concurrency: 6, busy: 3, queues: ['medium'] }), + ] + ) end it "collects queue latencies" do @@ -253,9 +260,31 @@ it "collects named queues stats", :aggregate_failures do expect { Yabeda.collect! }.to \ - update_yabeda_gauge(Yabeda.sidekiq.jobs_retry_count).with(1).and \ - update_yabeda_gauge(Yabeda.sidekiq.jobs_dead_count).with(3).and \ - update_yabeda_gauge(Yabeda.sidekiq.jobs_scheduled_count).with(2) + update_yabeda_gauge(Yabeda.sidekiq.jobs_retry_count).with({} => 1).and \ + update_yabeda_gauge(Yabeda.sidekiq.jobs_dead_count).with({} => 3).and \ + update_yabeda_gauge(Yabeda.sidekiq.jobs_scheduled_count).with({} => 2) + end + + it "collects process metrics" do + expect { Yabeda.collect! }.to \ + update_yabeda_gauge(Yabeda.sidekiq.active_workers_count).with({} => 10).and \ + update_yabeda_gauge(Yabeda.sidekiq.busy_workers_count).with({} => 4).and \ + update_yabeda_gauge(Yabeda.sidekiq.available_workers_count).with({} => 6).and \ + update_yabeda_gauge(Yabeda.sidekiq.active_workers_count_per_queue).with( + { queue: 'default' } => 4, + { queue: 'low' } => 4, + { queue: 'medium' } => 6 + ).and \ + update_yabeda_gauge(Yabeda.sidekiq.busy_workers_count_per_queue).with( + { queue: 'default' } => 1, + { queue: 'low' } => 1, + { queue: 'medium' } => 3 + ).and \ + update_yabeda_gauge(Yabeda.sidekiq.available_workers_count_per_queue).with( + { queue: 'default' } => 3, + { queue: 'low' } => 3, + { queue: 'medium' } => 3 + ) end it "measures maximum runtime of currently running jobs", sidekiq: :inline do