Skip to content

Conversation

@caspervonb
Copy link
Collaborator

@caspervonb caspervonb commented Nov 27, 2025

Replace the modular parse() function with chunk-based inline parsing in _read_loop. Reading 64KB chunks and parsing multiple messages per buffer avoids function call overhead on the hot path.

Handler signatures now accept bytes instead of str to defer decoding until necessary.

Subscriber throughput improves ~63% (295k to 480k msg/sec with pub/sub on CPython).

Replace the modular parse() function with chunk-based inline parsing in
_read_loop. Reading 64KB chunks and parsing multiple messages per buffer avoids
function call overhead on the hot path.

Handler signatures now accept bytes instead of str to defer decoding until
necessary.

Subscriber throughput improves ~63% (295k to 480k msg/sec).

Signed-off-by: Casper Beyer <[email protected]>
@caspervonb caspervonb marked this pull request as ready for review December 1, 2025 09:46
@caspervonb caspervonb requested a review from Copilot December 1, 2025 09:46
Copilot finished reviewing on behalf of caspervonb December 1, 2025 09:50
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces the modular parse() function with chunk-based inline parsing in the _read_loop method to improve subscriber throughput by ~63% (295k to 480k msg/sec). The changes introduce a 64KB buffer-based approach that parses multiple messages per buffer to reduce function call overhead.

Key changes:

  • Inline protocol parsing using 64KB chunks instead of calling parse() for each message
  • Handler signatures now accept bytes instead of str for subject, SID, and reply fields to defer decoding
  • Manual buffer management with rollback logic for incomplete messages

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +848 to 853
subject=subject.decode(),
data=payload,
reply=reply,
reply=reply.decode() if reply else None,
headers=Headers(headers) if headers else None, # type: ignore[arg-type]
status=status,
)
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling for decode() calls on subject and reply. Both can raise UnicodeDecodeError.

Add error handling:

try:
    message = Message(
        subject=subject.decode(),
        data=payload,
        reply=reply.decode() if reply else None,
        headers=Headers(headers) if headers else None,
        status=status,
    )
except UnicodeDecodeError as e:
    logger.error("Invalid UTF-8 in HMSG fields: %s", e)
    return

Copilot uses AI. Check for mistakes.
Comment on lines +689 to +691
crlf = data.find(CRLF, pos)
if crlf == -1:
incomplete = data[pos:]
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an unknown protocol is encountered, the code skips to the next line and continues parsing. However, if CRLF is not found in the remaining buffer, the incomplete variable is set to data[pos:], but the next iteration will attempt to parse this data as a complete message starting from position 0. This could lead to repeatedly trying to parse invalid protocol lines.

Consider logging the unknown protocol before skipping:

# Unknown protocol, skip to next line
logger.warning("Unknown protocol at position %d: %r", pos, data[pos:min(pos+20, data_len)])
crlf = data.find(CRLF, pos)
if crlf == -1:
    incomplete = data[pos:]
    break
pos = crlf + 2
Suggested change
crlf = data.find(CRLF, pos)
if crlf == -1:
incomplete = data[pos:]
logger.warning("Unknown protocol at position %d: %r", pos, data[pos:min(pos+20, data_len)])
crlf = data.find(CRLF, pos)
if crlf == -1:
# Discard invalid data to avoid infinite loop
logger.error("No CRLF found after unknown protocol at position %d, discarding %d bytes", pos, data_len - pos)
incomplete = b""

Copilot uses AI. Check for mistakes.
Comment on lines +460 to +462
pos = pos - len(subject) - 5
incomplete = data[pos:]
break
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The position rollback calculation appears incorrect. When the subject parsing fails, the code rolls back by len(subject) + 5, but at this point subject hasn't been assigned yet (the space1 wasn't found). This will cause an UnboundLocalError.

The rollback should simply be:

if space1 == -1:
    incomplete = data[pos - 4:]
    break

Copilot uses AI. Check for mistakes.

pending_messages, pending_bytes = subscription.pending

subject_str = subject.decode()
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling for decode() call. If the subject contains invalid UTF-8, this will raise UnicodeDecodeError.

Add error handling:

try:
    subject_str = subject.decode()
except UnicodeDecodeError:
    logger.error("Invalid UTF-8 in subject for slow consumer warning")
    subject_str = repr(subject)
Suggested change
subject_str = subject.decode()
try:
subject_str = subject.decode()
except UnicodeDecodeError:
logger.error("Invalid UTF-8 in subject for slow consumer warning")
subject_str = repr(subject)

Copilot uses AI. Check for mistakes.
Comment on lines +428 to +693
# Parse all messages in buffer
while pos < data_len:
remaining = data_len - pos

if remaining < 1:
incomplete = data[pos:]
break

first_char = data[pos]

# 'M' = 0x4d (MSG) - most common, check first
if first_char == 0x4D:
if remaining < 4:
incomplete = data[pos:]
break

if data[pos : pos + 4] == b"MSG ":
pos += 4

# Find first space (after subject)
space1 = data.find(b" ", pos, data_len)
if space1 == -1:
pos -= 4
incomplete = data[pos:]
break

subject = data[pos:space1]
pos = space1 + 1

# Find second space (after SID)
space2 = data.find(b" ", pos, data_len)
if space2 == -1:
pos = pos - len(subject) - 5
incomplete = data[pos:]
break

sid = data[pos:space2]
pos = space2 + 1

# Find CRLF (end of header)
crlf = data.find(CRLF, pos, data_len)
if crlf == -1:
pos = pos - len(subject) - len(sid) - 6
incomplete = data[pos:]
break

# Parse [reply] size
header_rest = data[pos:crlf]
space3_rel = header_rest.find(b" ")

if space3_rel != -1:
reply = header_rest[:space3_rel]
size_bytes = header_rest[space3_rel + 1 :]
else:
reply = None
size_bytes = header_rest

# Parse payload size
try:
payload_size = int(size_bytes)
except ValueError:
pos = crlf + 2
continue

# Check if we have complete payload
payload_start = crlf + 2
payload_end = payload_start + payload_size

if payload_end + 2 > data_len:
pos = pos - len(subject) - len(sid) - len(header_rest) - 10
incomplete = data[pos:]
break

# Verify trailing CRLF
if data[payload_end : payload_end + 2] != CRLF:
pos = payload_end + 2
continue

payload = data[payload_start:payload_end]

if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"<<- MSG %s %s %s %s", subject, sid, reply if reply else b"", len(payload)
)

await self._handle_msg(subject, sid, reply, payload)

pos = payload_end + 2
continue

# 'P' = 0x50 (PING or PONG)
elif first_char == 0x50:
if remaining < 6:
incomplete = data[pos:]
break

second_char = data[pos + 1]

if second_char == 0x49: # 'I' -> PING
if data[pos : pos + 6] == b"PING\r\n":
if logger.isEnabledFor(logging.DEBUG):
logger.debug("<<- PING")
await self._handle_ping()
pos += 6
continue
elif second_char == 0x4F: # 'O' -> PONG
if data[pos : pos + 6] == b"PONG\r\n":
if logger.isEnabledFor(logging.DEBUG):
logger.debug("<<- PONG")
await self._handle_pong()
pos += 6
continue

# 'H' = 0x48 (HMSG - headers message)
elif first_char == 0x48:
if remaining < 5:
incomplete = data[pos:]
break

if data[pos : pos + 5] == b"HMSG ":
pos += 5

# Find first space (after subject)
space1 = data.find(b" ", pos, data_len)
if space1 == -1:
pos -= 5
incomplete = data[pos:]
break

subject = data[pos:space1]
pos = space1 + 1

# Find second space (after SID)
space2 = data.find(b" ", pos, data_len)
if space2 == -1:
pos = pos - len(subject) - 6
incomplete = data[pos:]
break

sid = data[pos:space2]
pos = space2 + 1

# Find CRLF (end of header line)
crlf = data.find(CRLF, pos, data_len)
if crlf == -1:
pos = pos - len(subject) - len(sid) - 7
incomplete = data[pos:]
break

# Parse [reply] hdr_size total_size
header_rest = data[pos:crlf]
parts = header_rest.split(b" ")

if len(parts) == 2:
reply = None
hdr_size = int(parts[0])
total_size = int(parts[1])
elif len(parts) == 3:
reply = parts[0]
hdr_size = int(parts[1])
total_size = int(parts[2])
else:
pos = crlf + 2
continue

# Check if we have complete message
msg_start = crlf + 2
msg_end = msg_start + total_size

if msg_end + 2 > data_len:
pos = pos - len(subject) - len(sid) - len(header_rest) - 11
incomplete = data[pos:]
break

# Verify trailing CRLF
if data[msg_end : msg_end + 2] != CRLF:
pos = msg_end + 2
continue

header_data = data[msg_start : msg_start + hdr_size]
payload = data[msg_start + hdr_size : msg_end]

# Parse headers
headers, status_code, status_description = parse_headers(header_data)

if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"<<- HMSG %s %s %s %s %s",
subject,
sid,
reply if reply else b"",
len(headers),
len(payload),
)

await self._handle_hmsg(
subject, sid, reply, headers, payload, status_code, status_description
)

pos = msg_end + 2
continue

# 'I' = 0x49 (INFO)
elif first_char == 0x49:
if remaining < 5:
incomplete = data[pos:]
break

if data[pos : pos + 5] == b"INFO ":
crlf = data.find(CRLF, pos)
if crlf == -1:
incomplete = data[pos:]
break

info_json_bytes = data[pos + 5 : crlf]

if logger.isEnabledFor(logging.DEBUG):
logger.debug("<<- INFO %s...", info_json_bytes[:80])

try:
info = json.loads(info_json_bytes)
await self._handle_info(info)
except json.JSONDecodeError:
logger.error("Failed to parse INFO JSON: %s", info_json_bytes)

pos = crlf + 2
continue

# '+' = 0x2b (+OK)
elif first_char == 0x2B:
if remaining < 5:
incomplete = data[pos:]
break

if data[pos : pos + 5] == b"+OK\r\n":
if logger.isEnabledFor(logging.DEBUG):
logger.debug("<<- +OK")
pos += 5
continue

# '-' = 0x2d (-ERR)
elif first_char == 0x2D:
if remaining < 5:
incomplete = data[pos:]
break

if data[pos : pos + 5] == b"-ERR ":
crlf = data.find(CRLF, pos)
if crlf == -1:
incomplete = data[pos:]
break

error_msg = data[pos + 5 : crlf].decode("utf-8", errors="replace")

logger.error("<<- -ERR '%s'", error_msg)
await self._handle_error(error_msg)

pos = crlf + 2
continue

# Unknown protocol, skip to next line
crlf = data.find(CRLF, pos)
if crlf == -1:
incomplete = data[pos:]
break
pos = crlf + 2
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new inline parsing logic lacks dedicated unit tests. The existing tests in test_protocol.py test the old modular parse() function which is no longer used in the read loop. The inline parser should have comprehensive tests covering:

  • Incomplete messages spanning multiple chunks
  • All message types (MSG, HMSG, PING, PONG, INFO, +OK, -ERR)
  • Invalid payload sizes and error conditions
  • Malformed messages and protocol violations
  • Position rollback scenarios
  • Buffer boundary conditions

Consider adding integration tests that exercise the inline parser through the _read_loop or extracting the parsing logic into testable functions.

Copilot uses AI. Check for mistakes.

if sid in self._subscriptions:
subscription = self._subscriptions[sid]
sid_str = sid.decode()
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential crash due to missing error handling. The decode() call can raise UnicodeDecodeError if the SID contains invalid UTF-8 bytes. This would crash the read loop.

Add error handling:

try:
    sid_str = sid.decode()
except UnicodeDecodeError:
    logger.error("Invalid UTF-8 in SID: %r", sid)
    return
Suggested change
sid_str = sid.decode()
try:
sid_str = sid.decode()
except UnicodeDecodeError:
logger.error("Invalid UTF-8 in SID: %r", sid)
return

Copilot uses AI. Check for mistakes.
Comment on lines +497 to +499
pos = pos - len(subject) - len(sid) - len(header_rest) - 10
incomplete = data[pos:]
break
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar rollback calculation issue as lines 460-462 and 470-472. The formula pos - len(subject) - len(sid) - len(header_rest) - 10 is fragile and may not accurately restore to the message start. A miscalculation could cause parsing errors or skip messages.

Use a saved starting position instead:

msg_start_pos = pos - 4  # Save position before "MSG "
# ... parsing logic ...
if payload_end + 2 > data_len:
    incomplete = data[msg_start_pos:]
    break

Copilot uses AI. Check for mistakes.
Comment on lines +438 to +439
# 'M' = 0x4d (MSG) - most common, check first
if first_char == 0x4D:
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number 0x4D (ASCII 'M') is used without explanation. While there's a comment, consider defining these as constants at the top of the function for better maintainability:

MSG_CHAR = 0x4D  # 'M'
PING_PONG_CHAR = 0x50  # 'P'
HMSG_CHAR = 0x48  # 'H'
INFO_CHAR = 0x49  # 'I'
OK_CHAR = 0x2B  # '+'
ERR_CHAR = 0x2D  # '-'
PING_I_CHAR = 0x49  # 'I'
PONG_O_CHAR = 0x4F  # 'O'

Copilot uses AI. Check for mistakes.

pending_messages, pending_bytes = subscription.pending

subject_str = subject.decode()
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling for decode() call. If the subject contains invalid UTF-8, this will raise UnicodeDecodeError.

Add error handling:

try:
    subject_str = subject.decode()
except UnicodeDecodeError:
    logger.error("Invalid UTF-8 in subject for slow consumer warning")
    subject_str = repr(subject)
Suggested change
subject_str = subject.decode()
try:
subject_str = subject.decode()
except UnicodeDecodeError:
logger.error("Invalid UTF-8 in subject for slow consumer warning")
subject_str = repr(subject)

Copilot uses AI. Check for mistakes.

if sid in self._subscriptions:
subscription = self._subscriptions[sid]
sid_str = sid.decode()
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to line 784, missing error handling for decode(). If the SID contains invalid UTF-8, this will crash.

Add error handling:

try:
    sid_str = sid.decode()
except UnicodeDecodeError:
    logger.error("Invalid UTF-8 in SID: %r", sid)
    return
Suggested change
sid_str = sid.decode()
try:
sid_str = sid.decode()
except UnicodeDecodeError:
logger.error("Invalid UTF-8 in SID: %r", sid)
return

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants