diff --git a/schedule/__init__.py b/schedule/__init__.py index 3f7267da..c28993b8 100644 --- a/schedule/__init__.py +++ b/schedule/__init__.py @@ -37,6 +37,8 @@ [2] https://github.com/Rykian/clockwork [3] https://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/ """ +import asyncio +import inspect from collections.abc import Hashable import datetime import functools @@ -87,7 +89,7 @@ def __init__(self) -> None: def run_pending(self) -> None: """ - Run all jobs that are scheduled to run. + Run all sync jobs that are scheduled to run. Please note that it is *intended behavior that run_pending() does not run missed jobs*. For example, if you've registered a job @@ -99,9 +101,22 @@ def run_pending(self) -> None: for job in sorted(runnable_jobs): self._run_job(job) + async def run_pending_async(self): + """ + Run all sync and async jobs that are scheduled to run. + + Please note that it is *intended behavior that run_pending_async() + does not run missed jobs*. For example, if you've registered a job + that should run every minute and you only call run_pending() + in one hour increments then your job won't be run 60 times in + between but only once. + """ + runnable_jobs = (job for job in self.jobs if job.should_run) + await asyncio.gather(*[self._run_job_async(job) for job in runnable_jobs]) + def run_all(self, delay_seconds: int = 0) -> None: """ - Run all jobs regardless if they are scheduled to run or not. + Run all sync jobs regardless if they are scheduled to run or not. A delay of `delay` seconds is added between each job. This helps distribute system load generated by the jobs more evenly @@ -118,6 +133,25 @@ def run_all(self, delay_seconds: int = 0) -> None: self._run_job(job) time.sleep(delay_seconds) + async def run_all_async(self, delay_seconds=0): + """ + Run all sync and async jobs regardless if they are scheduled to run or not. + + A delay of `delay` seconds is added between each job. This helps + distribute system load generated by the jobs more evenly + over time. + + :param delay_seconds: A delay added between every executed job + """ + logger.debug( + "Running *all* %i jobs with %is delay in between", + len(self.jobs), + delay_seconds, + ) + for job in self.jobs[:]: + await self._run_job_async(job) + await asyncio.sleep(delay_seconds) + def get_jobs(self, tag: Optional[Hashable] = None) -> List["Job"]: """ Gets scheduled jobs marked with the given tag, or all jobs @@ -173,6 +207,17 @@ def _run_job(self, job: "Job") -> None: if isinstance(ret, CancelJob) or ret is CancelJob: self.cancel_job(job) + async def _run_job_async(self, job: "Job") -> None: + ret = job.run() + if inspect.isawaitable(ret): + ret = await ret + + self._process_job_return_value(job, ret) + + def _process_job_return_value(self, job: "Job", ret: any): + if isinstance(ret, CancelJob) or ret is CancelJob: + self.cancel_job(job) + def get_next_run( self, tag: Optional[Hashable] = None ) -> Optional[datetime.datetime]: @@ -855,6 +900,13 @@ def run_pending() -> None: default_scheduler.run_pending() +async def run_pending_async() -> None: + """Calls :meth:`run_pending_async ` on the + :data:`default scheduler instance `. + """ + await default_scheduler.run_pending_async() + + def run_all(delay_seconds: int = 0) -> None: """Calls :meth:`run_all ` on the :data:`default scheduler instance `.