Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability for clients to send error log to server #3057

Merged
merged 11 commits into from
Nov 21, 2024
2 changes: 1 addition & 1 deletion docs/resources/log.config
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ args=(sys.stdout,)
class=FileHandler
level=ERROR
formatter=fullFormatter
args=('error.log', 'a')
args=('error_log.txt', 'a')

[formatter_fullFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
1 change: 1 addition & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ class WorkspaceConstants:
DEFAULT_LOGGING_CONFIG = LOGGING_CONFIG + ".default"
AUDIT_LOG = "audit.log"
LOG_FILE_NAME = "log.txt"
ERROR_LOG_FILE_NAME = "error_log.txt"
STATS_POOL_SUMMARY_FILE_NAME = "stats_pool_summary.json"
STATS_POOL_RECORDS_FILE_NAME = "stats_pool_records.csv"

Expand Down
3 changes: 3 additions & 0 deletions nvflare/apis/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ def get_app_dir(self, job_id: str) -> str:
def get_app_log_file_path(self, job_id: str) -> str:
return os.path.join(self.get_run_dir(job_id), WorkspaceConstants.LOG_FILE_NAME)

def get_app_error_log_file_path(self, job_id: str) -> str:
return os.path.join(self.get_run_dir(job_id), WorkspaceConstants.ERROR_LOG_FILE_NAME)

def get_app_config_dir(self, job_id: str) -> str:
return os.path.join(self.get_app_dir(job_id), self.config_folder)

Expand Down
1 change: 1 addition & 0 deletions nvflare/fuel/f3/cellnet/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class CellChannelTopic:
EXECUTE_RESULT = "execute_result"
FIRE_EVENT = "fire_event"
REPORT_JOB_FAILURE = "report_job_failure"
TRANSMIT_ERROR_LOG = "transmit_error_log"
nvkevlu marked this conversation as resolved.
Show resolved Hide resolved

SIMULATOR_WORKER_INIT = "simulator_worker_init"

Expand Down
2 changes: 1 addition & 1 deletion nvflare/private/fed/app/simulator/log.config
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ args=(sys.stdout,)
class=FileHandler
level=ERROR
formatter=fullFormatter
args=('error.log', 'a')
args=('error_log.txt', 'a')

[formatter_fullFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
27 changes: 27 additions & 0 deletions nvflare/private/fed/client/client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import os
import threading
import time
from abc import ABC, abstractmethod
Expand All @@ -22,6 +23,7 @@
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_launcher_spec import JobLauncherSpec
from nvflare.apis.resource_manager_spec import ResourceManagerSpec
from nvflare.apis.workspace import Workspace
from nvflare.fuel.common.exit_codes import PROCESS_EXIT_REASON, ProcessExitCode
from nvflare.fuel.f3.cellnet.core_cell import FQCN
from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey, ReturnCode
Expand Down Expand Up @@ -393,6 +395,31 @@ def _wait_child_process_finish(self, client, job_id, allocated_resource, token,
return_code = get_return_code(job_handle, job_id, workspace, self.logger)

self.logger.info(f"run ({job_id}): child worker process finished with RC {return_code}")

should_report_error_log = (
False # set this to True to report error log to server, todo: get value from client config
YuanTingHsieh marked this conversation as resolved.
Show resolved Hide resolved
)
if should_report_error_log:
error_log_contents = None
workspace_object = Workspace(root_dir=workspace, site_name=client.client_name)
error_log_path = workspace_object.get_app_error_log_file_path(job_id=job_id)
if os.path.exists(error_log_path):
with open(error_log_path, "r") as f:
error_log_contents = f.read()
if error_log_contents:
request = new_cell_message(
headers={},
payload=error_log_contents,
)
self.client.cell.fire_and_forget(
targets=[FQCN.ROOT_SERVER],
channel=CellChannel.SERVER_MAIN,
topic=CellChannelTopic.TRANSMIT_ERROR_LOG,
message=request,
optional=True,
)
self.logger.info("Reported contents of error log to server!")

if return_code in [ProcessExitCode.UNSAFE_COMPONENT, ProcessExitCode.CONFIG_ERROR]:
request = new_cell_message(
headers={},
Expand Down
11 changes: 11 additions & 0 deletions nvflare/private/fed/server/fed_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ def _register_cellnet_cbs(self):
cb=self.client_heartbeat,
)

self.cell.register_request_cb(
channel=CellChannel.SERVER_MAIN,
topic=CellChannelTopic.TRANSMIT_ERROR_LOG,
cb=self.process_error_log,
)

self.cell.register_request_cb(
channel=CellChannel.SERVER_MAIN,
topic=CellChannelTopic.REPORT_JOB_FAILURE,
Expand Down Expand Up @@ -665,6 +671,11 @@ def quit_client(self, request: Message) -> Message:
headers = {CellMessageHeaderKeys.MESSAGE: "Removed client"}
return self._generate_reply(headers=headers, payload=None, fl_ctx=fl_ctx)

def process_error_log(self, request: Message):
payload = request.payload
client = request.get_header(key=MessageHeaderKey.ORIGIN)
print(f"Error log from {client}: {payload}")
YuanTingHsieh marked this conversation as resolved.
Show resolved Hide resolved

def process_job_failure(self, request: Message):
payload = request.payload
client = request.get_header(key=MessageHeaderKey.ORIGIN)
Expand Down
4 changes: 2 additions & 2 deletions nvflare/private/fed/utils/fed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def add_logfile_handler(log_file: str):
The purpose for this is to handle dynamic log file locations.

If a handler named errorFileHandler is found, it will be used as a template to
create a new handler for writing to the error.log file at the same directory as log_file.
create a new handler for writing to the error log file at the same directory as log_file.
The original errorFileHandler will be removed and replaced by the new handler.

Each log file will be rotated when it reaches 20MB.
Expand All @@ -90,7 +90,7 @@ def add_logfile_handler(log_file: str):
if not configured_error_handler:
return

error_log_file = os.path.join(os.path.dirname(log_file), "error.log")
error_log_file = os.path.join(os.path.dirname(log_file), WorkspaceConstants.ERROR_LOG_FILE_NAME)
error_file_handler = RotatingFileHandler(error_log_file, maxBytes=20 * 1024 * 1024, backupCount=10)
error_file_handler.setLevel(configured_error_handler.level)
error_file_handler.setFormatter(configured_error_handler.formatter)
Expand Down
Loading