Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ Metrics representing state of the whole Sidekiq installation (queues, processes,
- Number of jobs in retry set: `sidekiq_jobs_retry_count`
- Number of jobs in dead set (“morgue”): `sidekiq_jobs_dead_count`
- Active processes count: `sidekiq_active_processes`
- Active servers count: `sidekiq_active_workers_count`
- Active worker threads count: `sidekiq_active_workers_count`
- Busy worker threads count: `sidekiq_busy_workers_count`
- Available worker threads count: `sidekiq_available_workers_count`
- Active worker threads count per queue: `sidekiq_active_workers_count_per_queue` (segmented by queue)
- Busy worker threads count per queue: `sidekiq_busy_workers_count_per_queue` (segmented by queue)
- Available worker threads count per queue: `sidekiq_available_workers_count_per_queue` (segmented by queue)

By default all sidekiq worker processes (servers) collects global metrics about whole Sidekiq installation. This can be overridden by setting `collect_cluster_metrics` config key to `true` for non-Sidekiq processes or to `false` for Sidekiq processes (e.g. by setting `YABEDA_SIDEKIQ_COLLECT_CLUSTER_METRICS` env variable to `no`, see other methods in [anyway_config] docs).

Expand Down
58 changes: 47 additions & 11 deletions lib/yabeda/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,30 @@ def self.config
if config.collect_cluster_metrics # defaults to +::Sidekiq.server?+
retry_count_tags = config.retries_segmented_by_queue ? %i[queue] : []

gauge :jobs_waiting_count, tags: %i[queue], aggregation: :most_recent, comment: "The number of jobs waiting to process in sidekiq."
gauge :active_workers_count, tags: [], aggregation: :most_recent,
comment: "The number of currently running machines with sidekiq workers."
gauge :jobs_scheduled_count, tags: [], aggregation: :most_recent, comment: "The number of jobs scheduled for later execution."
gauge :jobs_retry_count, tags: retry_count_tags, aggregation: :most_recent, comment: "The number of failed jobs waiting to be retried"
gauge :jobs_dead_count, tags: [], aggregation: :most_recent, comment: "The number of jobs exceeded their retry count."
gauge :active_processes, tags: [], aggregation: :most_recent, comment: "The number of active Sidekiq worker processes."
gauge :queue_latency, tags: %i[queue], aggregation: :most_recent,
comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued"
gauge :jobs_waiting_count, tags: %i[queue], aggregation: :most_recent,
comment: "The number of jobs waiting to process in sidekiq."
gauge :active_workers_count, tags: [], aggregation: :most_recent,
comment: "Total number of sidekiq workers threads."
gauge :busy_workers_count, tags: [], aggregation: :most_recent,
comment: "Number of busy sidekiq workers threads."
gauge :available_workers_count, tags: [], aggregation: :most_recent,
comment: "Number of busy sidekiq workers threads."
gauge :active_workers_count_per_queue, tags: %i[queue], aggregation: :most_recent,
comment: "Total number of sidekiq workers threads that can process a given queue."
gauge :busy_workers_count_per_queue, tags: %i[queue], aggregation: :most_recent,
comment: "Number of busy sidekiq workers threads that can process a given queue."
gauge :available_workers_count_per_queue, tags: %i[queue], aggregation: :most_recent,
comment: "Number of busy sidekiq workers threads that can process a given queue."
gauge :jobs_scheduled_count, tags: [], aggregation: :most_recent,
comment: "The number of jobs scheduled for later execution."
gauge :jobs_retry_count, tags: retry_count_tags, aggregation: :most_recent,
comment: "The number of failed jobs waiting to be retried"
gauge :jobs_dead_count, tags: [], aggregation: :most_recent,
comment: "The number of jobs exceeded their retry count."
gauge :active_processes, tags: [], aggregation: :most_recent,
comment: "The number of active Sidekiq worker processes."
gauge :queue_latency, tags: %i[queue], aggregation: :most_recent,
comment: "The queue latency, the difference in seconds since the oldest job in the queue was enqueued"
end

collect do
Expand All @@ -70,15 +85,36 @@ def self.config
next unless config.collect_cluster_metrics

stats = ::Sidekiq::Stats.new

stats.queues.each do |k, v|
sidekiq_jobs_waiting_count.set({ queue: k }, v)
end
sidekiq_active_workers_count.set({}, stats.workers_size)
sidekiq_jobs_scheduled_count.set({}, stats.scheduled_size)
sidekiq_jobs_dead_count.set({}, stats.dead_size)
sidekiq_active_processes.set({}, stats.processes_size)

process_data = { active: 0, busy: 0, available: 0 }
queue_data = {}
::Sidekiq::ProcessSet.new.each do |process|
process_data[:active] += process['concurrency']
process_data[:busy] += process['busy']
process_data[:available] += process['concurrency'] - process['busy']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of calculating available threads number here? It can be calculated in the monitoring system as well

process['queues'].each do |queue|
queue_data[queue] ||= { active: 0, busy: 0, available: 0 }
queue_data[queue][:active] += process['concurrency']
queue_data[queue][:busy] += process['busy']
queue_data[queue][:available] += process['concurrency'] - process['busy']
end
end
sidekiq_active_workers_count.set({}, process_data[:active])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is process_data[:active] the same as stats.workers_size? (it is better to keep backward compatibility)

sidekiq_busy_workers_count.set({}, process_data[:busy])
sidekiq_available_workers_count.set({}, process_data[:available])
queue_data.each do |queue, data|
labels = { queue: queue }
sidekiq_active_workers_count_per_queue.set(labels, data[:active])
sidekiq_busy_workers_count_per_queue.set(labels, data[:busy])
sidekiq_available_workers_count_per_queue.set(labels, data[:available])
end

::Sidekiq::Queue.all.each do |queue|
sidekiq_queue_latency.set({ queue: queue.name }, queue.latency)
end
Expand Down
35 changes: 32 additions & 3 deletions spec/yabeda/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@
OpenStruct.new({ name: "mailers", latency: 0 }),
],
)
allow(Sidekiq::ProcessSet).to receive(:new).and_return(
[
OpenStruct.new({ concurrency: 2, busy: 1, queues: ['default', 'low'] }),
OpenStruct.new({ concurrency: 2, busy: 0, queues: ['default', 'low'] }),
OpenStruct.new({ concurrency: 6, busy: 3, queues: ['medium'] }),
]
)
end

it "collects queue latencies" do
Expand All @@ -253,9 +260,31 @@

it "collects named queues stats", :aggregate_failures do
expect { Yabeda.collect! }.to \
update_yabeda_gauge(Yabeda.sidekiq.jobs_retry_count).with(1).and \
update_yabeda_gauge(Yabeda.sidekiq.jobs_dead_count).with(3).and \
update_yabeda_gauge(Yabeda.sidekiq.jobs_scheduled_count).with(2)
update_yabeda_gauge(Yabeda.sidekiq.jobs_retry_count).with({} => 1).and \
update_yabeda_gauge(Yabeda.sidekiq.jobs_dead_count).with({} => 3).and \
update_yabeda_gauge(Yabeda.sidekiq.jobs_scheduled_count).with({} => 2)
end

it "collects process metrics" do
expect { Yabeda.collect! }.to \
update_yabeda_gauge(Yabeda.sidekiq.active_workers_count).with({} => 10).and \
update_yabeda_gauge(Yabeda.sidekiq.busy_workers_count).with({} => 4).and \
update_yabeda_gauge(Yabeda.sidekiq.available_workers_count).with({} => 6).and \
update_yabeda_gauge(Yabeda.sidekiq.active_workers_count_per_queue).with(
{ queue: 'default' } => 4,
{ queue: 'low' } => 4,
{ queue: 'medium' } => 6
).and \
update_yabeda_gauge(Yabeda.sidekiq.busy_workers_count_per_queue).with(
{ queue: 'default' } => 1,
{ queue: 'low' } => 1,
{ queue: 'medium' } => 3
).and \
update_yabeda_gauge(Yabeda.sidekiq.available_workers_count_per_queue).with(
{ queue: 'default' } => 3,
{ queue: 'low' } => 3,
{ queue: 'medium' } => 3
)
end

it "measures maximum runtime of currently running jobs", sidekiq: :inline do
Expand Down
Loading