Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Improve KeyValue and ObjectStore watchers: Fix watching past history replay #644

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions nats/js/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
logger = logging.getLogger(__name__)


class StopIterSentinel:
"""A sentinel class used to indicate that iteration should stop."""

pass


class KeyValue:
"""
KeyValue uses the JetStream KeyValue functionality.
Expand Down Expand Up @@ -278,6 +284,9 @@ async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:
watcher = await self.watchall()
delete_markers = []
async for update in watcher:
if update is None:
break

if update.operation == KV_DEL or update.operation == KV_PURGE:
delete_markers.append(update)

Expand All @@ -300,11 +309,11 @@ async def status(self) -> BucketStatus:
return KeyValue.BucketStatus(stream_info=info, bucket=self._name)

class KeyWatcher:
STOP_ITER = StopIterSentinel()

def __init__(self, js):
self._js = js
self._updates: asyncio.Queue[KeyValue.Entry
| None] = asyncio.Queue(maxsize=256)
self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256)
self._sub = None
self._pending: Optional[int] = None

Expand All @@ -317,6 +326,7 @@ async def stop(self):
stop will stop this watcher.
"""
await self._sub.unsubscribe()
await self._updates.put(KeyValue.KeyWatcher.STOP_ITER)

async def updates(self, timeout=5.0):
"""
Expand All @@ -331,10 +341,10 @@ def __aiter__(self):
return self

async def __anext__(self):
entry = await self._updates.get()
if not entry:
raise StopAsyncIteration
else:
while True:
entry = await self._updates.get()
if isinstance(entry, StopIterSentinel):
raise StopAsyncIteration
return entry

async def watchall(self, **kwargs) -> KeyWatcher:
Expand Down
14 changes: 8 additions & 6 deletions nats/js/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
ObjectDeletedError,
ObjectNotFoundError,
)
from nats.js.kv import MSG_ROLLUP_SUBJECT
from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel

VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")
Expand Down Expand Up @@ -427,10 +427,11 @@ async def update_meta(
await self._js.purge_stream(self._stream, subject=meta_subj)

class ObjectWatcher:
STOP_ITER = StopIterSentinel()

def __init__(self, js):
self._js = js
self._updates = asyncio.Queue(maxsize=256)
self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256)
self._sub = None
self._pending: Optional[int] = None

Expand All @@ -457,10 +458,11 @@ def __aiter__(self):
return self

async def __anext__(self):
entry = await self._updates.get()
if not entry:
raise StopAsyncIteration
else:
while True:
entry = await self._updates.get()

if isinstance(entry, StopIterSentinel):
raise StopAsyncIteration
return entry

async def watch(
Expand Down