Skip to content

Commit

Permalink
Enhanced the client job status (#2247)
Browse files Browse the repository at this point in the history
* Enhanced the client job status report.

* Made notify_timeout configurable.

* codestyle fix.

* Updated changes from PR review.

* codestyle fix.
  • Loading branch information
yhwen authored Jan 4, 2024
1 parent 51022c7 commit 0a0d3ab
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 21 deletions.
2 changes: 2 additions & 0 deletions nvflare/private/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ class TrainingTopic(object):
CANCEL_RESOURCE = "scheduler.cancel_resource"
START_JOB = "train.start_job"
GET_SCOPES = "train.get_scopes"
NOTIFY_JOB_STATUS = "train.notify_job_status"


class RequestHeader(object):

JOB_ID = "job_id"
JOB_STATUS = "job_status"
TOPIC = "topic"
JOB_META = "job_meta"
APP_NAME = "app_name"
Expand Down
27 changes: 25 additions & 2 deletions nvflare/private/fed/client/client_app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import logging
import os

from nvflare.apis.fl_constant import FLContextKey
from nvflare.apis.fl_constant import FLContextKey, SystemConfigs
from nvflare.apis.fl_context import FLContext
from nvflare.apis.workspace import Workspace
from nvflare.private.defs import EngineConstant
from nvflare.fuel.utils.config_service import ConfigService
from nvflare.private.admin_defs import Message
from nvflare.private.defs import CellChannel, EngineConstant, RequestHeader, TrainingTopic, new_cell_message
from nvflare.private.fed.app.fl_conf import create_privacy_manager
from nvflare.private.fed.client.client_json_config import ClientJsonConfigurator
from nvflare.private.fed.client.client_run_manager import ClientRunManager
Expand Down Expand Up @@ -53,8 +55,16 @@ def start_run(self, app_root, args, config_folder, federated_client, secure_trai
self.sync_up_parents_process(federated_client)

federated_client.start_overseer_agent()
notify_timeout = ConfigService.get_float_var(
name="notify_timeout", conf=SystemConfigs.APPLICATION_CONF, default=5.0
)
self.notify_job_status(federated_client, args.job_id, ClientStatus.STARTED, timeout=notify_timeout)
federated_client.status = ClientStatus.STARTED

self.client_runner.run(app_root, args)

self.notify_job_status(federated_client, args.job_id, ClientStatus.STOPPED, timeout=notify_timeout)
federated_client.status = ClientStatus.STOPPED
federated_client.stop_cell()

@staticmethod
Expand Down Expand Up @@ -127,6 +137,19 @@ def sync_up_parents_process(self, federated_client):
with run_manager.new_context() as fl_ctx:
run_manager.get_all_clients_from_server(fl_ctx)

def notify_job_status(self, federated_client, job_id, status, timeout=5.0):
message = Message(topic=TrainingTopic.NOTIFY_JOB_STATUS, body="")
message.set_header(RequestHeader.JOB_ID, str(job_id))
message.set_header(RequestHeader.JOB_STATUS, status)

federated_client.cell.send_request(
target=federated_client.client_name,
channel=CellChannel.CLIENT_MAIN,
topic=message.topic,
request=new_cell_message({}, message),
timeout=timeout,
)

def close(self):
if self.command_agent:
self.command_agent.shutdown()
Expand Down
3 changes: 3 additions & 0 deletions nvflare/private/fed/client/client_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def start_app(

return "Start the client app..."

def notify_job_status(self, job_id: str, job_status):
self.client_executor.notify_job_status(job_id, job_status)

def get_client_name(self):
return self.client.client_name

Expand Down
13 changes: 13 additions & 0 deletions nvflare/private/fed/client/client_engine_internal_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ def start_app(
"""
pass

@abstractmethod
def notify_job_status(self, job_id: str, job_status):
"""Notify the engine what's the client job's new status.
Args:
job_id: job_id
job_status: Client job status
Returns:
"""
pass

@abstractmethod
def abort_app(self, job_id: str) -> str:
"""Aborts the app execution for the specified run.
Expand Down
28 changes: 9 additions & 19 deletions nvflare/private/fed/client/client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def start_app(
RunProcessKey.LISTEN_PORT: listen_port,
RunProcessKey.CONNECTION: None,
RunProcessKey.CHILD_PROCESS: process,
RunProcessKey.STATUS: ClientStatus.STARTED,
RunProcessKey.STATUS: ClientStatus.STARTING,
}

thread = threading.Thread(
Expand All @@ -215,6 +215,12 @@ def start_app(
)
thread.start()

def notify_job_status(self, job_id, job_status):
with self.lock:
run_process = self.run_processes.get(job_id)
if run_process:
run_process[RunProcessKey.STATUS] = job_status

def check_status(self, job_id):
"""Checks the status of the running client.
Expand All @@ -226,24 +232,8 @@ def check_status(self, job_id):
"""
try:
with self.lock:
data = {}
fqcn = FQCN.join([self.client.client_name, job_id])
request = new_cell_message({}, data)
return_data = self.client.cell.send_request(
target=fqcn,
channel=CellChannel.CLIENT_COMMAND,
topic=AdminCommandNames.CHECK_STATUS,
request=request,
optional=True,
)
return_code = return_data.get_header(MessageHeaderKey.RETURN_CODE)
if return_code == ReturnCode.OK:
status_message = return_data.payload
self.logger.debug("check status from process listener......")
return status_message
else:
process_status = ClientStatus.NOT_STARTED
return get_status_message(process_status)
process_status = self.run_processes.get(job_id, {}).get(RunProcessKey.STATUS, ClientStatus.NOT_STARTED)
return get_status_message(process_status)
except Exception as e:
self.logger.error(f"check_status execution exception: {secure_format_exception(e)}.")
secure_log_traceback()
Expand Down
2 changes: 2 additions & 0 deletions nvflare/private/fed/client/client_req_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ClientStatusProcessor,
DeleteRunNumberProcessor,
DeployProcessor,
NotifyJobStatusProcessor,
RestartClientProcessor,
ScopeInfoProcessor,
ShutdownClientProcessor,
Expand Down Expand Up @@ -50,6 +51,7 @@ class ClientRequestProcessors:
CancelResourceProcessor(),
ReportResourcesProcessor(),
ReportEnvProcessor(),
NotifyJobStatusProcessor(),
]

@staticmethod
Expand Down
14 changes: 14 additions & 0 deletions nvflare/private/fed/client/training_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,17 @@ def process(self, req: Message, app_ctx) -> Message:
result = {ScopeInfoKey.SCOPE_NAMES: scope_names, ScopeInfoKey.DEFAULT_SCOPE: default_scope_name}
result = json.dumps(result)
return ok_reply(topic=f"reply_{req.topic}", body=result)


class NotifyJobStatusProcessor(RequestProcessor):
def get_topics(self) -> [str]:
return [TrainingTopic.NOTIFY_JOB_STATUS]

def process(self, req: Message, app_ctx) -> Message:
engine = app_ctx
if not isinstance(engine, ClientEngineInternalSpec):
raise TypeError("engine must be ClientEngineInternalSpec, but got {}".format(type(engine)))
job_id = req.get_header(RequestHeader.JOB_ID)
job_status = req.get_header(RequestHeader.JOB_STATUS)
engine.notify_job_status(job_id, job_status)
return ok_reply(topic=f"reply_{req.topic}", body=f"notify status: {job_status}")

0 comments on commit 0a0d3ab

Please sign in to comment.