Skip to content

Commit

Permalink
allow for watching future values with watcher
Browse files Browse the repository at this point in the history
Include StopIterSentinel

oops
  • Loading branch information
fielding committed Dec 22, 2024
1 parent 7e7883e commit 1ada326
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
22 changes: 16 additions & 6 deletions nats/js/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
logger = logging.getLogger(__name__)


class StopIterSentinel:
"""A sentinel class used to indicate that iteration should stop."""

pass


class KeyValue:
"""
KeyValue uses the JetStream KeyValue functionality.
Expand Down Expand Up @@ -278,6 +284,9 @@ async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:
watcher = await self.watchall()
delete_markers = []
async for update in watcher:
if update is None:
break

if update.operation == KV_DEL or update.operation == KV_PURGE:
delete_markers.append(update)

Expand All @@ -300,11 +309,11 @@ async def status(self) -> BucketStatus:
return KeyValue.BucketStatus(stream_info=info, bucket=self._name)

class KeyWatcher:
STOP_ITER = StopIterSentinel()

def __init__(self, js):
self._js = js
self._updates: asyncio.Queue[KeyValue.Entry
| None] = asyncio.Queue(maxsize=256)
self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256)
self._sub = None
self._pending: Optional[int] = None

Expand All @@ -317,6 +326,7 @@ async def stop(self):
stop will stop this watcher.
"""
await self._sub.unsubscribe()
await self._updates.put(KeyValue.KeyWatcher.STOP_ITER)

async def updates(self, timeout=5.0):
"""
Expand All @@ -331,10 +341,10 @@ def __aiter__(self):
return self

async def __anext__(self):
entry = await self._updates.get()
if not entry:
raise StopAsyncIteration
else:
while True:
entry = await self._updates.get()
if isinstance(entry, StopIterSentinel):
raise StopAsyncIteration
return entry

async def watchall(self, **kwargs) -> KeyWatcher:
Expand Down
14 changes: 8 additions & 6 deletions nats/js/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
ObjectDeletedError,
ObjectNotFoundError,
)
from nats.js.kv import MSG_ROLLUP_SUBJECT
from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel

VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")
Expand Down Expand Up @@ -427,10 +427,11 @@ async def update_meta(
await self._js.purge_stream(self._stream, subject=meta_subj)

class ObjectWatcher:
STOP_ITER = StopIterSentinel()

def __init__(self, js):
self._js = js
self._updates = asyncio.Queue(maxsize=256)
self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256)
self._sub = None
self._pending: Optional[int] = None

Expand All @@ -457,10 +458,11 @@ def __aiter__(self):
return self

async def __anext__(self):
entry = await self._updates.get()
if not entry:
raise StopAsyncIteration
else:
while True:
entry = await self._updates.get()

if isinstance(entry, StopIterSentinel):
raise StopAsyncIteration
return entry

async def watch(
Expand Down

0 comments on commit 1ada326

Please sign in to comment.