Skip to content

Commit ba1e69b

Browse files
committed
Add create_or_update_consumer to JetStream and Stream
Signed-off-by: Casper Beyer <[email protected]>
1 parent 1950cc3 commit ba1e69b

File tree

3 files changed

+112
-0
lines changed

3 files changed

+112
-0
lines changed

nats-jetstream/src/nats/jetstream/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,28 @@ async def update_consumer(self, stream_name: str, consumer_name: str, **config)
535535
stream = await self.get_stream(stream_name)
536536
return await stream.update_consumer(name=consumer_name, **config)
537537

538+
async def create_or_update_consumer(self, stream_name: str, **config) -> Consumer:
539+
"""Create or update a consumer.
540+
541+
This method will either create a consumer if it does not exist or update
542+
an existing consumer (if possible). This is an idempotent operation.
543+
544+
Note: Some consumer configuration fields cannot be updated after creation
545+
(e.g., max_waiting must be set during creation).
546+
547+
Args:
548+
stream_name: Name of the stream
549+
**config: Consumer configuration (must include 'name')
550+
551+
Returns:
552+
The created or updated consumer
553+
554+
Raises:
555+
ValueError: If 'name' is not provided in config
556+
"""
557+
stream = await self.get_stream(stream_name)
558+
return await stream.create_or_update_consumer(**config)
559+
538560
async def consumer_names(self, stream_name: str) -> AsyncIterator[str]:
539561
"""Get an async iterator over all consumer names for a stream.
540562

nats-jetstream/src/nats/jetstream/stream.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,46 @@ async def update_consumer(self, config: ConsumerConfig | None = None, /, **kwarg
13901390
# Delegate to _upsert_consumer with "update" action
13911391
return await self._upsert_consumer(action=CONSUMER_ACTION_UPDATE, **config_dict)
13921392

1393+
@overload
1394+
async def create_or_update_consumer(self, config: ConsumerConfig, /) -> Consumer:
1395+
"""Create or update a consumer from a ConsumerConfig object."""
1396+
...
1397+
1398+
@overload
1399+
async def create_or_update_consumer(self, *, name: str, **config) -> Consumer:
1400+
"""Create or update a consumer with keyword arguments."""
1401+
...
1402+
1403+
async def create_or_update_consumer(self, config: ConsumerConfig | None = None, /, **kwargs) -> Consumer:
1404+
"""Create or update a consumer for this stream.
1405+
1406+
This method will either create a consumer if it does not exist or update
1407+
an existing consumer (if possible). This is an idempotent operation.
1408+
1409+
Note: Some consumer configuration fields cannot be updated after creation
1410+
(e.g., max_waiting must be set during creation).
1411+
1412+
Args:
1413+
config: A ConsumerConfig object (positional-only)
1414+
**kwargs: Consumer configuration parameters as keyword arguments
1415+
1416+
Returns:
1417+
The created or updated consumer
1418+
"""
1419+
if config is None:
1420+
# Create ConsumerConfig from kwargs
1421+
config = ConsumerConfig.from_kwargs(**kwargs)
1422+
1423+
# Validate consumer name
1424+
if config.name is None:
1425+
raise ValueError("ConsumerConfig must have a name")
1426+
1427+
# Convert ConsumerConfig to API request format
1428+
config_dict = config.to_request()
1429+
1430+
# Delegate to _upsert_consumer with empty action (create-or-update)
1431+
return await self._upsert_consumer(action=CONSUMER_ACTION_CREATE_OR_UPDATE, **config_dict)
1432+
13931433
async def pause_consumer(self, consumer_name: str, pause_until: float) -> None:
13941434
"""Pause a consumer until a specific time.
13951435

nats-jetstream/tests/test_jetstream.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,56 @@ async def test_update_nonexistent_consumer_fails(jetstream: JetStream):
519519
await jetstream.update_consumer(stream_name="test_stream", consumer_name="nonexistent", max_deliver=20)
520520

521521

522+
@pytest.mark.asyncio
523+
async def test_create_or_update_consumer_creates_new(jetstream: JetStream):
524+
"""Test that create_or_update_consumer creates a new consumer when it doesn't exist."""
525+
# Create a stream
526+
await jetstream.create_stream(name="test_stream", subjects=["FOO.*"])
527+
528+
# Create a consumer using create_or_update
529+
consumer = await jetstream.create_or_update_consumer(
530+
stream_name="test_stream", name="test_consumer", durable_name="test_consumer", max_deliver=10
531+
)
532+
533+
assert consumer.info.config.name == "test_consumer"
534+
assert consumer.info.config.max_deliver == 10
535+
536+
537+
@pytest.mark.asyncio
538+
async def test_create_or_update_consumer_updates_existing(jetstream: JetStream):
539+
"""Test that create_or_update_consumer updates an existing consumer."""
540+
# Create a stream and consumer
541+
await jetstream.create_stream(name="test_stream", subjects=["FOO.*"])
542+
await jetstream.create_consumer(
543+
stream_name="test_stream", name="test_consumer", durable_name="test_consumer", max_deliver=10
544+
)
545+
546+
# Update the consumer using create_or_update
547+
updated_consumer = await jetstream.create_or_update_consumer(
548+
stream_name="test_stream", name="test_consumer", durable_name="test_consumer", max_deliver=20
549+
)
550+
551+
assert updated_consumer.info.config.max_deliver == 20
552+
553+
554+
@pytest.mark.asyncio
555+
async def test_create_or_update_consumer_via_stream(jetstream: JetStream):
556+
"""Test create_or_update_consumer via Stream object."""
557+
# Create a stream
558+
stream = await jetstream.create_stream(name="test_stream", subjects=["FOO.*"])
559+
560+
# Create a consumer using create_or_update
561+
consumer = await stream.create_or_update_consumer(name="test_consumer", max_deliver=10)
562+
563+
assert consumer.info.config.name == "test_consumer"
564+
assert consumer.info.config.max_deliver == 10
565+
566+
# Update the same consumer
567+
updated_consumer = await stream.create_or_update_consumer(name="test_consumer", max_deliver=20)
568+
569+
assert updated_consumer.info.config.max_deliver == 20
570+
571+
522572
@pytest.mark.asyncio
523573
async def test_delete_nonexistent_consumer_fails(jetstream: JetStream):
524574
"""Test that deleting a non-existent consumer fails."""

0 commit comments

Comments
 (0)