Skip to content

Commit a3fc895

Browse files
committed
Rename reply_to to reply for compatability with nats.aio
Signed-off-by: Casper Beyer <[email protected]>
1 parent 71eb800 commit a3fc895

File tree

6 files changed

+47
-49
lines changed

6 files changed

+47
-49
lines changed

nats-client/src/nats/client/__init__.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -297,15 +297,13 @@ async def _read_loop(self) -> None:
297297
break
298298

299299
match msg:
300-
case ("MSG", subject, sid, reply_to, payload):
301-
logger.debug(
302-
"<<- MSG %s %s %s %s", subject, sid, reply_to if reply_to else "", len(payload)
303-
)
304-
await self._handle_msg(subject, sid, reply_to, payload)
305-
case ("HMSG", subject, sid, reply_to, headers, payload, status_code, status_description):
306-
logger.debug("<<- HMSG %s %s %s %s %s", subject, sid, reply_to, len(headers), len(payload))
300+
case ("MSG", subject, sid, reply, payload):
301+
logger.debug("<<- MSG %s %s %s %s", subject, sid, reply if reply else "", len(payload))
302+
await self._handle_msg(subject, sid, reply, payload)
303+
case ("HMSG", subject, sid, reply, headers, payload, status_code, status_description):
304+
logger.debug("<<- HMSG %s %s %s %s %s", subject, sid, reply, len(headers), len(payload))
307305
await self._handle_hmsg(
308-
subject, sid, reply_to, headers, payload, status_code, status_description
306+
subject, sid, reply, headers, payload, status_code, status_description
309307
)
310308
case ("PING",):
311309
logger.debug("<<- PING")
@@ -401,11 +399,11 @@ async def _write_loop(self) -> None:
401399
logger.exception("Error during final flush")
402400
return
403401

404-
async def _handle_msg(self, subject: str, sid: str, reply_to: str | None, payload: bytes) -> None:
402+
async def _handle_msg(self, subject: str, sid: str, reply: str | None, payload: bytes) -> None:
405403
"""Handle MSG from server."""
406404
if sid in self._subscriptions:
407405
subscription = self._subscriptions[sid]
408-
msg = Message(subject=subject, data=payload, reply_to=reply_to)
406+
msg = Message(subject=subject, data=payload, reply=reply)
409407

410408
for callback in subscription._callbacks:
411409
try:
@@ -422,7 +420,7 @@ async def _handle_hmsg(
422420
self,
423421
subject: str,
424422
sid: str,
425-
reply_to: str,
423+
reply: str,
426424
headers: dict[str, list[str]],
427425
payload: bytes,
428426
status_code: str | None = None,
@@ -438,7 +436,7 @@ async def _handle_hmsg(
438436
msg = Message(
439437
subject=subject,
440438
data=payload,
441-
reply_to=reply_to,
439+
reply=reply,
442440
headers=Headers(headers) if headers else None, # type: ignore[arg-type]
443441
status=status,
444442
)
@@ -699,7 +697,7 @@ async def publish(
699697
subject: str,
700698
payload: bytes,
701699
*,
702-
reply_to: str | None = None,
700+
reply: str | None = None,
703701
headers: Headers | dict[str, str | list[str]] | None = None,
704702
) -> None:
705703
"""Publish a message to a subject."""
@@ -712,14 +710,14 @@ async def publish(
712710
command_parts = encode_hpub(
713711
subject,
714712
payload,
715-
reply_to=reply_to,
713+
reply=reply,
716714
headers=headers_dict, # type: ignore[arg-type]
717715
)
718716
else:
719717
command_parts = encode_pub(
720718
subject,
721719
payload,
722-
reply_to=reply_to,
720+
reply=reply,
723721
)
724722

725723
message_data = b"".join(command_parts)
@@ -853,7 +851,7 @@ async def request(
853851

854852
sub = await self.subscribe(inbox)
855853
try:
856-
await self.publish(subject, payload, reply_to=inbox, headers=headers)
854+
await self.publish(subject, payload, reply=inbox, headers=headers)
857855

858856
try:
859857
response = await asyncio.wait_for(sub.next(), timeout)

nats-client/src/nats/client/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,13 @@ class Message:
135135
Attributes:
136136
subject: The subject the message was published to
137137
data: The message payload as bytes
138-
reply_to: Optional reply subject for request-reply messaging
138+
reply: Optional reply subject for request-reply messaging
139139
headers: Optional message headers
140140
status: Optional NATS status information
141141
"""
142142

143143
subject: str
144144
data: bytes
145-
reply_to: str | None = None
145+
reply: str | None = None
146146
headers: Headers | None = None
147147
status: Status | None = None

nats-client/src/nats/client/protocol/command.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,20 @@ def encode_pub(
3030
subject: str,
3131
payload: bytes,
3232
*,
33-
reply_to: str | None = None,
33+
reply: str | None = None,
3434
) -> list[bytes]:
3535
"""Encode PUB command.
3636
3737
Args:
3838
subject: Subject to publish to
3939
payload: Message payload
40-
reply_to: Optional reply subject
40+
reply: Optional reply subject
4141
4242
Returns:
4343
List of byte strings to write in sequence
4444
"""
4545
# PUB format: PUB <subject> [reply-to] <#bytes>
46-
command = f"PUB {subject} {reply_to} {len(payload)}\r\n" if reply_to else f"PUB {subject} {len(payload)}\r\n"
46+
command = f"PUB {subject} {reply} {len(payload)}\r\n" if reply else f"PUB {subject} {len(payload)}\r\n"
4747

4848
return [command.encode(), payload, b"\r\n"]
4949

@@ -52,15 +52,15 @@ def encode_hpub(
5252
subject: str,
5353
payload: bytes,
5454
*,
55-
reply_to: str | None = None,
55+
reply: str | None = None,
5656
headers: dict[str, str | list[str]],
5757
) -> list[bytes]:
5858
"""Encode HPUB command.
5959
6060
Args:
6161
subject: Subject to publish to
6262
payload: Message payload
63-
reply_to: Optional reply subject
63+
reply: Optional reply subject
6464
headers: Headers to include with the message
6565
6666
Returns:
@@ -75,8 +75,8 @@ def encode_hpub(
7575
header_data = ("\r\n".join(header_lines) + "\r\n\r\n").encode()
7676

7777
# HPUB format: HPUB <subject> [reply-to] <#header bytes> <#total bytes>
78-
if reply_to:
79-
command = f"HPUB {subject} {reply_to} {len(header_data)} {len(header_data) + len(payload)}\r\n"
78+
if reply:
79+
command = f"HPUB {subject} {reply} {len(header_data)} {len(header_data) + len(payload)}\r\n"
8080
else:
8181
command = f"HPUB {subject} {len(header_data)} {len(header_data) + len(payload)}\r\n"
8282

nats-client/src/nats/client/protocol/message.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class Msg(NamedTuple):
6666
op: Literal["MSG"]
6767
subject: str
6868
sid: str
69-
reply_to: str | None
69+
reply: str | None
7070
payload: bytes
7171

7272

@@ -76,7 +76,7 @@ class HMsg(NamedTuple):
7676
op: Literal["HMSG"]
7777
subject: str
7878
sid: str
79-
reply_to: str | None
79+
reply: str | None
8080
headers: dict[str, list[str]]
8181
payload: bytes
8282
status_code: str | None
@@ -200,11 +200,11 @@ async def parse_msg(reader: Reader, args: list[bytes]) -> Msg:
200200

201201
if len(args) == MIN_MSG_ARGS:
202202
# No reply subject
203-
reply_to_bytes = None
203+
reply_bytes = None
204204
size = int(args[2])
205205
else:
206206
# With reply subject
207-
reply_to_bytes = args[2]
207+
reply_bytes = args[2]
208208
size = int(args[3])
209209

210210
# Check payload size limit
@@ -219,9 +219,9 @@ async def parse_msg(reader: Reader, args: list[bytes]) -> Msg:
219219
# Only convert to strings at the last moment
220220
subject = subject_bytes.decode()
221221
sid = sid_bytes.decode()
222-
reply_to = reply_to_bytes.decode() if reply_to_bytes is not None else None
222+
reply = reply_bytes.decode() if reply_bytes is not None else None
223223

224-
return Msg("MSG", subject, sid, reply_to, payload)
224+
return Msg("MSG", subject, sid, reply, payload)
225225

226226

227227
async def parse_hmsg(reader: Reader, args: list[bytes]) -> HMsg:
@@ -247,12 +247,12 @@ async def parse_hmsg(reader: Reader, args: list[bytes]) -> HMsg:
247247

248248
if len(args) == MIN_HMSG_ARGS:
249249
# No reply subject
250-
reply_to_bytes = None
250+
reply_bytes = None
251251
header_size = int(args[2])
252252
total_size = int(args[3])
253253
else:
254254
# With reply subject
255-
reply_to_bytes = args[2]
255+
reply_bytes = args[2]
256256
header_size = int(args[3])
257257
total_size = int(args[4])
258258

@@ -281,9 +281,9 @@ async def parse_hmsg(reader: Reader, args: list[bytes]) -> HMsg:
281281
# Convert remaining bytes to strings only at the final step
282282
subject = subject_bytes.decode()
283283
sid = sid_bytes.decode()
284-
reply_to = reply_to_bytes.decode() if reply_to_bytes is not None else None
284+
reply = reply_bytes.decode() if reply_bytes is not None else None
285285

286-
return HMsg("HMSG", subject, sid, reply_to, headers, payload, status_code, status_description)
286+
return HMsg("HMSG", subject, sid, reply, headers, payload, status_code, status_description)
287287

288288

289289
async def parse_info(args: list[bytes]) -> Info:

nats-client/tests/test_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ async def handle_request():
8686
subscription = await client.subscribe(test_subject)
8787
await client.flush()
8888
message = await subscription.next(timeout=1.0)
89-
await client.publish(message.reply_to, reply_payload)
89+
await client.publish(message.reply, reply_payload)
9090

9191
responder_task = asyncio.create_task(handle_request())
9292
await client.flush()
@@ -587,18 +587,18 @@ async def test_custom_inbox_prefix(server):
587587
reply_payload = b"Reply data"
588588

589589
# Track the inbox subject used in the request
590-
received_reply_to = None
590+
received_reply = None
591591

592592
# Setup responder that captures the reply-to subject
593593
subscription = await client.subscribe(test_subject)
594594
await client.flush()
595595

596596
async def handle_request():
597-
nonlocal received_reply_to
597+
nonlocal received_reply
598598
message = await subscription.next(timeout=2.0)
599-
received_reply_to = message.reply_to
600-
assert received_reply_to is not None
601-
await client.publish(received_reply_to, reply_payload)
599+
received_reply = message.reply
600+
assert received_reply is not None
601+
await client.publish(received_reply, reply_payload)
602602

603603
responder_task = asyncio.create_task(handle_request())
604604

@@ -610,9 +610,9 @@ async def handle_request():
610610
await responder_task
611611

612612
# Verify that the inbox used the custom prefix
613-
assert received_reply_to is not None
614-
assert received_reply_to.startswith(custom_prefix), (
615-
f"Expected inbox to start with '{custom_prefix}', got '{received_reply_to}'"
613+
assert received_reply is not None
614+
assert received_reply.startswith(custom_prefix), (
615+
f"Expected inbox to start with '{custom_prefix}', got '{received_reply}'"
616616
)
617617

618618
finally:

nats-client/tests/test_protocol.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def test_parse_msg():
3333
msg = await parse_msg(reader, [b"foo.bar", b"1", b"5"])
3434
assert msg.subject == "foo.bar"
3535
assert msg.sid == "1"
36-
assert msg.reply_to is None
36+
assert msg.reply is None
3737
assert msg.payload == b"hello"
3838

3939
# Test valid MSG with reply
@@ -44,7 +44,7 @@ async def test_parse_msg():
4444
msg = await parse_msg(reader, [b"foo.bar", b"1", b"reply.to", b"5"])
4545
assert msg.subject == "foo.bar"
4646
assert msg.sid == "1"
47-
assert msg.reply_to == "reply.to"
47+
assert msg.reply == "reply.to"
4848
assert msg.payload == b"hello"
4949

5050
# Test invalid size
@@ -78,7 +78,7 @@ async def test_parse_hmsg():
7878
msg = await parse_hmsg(reader, [b"foo.bar", b"1", b"reply.to", str(header_size).encode(), str(total_size).encode()])
7979
assert msg.subject == "foo.bar"
8080
assert msg.sid == "1"
81-
assert msg.reply_to == "reply.to"
81+
assert msg.reply == "reply.to"
8282
assert msg.payload == b"hello"
8383
assert msg.headers == {}
8484

@@ -171,7 +171,7 @@ def test_encode_pub():
171171
assert command == [b"PUB foo.bar 5\r\n", b"hello", b"\r\n"]
172172

173173
# Test with reply
174-
command = encode_pub("foo.bar", b"hello", reply_to="reply.to")
174+
command = encode_pub("foo.bar", b"hello", reply="reply.to")
175175
assert command == [b"PUB foo.bar reply.to 5\r\n", b"hello", b"\r\n"]
176176

177177

@@ -189,7 +189,7 @@ def test_encode_hpub():
189189
assert command[3] == b"\r\n"
190190

191191
# Test with reply
192-
command = encode_hpub("foo.bar", payload, reply_to="reply.to", headers=headers)
192+
command = encode_hpub("foo.bar", payload, reply="reply.to", headers=headers)
193193
assert len(command) == 4
194194
assert command[0].startswith(b"HPUB foo.bar reply.to")
195195
assert command[1].startswith(b"NATS/1.0\r\n")

0 commit comments

Comments
 (0)