Skip to content

Commit bdff0f7

Browse files
committed
export RQ status as prometheus metrics
fixes: rq#503
1 parent 2eba43f commit bdff0f7

File tree

8 files changed

+269
-2
lines changed

8 files changed

+269
-2
lines changed

.github/workflows/test.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ jobs:
3737
3838
- name: Run Test
3939
run: |
40-
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
40+
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2
41+
42+
- name: Install optional dependencies
43+
run: |
44+
pip install prometheus_client
45+
46+
- name: Run Test with optional dependencies
47+
run: |
48+
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2
4149
4250
mypy:
4351
runs-on: ubuntu-latest

README.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,33 @@ Additionally, these statistics are also accessible from the command line.
368368
369369
.. image:: demo-django-rq-cli-dashboard.gif
370370

371+
Configuring Prometheus
372+
----------------------
373+
374+
``django_rq`` also provides a Prometheus compatible view, which can be enabled
375+
by installing ``prometheus_client`` or installing the extra "prometheus-metrics"
376+
(``pip install django-rq[prometheus-metrics]``). The metrics are exposed at
377+
``/django-rq/metrics/`` and the following is an example of the metrics that
378+
are exported::
379+
380+
# HELP rq_workers RQ workers
381+
# TYPE rq_workers gauge
382+
# HELP rq_job_successful_total RQ successful job count
383+
# TYPE rq_job_successful_total counter
384+
# HELP rq_job_failed_total RQ failed job count
385+
# TYPE rq_job_failed_total counter
386+
# HELP rq_working_seconds_total RQ total working time
387+
# TYPE rq_working_seconds_total counter
388+
# HELP rq_jobs RQ jobs by status
389+
# TYPE rq_jobs gauge
390+
rq_jobs{queue="default",status="queued"} 0.0
391+
rq_jobs{queue="default",status="started"} 0.0
392+
rq_jobs{queue="default",status="finished"} 0.0
393+
rq_jobs{queue="default",status="failed"} 0.0
394+
rq_jobs{queue="default",status="deferred"} 0.0
395+
rq_jobs{queue="default",status="scheduled"} 0.0
396+
397+
371398
Configuring Sentry
372399
-------------------
373400
Sentry

django_rq/metrics_collector.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from rq.job import JobStatus
2+
3+
from .queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs
4+
from .workers import get_worker_class
5+
6+
try:
7+
from prometheus_client import Summary
8+
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
9+
10+
class RQCollector:
11+
"""RQ stats collector"""
12+
13+
summary = Summary('rq_request_processing_seconds_total', 'Time spent collecting RQ data')
14+
15+
def collect(self):
16+
from .settings import QUEUES
17+
18+
with self.summary.time():
19+
rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues'])
20+
rq_job_successful_total = CounterMetricFamily('rq_job_successful_total', 'RQ successful job count', labels=['name', 'queues'])
21+
rq_job_failed_total = CounterMetricFamily('rq_job_failed_total', 'RQ failed job count', labels=['name', 'queues'])
22+
rq_working_seconds_total = CounterMetricFamily('rq_working_seconds_total', 'RQ total working time', labels=['name', 'queues'])
23+
24+
rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by status', labels=['queue', 'status'])
25+
26+
worker_class = get_worker_class()
27+
unique_configs = get_unique_connection_configs()
28+
connections = {}
29+
for queue_name, config in QUEUES.items():
30+
index = unique_configs.index(filter_connection_params(config))
31+
if index not in connections:
32+
connections[index] = connection = get_connection(queue_name)
33+
34+
for worker in worker_class.all(connection):
35+
name = worker.name
36+
label_queues = ','.join(worker.queue_names())
37+
rq_workers.add_metric([name, worker.get_state(), label_queues], 1)
38+
rq_job_successful_total.add_metric([name, label_queues], worker.successful_job_count)
39+
rq_job_failed_total.add_metric([name, label_queues], worker.failed_job_count)
40+
rq_working_seconds_total.add_metric([name, label_queues], worker.total_working_time)
41+
else:
42+
connection = connections[index]
43+
44+
queue = get_queue(queue_name, connection=connection)
45+
rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count)
46+
rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count)
47+
rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count)
48+
rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count)
49+
rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count)
50+
rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count)
51+
52+
yield rq_workers
53+
yield rq_job_successful_total
54+
yield rq_job_failed_total
55+
yield rq_working_seconds_total
56+
yield rq_jobs
57+
58+
except ImportError:
59+
RQCollector = None # type: ignore[assignment, misc]
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import os
2+
from unittest import skipIf
3+
from unittest.mock import patch
4+
5+
from django.contrib.auth.models import User
6+
from django.test import TestCase, override_settings
7+
from django.test.client import Client
8+
from django.urls import NoReverseMatch, reverse
9+
10+
from django_rq import get_queue
11+
from django_rq.workers import get_worker
12+
13+
from .fixtures import access_self, failing_job
14+
15+
try:
16+
import prometheus_client
17+
except ImportError:
18+
prometheus_client = None
19+
20+
RQ_QUEUES = {
21+
'default': {
22+
'HOST': os.environ.get('REDIS_HOST', 'localhost'),
23+
'PORT': 6379,
24+
'DB': 0,
25+
},
26+
}
27+
28+
29+
@skipIf(prometheus_client is None, 'prometheus_client is required')
30+
@override_settings(RQ={'AUTOCOMMIT': True})
31+
class PrometheusTest(TestCase):
32+
def setUp(self):
33+
self.user = User.objects.create_user('foo', password='pass')
34+
self.user.is_staff = True
35+
self.user.is_active = True
36+
self.user.save()
37+
self.client = Client()
38+
self.client.force_login(self.user)
39+
get_queue('default').connection.flushall()
40+
41+
def assertMetricsContain(self, lines):
42+
response = self.client.get(reverse('rq_metrics'))
43+
self.assertEqual(response.status_code, 200)
44+
self.assertLessEqual(
45+
lines, set(response.content.decode('utf-8').splitlines())
46+
)
47+
48+
@patch('django_rq.settings.QUEUES', RQ_QUEUES)
49+
def test_metrics_default(self):
50+
self.assertMetricsContain(
51+
{
52+
'# HELP rq_jobs RQ jobs by status',
53+
'rq_jobs{queue="default",status="queued"} 0.0',
54+
'rq_jobs{queue="default",status="started"} 0.0',
55+
'rq_jobs{queue="default",status="finished"} 0.0',
56+
'rq_jobs{queue="default",status="failed"} 0.0',
57+
'rq_jobs{queue="default",status="deferred"} 0.0',
58+
'rq_jobs{queue="default",status="scheduled"} 0.0',
59+
}
60+
)
61+
62+
@patch('django_rq.settings.QUEUES', RQ_QUEUES)
63+
def test_metrics_with_jobs(self):
64+
queue = get_queue('default')
65+
queue.enqueue(failing_job)
66+
67+
for _ in range(10):
68+
queue.enqueue(access_self)
69+
70+
worker = get_worker('default', name='test_worker')
71+
worker.register_birth()
72+
73+
# override worker registration to effectively simulate non burst mode
74+
register_death = worker.register_death
75+
worker.register_birth = worker.register_death = lambda: None # type: ignore[method-assign]
76+
77+
try:
78+
self.assertMetricsContain(
79+
{
80+
# job information
81+
'# HELP rq_jobs RQ jobs by status',
82+
'rq_jobs{queue="default",status="queued"} 11.0',
83+
'rq_jobs{queue="default",status="started"} 0.0',
84+
'rq_jobs{queue="default",status="finished"} 0.0',
85+
'rq_jobs{queue="default",status="failed"} 0.0',
86+
'rq_jobs{queue="default",status="deferred"} 0.0',
87+
'rq_jobs{queue="default",status="scheduled"} 0.0',
88+
# worker information
89+
'# HELP rq_workers RQ workers',
90+
'rq_workers{name="test_worker",queues="default",state="?"} 1.0',
91+
'# HELP rq_job_successful_total RQ successful job count',
92+
'rq_job_successful_total{name="test_worker",queues="default"} 0.0',
93+
'# HELP rq_job_failed_total RQ failed job count',
94+
'rq_job_failed_total{name="test_worker",queues="default"} 0.0',
95+
'# HELP rq_working_seconds_total RQ total working time',
96+
'rq_working_seconds_total{name="test_worker",queues="default"} 0.0',
97+
}
98+
)
99+
100+
worker.work(burst=True, max_jobs=4)
101+
self.assertMetricsContain(
102+
{
103+
# job information
104+
'rq_jobs{queue="default",status="queued"} 7.0',
105+
'rq_jobs{queue="default",status="finished"} 3.0',
106+
'rq_jobs{queue="default",status="failed"} 1.0',
107+
# worker information
108+
'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
109+
'rq_job_successful_total{name="test_worker",queues="default"} 3.0',
110+
'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
111+
}
112+
)
113+
114+
worker.work(burst=True)
115+
self.assertMetricsContain(
116+
{
117+
# job information
118+
'rq_jobs{queue="default",status="queued"} 0.0',
119+
'rq_jobs{queue="default",status="finished"} 10.0',
120+
'rq_jobs{queue="default",status="failed"} 1.0',
121+
# worker information
122+
'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
123+
'rq_job_successful_total{name="test_worker",queues="default"} 10.0',
124+
'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
125+
}
126+
)
127+
finally:
128+
register_death()
129+
130+
131+
@skipIf(prometheus_client is not None, 'prometheus_client is installed')
132+
class NoPrometheusTest(TestCase):
133+
def test_no_metrics_without_prometheus_client(self):
134+
with self.assertRaises(NoReverseMatch):
135+
reverse('rq_metrics')

django_rq/urls.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
from django.urls import re_path
22

33
from . import views
4+
from .metrics_collector import RQCollector
5+
6+
metrics_view = [
7+
re_path(r'^metrics/?$', views.prometheus_metrics, name='rq_metrics'),
8+
] if RQCollector else [] # type: ignore[truthy-function]
49

510
urlpatterns = [
611
re_path(r'^$', views.stats, name='rq_home'),
712
re_path(r'^stats.json/(?P<token>[\w]+)?/?$', views.stats_json, name='rq_home_json'),
13+
*metrics_view,
814
re_path(r'^queues/(?P<queue_index>[\d]+)/$', views.jobs, name='rq_jobs'),
915
re_path(r'^workers/(?P<queue_index>[\d]+)/$', views.workers, name='rq_workers'),
1016
re_path(r'^workers/(?P<queue_index>[\d]+)/(?P<key>[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'),

django_rq/views.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from django.contrib import admin, messages
77
from django.contrib.admin.views.decorators import staff_member_required
8-
from django.http import Http404, JsonResponse
8+
from django.http import Http404, HttpResponse, JsonResponse
99
from django.shortcuts import redirect, render
1010
from django.urls import reverse
1111
from django.views.decorators.cache import never_cache
@@ -28,6 +28,15 @@
2828
from .settings import API_TOKEN, QUEUES_MAP
2929
from .utils import get_executions, get_jobs, get_scheduler_statistics, get_statistics, stop_jobs
3030

31+
try:
32+
import prometheus_client
33+
34+
from .metrics_collector import RQCollector
35+
except ImportError:
36+
prometheus_client = RQCollector = None # type: ignore[assignment, misc]
37+
38+
registry = None
39+
3140

3241
@never_cache
3342
@staff_member_required
@@ -49,6 +58,25 @@ def stats_json(request, token=None):
4958
)
5059

5160

61+
@never_cache
62+
@staff_member_required
63+
def prometheus_metrics(request):
64+
global registry
65+
66+
if not RQCollector: # type: ignore[truthy-function]
67+
raise Http404
68+
69+
if not registry:
70+
registry = prometheus_client.CollectorRegistry(auto_describe=True)
71+
registry.register(RQCollector())
72+
73+
encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', ''))
74+
if 'name[]' in request.GET:
75+
registry = registry.restricted_registry(request.GET.getlist('name[]'))
76+
77+
return HttpResponse(encoder(registry), headers={'Content-Type': content_type})
78+
79+
5280
@never_cache
5381
@staff_member_required
5482
def jobs(request, queue_index):

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ warn_unreachable = true
1414
[mypy-django_redis.*]
1515
ignore_missing_imports = true
1616

17+
[mypy-prometheus_client.*]
18+
ignore_missing_imports = true
19+
1720
[mypy-redis_cache.*]
1821
ignore_missing_imports = true
1922

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
},
2020
install_requires=['django>=3.2', 'rq>=2', 'redis>=3.5'],
2121
extras_require={
22+
'prometheus-metrics': ['prometheus_client>=0.4.0'],
2223
'Sentry': ['sentry-sdk>=1.0.0'],
2324
'testing': [],
2425
},

0 commit comments

Comments
 (0)