Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Improve KeyValue and ObjectStore watchers: Fix watching past history replay #644

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

fielding
Copy link

Overview

This PR enhances the KeyValue and ObjectStore watchers in nats.py by introducing a few key improvements:

1. StopIterSentinel for Watchers

  • Introduced a sentinel object (StopIterSentinel) and enqueue it on the watcher’s queue to signal termination (e.g., when stop() is called).
  • This allows the watchers to end iteration gracefully (StopAsyncIteration) without conflating a None marker with an end-of-stream condition.

2. Ensure Watchers Do Not Break on None

  • Previously, watchers might treat a None in the queue as a termination signal. But for the JetStream KV watchers, None is also used internally to indicate “no more pending updates” as part of the initial replay.
  • This PR changes the watchers to continuously yield future values after the initial replay, only terminating if they receive the StopIterSentinel.

3. Minor Cleanups in Watchers

  • Unified some code so that KeyWatcher and ObjectWatcher push the StopIterSentinel to their _updates queue and raise StopAsyncIteration inside __anext__() if the sentinel is dequeued.
  • This helps watchers remain consistent with ADR-8’s specification for KeyValue stores and object watchers that keep running after the initial flush.

Note

  • All tests now pass except for one check in the test test_kv_simple that does assert 2 == 1. That specific test (from the official suite) checks behavior around purge vs. expected message count, which can differ in certain server versions. If this is not due to the version mismatch I will fix or close this.

Linked Issues

This PR might relate to or address:

(References and discussion around ephemeral watchers, None handling, or improvements to KV watchers.)


Impact and Compatibility

  • Backward Compatibility: The watchers remain backward compatible. These changes fix edge cases around iteration and do not break the existing watcher contract as specified by the tests.
  • StopIterSentinel: This is only used internally; existing user code should not be disrupted aside from improved reliability.
  • Tests: The changes pass all existing tests, except for the test_kv_simple mismatch.

Usage Example

Below is an example of usage for a KV watcher now that watchers keep running beyond the initial replay:

# Acquire KV
kv = await js.key_value("MY_BUCKET")
watcher = await kv.watch("some.prefix.*", include_history=False)

async for entry in watcher:
    if entry is None:
        # This means initial replay is done, keep going or anything else fancied by the user at that time
        continue
        
    print("Received Key:", entry.key, "Value:", entry.value)

Contributing Statement
• This contribution is my original work.
• I license the work to the NATS project under the project’s Apache 2.0 license.

Thanks for your consideration! If any further adjustments or clarifications are needed, please let me know. I tried to adhere to the ADR and existing code practices as closely as possible.

Cheers,
Fielding

@fielding fielding marked this pull request as draft December 22, 2024 17:54
@fielding
Copy link
Author

Sorry meant to open as a draft. I will address the build failure issues then mark as ready.

@fielding fielding marked this pull request as ready for review December 22, 2024 22:32
Include StopIterSentinel

oops
@fielding fielding force-pushed the fix/watcher-async-iteration branch from fc0d9d5 to 1ada326 Compare December 22, 2024 22:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant