From 1ada32618f948431daf2bf97de15dfb18013292b Mon Sep 17 00:00:00 2001 From: Fielding Johnston Date: Sun, 22 Dec 2024 16:34:03 -0600 Subject: [PATCH] allow for watching future values with watcher Include StopIterSentinel oops --- nats/js/kv.py | 22 ++++++++++++++++------ nats/js/object_store.py | 14 ++++++++------ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/nats/js/kv.py b/nats/js/kv.py index 071205aa..b95e09bb 100644 --- a/nats/js/kv.py +++ b/nats/js/kv.py @@ -35,6 +35,12 @@ logger = logging.getLogger(__name__) +class StopIterSentinel: + """A sentinel class used to indicate that iteration should stop.""" + + pass + + class KeyValue: """ KeyValue uses the JetStream KeyValue functionality. @@ -278,6 +284,9 @@ async def purge_deletes(self, olderthan: int = 30 * 60) -> bool: watcher = await self.watchall() delete_markers = [] async for update in watcher: + if update is None: + break + if update.operation == KV_DEL or update.operation == KV_PURGE: delete_markers.append(update) @@ -300,11 +309,11 @@ async def status(self) -> BucketStatus: return KeyValue.BucketStatus(stream_info=info, bucket=self._name) class KeyWatcher: + STOP_ITER = StopIterSentinel() def __init__(self, js): self._js = js - self._updates: asyncio.Queue[KeyValue.Entry - | None] = asyncio.Queue(maxsize=256) + self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256) self._sub = None self._pending: Optional[int] = None @@ -317,6 +326,7 @@ async def stop(self): stop will stop this watcher. """ await self._sub.unsubscribe() + await self._updates.put(KeyValue.KeyWatcher.STOP_ITER) async def updates(self, timeout=5.0): """ @@ -331,10 +341,10 @@ def __aiter__(self): return self async def __anext__(self): - entry = await self._updates.get() - if not entry: - raise StopAsyncIteration - else: + while True: + entry = await self._updates.get() + if isinstance(entry, StopIterSentinel): + raise StopAsyncIteration return entry async def watchall(self, **kwargs) -> KeyWatcher: diff --git a/nats/js/object_store.py b/nats/js/object_store.py index 441e72d3..90b570a0 100644 --- a/nats/js/object_store.py +++ b/nats/js/object_store.py @@ -33,7 +33,7 @@ ObjectDeletedError, ObjectNotFoundError, ) -from nats.js.kv import MSG_ROLLUP_SUBJECT +from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$") VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$") @@ -427,10 +427,11 @@ async def update_meta( await self._js.purge_stream(self._stream, subject=meta_subj) class ObjectWatcher: + STOP_ITER = StopIterSentinel() def __init__(self, js): self._js = js - self._updates = asyncio.Queue(maxsize=256) + self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256) self._sub = None self._pending: Optional[int] = None @@ -457,10 +458,11 @@ def __aiter__(self): return self async def __anext__(self): - entry = await self._updates.get() - if not entry: - raise StopAsyncIteration - else: + while True: + entry = await self._updates.get() + + if isinstance(entry, StopIterSentinel): + raise StopAsyncIteration return entry async def watch(