Skip to content

Commit 1b44875

Browse files
committed
Added redis stream option for job delivery
1 parent 1315583 commit 1b44875

4 files changed

Lines changed: 53 additions & 13 deletions

File tree

arq/cli.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from .typing import WorkerSettingsType
1717

1818
burst_help = 'Batch mode: exit once no jobs are found in any queue.'
19+
stream_help = 'Stream mode: use redis streams for job delivery. Does not support batch mode.'
1920
health_check_help = 'Health Check: run a health check and exit.'
2021
watch_help = 'Watch a directory and reload the worker upon changes.'
2122
verbose_help = 'Enable verbose output.'
@@ -26,11 +27,14 @@
2627
@click.version_option(VERSION, '-V', '--version', prog_name='arq')
2728
@click.argument('worker-settings', type=str, required=True)
2829
@click.option('--burst/--no-burst', default=None, help=burst_help)
30+
@click.option('--stream/--no-stream', default=None, help=stream_help)
2931
@click.option('--check', is_flag=True, help=health_check_help)
3032
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
3133
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
3234
@click.option('--custom-log-dict', type=str, help=logdict_help)
33-
def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None:
35+
def cli(
36+
*, worker_settings: str, burst: bool, stream: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str
37+
) -> None:
3438
"""
3539
Job queues in python with asyncio and redis.
3640
@@ -48,6 +52,8 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose:
4852
exit(check_health(worker_settings_))
4953
else:
5054
kwargs = {} if burst is None else {'burst': burst}
55+
if stream:
56+
kwargs['stream'] = stream
5157
if watch:
5258
asyncio.run(watch_reload(watch, worker_settings_))
5359
else:

arq/connections.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from redis.asyncio.sentinel import Sentinel
1414
from redis.exceptions import RedisError, WatchError
1515

16-
from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix
16+
from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix, stream_prefix
1717
from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job
1818
from .utils import timestamp_ms, to_ms, to_unix_ms
1919

@@ -120,6 +120,7 @@ async def enqueue_job(
120120
self,
121121
function: str,
122122
*args: Any,
123+
_use_stream: bool = False,
123124
_job_id: Optional[str] = None,
124125
_queue_name: Optional[str] = None,
125126
_defer_until: Optional[datetime] = None,
@@ -133,6 +134,7 @@ async def enqueue_job(
133134
134135
:param function: Name of the function to call
135136
:param args: args to pass to the function
137+
:param _use_stream: queue the job through redis streams. Stream mode must be enabled in worker.
136138
:param _job_id: ID of the job, can be used to enforce job uniqueness
137139
:param _queue_name: queue of the job, can be used to create job in different queue
138140
:param _defer_until: datetime at which to run the job
@@ -171,6 +173,8 @@ async def enqueue_job(
171173

172174
job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
173175
pipe.multi()
176+
if _use_stream:
177+
pipe.xadd(stream_prefix + _queue_name, {job_key_prefix: job})
174178
pipe.psetex(job_key, expires_ms, job)
175179
pipe.zadd(_queue_name, {job_id: score})
176180
try:

arq/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
default_queue_name = 'arq:queue'
2+
default_worker_name = 'arq:worker'
3+
default_worker_group = 'arq:workers'
4+
stream_prefix = 'arq:stream:'
25
job_key_prefix = 'arq:job:'
36
in_progress_key_prefix = 'arq:in-progress:'
47
result_key_prefix = 'arq:result:'

arq/worker.py

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
abort_job_max_age,
2121
abort_jobs_ss,
2222
default_queue_name,
23+
default_worker_group,
24+
default_worker_name,
2325
expires_extra_ms,
2426
health_check_key_suffix,
2527
in_progress_key_prefix,
2628
job_key_prefix,
2729
keep_cronjob_progress,
2830
result_key_prefix,
2931
retry_key_prefix,
32+
stream_prefix,
3033
)
3134
from .utils import (
3235
args_to_string,
@@ -144,10 +147,13 @@ class Worker:
144147
:param functions: list of functions to register, can either be raw coroutine functions or the
145148
result of :func:`arq.worker.func`.
146149
:param queue_name: queue name to get jobs from
150+
:param worker_name: unique name to identify this worker
151+
:param worker_group: worker group that this worker belongs to
147152
:param cron_jobs: list of cron jobs to run, use :func:`arq.cron.cron` to create them
148153
:param redis_settings: settings for creating a redis connection
149154
:param redis_pool: existing redis pool, generally None
150155
:param burst: whether to stop the worker once all jobs have been run
156+
:param stream: whether to constantly listen for new jobs from a redis stream
151157
:param on_startup: coroutine function to run at startup
152158
:param on_shutdown: coroutine function to run at shutdown
153159
:param on_job_start: coroutine function to run on job start
@@ -188,10 +194,13 @@ def __init__(
188194
functions: Sequence[Union[Function, 'WorkerCoroutine']] = (),
189195
*,
190196
queue_name: Optional[str] = default_queue_name,
197+
worker_name: Optional[str] = None,
198+
worker_group: Optional[str] = None,
191199
cron_jobs: Optional[Sequence[CronJob]] = None,
192200
redis_settings: Optional[RedisSettings] = None,
193201
redis_pool: Optional[ArqRedis] = None,
194202
burst: bool = False,
203+
stream: bool = False,
195204
on_startup: Optional['StartupShutdown'] = None,
196205
on_shutdown: Optional['StartupShutdown'] = None,
197206
on_job_start: Optional['StartupShutdown'] = None,
@@ -234,6 +243,10 @@ def __init__(
234243
if len(self.functions) == 0:
235244
raise RuntimeError('at least one function or cron_job must be registered')
236245
self.burst = burst
246+
self.stream = stream
247+
if stream is True:
248+
self.worker_name = worker_name if worker_name is not None else default_worker_name
249+
self.worker_group = worker_group if worker_group is not None else default_worker_group
237250
self.on_startup = on_startup
238251
self.on_shutdown = on_shutdown
239252
self.on_job_start = on_job_start
@@ -357,17 +370,31 @@ async def main(self) -> None:
357370
if self.on_startup:
358371
await self.on_startup(self.ctx)
359372

360-
async for _ in poll(self.poll_delay_s):
361-
await self._poll_iteration()
362-
363-
if self.burst:
364-
if 0 <= self.max_burst_jobs <= self._jobs_started():
365-
await asyncio.gather(*self.tasks.values())
366-
return None
367-
queued_jobs = await self.pool.zcard(self.queue_name)
368-
if queued_jobs == 0:
369-
await asyncio.gather(*self.tasks.values())
370-
return None
373+
if self.stream is False:
374+
async for _ in poll(self.poll_delay_s):
375+
await self._poll_iteration()
376+
377+
if self.burst:
378+
if 0 <= self.max_burst_jobs <= self._jobs_started():
379+
await asyncio.gather(*self.tasks.values())
380+
return None
381+
queued_jobs = await self.pool.zcard(self.queue_name)
382+
if queued_jobs == 0:
383+
await asyncio.gather(*self.tasks.values())
384+
return None
385+
else:
386+
stream_name = stream_prefix + self.queue_name
387+
388+
with contextlib.suppress(ResponseError):
389+
await self.pool.xgroup_create(stream_name, self.worker_group, '$', mkstream=True)
390+
logger.info('Stream consumer group created with name: %s', self.worker_group)
391+
392+
while True:
393+
if event := await self.pool.xreadgroup(
394+
consumername=self.worker_name, groupname=self.worker_group, streams={stream_name: '>'}, block=0
395+
):
396+
await self._poll_iteration()
397+
await self.pool.xack(stream_name, self.worker_group, event[0][1][0][0]) # type: ignore[no-untyped-call]
371398

372399
async def _poll_iteration(self) -> None:
373400
"""

0 commit comments

Comments
 (0)