Skip to content

Conversation

@caspervonb
Copy link
Collaborator

@caspervonb caspervonb commented Sep 23, 2025

This adds a new modern (following the newer simplified interface) JetStream package under the name nats-jetstream which implements the following:

  • ADR-7: NATS Server Error Codes (errors need work)
  • ADR-10: JetStream Extended Purge
  • ADR-13: Pull Subscribe Internals (needs review)
  • ADR-17: Ordered Consumer (not started)
  • ADR-22: JetStream Publish Retries on No Responders
  • ADR-28: JetStream RePublish
  • ADR-31: JetStream Direct Get
  • ADR-33: Metadata for Stream and Consumer
  • ADR-34: JetStream Consumers Multiple Filters
  • ADR-36: Subject Mapping Transforms in Streams
  • ADR-37: JetStream Simplification
  • ADR-42: Priority Groups for Pull Consumers (partially implemented - missing pinned and overflow modes)
  • ADR-44: Consumer Pause/Resume

Depends on #732

@caspervonb caspervonb force-pushed the add-nats-jetstream-package branch from cc17acd to b383f0b Compare September 23, 2025 10:39
Copy link
Member

@aricart aricart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure why we are going to look at the schemas and validate them... But LGTM

Copy link
Collaborator Author

@caspervonb caspervonb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits

Copy link
Collaborator Author

@caspervonb caspervonb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More nits

self,
stream_name: str,
name: str,
durable_name: str | None = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pack into kwargs

Suggested change
durable_name: str | None = None,

self,
stream_name: str,
name: str,
durable_name: str | None = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pack into kwargs

Suggested change
durable_name: str | None = None,

Returns:
A unique inbox subject string in the format "_INBOX.{uuid}"
"""
return f"_INBOX.{uuid.uuid4().hex}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened to the nuid?

Copy link
Member

@wallyqs wallyqs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have:

  • Clustering based test cases
  • Cluster reconnect test cases resuming fetch

?

@oliverlambson oliverlambson mentioned this pull request Sep 24, 2025
3 tasks
Copy link
Member

@philpennock philpennock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed through end of the src/nats/jetstream/api/ folder, need to resume from the consumer/ sibling to that when I next look at this again. Sending what I've looked at so far though. (Will be around on Thursday to resume).

while True:
await asyncio.sleep(0.5)
if nc.status != ClientStatus.CONNECTED:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is example code for people to start from, and people do Interesting Things when copy/pasting without full understanding, I think it's worth a small comment:

        if nc.status != ClientStatus.CONNECTED:
            # NB: this is only safe because of the sleep above; beware busy loops
            continue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah example got stale, need new ones.

try:
await js.publish("FOO.TEST1", f"msg {i}".encode())
except Exception as err:
print("pub error: ", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grumble (unix stdio pedant) : , file=sys.stderr, flush=True) (and import sys above)

@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://nats.io/schemas/jetstream/api/v1/account_info_response.json",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, are we expecting every client library for each language to hold a copy of these? That seems ... sub-optimal. Shouldn't there be one primary authoritative location, designed to be pulled in via git submodules or whichever other repo-linking poison folks prefer?

(I'm assuming no point reviewing the schemas in this PR so skipping past them.)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, those schemas live and are maintained in jsm.go, it'd be hard to keep it up to date across many repos.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are pulled from there, just with a little bit of filtering unwanted files. I'd do a submodule if we had nats-io/schema or something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there is a task to fetch them from jsm.go?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah tools/fetch_schemas.py

StreamState,
SubjectTransform,
Tier,
LostStreamData,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort

def check_response(
data: Any, expected_type: type[ResponseT]
) -> tuple[bool, set[str] | None, set[str] | None]:
# TODO check for missing and unknown keys
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity: are you expecting this TODO to be done before this PR moves out of draft, or for some later milestone?


consumer_name = consumer_config.get("name")
if not consumer_name:
raise ValueError("name is required")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be more friendly if all 3 were collected, and then if not (a and b and c) used to return a ValueError listing all the missing fields at once.

(nb: this is an opinion and not a blocking issue, happy to be ignored on this)

Comment on lines 169 to 202
mirror = kwargs.get("mirror")
subjects = kwargs.get("subjects")
if mirror is not None and subjects:
raise ValueError("Cannot specify both mirror and subjects")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I look at logic like this and can't help but think "Isn't this schema enforcement? If we have all these .json schema files, why aren't the client functions (at least for infrequent ops like stream create) just validating the config against the schema file instead of implementing policy checks manually?"

Comment on lines +287 to +318
response_type: type[ResponseT],
raise_on_error: Literal[True] = True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, and the same in the next @overload, is missing the timeout parameter; the implementation is assignable to these function signatures so it should work, but it looks like this means that type-checkers should reject attempts to actually provide the timeout parameter. Have I misunderstood? Is this intentional? Or is it just a forgotten parameter for the signature?

raise_on_error: bool = True,
timeout: float = 5.0,
) -> ResponseT | ErrorResponse:
request_id = str(uuid.uuid4())[:8]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is reading 16 bytes from /dev/urandom every time, just to return the representation of the first 4 bytes. Aside from making me twitch (which is probably amusing 😉), that's also going to be a device read on every JSON request.

The secrets module was introduced in Python 3.6; secrets.token_hex(4) will be more efficient, but still hit /dev/urandom. But the only goal is a distinct ID, there's no defense against other clients knowing the ID, right? Presumably not using an incrementing integer to avoid threading issues, but should then be using a client-side cache to prevent re-use, so just rolling the dice here (but the die has a lot of sides). I'd be tempted to use an in-process PRNG seeded once and just roll with that.

For now, just use secrets.token_hex(4) please and remember to benchmark to see if this ever matters with lots of small JSON requests.

Copy link
Member

@philpennock philpennock left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the rest, except I skimmed the test files just enough to confirm they were limited to behavior tests, not reflective integrity tests.

"""Protocol for a batch of messages retrieved from a JetStream consumer."""

def __aiter__(self) -> AsyncIterator[Message]:
"""Return self as an async iterator."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in MessageStream below, is this really self?

raise StopAsyncIteration
case _:
self._error = Exception(
f"Status {raw_msg.status.code}: {raw_msg.status.description or ''}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: can we use or '(missing description)' to make it clearer if this exception is ever seen that there was something expected that's missing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we shouldn't swallow unexpected errors.

try:
headers_bytes = base64.b64decode(message["hdrs"])
headers_str = headers_bytes.decode("utf-8")
if headers_str.startswith("NATS/1.0\r\n"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an exception raised if the headers don't start with this?

def main():
"""Main entry point."""
# Get the root directory of our project
root_dir = Path(__file__).parent.parent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fragile to current location in the git repo. If you're going to invoke git anyway for this rare maintenance operation, then git rev-parse --show-toplevel is your friend here.

@caspervonb caspervonb force-pushed the add-nats-client-package branch 4 times, most recently from 7acfd1f to 2e13fe5 Compare October 7, 2025 15:02
@caspervonb caspervonb force-pushed the add-nats-jetstream-package branch 4 times, most recently from 1da8646 to b6b8a1a Compare October 7, 2025 15:38
@caspervonb caspervonb force-pushed the add-nats-client-package branch 2 times, most recently from 3cd0ce1 to 171ad83 Compare October 8, 2025 20:16
Signed-off-by: Casper Beyer <[email protected]>
Signed-off-by: Casper Beyer <[email protected]>
Signed-off-by: Casper Beyer <[email protected]>
Signed-off-by: Casper Beyer <[email protected]>
- Add validation to reject both max_messages and max_bytes simultaneously
- Set batch size to 1,000,000 when using max_bytes for better throughput
- Add heartbeat timer pause/resume on disconnect/reconnect
- All three fixes address the non-compliance issues identified in ADR-37
- Remove test that used both max_messages and max_bytes together
- Add test validating rejection of both parameters
- Fix threshold_bytes test to use only max_bytes per ADR-37
Signed-off-by: Casper Beyer <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants