Skip to content

Commit

Permalink
Merge branch 'NVIDIA:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
falibabaei authored Nov 15, 2024
2 parents a183869 + c133f37 commit 27d16b1
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 167 deletions.
1 change: 1 addition & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class FLContextKey(object):
RECONNECTED_CLIENT_NAME = "_reconnected_client_name"
SITE_OBJ = "_site_obj_"
JOB_LAUNCHER = "_job_launcher"
SNAPSHOT = "job_snapshot"

CLIENT_REGISTER_DATA = "_client_register_data"
SECURITY_ITEMS = "_security_items"
Expand Down
66 changes: 66 additions & 0 deletions nvflare/app_common/job_launcher/client_process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.workspace import Workspace
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path


class ClientProcessJobLauncher(ProcessJobLauncher):
def get_command(self, launch_data, fl_ctx) -> (str, dict):
new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = launch_data.get(JobConstants.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
return command, new_env
64 changes: 18 additions & 46 deletions nvflare/app_common/job_launcher/process_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import os
import shlex
import subprocess
import sys
from abc import abstractmethod

from nvflare.apis.event_type import EventType
from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_launcher_spec import JobHandleSpec, JobLauncherSpec, JobReturnCode, add_launcher
from nvflare.apis.workspace import Workspace
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path, extract_job_image
from nvflare.private.fed.utils.fed_utils import extract_job_image

JOB_RETURN_CODE_MAPPING = {0: JobReturnCode.SUCCESS, 1: JobReturnCode.EXECUTION_ERROR, 9: JobReturnCode.ABORTED}

Expand Down Expand Up @@ -64,51 +62,11 @@ def __init__(self):

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

new_env = os.environ.copy()
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = job_meta.get(JobMetaKey.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t
command = (
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m "
+ args.workspace
+ " -w "
+ (workspace_obj.get_startup_kit_dir())
+ " -t "
+ client.token
+ " -d "
+ client.ssid
+ " -n "
+ job_id
+ " -c "
+ client.client_name
+ " -p "
+ str(client.cell.get_internal_listener_url())
+ " -g "
+ service.get("target")
+ " -scheme "
+ service.get("scheme", "grpc")
+ " -s fed_client.json "
" --set" + command_options + " print_conf=True"
)
command, new_env = self.get_command(job_meta, fl_ctx)
# use os.setsid to create new process group ID
process = subprocess.Popen(shlex.split(command, True), preexec_fn=os.setsid, env=new_env)

self.logger.info("Worker child process ID: {}".format(process.pid))
self.logger.info("Launch the job in process ID: {}".format(process.pid))

return ProcessHandle(process)

Expand All @@ -118,3 +76,17 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if not job_image:
add_launcher(self, fl_ctx)

@abstractmethod
def get_command(self, launch_data, fl_ctx) -> (str, dict):
"""To generate the command to launcher the job in sub-process
Args:
fl_ctx: FLContext
launch_data: job launcher data
Returns:
launch command, environment dict
"""
pass
71 changes: 71 additions & 0 deletions nvflare/app_common/job_launcher/server_process_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

from nvflare.apis.fl_constant import FLContextKey, JobConstants
from nvflare.apis.workspace import Workspace
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path


class ServerProcessJobLauncher(ProcessJobLauncher):
def get_command(self, launch_data, fl_ctx) -> (str, dict):
new_env = os.environ.copy()

workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
server = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = launch_data.get(JobConstants.JOB_ID)
restore_snapshot = fl_ctx.get_prop(FLContextKey.SNAPSHOT, False)

app_root = workspace_obj.get_app_dir(job_id)
cell = server.cell
server_state = server.server_state

app_custom_folder = workspace_obj.get_app_custom_dir(job_id)
if app_custom_folder != "":
add_custom_dir_to_path(app_custom_folder, new_env)

command_options = ""
for t in args.set:
command_options += " " + t

command = (
sys.executable
+ " -m nvflare.private.fed.app.server.runner_process -m "
+ args.workspace
+ " -s fed_server.json -r "
+ app_root
+ " -n "
+ str(job_id)
+ " -p "
+ str(cell.get_internal_listener_url())
+ " -u "
+ str(cell.get_root_url_for_child())
+ " --host "
+ str(server_state.host)
+ " --port "
+ str(server_state.service_port)
+ " --ssid "
+ str(server_state.ssid)
+ " --ha_mode "
+ str(server.ha_mode)
+ " --set"
+ command_options
+ " print_conf=True restore_snapshot="
+ str(restore_snapshot)
)

return command, new_env
129 changes: 104 additions & 25 deletions nvflare/app_opt/job_launcher/k8s_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import logging
import time
from abc import abstractmethod
from enum import Enum

from kubernetes import config
Expand Down Expand Up @@ -82,7 +83,7 @@ def __init__(self, job_id: str, api_instance: core_v1_api, job_config: dict, nam
"imagePullPolicy": "Always",
}
]
self.container_args_python_args_list = ["-u", "-m", "nvflare.private.fed.app.client.worker_process"]
self.container_args_python_args_list = ["-u", "-m", job_config.get("command")]
self.container_args_module_args_dict = {
"-m": None,
"-w": None,
Expand Down Expand Up @@ -218,39 +219,19 @@ def __init__(

def launch_job(self, job_meta: dict, fl_ctx: FLContext) -> JobHandleSpec:

workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
job_id = job_meta.get(JobConstants.JOB_ID)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")

self.logger.info(f"K8sJobLauncher start to launch job: {job_id} for client: {client.client_name}")
args = fl_ctx.get_prop(FLContextKey.ARGS)
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
self.logger.info(f"launch job use image: {job_image}")
job_config = {
"name": job_id,
"image": job_image,
"container_name": f"container-{job_id}",
"command": self.get_command(),
"volume_mount_list": [{"name": self.workspace, "mountPath": self.mount_path}],
"volume_list": [{"name": self.workspace, "hostPath": {"path": self.root_hostpath, "type": "Directory"}}],
"module_args": {
"-m": args.workspace,
"-w": (workspace_obj.get_startup_kit_dir()),
"-t": client.token,
"-d": client.ssid,
"-n": job_id,
"-c": client.client_name,
"-p": "tcp://parent-pod:8004",
"-g": service.get("target"),
"-scheme": service.get("scheme", "grpc"),
"-s": "fed_client.json",
},
"set_list": args.set,
"module_args": self.get_module_args(job_id, fl_ctx),
"set_list": self.get_set_list(args, fl_ctx),
}

self.logger.info(f"launch job with k8s_launcher. Job_id:{job_id}")
Expand All @@ -273,3 +254,101 @@ def handle_event(self, event_type: str, fl_ctx: FLContext):
job_image = extract_job_image(job_meta, fl_ctx.get_identity_name())
if job_image:
add_launcher(self, fl_ctx)

@abstractmethod
def get_command(self):
"""To get the run command of the launcher
Returns: the command for the launcher process
"""
pass

@abstractmethod
def get_module_args(self, job_id, fl_ctx: FLContext):
"""To get the args to run the launcher
Args:
job_id: run job_id
fl_ctx: FLContext
Returns:
"""
pass

@abstractmethod
def get_set_list(self, args, fl_ctx: FLContext):
"""To get the command set_list
Args:
args: command args
fl_ctx: FLContext
Returns: set_list command options
"""
pass


class ClientK8sJobLauncher(K8sJobLauncher):
def get_command(self):
return "nvflare.private.fed.app.client.worker_process"

def get_module_args(self, job_id, fl_ctx: FLContext):
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ)
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG)
if not server_config:
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context")
service = server_config[0].get("service", {})
if not isinstance(service, dict):
raise RuntimeError(f"expect server config data to be dict but got {type(service)}")
self.logger.info(f"K8sJobLauncher start to launch job: {job_id} for client: {client.client_name}")

return {
"-m": args.workspace,
"-w": (workspace_obj.get_startup_kit_dir()),
"-t": client.token,
"-d": client.ssid,
"-n": job_id,
"-c": client.client_name,
"-p": str(client.cell.get_internal_listener_url()),
"-g": service.get("target"),
"-scheme": service.get("scheme", "grpc"),
"-s": "fed_client.json",
}

def get_set_list(self, args, fl_ctx: FLContext):
args.set.append("print_conf=True")
return args.set


class ServerK8sJobLauncher(K8sJobLauncher):
def get_command(self):
return "nvflare.private.fed.app.server.runner_process"

def get_module_args(self, job_id, fl_ctx: FLContext):
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT)
args = fl_ctx.get_prop(FLContextKey.ARGS)
server = fl_ctx.get_prop(FLContextKey.SITE_OBJ)

return {
"-m": args.workspace,
"-s": "fed_server.json",
"-r": workspace_obj.get_app_dir(),
"-n": str(job_id),
"-p": str(server.cell.get_internal_listener_url()),
"-u": str(server.cell.get_root_url_for_child()),
"--host": str(server.server_state.host),
"--port": str(server.server_state.service_port),
"--ssid": str(server.server_state.ssid),
"--ha_mode": str(server.ha_mode),
}

def get_set_list(self, args, fl_ctx: FLContext):
restore_snapshot = fl_ctx.get_prop(FLContextKey.SNAPSHOT, False)
args.set.append("print_conf=True")
args.set.append("restore_snapshot=" + str(restore_snapshot))
return args.set
Loading

0 comments on commit 27d16b1

Please sign in to comment.