From 94544fe3101059d8b8deae379384ebc512b66850 Mon Sep 17 00:00:00 2001 From: YuanTingHsieh Date: Wed, 29 Jan 2025 21:45:37 -0800 Subject: [PATCH] Fix SubprocessLauncher racing --- .../app_common/executors/launcher_executor.py | 9 ++- .../launchers/subprocess_launcher.py | 75 ++++++++++--------- 2 files changed, 46 insertions(+), 38 deletions(-) diff --git a/nvflare/app_common/executors/launcher_executor.py b/nvflare/app_common/executors/launcher_executor.py index b7540021f3..441153e5d5 100644 --- a/nvflare/app_common/executors/launcher_executor.py +++ b/nvflare/app_common/executors/launcher_executor.py @@ -129,6 +129,7 @@ def initialize(self, fl_ctx: FLContext) -> None: def finalize(self, fl_ctx: FLContext) -> None: self._execute_launcher_method_in_thread_executor(method_name="finalize", fl_ctx=fl_ctx) + self._thread_pool_executor.shutdown() def handle_event(self, event_type: str, fl_ctx: FLContext) -> None: if event_type == EventType.START_RUN: @@ -295,7 +296,7 @@ def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal: run_status = self.launcher.check_run_status(task_name, fl_ctx) if run_status != LauncherRunStatus.RUNNING: - self.log_info( + self.log_error( fl_ctx, f"External process has not called flare.init and run status becomes {run_status}." ) return False @@ -316,7 +317,7 @@ def _finalize_external_execution( fl_ctx=fl_ctx, ) if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS: - self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}") + self.log_debug(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}") self.log_info(fl_ctx, f"Calling stop task ({task_name}).") stop_task_success = self._execute_launcher_method_in_thread_executor( @@ -407,11 +408,11 @@ def _monitor_launcher(self, fl_ctx: FLContext): self._launcher_finish = True self.log_info( fl_ctx, - f"launcher completed {task_name} with status {run_status} at time {self._launcher_finish_time}", + f"launcher completed with status {run_status} at time {self._launcher_finish_time}", ) if run_status == LauncherRunStatus.COMPLETE_FAILED: - msg = f"Launcher failed with at time {self._launcher_finish_time} " + msg = f"Launcher failed at time {self._launcher_finish_time} " self._abort_signal.trigger(msg) break diff --git a/nvflare/app_common/launchers/subprocess_launcher.py b/nvflare/app_common/launchers/subprocess_launcher.py index 547cea6a87..0944dbff39 100644 --- a/nvflare/app_common/launchers/subprocess_launcher.py +++ b/nvflare/app_common/launchers/subprocess_launcher.py @@ -15,7 +15,7 @@ import os import shlex import subprocess -from threading import Thread +from threading import Lock, Thread from typing import Optional from nvflare.apis.fl_constant import FLContextKey @@ -108,6 +108,7 @@ def __init__( self._launch_once = launch_once self._clean_up_script = clean_up_script self._shutdown_timeout = shutdown_timeout + self._lock = Lock() self.logger = get_obj_logger(self) def initialize(self, fl_ctx: FLContext): @@ -129,40 +130,46 @@ def stop_task(self, task_name: str, fl_ctx: FLContext, abort_signal: Signal) -> self._stop_external_process() def _start_external_process(self, fl_ctx: FLContext): - if self._process is None: - command = self._script - env = os.environ.copy() - env["CLIENT_API_TYPE"] = "EX_PROCESS_API" - - workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) - job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID) - app_custom_folder = workspace.get_app_custom_dir(job_id) - add_custom_dir_to_path(app_custom_folder, env) - - command_seq = shlex.split(command) - self._process = subprocess.Popen( - command_seq, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._app_dir, env=env - ) - self._log_thread = Thread(target=log_subprocess_output, args=(self._process, self.logger)) - self._log_thread.start() + with self._lock: + if self._process is None: + command = self._script + env = os.environ.copy() + env["CLIENT_API_TYPE"] = "EX_PROCESS_API" + + workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) + job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID) + app_custom_folder = workspace.get_app_custom_dir(job_id) + add_custom_dir_to_path(app_custom_folder, env) + + command_seq = shlex.split(command) + self._process = subprocess.Popen( + command_seq, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._app_dir, env=env + ) + self._log_thread = Thread(target=log_subprocess_output, args=(self._process, self.logger)) + self._log_thread.start() def _stop_external_process(self): - if self._process: - self._process.wait(self._shutdown_timeout) - self._process.terminate() - self._log_thread.join() - if self._clean_up_script: - command_seq = shlex.split(self._clean_up_script) - process = subprocess.Popen(command_seq, cwd=self._app_dir) - process.wait() - self._process = None + with self._lock: + if self._process: + try: + self._process.wait(self._shutdown_timeout) + except subprocess.TimeoutExpired: + pass + self._process.terminate() + self._log_thread.join() + if self._clean_up_script: + command_seq = shlex.split(self._clean_up_script) + process = subprocess.Popen(command_seq, cwd=self._app_dir) + process.wait() + self._process = None def check_run_status(self, task_name: str, fl_ctx: FLContext) -> str: - if self._process is None: - return LauncherRunStatus.NOT_RUNNING - return_code = self._process.poll() - if return_code is None: - return LauncherRunStatus.RUNNING - if return_code == 0: - return LauncherRunStatus.COMPLETE_SUCCESS - return LauncherRunStatus.COMPLETE_FAILED + with self._lock: + if self._process is None: + return LauncherRunStatus.NOT_RUNNING + return_code = self._process.poll() + if return_code is None: + return LauncherRunStatus.RUNNING + if return_code == 0: + return LauncherRunStatus.COMPLETE_SUCCESS + return LauncherRunStatus.COMPLETE_FAILED