From f9c92092dc25669e260d7f43576b96b70875d7f8 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Sun, 22 Dec 2024 18:22:42 +0400 Subject: [PATCH 1/8] Refactor notifiers --- janus/__init__.py | 284 ++++++++++++++++++++++------------------------ 1 file changed, 136 insertions(+), 148 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 80a5bf7..0afdc1f 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -8,7 +8,7 @@ from queue import Empty as SyncQueueEmpty from queue import Full as SyncQueueFull from time import monotonic -from typing import Callable, Generic, Optional, Protocol, TypeVar +from typing import Callable, Generic, Literal, Optional, Protocol, TypeVar if sys.version_info >= (3, 13): from asyncio import QueueShutDown as AsyncQueueShutDown @@ -88,52 +88,122 @@ async def get(self) -> T: ... async def join(self) -> None: ... +class SyncCondition(threading.Condition): + def __init__(self, lock: threading.Lock) -> None: + super().__init__(lock) + self.waiting = 0 + + def wait(self, timeout: OptFloat = None) -> bool: + self.waiting += 1 + try: + return super().wait(timeout) + finally: + self.waiting -= 1 + + +class AsyncCondition(asyncio.Condition): + _loop: Optional[asyncio.AbstractEventLoop] = None + + def __init__( + self, + sync_lock: threading.Lock, + async_lock: asyncio.Lock, + pending: deque[asyncio.Future[None]] + ) -> None: + super().__init__(async_lock) + self.__mutex = sync_lock + self.__pending = pending + self.waiting = 0 + + async def wait(self) -> Literal[True]: + self.waiting += 1 + self.__mutex.release() + try: + return await super().wait() + finally: + self.__mutex.acquire() + self.waiting -= 1 + + async def __do_notifier(self, method: Callable[[], None]) -> None: + async with self: + method() + + def __setup_notifier( + self, loop: asyncio.AbstractEventLoop, method: Callable[[], None] + ) -> None: + task = loop.create_task(self.__do_notifier(method)) + task.add_done_callback(self.__pending.remove) + self.__pending.append(task) + + def __setup_notifier_threadsafe(self, method: Callable[[], None]): + loop = self._loop + if loop is None or loop.is_closed(): + # async API is not available, nothing to notify + return + loop.call_soon_threadsafe(self.__setup_notifier, loop, method) + + def notify_soon(self) -> None: + # Warning! + # The function should be called when sync_lock is locked, + # otherwise the code is not thread-safe + if self.waiting: + self.__setup_notifier_threadsafe(self.notify) + + def notify_all_soon(self) -> None: + # Warning! + # The function should be called when sync_lock is locked, + # otherwise the code is not thread-safe + if self.waiting: + self.__setup_notifier_threadsafe(self.notify_all) + + class Queue(Generic[T]): _loop: Optional[asyncio.AbstractEventLoop] = None def __init__(self, maxsize: int = 0) -> None: - if sys.version_info < (3, 10): - self._loop = asyncio.get_running_loop() - self._maxsize = maxsize self._is_shutdown = False self._init(maxsize) + self._pending: deque[asyncio.Future[None]] = deque() self._unfinished_tasks = 0 self._sync_mutex = threading.Lock() - self._sync_not_empty = threading.Condition(self._sync_mutex) - self._sync_not_empty_waiting = 0 - self._sync_not_full = threading.Condition(self._sync_mutex) - self._sync_not_full_waiting = 0 - self._sync_tasks_done = threading.Condition(self._sync_mutex) - self._sync_tasks_done_waiting = 0 + self._sync_not_empty = SyncCondition(self._sync_mutex) + self._sync_not_full = SyncCondition(self._sync_mutex) + self._sync_tasks_done = SyncCondition(self._sync_mutex) self._async_mutex = asyncio.Lock() if sys.version_info[:3] == (3, 10, 0): # Workaround for Python 3.10 bug, see #358: getattr(self._async_mutex, "_get_loop", lambda: None)() - self._async_not_empty = asyncio.Condition(self._async_mutex) - self._async_not_empty_waiting = 0 - self._async_not_full = asyncio.Condition(self._async_mutex) - self._async_not_full_waiting = 0 - self._async_tasks_done = asyncio.Condition(self._async_mutex) - self._async_tasks_done_waiting = 0 - - self._pending: deque[asyncio.Future[None]] = deque() + self._async_not_empty = AsyncCondition( + self._sync_mutex, self._async_mutex, self._pending + ) + self._async_not_full = AsyncCondition( + self._sync_mutex, self._async_mutex, self._pending + ) + self._async_tasks_done = AsyncCondition( + self._sync_mutex, self._async_mutex, self._pending + ) self._sync_queue = _SyncQueueProxy(self) self._async_queue = _AsyncQueueProxy(self) + if sys.version_info < (3, 10): + self._get_loop() + def _get_loop(self) -> asyncio.AbstractEventLoop: # Warning! # The function should be called when self._sync_mutex is locked, # otherwise the code is not thread-safe loop = asyncio.get_running_loop() - if self._loop is None: self._loop = loop + self._async_not_empty._loop = loop + self._async_not_full._loop = loop + self._async_tasks_done._loop = loop if loop is not self._loop: raise RuntimeError(f"{self!r} is bound to a different event loop") return loop @@ -159,19 +229,13 @@ def shutdown(self, immediate: bool = False) -> None: if self._unfinished_tasks > 0: self._unfinished_tasks -= 1 # release all blocked threads in `join()` - if self._sync_tasks_done_waiting: - self._sync_tasks_done.notify_all() - if self._async_tasks_done_waiting: - self._notify_async(self._async_tasks_done.notify_all) + self._sync_tasks_done.notify_all() + self._async_tasks_done.notify_all_soon() # All getters need to re-check queue-empty to raise ShutDown - if self._sync_not_empty_waiting: - self._sync_not_empty.notify_all() - if self._sync_not_full_waiting: - self._sync_not_full.notify_all() - if self._async_not_empty_waiting: - self._notify_async(self._async_not_empty.notify_all) - if self._async_not_full_waiting: - self._notify_async(self._async_not_full.notify_all) + self._sync_not_empty.notify_all() + self._sync_not_full.notify_all() + self._async_not_empty.notify_all_soon() + self._async_not_full.notify_all_soon() def close(self) -> None: """Close the queue. @@ -239,27 +303,6 @@ def _put_internal(self, item: T) -> None: self._put(item) self._unfinished_tasks += 1 - async def _do_async_notifier(self, method: Callable[[], None]) -> None: - async with self._async_mutex: - method() - - def _setup_async_notifier( - self, loop: asyncio.AbstractEventLoop, method: Callable[[], None] - ) -> None: - task = loop.create_task(self._do_async_notifier(method)) - task.add_done_callback(self._pending.remove) - self._pending.append(task) - - def _notify_async(self, method: Callable[[], None]) -> None: - # Warning! - # The function should be called when self._sync_mutex is locked, - # otherwise the code is not thread-safe - loop = self._loop - if loop is None or loop.is_closed(): - # async API is not available, nothing to notify - return - loop.call_soon_threadsafe(self._setup_async_notifier, loop, method) - class _SyncQueueProxy(SyncQueue[T]): """Create a queue object with a given maximum size. @@ -293,15 +336,13 @@ def task_done(self) -> None: placed in the queue. """ parent = self._parent - with parent._sync_tasks_done: + with parent._sync_mutex: unfinished = parent._unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: raise ValueError("task_done() called too many times") - if parent._sync_tasks_done_waiting: - parent._sync_tasks_done.notify_all() - if parent._async_tasks_done_waiting: - parent._notify_async(parent._async_tasks_done.notify_all) + parent._sync_tasks_done.notify_all() + parent._async_tasks_done.notify_all_soon() parent._unfinished_tasks = unfinished def join(self) -> None: @@ -314,13 +355,9 @@ def join(self) -> None: When the count of unfinished tasks drops to zero, join() unblocks. """ parent = self._parent - with parent._sync_tasks_done: + with parent._sync_mutex: while parent._unfinished_tasks: - parent._sync_tasks_done_waiting += 1 - try: - parent._sync_tasks_done.wait() - finally: - parent._sync_tasks_done_waiting -= 1 + parent._sync_tasks_done.wait() def qsize(self) -> int: """Return the approximate size of the queue (not reliable!).""" @@ -367,7 +404,7 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: is ignored in that case). """ parent = self._parent - with parent._sync_not_full: + with parent._sync_mutex: if parent._is_shutdown: raise SyncQueueShutDown if parent._maxsize > 0: @@ -376,11 +413,7 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: raise SyncQueueFull elif timeout is None: while parent._qsize() >= parent._maxsize: - parent._sync_not_full_waiting += 1 - try: - parent._sync_not_full.wait() - finally: - parent._sync_not_full_waiting -= 1 + parent._sync_not_full.wait() if parent._is_shutdown: raise SyncQueueShutDown elif timeout < 0: @@ -391,18 +424,12 @@ def put(self, item: T, block: bool = True, timeout: OptFloat = None) -> None: remaining = endtime - monotonic() if remaining <= 0.0: raise SyncQueueFull - parent._sync_not_full_waiting += 1 - try: - parent._sync_not_full.wait(remaining) - finally: - parent._sync_not_full_waiting -= 1 + parent._sync_not_full.wait(remaining) if parent._is_shutdown: raise SyncQueueShutDown parent._put_internal(item) - if parent._sync_not_empty_waiting: - parent._sync_not_empty.notify() - if parent._async_not_empty_waiting: - parent._notify_async(parent._async_not_empty.notify) + parent._sync_not_empty.notify() + parent._async_not_empty.notify_soon() def get(self, block: bool = True, timeout: OptFloat = None) -> T: """Remove and return an item from the queue. @@ -416,7 +443,7 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: in that case). """ parent = self._parent - with parent._sync_not_empty: + with parent._sync_mutex: if parent._is_shutdown and not parent._qsize(): raise SyncQueueShutDown if not block: @@ -424,11 +451,7 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: raise SyncQueueEmpty elif timeout is None: while not parent._qsize(): - parent._sync_not_empty_waiting += 1 - try: - parent._sync_not_empty.wait() - finally: - parent._sync_not_empty_waiting -= 1 + parent._sync_not_empty.wait() if parent._is_shutdown and not parent._qsize(): raise SyncQueueShutDown elif timeout < 0: @@ -439,18 +462,12 @@ def get(self, block: bool = True, timeout: OptFloat = None) -> T: remaining = endtime - monotonic() if remaining <= 0.0: raise SyncQueueEmpty - parent._sync_not_empty_waiting += 1 - try: - parent._sync_not_empty.wait(remaining) - finally: - parent._sync_not_empty_waiting -= 1 + parent._sync_not_empty.wait(remaining) if parent._is_shutdown and not parent._qsize(): raise SyncQueueShutDown item = parent._get() - if parent._sync_not_full_waiting: - parent._sync_not_full.notify() - if parent._async_not_full_waiting: - parent._notify_async(parent._async_not_full.notify) + parent._sync_not_full.notify() + parent._async_not_full.notify_soon() return item def put_nowait(self, item: T) -> None: @@ -541,27 +558,19 @@ async def put(self, item: T) -> None: This method is a coroutine. """ parent = self._parent - async with parent._async_not_full: + with parent._sync_mutex: + parent._get_loop() + async with parent._async_mutex: with parent._sync_mutex: if parent._is_shutdown: raise AsyncQueueShutDown - parent._get_loop() # check the event loop while 0 < parent._maxsize <= parent._qsize(): - parent._async_not_full_waiting += 1 - parent._sync_mutex.release() - try: - await parent._async_not_full.wait() - finally: - parent._sync_mutex.acquire() - parent._async_not_full_waiting -= 1 + await parent._async_not_full.wait() if parent._is_shutdown: raise AsyncQueueShutDown - parent._put_internal(item) - if parent._async_not_empty_waiting: - parent._async_not_empty.notify() - if parent._sync_not_empty_waiting: - parent._sync_not_empty.notify() + parent._async_not_empty.notify() + parent._sync_not_empty.notify() def put_nowait(self, item: T) -> None: """Put an item into the queue without blocking. @@ -570,18 +579,14 @@ def put_nowait(self, item: T) -> None: """ parent = self._parent with parent._sync_mutex: + parent._get_loop() if parent._is_shutdown: raise AsyncQueueShutDown - - parent._get_loop() if 0 < parent._maxsize <= parent._qsize(): raise AsyncQueueFull - parent._put_internal(item) - if parent._async_not_empty_waiting: - parent._notify_async(parent._async_not_empty.notify) - if parent._sync_not_empty_waiting: - parent._sync_not_empty.notify() + parent._async_not_empty.notify_soon() + parent._sync_not_empty.notify() async def get(self) -> T: """Remove and return an item from the queue. @@ -591,27 +596,19 @@ async def get(self) -> T: This method is a coroutine. """ parent = self._parent - async with parent._async_not_empty: + with parent._sync_mutex: + parent._get_loop() + async with parent._async_mutex: with parent._sync_mutex: if parent._is_shutdown and not parent._qsize(): raise AsyncQueueShutDown - parent._get_loop() # check the event loop while not parent._qsize(): - parent._async_not_empty_waiting += 1 - parent._sync_mutex.release() - try: - await parent._async_not_empty.wait() - finally: - parent._sync_mutex.acquire() - parent._async_not_empty_waiting -= 1 + await parent._async_not_empty.wait() if parent._is_shutdown and not parent._qsize(): raise AsyncQueueShutDown - item = parent._get() - if parent._async_not_full_waiting: - parent._async_not_full.notify() - if parent._sync_not_full_waiting: - parent._sync_not_full.notify() + parent._async_not_full.notify() + parent._sync_not_full.notify() return item def get_nowait(self) -> T: @@ -621,17 +618,14 @@ def get_nowait(self) -> T: """ parent = self._parent with parent._sync_mutex: + parent._get_loop() if parent._is_shutdown and not parent._qsize(): raise AsyncQueueShutDown if not parent._qsize(): raise AsyncQueueEmpty - - parent._get_loop() item = parent._get() - if parent._async_not_full_waiting: - parent._notify_async(parent._async_not_full.notify) - if parent._sync_not_full_waiting: - parent._sync_not_full.notify() + parent._async_not_full.notify_soon() + parent._sync_not_full.notify() return item def task_done(self) -> None: @@ -649,15 +643,14 @@ def task_done(self) -> None: the queue. """ parent = self._parent - with parent._sync_tasks_done: + with parent._sync_mutex: + parent._get_loop() if parent._unfinished_tasks <= 0: raise ValueError("task_done() called too many times") parent._unfinished_tasks -= 1 if parent._unfinished_tasks == 0: - if parent._async_tasks_done_waiting: - parent._notify_async(parent._async_tasks_done.notify_all) - if parent._sync_tasks_done_waiting: - parent._sync_tasks_done.notify_all() + parent._async_tasks_done.notify_all_soon() + parent._sync_tasks_done.notify_all() async def join(self) -> None: """Block until all items in the queue have been gotten and processed. @@ -668,17 +661,12 @@ async def join(self) -> None: When the count of unfinished tasks drops to zero, join() unblocks. """ parent = self._parent - async with parent._async_tasks_done: + with parent._sync_mutex: + parent._get_loop() + async with parent._async_mutex: with parent._sync_mutex: - parent._get_loop() # check the event loop while parent._unfinished_tasks: - parent._async_tasks_done_waiting += 1 - parent._sync_mutex.release() - try: - await parent._async_tasks_done.wait() - finally: - parent._sync_mutex.acquire() - parent._async_tasks_done_waiting -= 1 + await parent._async_tasks_done.wait() def shutdown(self, immediate: bool = False) -> None: """Shut-down the queue, making queue gets and puts raise an exception. From dbf69abf2cead762413ec60d7f5174a3e9ec9f63 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Sun, 22 Dec 2024 18:23:20 +0400 Subject: [PATCH 2/8] Update test_mixed.py --- tests/test_mixed.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_mixed.py b/tests/test_mixed.py index a0c02da..a4f085e 100644 --- a/tests/test_mixed.py +++ b/tests/test_mixed.py @@ -310,7 +310,7 @@ async def test_put_notifies_sync_not_empty(self): for _ in range(4): executor.submit(q.sync_q.get) - while q._sync_not_empty_waiting != 4: + while q._sync_not_empty.waiting != 4: await asyncio.sleep(0.001) q.sync_q.put_nowait(1) @@ -328,7 +328,7 @@ async def test_put_notifies_async_not_empty(self): tasks = [loop.create_task(q.async_q.get()) for _ in range(4)] - while q._async_not_empty_waiting != 4: + while q._async_not_empty.waiting != 4: await asyncio.sleep(0) q.sync_q.put_nowait(1) @@ -351,7 +351,7 @@ async def test_get_notifies_sync_not_full(self): for _ in range(4): executor.submit(q.sync_q.put, object()) - while q._sync_not_full_waiting != 4: + while q._sync_not_full.waiting != 4: await asyncio.sleep(0.001) q.sync_q.get_nowait() @@ -371,7 +371,7 @@ async def test_get_notifies_async_not_full(self): tasks = [loop.create_task(q.async_q.put(object())) for _ in range(4)] - while q._async_not_full_waiting != 4: + while q._async_not_full.waiting != 4: await asyncio.sleep(0) q.sync_q.get_nowait() From 207f7d485ca504f2a05f7dc5c8cdd89a0b141ddf Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Sun, 22 Dec 2024 18:32:57 +0400 Subject: [PATCH 3/8] Add missing return type --- janus/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/janus/__init__.py b/janus/__init__.py index 0afdc1f..8217531 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -135,7 +135,7 @@ def __setup_notifier( task.add_done_callback(self.__pending.remove) self.__pending.append(task) - def __setup_notifier_threadsafe(self, method: Callable[[], None]): + def __setup_notifier_threadsafe(self, method: Callable[[], None]) -> None: loop = self._loop if loop is None or loop.is_closed(): # async API is not available, nothing to notify From ab99158990156196b961eef44ec8068a6b8c325a Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 25 Dec 2024 16:57:57 +0400 Subject: [PATCH 4/8] Replace inheritance with delegation --- janus/__init__.py | 52 +++++++++++++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 8217531..827456d 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -88,20 +88,26 @@ async def get(self) -> T: ... async def join(self) -> None: ... -class SyncCondition(threading.Condition): +class SyncCondition: def __init__(self, lock: threading.Lock) -> None: - super().__init__(lock) + self._parent = threading.Condition(lock) self.waiting = 0 def wait(self, timeout: OptFloat = None) -> bool: self.waiting += 1 try: - return super().wait(timeout) + return self._parent.wait(timeout) finally: self.waiting -= 1 + def notify(self) -> None: + self._parent.notify() -class AsyncCondition(asyncio.Condition): + def notify_all(self) -> None: + self._parent.notify_all() + + +class AsyncCondition: _loop: Optional[asyncio.AbstractEventLoop] = None def __init__( @@ -110,51 +116,57 @@ def __init__( async_lock: asyncio.Lock, pending: deque[asyncio.Future[None]] ) -> None: - super().__init__(async_lock) - self.__mutex = sync_lock - self.__pending = pending + self._parent = asyncio.Condition(async_lock) + self._mutex = sync_lock + self._pending = pending self.waiting = 0 async def wait(self) -> Literal[True]: self.waiting += 1 - self.__mutex.release() + self._mutex.release() try: - return await super().wait() + return await self._parent.wait() finally: - self.__mutex.acquire() + self._mutex.acquire() self.waiting -= 1 - async def __do_notifier(self, method: Callable[[], None]) -> None: - async with self: + async def _do_notifier(self, method: Callable[[], None]) -> None: + async with self._parent: method() - def __setup_notifier( + def _setup_notifier( self, loop: asyncio.AbstractEventLoop, method: Callable[[], None] ) -> None: - task = loop.create_task(self.__do_notifier(method)) - task.add_done_callback(self.__pending.remove) - self.__pending.append(task) + task = loop.create_task(self._do_notifier(method)) + task.add_done_callback(self._pending.remove) + self._pending.append(task) - def __setup_notifier_threadsafe(self, method: Callable[[], None]) -> None: + def _setup_notifier_threadsafe(self, method: Callable[[], None]) -> None: loop = self._loop if loop is None or loop.is_closed(): # async API is not available, nothing to notify return - loop.call_soon_threadsafe(self.__setup_notifier, loop, method) + loop.call_soon_threadsafe(self._setup_notifier, loop, method) + + def notify(self) -> None: + self._parent.notify() + + def notify_all(self) -> None: + self._parent.notify_all() def notify_soon(self) -> None: # Warning! # The function should be called when sync_lock is locked, # otherwise the code is not thread-safe if self.waiting: - self.__setup_notifier_threadsafe(self.notify) + self._setup_notifier_threadsafe(self._parent.notify) def notify_all_soon(self) -> None: # Warning! # The function should be called when sync_lock is locked, # otherwise the code is not thread-safe if self.waiting: - self.__setup_notifier_threadsafe(self.notify_all) + self._setup_notifier_threadsafe(self._parent.notify_all) class Queue(Generic[T]): From 1d91757b73ef48ae7d556087478719910c64c35f Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 25 Dec 2024 17:01:48 +0400 Subject: [PATCH 5/8] Move _get_loop() to AsyncCondition --- janus/__init__.py | 43 +++++++++++++++++-------------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index 827456d..f7cbc42 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -121,7 +121,13 @@ def __init__( self._pending = pending self.waiting = 0 + if sys.version_info < (3, 10): + self._get_loop() + async def wait(self) -> Literal[True]: + if sys.version_info >= (3, 10): + self._get_loop() + self.waiting += 1 self._mutex.release() try: @@ -130,6 +136,17 @@ async def wait(self) -> Literal[True]: self._mutex.acquire() self.waiting -= 1 + def _get_loop(self) -> asyncio.AbstractEventLoop: + # Warning! + # The function should be called when sync_mutex is locked, + # otherwise the code is not thread-safe + loop = asyncio.get_running_loop() + if self._loop is None: + self._loop = loop + if loop is not self._loop: + raise RuntimeError(f"{self!r} is bound to a different event loop") + return loop + async def _do_notifier(self, method: Callable[[], None]) -> None: async with self._parent: method() @@ -203,23 +220,6 @@ def __init__(self, maxsize: int = 0) -> None: self._sync_queue = _SyncQueueProxy(self) self._async_queue = _AsyncQueueProxy(self) - if sys.version_info < (3, 10): - self._get_loop() - - def _get_loop(self) -> asyncio.AbstractEventLoop: - # Warning! - # The function should be called when self._sync_mutex is locked, - # otherwise the code is not thread-safe - loop = asyncio.get_running_loop() - if self._loop is None: - self._loop = loop - self._async_not_empty._loop = loop - self._async_not_full._loop = loop - self._async_tasks_done._loop = loop - if loop is not self._loop: - raise RuntimeError(f"{self!r} is bound to a different event loop") - return loop - def shutdown(self, immediate: bool = False) -> None: """Shut-down the queue, making queue gets and puts raise an exception. @@ -570,8 +570,6 @@ async def put(self, item: T) -> None: This method is a coroutine. """ parent = self._parent - with parent._sync_mutex: - parent._get_loop() async with parent._async_mutex: with parent._sync_mutex: if parent._is_shutdown: @@ -591,7 +589,6 @@ def put_nowait(self, item: T) -> None: """ parent = self._parent with parent._sync_mutex: - parent._get_loop() if parent._is_shutdown: raise AsyncQueueShutDown if 0 < parent._maxsize <= parent._qsize(): @@ -608,8 +605,6 @@ async def get(self) -> T: This method is a coroutine. """ parent = self._parent - with parent._sync_mutex: - parent._get_loop() async with parent._async_mutex: with parent._sync_mutex: if parent._is_shutdown and not parent._qsize(): @@ -630,7 +625,6 @@ def get_nowait(self) -> T: """ parent = self._parent with parent._sync_mutex: - parent._get_loop() if parent._is_shutdown and not parent._qsize(): raise AsyncQueueShutDown if not parent._qsize(): @@ -656,7 +650,6 @@ def task_done(self) -> None: """ parent = self._parent with parent._sync_mutex: - parent._get_loop() if parent._unfinished_tasks <= 0: raise ValueError("task_done() called too many times") parent._unfinished_tasks -= 1 @@ -673,8 +666,6 @@ async def join(self) -> None: When the count of unfinished tasks drops to zero, join() unblocks. """ parent = self._parent - with parent._sync_mutex: - parent._get_loop() async with parent._async_mutex: with parent._sync_mutex: while parent._unfinished_tasks: From e11d263d98991daf795d0098cced13a0cf050559 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 25 Dec 2024 17:02:06 +0400 Subject: [PATCH 6/8] Update test_mixed.py --- tests/test_mixed.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_mixed.py b/tests/test_mixed.py index a4f085e..4eb53d7 100644 --- a/tests/test_mixed.py +++ b/tests/test_mixed.py @@ -21,16 +21,16 @@ def test_ctor_noloop(self): async def test_get_loop_ok(self): q = janus.Queue() loop = asyncio.get_running_loop() - assert q._get_loop() is loop - assert q._loop is loop + assert q._async_not_empty._get_loop() is loop + assert q._async_not_empty._loop is loop @pytest.mark.asyncio async def test_get_loop_different_loop(self): q = janus.Queue() # emulate binding another loop - loop = q._loop = asyncio.new_event_loop() + loop = q._async_not_empty._loop = asyncio.new_event_loop() with pytest.raises(RuntimeError, match="is bound to a different event loop"): - q._get_loop() + q._async_not_empty._get_loop() loop.close() @pytest.mark.asyncio From 2d3c05cac5461466d94a85ed4855de5ebb9673a7 Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 25 Dec 2024 17:23:13 +0400 Subject: [PATCH 7/8] Remove unnecessary _loop --- janus/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index f7cbc42..a8aa69f 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -187,8 +187,6 @@ def notify_all_soon(self) -> None: class Queue(Generic[T]): - _loop: Optional[asyncio.AbstractEventLoop] = None - def __init__(self, maxsize: int = 0) -> None: self._maxsize = maxsize self._is_shutdown = False From 405b1b3f65ddc1690ca67e8eda7a7823e709301e Mon Sep 17 00:00:00 2001 From: Ilya Egorov <0x42005e1f@gmail.com> Date: Wed, 25 Dec 2024 17:57:44 +0400 Subject: [PATCH 8/8] Remove unnecessary AsyncCondition.notify_all() --- janus/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/janus/__init__.py b/janus/__init__.py index a8aa69f..ee2354f 100644 --- a/janus/__init__.py +++ b/janus/__init__.py @@ -168,9 +168,6 @@ def _setup_notifier_threadsafe(self, method: Callable[[], None]) -> None: def notify(self) -> None: self._parent.notify() - def notify_all(self) -> None: - self._parent.notify_all() - def notify_soon(self) -> None: # Warning! # The function should be called when sync_lock is locked,