Skip to content

Commit

Permalink
[2.5] Fix subprocess read deadlock (#3182)
Browse files Browse the repository at this point in the history
### Description

This PR reads the subprocess stdout as binary data. It addresses the
subprocess launcher deadlock issue when the output is not line-oriented
or binary.

### Types of changes
<!--- Put an `x` in all the boxes that apply, and remove the not
applicable items -->
- [x] Non-breaking change (fix or new feature that would not break
existing functionality).
- [ ] Breaking change (fix or new feature that would cause existing
functionality to change).
- [ ] New tests added to cover the changes.
- [ ] Quick tests passed locally by running `./runtest.sh`.
- [ ] In-line docstrings updated.
- [ ] Documentation updated.
  • Loading branch information
nvidianz authored Jan 27, 2025
1 parent 54cbec1 commit e375c1e
Showing 1 changed file with 53 additions and 2 deletions.
55 changes: 53 additions & 2 deletions nvflare/app_common/launchers/subprocess_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,60 @@
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path


def get_line(buffer: bytearray):
"""Read a line from the binary buffer. It treats all combinations of \n and \r as line breaks.
Args:
buffer: A binary buffer
Returns:
(line, remaining): Return the first line as str and the remaining buffer.
line is None if no newline found
"""
size = len(buffer)
r = buffer.find(b"\r")
if r < 0:
r = size + 1
n = buffer.find(b"\n")
if n < 0:
n = size + 1
index = min(r, n)

if index >= size:
return None, buffer

# if \r and \n are adjacent, treat them as one
if abs(r - n) == 1:
index = index + 1

line = buffer[:index].decode().rstrip()
if index >= size - 1:
remaining = bytearray()
else:
remaining = buffer[index + 1 :]
return line, remaining


def log_subprocess_output(process, logger):
for c in iter(process.stdout.readline, b""):
logger.info(c.decode().rstrip())

buffer = bytearray()
while True:
chunk = process.stdout.read1(4096)
if not chunk:
break
buffer = buffer + chunk

while True:
line, buffer = get_line(buffer)
if line is None:
break

if line:
logger.info(line)

if buffer:
logger.info(buffer.decode())


class SubprocessLauncher(Launcher):
Expand Down

0 comments on commit e375c1e

Please sign in to comment.