Skip to content

Commit 95e7fac

Browse files
Fix filter_subject overriding filter_subjects (#711)
Signed-off-by: Mark Jan van Kampen <[email protected]>
1 parent 9834125 commit 95e7fac

File tree

2 files changed

+65
-5
lines changed

2 files changed

+65
-5
lines changed

nats/src/nats/js/client.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,9 @@ async def cb(msg):
437437
deliver = self._nc.new_inbox()
438438
config.deliver_subject = deliver
439439

440-
# Auto created consumers use the filter subject.
441-
config.filter_subject = subject
440+
# Auto created consumers use the filter subject, unless filter_subjects is set.
441+
if not config.filter_subjects:
442+
config.filter_subject = subject
442443

443444
# Heartbeats / FlowControl
444445
config.flow_control = flow_control
@@ -593,9 +594,10 @@ async def main():
593594
if config is None:
594595
config = api.ConsumerConfig()
595596

596-
# Auto created consumers use the filter subject.
597-
# config.name = durable
598-
config.filter_subject = subject
597+
# Auto created consumers use the filter subject, unless filter_subjects is set.
598+
if not config.filter_subjects:
599+
config.filter_subject = subject
600+
599601
if durable:
600602
config.name = durable
601603
config.durable_name = durable

nats/tests/test_js.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,6 +1136,33 @@ async def test_fetch_heartbeats(self):
11361136

11371137
await nc.close()
11381138

1139+
@async_long_test
1140+
async def test_subscribe_filter_subjects(self):
1141+
nc = NATS()
1142+
await nc.connect()
1143+
1144+
js = nc.jetstream()
1145+
1146+
await js.add_stream(name="events", subjects=["events.>"])
1147+
1148+
sub = await js.pull_subscribe(
1149+
"events.>",
1150+
"filter",
1151+
config=nats.js.api.ConsumerConfig(
1152+
filter_subjects=["events.1", "events.2"],
1153+
),
1154+
)
1155+
for i in range(0, 15):
1156+
await js.publish("events.%d" % i, b"i:%d" % i)
1157+
msgs = await sub.fetch(20, timeout=5)
1158+
assert len(msgs) == 2
1159+
for msg in msgs:
1160+
await msg.ack_sync()
1161+
info = await js.consumer_info("events", "filter")
1162+
assert info.num_pending == 0
1163+
1164+
await nc.close()
1165+
11391166

11401167
class JSMTest(SingleJetStreamServerTestCase):
11411168
@async_test
@@ -2093,6 +2120,37 @@ async def cb_d(msg):
20932120
await js.delete_stream("pconfig")
20942121
await nc.close()
20952122

2123+
@async_long_test
2124+
async def test_subscribe_filter_subjects(self):
2125+
nc = NATS()
2126+
await nc.connect()
2127+
2128+
js = nc.jetstream()
2129+
2130+
await js.add_stream(name="events", subjects=["events.>"])
2131+
a = []
2132+
2133+
def cb(msg):
2134+
a.append(msg)
2135+
2136+
sub = await js.subscribe(
2137+
"events.>",
2138+
"filter",
2139+
cb=cb,
2140+
config=nats.js.api.ConsumerConfig(
2141+
filter_subjects=["events.1", "events.2"],
2142+
),
2143+
)
2144+
for i in range(0, 15):
2145+
await js.publish("events.%d" % i, b"i:%d" % i)
2146+
await asyncio.sleep(1)
2147+
assert len(a) == 2
2148+
2149+
info = await sub.consumer_info()
2150+
assert info.num_pending == 0
2151+
2152+
await nc.close()
2153+
20962154

20972155
class AckPolicyTest(SingleJetStreamServerTestCase):
20982156
@async_test

0 commit comments

Comments
 (0)