From 20c9c47d9c3f9f96a8b75474dbc0bc6cc53aa9c0 Mon Sep 17 00:00:00 2001 From: Alexander Dicke Date: Sun, 26 May 2024 11:45:30 +0200 Subject: [PATCH] feat(python): adds optional logging to workers re #2571 --- python/bullmq/types/worker_options.py | 7 +++++++ python/bullmq/worker.py | 10 ++++++++++ 2 files changed, 17 insertions(+) diff --git a/python/bullmq/types/worker_options.py b/python/bullmq/types/worker_options.py index 3efd285618..3dad32a807 100644 --- a/python/bullmq/types/worker_options.py +++ b/python/bullmq/types/worker_options.py @@ -55,3 +55,10 @@ class WorkerOptions(TypedDict, total=False): """ Options for connecting to a Redis instance. """ + + enable_logging: bool + """ + Whether to enable basic logging for the worker. + + @default False + """ \ No newline at end of file diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py index 803196e5d1..74b2f3f271 100644 --- a/python/bullmq/worker.py +++ b/python/bullmq/worker.py @@ -1,3 +1,4 @@ +import logging from typing import Callable from uuid import uuid4 from bullmq.custom_errors import WaitingChildrenError @@ -19,6 +20,9 @@ # Obviously we can still process much faster than 1 job per millisecond but delays and rate limits will never work with more accuracy than 1ms. minimum_block_timeout = 0.001 +logger = logging.getLogger(__name__) + + class Worker(EventEmitter): def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], opts: WorkerOptions = {}): super().__init__() @@ -56,6 +60,12 @@ def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], o if opts.get("autorun", True): asyncio.ensure_future(self.run()) + if opts.get("enable_logging", False): + self.on("completed", + lambda job, result: logger.info(f"Worker {self.name}: Finished job {job.id}")) + self.on("failed", + lambda job, err: logger.error(f"Worker {self.name}: Job {job.id} failed: `{repr(err)}`")) + async def run(self): if self.running: raise Exception("Worker is already running")