From e375c1e0229f2d78f13a56619d2f68f77743c23c Mon Sep 17 00:00:00 2001 From: Zhihong Zhang <100308595+nvidianz@users.noreply.github.com> Date: Mon, 27 Jan 2025 16:37:01 -0500 Subject: [PATCH] [2.5] Fix subprocess read deadlock (#3182) ### Description This PR reads the subprocess stdout as binary data. It addresses the subprocess launcher deadlock issue when the output is not line-oriented or binary. ### Types of changes - [x] Non-breaking change (fix or new feature that would not break existing functionality). - [ ] Breaking change (fix or new feature that would cause existing functionality to change). - [ ] New tests added to cover the changes. - [ ] Quick tests passed locally by running `./runtest.sh`. - [ ] In-line docstrings updated. - [ ] Documentation updated. --- .../launchers/subprocess_launcher.py | 55 ++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/nvflare/app_common/launchers/subprocess_launcher.py b/nvflare/app_common/launchers/subprocess_launcher.py index 8b66e674c7..0444eaebd6 100644 --- a/nvflare/app_common/launchers/subprocess_launcher.py +++ b/nvflare/app_common/launchers/subprocess_launcher.py @@ -27,9 +27,60 @@ from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path +def get_line(buffer: bytearray): + """Read a line from the binary buffer. It treats all combinations of \n and \r as line breaks. + + Args: + buffer: A binary buffer + + Returns: + (line, remaining): Return the first line as str and the remaining buffer. + line is None if no newline found + + """ + size = len(buffer) + r = buffer.find(b"\r") + if r < 0: + r = size + 1 + n = buffer.find(b"\n") + if n < 0: + n = size + 1 + index = min(r, n) + + if index >= size: + return None, buffer + + # if \r and \n are adjacent, treat them as one + if abs(r - n) == 1: + index = index + 1 + + line = buffer[:index].decode().rstrip() + if index >= size - 1: + remaining = bytearray() + else: + remaining = buffer[index + 1 :] + return line, remaining + + def log_subprocess_output(process, logger): - for c in iter(process.stdout.readline, b""): - logger.info(c.decode().rstrip()) + + buffer = bytearray() + while True: + chunk = process.stdout.read1(4096) + if not chunk: + break + buffer = buffer + chunk + + while True: + line, buffer = get_line(buffer) + if line is None: + break + + if line: + logger.info(line) + + if buffer: + logger.info(buffer.decode()) class SubprocessLauncher(Launcher):