From b8ad2b58dd0db6b0e66197e6cbfa6760a4585030 Mon Sep 17 00:00:00 2001 From: Zhihong Zhang Date: Tue, 28 Jan 2025 14:50:42 -0500 Subject: [PATCH] Fixed the subprocess read deadlock --- .../launchers/subprocess_launcher.py | 58 ++++++++++++++++++- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/nvflare/app_common/launchers/subprocess_launcher.py b/nvflare/app_common/launchers/subprocess_launcher.py index faf6151f38..8ba13f984f 100644 --- a/nvflare/app_common/launchers/subprocess_launcher.py +++ b/nvflare/app_common/launchers/subprocess_launcher.py @@ -18,18 +18,70 @@ from threading import Thread from typing import Optional +from nvflare.utils.job_launcher_utils import add_custom_dir_to_path + from nvflare.apis.fl_constant import FLContextKey from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable from nvflare.apis.signal import Signal from nvflare.app_common.abstract.launcher import Launcher, LauncherRunStatus from nvflare.fuel.utils.log_utils import get_obj_logger -from nvflare.utils.job_launcher_utils import add_custom_dir_to_path + + +def get_line(buffer: bytearray): + """Read a line from the binary buffer. It treats all combinations of \n and \r as line breaks. + + Args: + buffer: A binary buffer + + Returns: + (line, remaining): Return the first line as str and the remaining buffer. + line is None if no newline found + + """ + size = len(buffer) + r = buffer.find(b"\r") + if r < 0: + r = size + 1 + n = buffer.find(b"\n") + if n < 0: + n = size + 1 + index = min(r, n) + + if index >= size: + return None, buffer + + # if \r and \n are adjacent, treat them as one + if abs(r - n) == 1: + index = index + 1 + + line = buffer[:index].decode().rstrip() + if index >= size - 1: + remaining = bytearray() + else: + remaining = buffer[index + 1 :] + return line, remaining def log_subprocess_output(process, logger): - for c in iter(process.stdout.readline, b""): - logger.info(c.decode().rstrip()) + + buffer = bytearray() + while True: + chunk = process.stdout.read1(4096) + if not chunk: + break + buffer = buffer + chunk + + while True: + line, buffer = get_line(buffer) + if line is None: + break + + if line: + logger.info(line) + + if buffer: + logger.info(buffer.decode()) class SubprocessLauncher(Launcher):