-
Notifications
You must be signed in to change notification settings - Fork 680
/
Copy pathcelery_tasks
executable file
·133 lines (99 loc) · 2.94 KB
/
celery_tasks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
"""=cut
=head1 NAME
celery_tasks - Munin plugin to monitor the number of Celery tasks with specified names.
=head1 REQUIREMENTS
- Python
- celery (http://celeryproject.org/)
- celerymon (http://github.com/ask/celerymon)
Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option
=head1 CONFIGURATION
Default configuration:
None
You must set the name of at least one task you want to monitor (multiple names are separated by a comma).
For example:
[celery_tasks]
env.tasks myapp.tasks.SendEmailTask,myapp2.tasks.FetchUserDataTask
This would monitor the number of task for a task with name "myapp.tasks.SendEmailTask" and "myapp2.tasks.FetchUserDataTask".
=head1 MAGIC MARKERS
#%# family=manual
#%# capabilities=autoconf
=head1 AUTHOR
Tomaz Muraus (http://github.com/Kami/munin-celery)
=head1 LICENSE
GPLv2
=cut"""
import os
import sys
import urllib
try:
import json
except:
import simplejson as json
API_URL = 'http://localhost:8989'
URL_ENDPOINTS = {
'workers': '/api/worker/',
'worker_tasks': '/api/worker/%s/tasks',
'tasks': '/api/task/',
'task_names': '/api/task/name/',
'task_details': '/api/task/name/%s',
}
TASK_STATES = (
'PENDING',
'RECEIVED',
'STARTED',
'SUCCESS',
'FAILURE',
'REVOKED',
'RETRY'
)
def get_data(what, api_url, *args):
try:
request = urllib.urlopen('%s%s' % (api_url, \
URL_ENDPOINTS[what] % (args)))
response = request.read()
return json.loads(response)
except IOError:
print 'Could not connect to the celerymon webserver'
sys.exit(-1)
def check_web_server_status(api_url):
try:
request = urllib.urlopen(api_url)
response = request.read()
except IOError:
print 'Could not connect to the celerymon webserver'
sys.exit(-1)
def clean_task_name(task_name):
return task_name.replace('.', '_')
# Config
def print_config(task_names):
print 'graph_title Celery tasks'
print 'graph_args --lower-limit 0'
print 'graph_scale no'
print 'graph_vlabel tasks per ${graph_period}'
print 'graph_category cloud'
for name in task_names:
print '%s.label %s' % (clean_task_name(name), name)
print '%s.type DERIVE' % (clean_task_name(name))
print '%s.min 0' % (clean_task_name(name))
print '%s.info number of %s tasks' % (clean_task_name(name), name)
# Values
def print_values(task_names = None, api_url = None):
for task_name in task_names:
count = len(get_data('task_details', api_url, task_name))
print '%s.value %d' % (clean_task_name(task_name), count)
if __name__ == '__main__':
task_names = os.environ.get('tasks', None)
api_url = os.environ.get('api_url', API_URL)
check_web_server_status(api_url)
if not task_names:
print 'You need to define at least one task name'
sys.exit(-1)
task_names = task_names.split(',')
if len(sys.argv) > 1:
if sys.argv[1] == 'config':
print_config(task_names)
elif sys.argv[1] == 'autoconf':
print 'yes'
else:
print_values(task_names, api_url)