Skip to content

Commit 87735f1

Browse files
committed
Use asyncio.QueueShutdown when available
Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent adc5c10 commit 87735f1

File tree

3 files changed

+52
-24
lines changed

3 files changed

+52
-24
lines changed

nats/src/nats/aio/client.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1797,15 +1797,9 @@ async def _process_msg(
17971797
if sub._jsi:
17981798
await sub._jsi.check_for_sequence_mismatch(msg)
17991799

1800-
# Send sentinel after reaching max messages for non-callback subscriptions.
1801-
if max_msgs_reached and not sub._cb and sub._active_consumers is not None and sub._active_consumers > 0:
1802-
# Send one sentinel per active consumer to unblock them all.
1803-
for _ in range(sub._active_consumers):
1804-
try:
1805-
sub._pending_queue.put_nowait(None)
1806-
except Exception:
1807-
# Queue might be full or closed, that's ok
1808-
break
1800+
# Unblock waiting consumers after reaching max messages for non-callback subscriptions.
1801+
if max_msgs_reached and not sub._cb:
1802+
sub._shutdown_queue()
18091803

18101804
def _build_message(
18111805
self,

nats/src/nats/aio/subscription.py

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import asyncio
18+
import sys
1819
from typing import (
1920
TYPE_CHECKING,
2021
AsyncIterator,
@@ -31,6 +32,16 @@
3132
if TYPE_CHECKING:
3233
from nats.js import JetStreamContext
3334

35+
# Python 3.13+ has QueueShutDown exception for cleaner queue termination.
36+
_HAS_QUEUE_SHUTDOWN = sys.version_info >= (3, 13)
37+
if _HAS_QUEUE_SHUTDOWN:
38+
from asyncio import QueueShutDown
39+
else:
40+
# For older Python versions, we'll use a custom exception
41+
class QueueShutDown(Exception):
42+
pass
43+
44+
3445
DEFAULT_SUB_PENDING_MSGS_LIMIT = 512 * 1024
3546
DEFAULT_SUB_PENDING_BYTES_LIMIT = 128 * 1024 * 1024
3647

@@ -84,8 +95,10 @@ def __init__(
8495
self._pending_msgs_limit = pending_msgs_limit
8596
self._pending_bytes_limit = pending_bytes_limit
8697
self._pending_queue: asyncio.Queue[Msg] = asyncio.Queue(maxsize=pending_msgs_limit)
87-
# Track active consumers (both async generators and next_msg calls) for non-callback subscriptions.
88-
if cb is None:
98+
99+
# For Python < 3.13, we need to track active consumers for sentinel-based termination
100+
# For Python 3.13+, we use QueueShutDown which doesn't require tracking.
101+
if not _HAS_QUEUE_SHUTDOWN and cb is None:
89102
self._active_consumers = 0 # Counter of active consumers waiting for messages
90103
else:
91104
self._active_consumers = None
@@ -135,8 +148,10 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
135148
Async generator that yields messages directly from the subscription queue.
136149
"""
137150
yielded_count = 0
151+
138152
if self._active_consumers is not None:
139153
self._active_consumers += 1
154+
140155
try:
141156
while True:
142157
# Check if subscription was cancelled/closed.
@@ -151,6 +166,8 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
151166
msg = await self._pending_queue.get()
152167
except asyncio.CancelledError:
153168
break
169+
except QueueShutDown:
170+
break
154171

155172
# Check for sentinel value which signals generator to stop.
156173
if msg is None:
@@ -224,7 +241,6 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
224241
if self._cb:
225242
raise errors.Error("nats: next_msg cannot be used in async subscriptions")
226243

227-
# Track this next_msg call
228244
if self._active_consumers is not None:
229245
self._active_consumers += 1
230246

@@ -243,8 +259,11 @@ async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
243259
if self._conn.is_closed:
244260
raise errors.ConnectionClosedError
245261
raise
262+
except QueueShutDown:
263+
if self._conn.is_closed:
264+
raise errors.ConnectionClosedError
265+
raise errors.TimeoutError
246266
finally:
247-
# Untrack this next_msg call.
248267
if self._active_consumers is not None:
249268
self._active_consumers -= 1
250269

@@ -345,22 +364,36 @@ async def unsubscribe(self, limit: int = 0):
345364
if not self._conn.is_reconnecting:
346365
await self._conn._send_unsubscribe(self._id, limit=limit)
347366

367+
def _shutdown_queue(self) -> None:
368+
"""
369+
Shutdown the subscription queue gracefully.
370+
371+
For Python 3.13+, uses queue.shutdown() for clean termination.
372+
For older Python versions, sends sentinel values to unblock consumers.
373+
"""
374+
try:
375+
if _HAS_QUEUE_SHUTDOWN:
376+
# Python 3.13+: Use queue shutdown for graceful termination.
377+
self._pending_queue.shutdown()
378+
elif self._active_consumers is not None:
379+
# Python < 3.13: Send sentinels for each active consumer, or at least one
380+
# to ensure any future consumers will be unblocked
381+
sentinels_to_send = max(1, self._active_consumers)
382+
for _ in range(sentinels_to_send):
383+
self._pending_queue.put_nowait(None)
384+
except Exception:
385+
pass
386+
348387
def _stop_processing(self) -> None:
349388
"""
350389
Stops the subscription from processing new messages.
351390
"""
352391
if self._wait_for_msgs_task and not self._wait_for_msgs_task.done():
353392
self._wait_for_msgs_task.cancel()
354393

355-
# Send sentinels to unblock waiting consumers
356-
try:
357-
if self._pending_queue and self._active_consumers is not None and self._active_consumers > 0:
358-
# Send one sentinel for each active consumer (both generators and next_msg calls)
359-
for _ in range(self._active_consumers):
360-
self._pending_queue.put_nowait(None)
361-
except Exception:
362-
# Queue might be closed or full, that's ok
363-
pass
394+
# Unblock waiting consumers
395+
if self._pending_queue:
396+
self._shutdown_queue()
364397

365398
async def _wait_for_msgs(self, error_cb) -> None:
366399
"""
@@ -401,3 +434,5 @@ async def _wait_for_msgs(self, error_cb) -> None:
401434
self._stop_processing()
402435
except asyncio.CancelledError:
403436
break
437+
except QueueShutDown:
438+
break

nats/src/nats/js/client.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,15 +883,14 @@ def __init__(
883883
self._cb = sub._cb
884884
self._future = sub._future
885885
self._closed = sub._closed
886-
self._active_generators = sub._active_generators
886+
self._active_consumers = sub._active_consumers
887887

888888
# Per subscription message processor.
889889
self._pending_msgs_limit = sub._pending_msgs_limit
890890
self._pending_bytes_limit = sub._pending_bytes_limit
891891
self._pending_queue = sub._pending_queue
892892
self._pending_size = sub._pending_size
893893
self._wait_for_msgs_task = sub._wait_for_msgs_task
894-
self._pending_next_msgs_calls = sub._pending_next_msgs_calls
895894

896895
async def consumer_info(self) -> api.ConsumerInfo:
897896
"""

0 commit comments

Comments
 (0)