-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathsingleton.py
166 lines (142 loc) · 5.02 KB
/
singleton.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from celery import Task as BaseTask
from kombu.utils.uuid import uuid
import inspect
from .backends import get_backend
from .config import Config
from .exceptions import DuplicateTaskError
from . import util
from . import inspect_celery
def clear_locks(app):
config = Config(app)
backend = get_backend(config)
backend.clear(config.key_prefix)
class Singleton(BaseTask):
abstract = True
_singleton_backend = None
_singleton_config = None
unique_on = None
raise_on_duplicate = None
lock_expiry = None
challenge_lock_on_startup = False
celery_app = None
@property
def _raise_on_duplicate(self):
if self.raise_on_duplicate is not None:
return self.raise_on_duplicate
return self.singleton_config.raise_on_duplicate or False
@property
def singleton_config(self):
if self._singleton_config:
return self._singleton_config
self._singleton_config = Config(self._get_app())
return self._singleton_config
@property
def singleton_backend(self):
if self._singleton_backend:
return self._singleton_backend
self._singleton_backend = get_backend(self.singleton_config)
return self._singleton_backend
def aquire_lock(self, lock, task_id):
expiry = (
self.lock_expiry
if self.lock_expiry is not None
else self.singleton_config.lock_expiry
)
return self.singleton_backend.lock(lock, task_id, expiry=expiry)
def get_existing_task_id(self, lock):
return self.singleton_backend.get(lock)
def generate_lock(self, task_name, task_args=None, task_kwargs=None):
unique_on = self.unique_on
task_args = task_args or []
task_kwargs = task_kwargs or {}
if unique_on:
if isinstance(unique_on, str):
unique_on = [unique_on]
sig = inspect.signature(self.run)
bound = sig.bind(*task_args, **task_kwargs).arguments
unique_args = []
unique_kwargs = {key: bound[key] for key in unique_on}
else:
unique_args = task_args
unique_kwargs = task_kwargs
return util.generate_lock(
task_name,
unique_args,
unique_kwargs,
key_prefix=self.singleton_config.key_prefix,
)
def apply_async(
self,
args=None,
kwargs=None,
task_id=None,
producer=None,
link=None,
link_error=None,
shadow=None,
**options
):
args = args or []
kwargs = kwargs or {}
task_id = task_id or uuid()
lock = self.generate_lock(self.name, args, kwargs)
run_args = dict(
lock=lock,
args=args,
kwargs=kwargs,
task_id=task_id,
producer=producer,
link=link,
link_error=link_error,
shadow=shadow,
**options
)
if self.challenge_lock_on_startup:
self.challenge_lock(lock, args, kwargs)
task = self.lock_and_run(**run_args)
if task:
return task
existing_task_id = self.get_existing_task_id(lock)
while not existing_task_id:
task = self.lock_and_run(**run_args)
if task:
return task
existing_task_id = self.get_existing_task_id(lock)
return self.on_duplicate(existing_task_id)
def lock_and_run(self, lock, *args, task_id=None, **kwargs):
lock_aquired = self.aquire_lock(lock, task_id)
if lock_aquired:
try:
return super(Singleton, self).apply_async(
*args, task_id=task_id, **kwargs
)
except Exception:
# Clear the lock if apply_async fails
self.unlock(lock)
raise
def challenge_lock(self, lock, args, kwargs):
current_lock_id = self.singleton_backend.get(lock)
if not current_lock_id:
return
if not inspect_celery.are_worker_active(self.celery_app):
return
task_found = inspect_celery.get_task_by_id(self.celery_app, current_lock_id)
if task_found:
return
self.unlock(lock)
def release_lock(self, task_args=None, task_kwargs=None):
lock = self.generate_lock(self.name, task_args, task_kwargs)
self.unlock(lock)
def unlock(self, lock):
self.singleton_backend.unlock(lock)
def on_duplicate(self, existing_task_id):
if self._raise_on_duplicate:
raise DuplicateTaskError(
"Attempted to queue a duplicate of task ID {}".format(existing_task_id),
task_id=existing_task_id,
)
return self.AsyncResult(existing_task_id)
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.release_lock(task_args=args, task_kwargs=kwargs)
def on_success(self, retval, task_id, args, kwargs):
self.release_lock(task_args=args, task_kwargs=kwargs)