Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Retry #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions celery_singleton/singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class Singleton(BaseTask):
raise_on_duplicate = None
lock_expiry = None

def __init__(self, *args, **kwargs):
self._unlock_to_super_retry = False
super(Singleton, self).__init__(*args, **kwargs)

@property
def _raise_on_duplicate(self):
if self.raise_on_duplicate is not None:
Expand Down Expand Up @@ -78,17 +82,8 @@ def generate_lock(self, task_name, task_args=None, task_kwargs=None):
key_prefix=self.singleton_config.key_prefix,
)

def apply_async(
self,
args=None,
kwargs=None,
task_id=None,
producer=None,
link=None,
link_error=None,
shadow=None,
**options
):
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
args = args or []
kwargs = kwargs or {}
task_id = task_id or uuid()
Expand Down Expand Up @@ -120,14 +115,15 @@ def apply_async(

def lock_and_run(self, lock, *args, task_id=None, **kwargs):
lock_aquired = self.aquire_lock(lock, task_id)
if lock_aquired:
if lock_aquired or self._unlock_to_super_retry:
try:
return super(Singleton, self).apply_async(
*args, task_id=task_id, **kwargs
)
except Exception:
# Clear the lock if apply_async fails
self.unlock(lock)
if lock_aquired:
self.unlock(lock)
raise

def release_lock(self, task_args=None, task_kwargs=None):
Expand All @@ -140,7 +136,9 @@ def unlock(self, lock):
def on_duplicate(self, existing_task_id):
if self._raise_on_duplicate:
raise DuplicateTaskError(
"Attempted to queue a duplicate of task ID {}".format(existing_task_id),
"Attempted to queue a duplicate of task ID {}".format(
existing_task_id
),
task_id=existing_task_id,
)
return self.AsyncResult(existing_task_id)
Expand All @@ -150,3 +148,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):

def on_success(self, retval, task_id, args, kwargs):
self.release_lock(task_args=args, task_kwargs=kwargs)

def retry(self, args=None, kwargs=None, exc=None, throw=True,
eta=None, countdown=None, max_retries=None, **options):
self._unlock_to_super_retry = True
retry_task = super(Singleton, self).retry(
args, kwargs, exc, throw, eta, countdown, max_retries, **options
)
self._unlock_to_super_retry = False

return retry_task