Skip to content

Commit cdfbf85

Browse files
committed
js: support consumer with name
Signed-off-by: Waldemar Quevedo <[email protected]>
1 parent 517f480 commit cdfbf85

File tree

3 files changed

+184
-65
lines changed

3 files changed

+184
-65
lines changed

nats/js/api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ class ConsumerConfig(Base):
371371
References:
372372
* `Consumers <https://docs.nats.io/jetstream/concepts/consumers>`_
373373
"""
374+
name: Optional[str] = None
374375
durable_name: Optional[str] = None
375376
description: Optional[str] = None
376377
deliver_policy: Optional[DeliverPolicy] = DeliverPolicy.ALL

nats/js/manager.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -193,18 +193,23 @@ async def add_consumer(
193193
req_data = json.dumps(req).encode()
194194

195195
resp = None
196-
if durable_name is not None:
197-
resp = await self._api_request(
198-
f"{self._prefix}.CONSUMER.DURABLE.CREATE.{stream}.{durable_name}",
199-
req_data,
200-
timeout=timeout
201-
)
196+
subject = ''
197+
version = self._nc.connected_server_version
198+
consumer_name_supported = version.major >= 2 and version.minor >= 9
199+
if consumer_name_supported and config.name:
200+
# NOTE: Only supported after nats-server v2.9.0
201+
if config.filter_subject and config.filter_subject != ">":
202+
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}.{config.name}.{config.filter_subject}"
203+
else:
204+
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}.{config.name}"
205+
elif durable_name:
206+
# NOTE: Legacy approach to create consumers. After nats-server v2.9
207+
# name option can be used instead.
208+
subject = f"{self._prefix}.CONSUMER.DURABLE.CREATE.{stream}.{durable_name}"
202209
else:
203-
resp = await self._api_request(
204-
f"{self._prefix}.CONSUMER.CREATE.{stream}",
205-
req_data,
206-
timeout=timeout
207-
)
210+
subject = f"{self._prefix}.CONSUMER.CREATE.{stream}"
211+
212+
resp = await self._api_request(subject, req_data, timeout=timeout)
208213
return api.ConsumerInfo.from_response(resp)
209214

210215
async def delete_consumer(self, stream: str, consumer: str) -> bool:

tests/test_js.py

Lines changed: 167 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,119 @@ async def test_number_of_consumer_replicas(self):
980980

981981
await nc.close()
982982

983+
@async_test
984+
async def test_consumer_with_name(self):
985+
nc = NATS()
986+
await nc.connect()
987+
js = nc.jetstream()
988+
jsm = nc.jsm()
989+
990+
tsub = await nc.subscribe("$JS.API.CONSUMER.>")
991+
992+
# Create stream.
993+
await jsm.add_stream(name="ctests", subjects=["a", "b", "c.>"])
994+
await js.publish("a", b'hello world!')
995+
await js.publish("b", b'hello world!!')
996+
await js.publish("c.d", b'hello world!!!')
997+
await js.publish("c.d.e", b'hello world!!!!')
998+
999+
# Create ephemeral pull consumer with a name.
1000+
stream_name = "ctests"
1001+
consumer_name = "ephemeral"
1002+
cinfo = await jsm.add_consumer(
1003+
stream_name,
1004+
name=consumer_name,
1005+
ack_policy="explicit",
1006+
)
1007+
assert cinfo.config.name == consumer_name
1008+
1009+
msg = await tsub.next_msg()
1010+
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.ephemeral'
1011+
1012+
sub = await js.pull_subscribe_bind(consumer_name, stream_name)
1013+
msgs = await sub.fetch(1)
1014+
assert msgs[0].data == b'hello world!'
1015+
ok = await msgs[0].ack_sync()
1016+
assert ok
1017+
1018+
msg = await tsub.next_msg()
1019+
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.ephemeral'
1020+
1021+
# Create durable pull consumer with a name.
1022+
consumer_name = "durable"
1023+
cinfo = await jsm.add_consumer(
1024+
stream_name,
1025+
name=consumer_name,
1026+
durable_name=consumer_name,
1027+
ack_policy="explicit",
1028+
)
1029+
assert cinfo.config.name == consumer_name
1030+
msg = await tsub.next_msg()
1031+
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.durable'
1032+
1033+
sub = await js.pull_subscribe_bind(consumer_name, stream_name)
1034+
msgs = await sub.fetch(1)
1035+
assert msgs[0].data == b'hello world!'
1036+
ok = await msgs[0].ack_sync()
1037+
assert ok
1038+
msg = await tsub.next_msg()
1039+
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.durable'
1040+
1041+
# Create durable pull consumer with a name and a filter_subject
1042+
consumer_name = "durable2"
1043+
cinfo = await jsm.add_consumer(
1044+
stream_name,
1045+
name=consumer_name,
1046+
durable_name=consumer_name,
1047+
filter_subject="b",
1048+
ack_policy="explicit",
1049+
)
1050+
assert cinfo.config.name == consumer_name
1051+
msg = await tsub.next_msg()
1052+
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.durable2.b'
1053+
1054+
sub = await js.pull_subscribe_bind(consumer_name, stream_name)
1055+
msgs = await sub.fetch(1)
1056+
assert msgs[0].data == b'hello world!!'
1057+
ok = await msgs[0].ack_sync()
1058+
assert ok
1059+
msg = await tsub.next_msg()
1060+
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.durable2'
1061+
1062+
# Create durable pull consumer with a name and a filter_subject
1063+
consumer_name = "durable3"
1064+
cinfo = await jsm.add_consumer(
1065+
stream_name,
1066+
name=consumer_name,
1067+
durable_name=consumer_name,
1068+
filter_subject=">",
1069+
ack_policy="explicit",
1070+
)
1071+
assert cinfo.config.name == consumer_name
1072+
msg = await tsub.next_msg()
1073+
assert msg.subject == '$JS.API.CONSUMER.CREATE.ctests.durable3'
1074+
1075+
sub = await js.pull_subscribe_bind(consumer_name, stream_name)
1076+
msgs = await sub.fetch(1)
1077+
assert msgs[0].data == b'hello world!'
1078+
ok = await msgs[0].ack_sync()
1079+
assert ok
1080+
msg = await tsub.next_msg()
1081+
assert msg.subject == '$JS.API.CONSUMER.MSG.NEXT.ctests.durable3'
1082+
1083+
# name and durable must match if both present.
1084+
with pytest.raises(BadRequestError) as err:
1085+
await jsm.add_consumer(
1086+
stream_name,
1087+
name="name1",
1088+
durable_name="name2",
1089+
ack_policy="explicit",
1090+
)
1091+
assert err.value.err_code == 10017
1092+
assert err.value.description == 'consumer name in subject does not match durable name in request'
1093+
1094+
await nc.close()
1095+
9831096

9841097
class SubscribeTest(SingleJetStreamServerTestCase):
9851098

@@ -2634,60 +2747,60 @@ async def test_account_limits(self):
26342747

26352748
# Check unmarshalling response with Tiers:
26362749
blob = """{
2637-
"type": "io.nats.jetstream.api.v1.account_info_response",
2638-
"memory": 0,
2639-
"storage": 6829550,
2640-
"streams": 1,
2641-
"consumers": 0,
2642-
"limits": {
2643-
"max_memory": 0,
2644-
"max_storage": 0,
2645-
"max_streams": 0,
2646-
"max_consumers": 0,
2647-
"max_ack_pending": 0,
2648-
"memory_max_stream_bytes": 0,
2649-
"storage_max_stream_bytes": 0,
2650-
"max_bytes_required": false
2651-
},
2652-
"domain": "ngs",
2653-
"api": {
2654-
"total": 6,
2655-
"errors": 0
2656-
},
2657-
"tiers": {
2658-
"R1": {
2659-
"memory": 0,
2660-
"storage": 6829550,
2661-
"streams": 1,
2662-
"consumers": 0,
2663-
"limits": {
2664-
"max_memory": 0,
2665-
"max_storage": 2000000000000,
2666-
"max_streams": 100,
2667-
"max_consumers": 1000,
2668-
"max_ack_pending": -1,
2669-
"memory_max_stream_bytes": -1,
2670-
"storage_max_stream_bytes": -1,
2671-
"max_bytes_required": true
2672-
}
2673-
},
2674-
"R3": {
2675-
"memory": 0,
2676-
"storage": 0,
2677-
"streams": 0,
2678-
"consumers": 0,
2679-
"limits": {
2680-
"max_memory": 0,
2681-
"max_storage": 500000000000,
2682-
"max_streams": 25,
2683-
"max_consumers": 250,
2684-
"max_ack_pending": -1,
2685-
"memory_max_stream_bytes": -1,
2686-
"storage_max_stream_bytes": -1,
2687-
"max_bytes_required": true
2688-
}
2689-
}
2690-
}}
2750+
"type": "io.nats.jetstream.api.v1.account_info_response",
2751+
"memory": 0,
2752+
"storage": 6829550,
2753+
"streams": 1,
2754+
"consumers": 0,
2755+
"limits": {
2756+
"max_memory": 0,
2757+
"max_storage": 0,
2758+
"max_streams": 0,
2759+
"max_consumers": 0,
2760+
"max_ack_pending": 0,
2761+
"memory_max_stream_bytes": 0,
2762+
"storage_max_stream_bytes": 0,
2763+
"max_bytes_required": false
2764+
},
2765+
"domain": "ngs",
2766+
"api": {
2767+
"total": 6,
2768+
"errors": 0
2769+
},
2770+
"tiers": {
2771+
"R1": {
2772+
"memory": 0,
2773+
"storage": 6829550,
2774+
"streams": 1,
2775+
"consumers": 0,
2776+
"limits": {
2777+
"max_memory": 0,
2778+
"max_storage": 2000000000000,
2779+
"max_streams": 100,
2780+
"max_consumers": 1000,
2781+
"max_ack_pending": -1,
2782+
"memory_max_stream_bytes": -1,
2783+
"storage_max_stream_bytes": -1,
2784+
"max_bytes_required": true
2785+
}
2786+
},
2787+
"R3": {
2788+
"memory": 0,
2789+
"storage": 0,
2790+
"streams": 0,
2791+
"consumers": 0,
2792+
"limits": {
2793+
"max_memory": 0,
2794+
"max_storage": 500000000000,
2795+
"max_streams": 25,
2796+
"max_consumers": 250,
2797+
"max_ack_pending": -1,
2798+
"memory_max_stream_bytes": -1,
2799+
"storage_max_stream_bytes": -1,
2800+
"max_bytes_required": true
2801+
}
2802+
}
2803+
}}
26912804
"""
26922805

26932806
expected = nats.js.api.AccountInfo(

0 commit comments

Comments
 (0)