Skip to content

Commit 185c9f9

Browse files
committed
Overhaul error handling
Signed-off-by: Casper Beyer <[email protected]>
1 parent ba1e69b commit 185c9f9

File tree

8 files changed

+544
-125
lines changed

8 files changed

+544
-125
lines changed

nats-jetstream/src/nats/jetstream/__init__.py

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,18 @@
1212
from nats.client.protocol.message import parse_headers
1313
from nats.jetstream import api
1414
from nats.jetstream.consumer import Consumer, ConsumerInfo
15+
from nats.jetstream.errors import (
16+
ConsumerDeletedError,
17+
ConsumerNotFoundError,
18+
ErrorCode,
19+
JetStreamError,
20+
JetStreamNotEnabledError,
21+
JetStreamNotEnabledForAccountError,
22+
MaximumConsumersLimitError,
23+
MessageNotFoundError,
24+
StreamNameAlreadyInUseError,
25+
StreamNotFoundError,
26+
)
1527
from nats.jetstream.stream import (
1628
ClusterInfo,
1729
ExternalStreamSource,
@@ -421,6 +433,11 @@ async def create_stream(self, config: StreamConfig | None = None, /, **kwargs) -
421433
422434
Returns:
423435
The created Stream object
436+
437+
Raises:
438+
ValueError: If stream name is not provided
439+
StreamNameAlreadyInUseError: If a stream with this name already exists
440+
JetStreamError: For other JetStream API errors
424441
"""
425442
if config is None:
426443
# Create StreamConfig from kwargs with dict-to-dataclass conversion
@@ -437,15 +454,38 @@ async def create_stream(self, config: StreamConfig | None = None, /, **kwargs) -
437454
return Stream(self, config.name, info)
438455

439456
async def update_stream(self, **config) -> StreamInfo:
440-
"""Update an existing stream."""
457+
"""Update an existing stream.
458+
459+
Args:
460+
**config: Stream configuration parameters (must include 'name')
461+
462+
Returns:
463+
Updated StreamInfo
464+
465+
Raises:
466+
ValueError: If 'name' is not provided in config
467+
StreamNotFoundError: If the stream does not exist
468+
JetStreamError: For other JetStream API errors
469+
"""
441470
name = config.get("name")
442471
if name is None:
443472
raise ValueError("Stream name is required for update")
444473
response = await self._api.stream_update(name, **config)
445474
return StreamInfo.from_response(response, strict=self._strict)
446475

447476
async def delete_stream(self, name: str) -> bool:
448-
"""Delete a stream."""
477+
"""Delete a stream.
478+
479+
Args:
480+
name: Name of the stream to delete
481+
482+
Returns:
483+
True if the stream was deleted
484+
485+
Raises:
486+
StreamNotFoundError: If the stream does not exist
487+
JetStreamError: For other JetStream API errors
488+
"""
449489
response = await self._api.stream_delete(name)
450490
return response["success"]
451491

@@ -467,7 +507,18 @@ async def get_stream_info(
467507
return StreamInfo.from_response(response, strict=self._strict)
468508

469509
async def get_stream(self, name: str) -> Stream:
470-
"""Get a stream by name."""
510+
"""Get a stream by name.
511+
512+
Args:
513+
name: Name of the stream to get
514+
515+
Returns:
516+
The Stream object
517+
518+
Raises:
519+
StreamNotFoundError: If the stream does not exist
520+
JetStreamError: For other JetStream API errors
521+
"""
471522
info = await self.get_stream_info(name)
472523
return Stream(self, name, info)
473524

@@ -482,6 +533,11 @@ async def create_consumer(self, stream_name: str, name: str, durable_name: str |
482533
483534
Returns:
484535
The created consumer
536+
537+
Raises:
538+
StreamNotFoundError: If the stream does not exist
539+
MaximumConsumersLimitError: If maximum consumers limit is reached
540+
JetStreamError: For other JetStream API errors
485541
"""
486542
# Get the stream first
487543
stream = await self.get_stream(stream_name)
@@ -504,6 +560,11 @@ async def get_consumer(self, stream_name: str, consumer_name: str) -> Consumer:
504560
505561
Returns:
506562
The consumer
563+
564+
Raises:
565+
StreamNotFoundError: If the stream does not exist
566+
ConsumerNotFoundError: If the consumer does not exist
567+
JetStreamError: For other JetStream API errors
507568
"""
508569
stream = await self.get_stream(stream_name)
509570
return await stream.get_consumer(consumer_name)
@@ -517,6 +578,11 @@ async def delete_consumer(self, stream_name: str, consumer_name: str) -> bool:
517578
518579
Returns:
519580
True if the consumer was deleted
581+
582+
Raises:
583+
StreamNotFoundError: If the stream does not exist
584+
ConsumerNotFoundError: If the consumer does not exist
585+
JetStreamError: For other JetStream API errors
520586
"""
521587
stream = await self.get_stream(stream_name)
522588
return await stream.delete_consumer(consumer_name)
@@ -531,6 +597,11 @@ async def update_consumer(self, stream_name: str, consumer_name: str, **config)
531597
532598
Returns:
533599
The updated consumer
600+
601+
Raises:
602+
StreamNotFoundError: If the stream does not exist
603+
ConsumerNotFoundError: If the consumer does not exist
604+
JetStreamError: For other JetStream API errors
534605
"""
535606
stream = await self.get_stream(stream_name)
536607
return await stream.update_consumer(name=consumer_name, **config)
@@ -553,6 +624,9 @@ async def create_or_update_consumer(self, stream_name: str, **config) -> Consume
553624
554625
Raises:
555626
ValueError: If 'name' is not provided in config
627+
StreamNotFoundError: If the stream does not exist
628+
MaximumConsumersLimitError: If maximum consumers limit is reached
629+
JetStreamError: For other JetStream API errors
556630
"""
557631
stream = await self.get_stream(stream_name)
558632
return await stream.create_or_update_consumer(**config)
@@ -637,7 +711,16 @@ async def get_consumer_info(self, stream_name: str, consumer_name: str) -> Consu
637711
return ConsumerInfo.from_response(response, strict=self._strict)
638712

639713
async def account_info(self) -> AccountInfo:
640-
"""Get account information."""
714+
"""Get account information.
715+
716+
Returns:
717+
Account information including limits and usage
718+
719+
Raises:
720+
JetStreamNotEnabledError: If JetStream is not enabled on the server
721+
JetStreamNotEnabledForAccountError: If JetStream is not enabled for this account
722+
JetStreamError: For other JetStream API errors
723+
"""
641724
response = await self._api.account_info()
642725
return AccountInfo.from_response(response, strict=self._strict)
643726

@@ -653,6 +736,10 @@ async def get_message(self, stream: str, sequence: int) -> StreamMessage:
653736
654737
Returns:
655738
The stream message including subject, data, headers, etc.
739+
740+
Raises:
741+
MessageNotFoundError: If the message does not exist
742+
JetStreamError: For other JetStream API errors
656743
"""
657744
response = await self._api.stream_msg_get(stream, seq=sequence)
658745
message = response["message"]
@@ -742,6 +829,7 @@ def new(client: Client, prefix: str = "$JS.API", domain: str | None = None, stri
742829

743830

744831
__all__ = [
832+
# Core classes
745833
"JetStream",
746834
"Consumer",
747835
"ConsumerInfo",
@@ -764,4 +852,15 @@ def new(client: Client, prefix: str = "$JS.API", domain: str | None = None, stri
764852
"Tier",
765853
"APIStats",
766854
"PublishAck",
855+
# Errors
856+
"ErrorCode",
857+
"JetStreamError",
858+
"ConsumerDeletedError",
859+
"ConsumerNotFoundError",
860+
"JetStreamNotEnabledError",
861+
"JetStreamNotEnabledForAccountError",
862+
"MaximumConsumersLimitError",
863+
"MessageNotFoundError",
864+
"StreamNameAlreadyInUseError",
865+
"StreamNotFoundError",
767866
]

nats-jetstream/src/nats/jetstream/api/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .client import Client, ConsumerDeletedError, Error
1+
from .client import Client
22

33
# Only the non-request/response types are exported
44
from .types import (
@@ -26,8 +26,6 @@
2626

2727
__all__ = [
2828
"Client",
29-
"ConsumerDeletedError",
30-
"Error",
3129
"AccountInfo",
3230
"AccountLimits",
3331
"ApiStats",

0 commit comments

Comments
 (0)