-
Notifications
You must be signed in to change notification settings - Fork 182
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* WIP: implement the ProcessJobLaunch. * WIP: working ProcessJobLauncher impelentation. * Added k8s_launcher implementation. * Added logger for k8s_launcher.py * Added empty launcher_map check. * Added more config for K8sJobLauncher. * renamed RunProcessKey.JOB_LAUNCHER. * Separated out the JobHandleSpec. * Support for the launcher deploy image. * Changed the _get_job_launcher logic. * add more handled for the deployment_map change. * Added logging for job launcher. * Fixed extract_job_image usage. * Added job_meta for launch_job. * codestyle fix. * refactoried. * extract to use constants. * Change the JobLauncherSpec API signiture. * Added can_launch() to JobLauncherSpec. * refactor. * removed duplicate const. * removed no use import. * changed to raise NotImplementedError(). * Added job launcher support for server side. * refactored. * Changed to use event to get the job launcher. * updated K8sJobLauncher. * codestyle fix. * removed no use import. * JobReturnCode standard. * fixed the _get_job_launcher() condition logic. * refactored. * refactored. * Added ClientK8sJobLauncher. * Added ServerK8sJobLauncher. * Fixed missing args. * Fixed get_set_list(). * refactored. * fixed the missing client_name in the workspace object. * codestyle fix. * removed no use constant. * use a new fl_ctx for get_job_launcher to solve the thread safe issue. * changed to use with engine.new_context() as job_launcher_ctx: pattern. --------- Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <[email protected]>
- Loading branch information
1 parent
3f7b12b
commit c25c914
Showing
13 changed files
with
315 additions
and
159 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
66 changes: 66 additions & 0 deletions
66
nvflare/app_common/job_launcher/client_process_launcher.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
import os | ||
import sys | ||
|
||
from nvflare.apis.fl_constant import FLContextKey, JobConstants | ||
from nvflare.apis.workspace import Workspace | ||
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher | ||
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path | ||
|
||
|
||
class ClientProcessJobLauncher(ProcessJobLauncher): | ||
def get_command(self, launch_data, fl_ctx) -> (str, dict): | ||
new_env = os.environ.copy() | ||
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) | ||
args = fl_ctx.get_prop(FLContextKey.ARGS) | ||
client = fl_ctx.get_prop(FLContextKey.SITE_OBJ) | ||
job_id = launch_data.get(JobConstants.JOB_ID) | ||
server_config = fl_ctx.get_prop(FLContextKey.SERVER_CONFIG) | ||
if not server_config: | ||
raise RuntimeError(f"missing {FLContextKey.SERVER_CONFIG} in FL context") | ||
service = server_config[0].get("service", {}) | ||
if not isinstance(service, dict): | ||
raise RuntimeError(f"expect server config data to be dict but got {type(service)}") | ||
|
||
app_custom_folder = workspace_obj.get_app_custom_dir(job_id) | ||
if app_custom_folder != "": | ||
add_custom_dir_to_path(app_custom_folder, new_env) | ||
|
||
command_options = "" | ||
for t in args.set: | ||
command_options += " " + t | ||
command = ( | ||
f"{sys.executable} -m nvflare.private.fed.app.client.worker_process -m " | ||
+ args.workspace | ||
+ " -w " | ||
+ (workspace_obj.get_startup_kit_dir()) | ||
+ " -t " | ||
+ client.token | ||
+ " -d " | ||
+ client.ssid | ||
+ " -n " | ||
+ job_id | ||
+ " -c " | ||
+ client.client_name | ||
+ " -p " | ||
+ str(client.cell.get_internal_listener_url()) | ||
+ " -g " | ||
+ service.get("target") | ||
+ " -scheme " | ||
+ service.get("scheme", "grpc") | ||
+ " -s fed_client.json " | ||
" --set" + command_options + " print_conf=True" | ||
) | ||
return command, new_env |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
71 changes: 71 additions & 0 deletions
71
nvflare/app_common/job_launcher/server_process_launcher.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
import os | ||
import sys | ||
|
||
from nvflare.apis.fl_constant import FLContextKey, JobConstants | ||
from nvflare.apis.workspace import Workspace | ||
from nvflare.app_common.job_launcher.process_launcher import ProcessJobLauncher | ||
from nvflare.private.fed.utils.fed_utils import add_custom_dir_to_path | ||
|
||
|
||
class ServerProcessJobLauncher(ProcessJobLauncher): | ||
def get_command(self, launch_data, fl_ctx) -> (str, dict): | ||
new_env = os.environ.copy() | ||
|
||
workspace_obj: Workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) | ||
args = fl_ctx.get_prop(FLContextKey.ARGS) | ||
server = fl_ctx.get_prop(FLContextKey.SITE_OBJ) | ||
job_id = launch_data.get(JobConstants.JOB_ID) | ||
restore_snapshot = fl_ctx.get_prop(FLContextKey.SNAPSHOT, False) | ||
|
||
app_root = workspace_obj.get_app_dir(job_id) | ||
cell = server.cell | ||
server_state = server.server_state | ||
|
||
app_custom_folder = workspace_obj.get_app_custom_dir(job_id) | ||
if app_custom_folder != "": | ||
add_custom_dir_to_path(app_custom_folder, new_env) | ||
|
||
command_options = "" | ||
for t in args.set: | ||
command_options += " " + t | ||
|
||
command = ( | ||
sys.executable | ||
+ " -m nvflare.private.fed.app.server.runner_process -m " | ||
+ args.workspace | ||
+ " -s fed_server.json -r " | ||
+ app_root | ||
+ " -n " | ||
+ str(job_id) | ||
+ " -p " | ||
+ str(cell.get_internal_listener_url()) | ||
+ " -u " | ||
+ str(cell.get_root_url_for_child()) | ||
+ " --host " | ||
+ str(server_state.host) | ||
+ " --port " | ||
+ str(server_state.service_port) | ||
+ " --ssid " | ||
+ str(server_state.ssid) | ||
+ " --ha_mode " | ||
+ str(server.ha_mode) | ||
+ " --set" | ||
+ command_options | ||
+ " print_conf=True restore_snapshot=" | ||
+ str(restore_snapshot) | ||
) | ||
|
||
return command, new_env |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.