Skip to content

Commit

Permalink
Fix SubprocessLauncher racing
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanTingHsieh committed Jan 30, 2025
1 parent 3822436 commit 94544fe
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 38 deletions.
9 changes: 5 additions & 4 deletions nvflare/app_common/executors/launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def initialize(self, fl_ctx: FLContext) -> None:

def finalize(self, fl_ctx: FLContext) -> None:
self._execute_launcher_method_in_thread_executor(method_name="finalize", fl_ctx=fl_ctx)
self._thread_pool_executor.shutdown()

def handle_event(self, event_type: str, fl_ctx: FLContext) -> None:
if event_type == EventType.START_RUN:
Expand Down Expand Up @@ -295,7 +296,7 @@ def _wait_external_setup(self, task_name: str, fl_ctx: FLContext, abort_signal:

run_status = self.launcher.check_run_status(task_name, fl_ctx)
if run_status != LauncherRunStatus.RUNNING:
self.log_info(
self.log_error(
fl_ctx, f"External process has not called flare.init and run status becomes {run_status}."
)
return False
Expand All @@ -316,7 +317,7 @@ def _finalize_external_execution(
fl_ctx=fl_ctx,
)
if not self._received_result.is_set() and check_run_status != LauncherRunStatus.COMPLETE_SUCCESS:
self.log_warning(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}")
self.log_debug(fl_ctx, f"Try to stop task ({task_name}) when launcher run status is {check_run_status}")

self.log_info(fl_ctx, f"Calling stop task ({task_name}).")
stop_task_success = self._execute_launcher_method_in_thread_executor(
Expand Down Expand Up @@ -407,11 +408,11 @@ def _monitor_launcher(self, fl_ctx: FLContext):
self._launcher_finish = True
self.log_info(
fl_ctx,
f"launcher completed {task_name} with status {run_status} at time {self._launcher_finish_time}",
f"launcher completed with status {run_status} at time {self._launcher_finish_time}",
)

if run_status == LauncherRunStatus.COMPLETE_FAILED:
msg = f"Launcher failed with at time {self._launcher_finish_time} "
msg = f"Launcher failed at time {self._launcher_finish_time} "
self._abort_signal.trigger(msg)
break

Expand Down
75 changes: 41 additions & 34 deletions nvflare/app_common/launchers/subprocess_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import os
import shlex
import subprocess
from threading import Thread
from threading import Lock, Thread
from typing import Optional

from nvflare.apis.fl_constant import FLContextKey
Expand Down Expand Up @@ -108,6 +108,7 @@ def __init__(
self._launch_once = launch_once
self._clean_up_script = clean_up_script
self._shutdown_timeout = shutdown_timeout
self._lock = Lock()
self.logger = get_obj_logger(self)

def initialize(self, fl_ctx: FLContext):
Expand All @@ -129,40 +130,46 @@ def stop_task(self, task_name: str, fl_ctx: FLContext, abort_signal: Signal) ->
self._stop_external_process()

def _start_external_process(self, fl_ctx: FLContext):
if self._process is None:
command = self._script
env = os.environ.copy()
env["CLIENT_API_TYPE"] = "EX_PROCESS_API"

workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID)
app_custom_folder = workspace.get_app_custom_dir(job_id)
add_custom_dir_to_path(app_custom_folder, env)

command_seq = shlex.split(command)
self._process = subprocess.Popen(
command_seq, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._app_dir, env=env
)
self._log_thread = Thread(target=log_subprocess_output, args=(self._process, self.logger))
self._log_thread.start()
with self._lock:
if self._process is None:
command = self._script
env = os.environ.copy()
env["CLIENT_API_TYPE"] = "EX_PROCESS_API"

workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
job_id = fl_ctx.get_prop(FLContextKey.CURRENT_JOB_ID)
app_custom_folder = workspace.get_app_custom_dir(job_id)
add_custom_dir_to_path(app_custom_folder, env)

command_seq = shlex.split(command)
self._process = subprocess.Popen(
command_seq, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self._app_dir, env=env
)
self._log_thread = Thread(target=log_subprocess_output, args=(self._process, self.logger))
self._log_thread.start()

def _stop_external_process(self):
if self._process:
self._process.wait(self._shutdown_timeout)
self._process.terminate()
self._log_thread.join()
if self._clean_up_script:
command_seq = shlex.split(self._clean_up_script)
process = subprocess.Popen(command_seq, cwd=self._app_dir)
process.wait()
self._process = None
with self._lock:
if self._process:
try:
self._process.wait(self._shutdown_timeout)
except subprocess.TimeoutExpired:
pass
self._process.terminate()
self._log_thread.join()
if self._clean_up_script:
command_seq = shlex.split(self._clean_up_script)
process = subprocess.Popen(command_seq, cwd=self._app_dir)
process.wait()
self._process = None

def check_run_status(self, task_name: str, fl_ctx: FLContext) -> str:
if self._process is None:
return LauncherRunStatus.NOT_RUNNING
return_code = self._process.poll()
if return_code is None:
return LauncherRunStatus.RUNNING
if return_code == 0:
return LauncherRunStatus.COMPLETE_SUCCESS
return LauncherRunStatus.COMPLETE_FAILED
with self._lock:
if self._process is None:
return LauncherRunStatus.NOT_RUNNING
return_code = self._process.poll()
if return_code is None:
return LauncherRunStatus.RUNNING
if return_code == 0:
return LauncherRunStatus.COMPLETE_SUCCESS
return LauncherRunStatus.COMPLETE_FAILED

0 comments on commit 94544fe

Please sign in to comment.