Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async jobs #438

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
[2] https://github.com/Rykian/clockwork
[3] https://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/
"""
import asyncio
import inspect
from collections.abc import Hashable
import datetime
import functools
Expand Down Expand Up @@ -99,6 +101,12 @@ def run_pending(self) -> None:
for job in sorted(runnable_jobs):
self._run_job(job)


async def run_pending_async(self):
runnable_jobs = (job for job in self.jobs if job.should_run)
await asyncio.gather(*[self._run_job_async(job) for job in runnable_jobs])


def run_all(self, delay_seconds: int = 0) -> None:
"""
Run all jobs regardless if they are scheduled to run or not.
Expand All @@ -118,6 +126,16 @@ def run_all(self, delay_seconds: int = 0) -> None:
self._run_job(job)
time.sleep(delay_seconds)

async def run_all_async(self, delay_seconds=0):
logger.debug(
"Running *all* %i jobs with %is delay in between",
len(self.jobs),
delay_seconds,
)
for job in self.jobs[:]:
await self._run_job_async(job)
await asyncio.sleep(delay_seconds)

def get_jobs(self, tag: Optional[Hashable] = None) -> List["Job"]:
"""
Gets scheduled jobs marked with the given tag, or all jobs
Expand Down Expand Up @@ -170,6 +188,16 @@ def every(self, interval: int = 1) -> "Job":

def _run_job(self, job: "Job") -> None:
ret = job.run()
self._process_job_return_value(job, ret)

async def _run_job_async(self, job: "Job") -> None:
ret = job.run()
if inspect.isawaitable(ret):
ret = await ret

self._process_job_return_value(job, ret)

def _process_job_return_value(self, job: "Job", ret: any):
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

Expand Down Expand Up @@ -653,6 +681,13 @@ def run_pending() -> None:
default_scheduler.run_pending()


async def run_pending_async() -> None:
"""Calls :meth:`run_pending <Scheduler.run_pending>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
await default_scheduler.run_pending_async()


def run_all(delay_seconds: int = 0) -> None:
"""Calls :meth:`run_all <Scheduler.run_all>` on the
:data:`default scheduler instance <default_scheduler>`.
Expand Down