Skip to content

Commit

Permalink
In process Client API Executor Part 1 (#2248)
Browse files Browse the repository at this point in the history
* 1) fix issue with logging
2) fix example
code formatting
add queue.task_done()
1) add message bus
2) hide task func wrapper class
3) rename executor package
4) clean up some code
update meta info
remove used code
optimize import
fix message_bus
import order change
rename the executor from ClientAPIMemExecutor to InProcessClientAPIExecutor
1) remove thread_pool
2) further loose couple executor and client_api implementation
formating
add unit tests
avoid duplicated constant TASK_NAME definition
split PR into two parts (besides message bus)
this is part 1: only remove the example and job template changes

1. Replace MemPipe (Queues) with callback via EventManager
2. Simplified overall logics
3. notice the param convert doesn't quite work ( need to fix later)
4. removed some tests that now invalid. Will need to add more unit tests later

fix task_name is None bug

add few unit tests
code format
update to comform with new databus changes

* rebase

* conform with recemt changes

* clean up, support main func

* fix format

* update unit tests

* databus updates, enhance module parsing

* address comments

* add docstrings, address comments

---------

Co-authored-by: Sean Yang <[email protected]>
Co-authored-by: Yuan-Ting Hsieh (謝沅廷) <[email protected]>
  • Loading branch information
3 people authored Feb 26, 2024
1 parent 8312bc9 commit 81caf29
Show file tree
Hide file tree
Showing 31 changed files with 1,709 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def evaluate(input_weights):

# (4) receive FLModel from NVFlare
input_model = flare.receive()
client_id = flare.system_info().get("site_name", None)
client_id = flare.get_site_name()

# Based on different "task" we will do different things
# for "train" task (flare.is_train()) we use the received model to do training and/or evaluation
Expand All @@ -106,7 +106,7 @@ def evaluate(input_weights):
# for "submit_model" task (flare.is_submit_model()) we just need to send back the local model
# (5) performing train task on received model
if flare.is_train():
print(f"({client_id}) round={input_model.current_round}/{input_model.total_rounds-1}")
print(f"({client_id}) current_round={input_model.current_round}, total_rounds={input_model.total_rounds}")

# (5.1) loads model from NVFlare
net.load_state_dict(input_model.params)
Expand Down Expand Up @@ -167,7 +167,6 @@ def evaluate(input_weights):

# (5.5) send model back to NVFlare
flare.send(output_model)

# (6) performing evaluate task on received model
elif flare.is_evaluate():
accuracy = evaluate(input_model.params)
Expand Down
69 changes: 69 additions & 0 deletions job_templates/sag_pt_in_proc/config_fed_client.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
# version of the configuration
format_version = 2

fn_path = "train.main"
fn_args = {
batch_size = 6
dataset_path = "/tmp/nvflare/data/cifar10"
num_workers = 2
}

# Client Computing Executors.
executors = [
{
# tasks the executors are defined to handle
tasks = ["train"]

# This particular executor
executor {

path = "nvflare.app_opt.pt.in_process_client_api_executor.PTInProcessClientAPIExecutor"
args {
# if the task_fn_path is main, task_fn_args are passed as sys.argv
# if the task_fn_path is a function, task_fn_args are passed as the function args
# (Note: task_fn_path must be of the form {module}.{func_name})
task_fn_path = "{fn_path}"
task_fn_args = "{fn_args}"

# if the transfer_type is FULL, then it will be sent directly
# if the transfer_type is DIFF, then we will calculate the
# difference VS received parameters and send the difference
params_transfer_type = "DIFF"

# if train_with_evaluation is true, the executor will expect
# the custom code need to send back both the trained parameters and the evaluation metric
# otherwise only trained parameters are expected
train_with_evaluation = true

# time interval in seconds. Time interval to wait before check if the local task has submitted the result
# if the local task takes long time, you can increase this interval to larger number
# uncomment to overwrite the default, default is 0.5 seconds
result_pull_interval = 0.5

# time interval in seconds. Time interval to wait before check if the trainig code has log metric (such as
# Tensorboard log, MLFlow log or Weights & Biases logs. The result will be streanmed to the server side
# then to the corresponding tracking system
# if the log is not needed, you can set this to a larger number
# uncomment to overwrite the default, default is None, which disable the log streaming feature.
log_pull_interval = 0.1

}
}
}
],

# this defined an array of task data filters. If provided, it will control the data from server controller to client executor
task_data_filters = []

# this defined an array of task result filters. If provided, it will control the result from client executor to server controller
task_result_filters = []

components = [
{
"id": "event_to_fed",
"name": "ConvertToFedEvent",
"args": {"events_to_convert": ["analytix_log_stats"], "fed_event_prefix": "fed."}
}
]
}
127 changes: 127 additions & 0 deletions job_templates/sag_pt_in_proc/config_fed_server.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{
# version of the configuration
format_version = 2

# task data filter: if filters are provided, the filter will filter the data flow out of server to client.
task_data_filters =[]

# task result filter: if filters are provided, the filter will filter the result flow out of client to server.
task_result_filters = []

# This assumes that there will be a "net.py" file with class name "Net".
# If your model code is not in "net.py" and class name is not "Net", please modify here
model_class_path = "net.Net"

# workflows: Array of workflows the control the Federated Learning workflow lifecycle.
# One can specify multiple workflows. The NVFLARE will run them in the order specified.
workflows = [
{
# 1st workflow"
id = "scatter_and_gather"

# name = ScatterAndGather, path is the class path of the ScatterAndGather controller.
path = "nvflare.app_common.workflows.scatter_and_gather.ScatterAndGather"
args {
# argument of the ScatterAndGather class.
# min number of clients required for ScatterAndGather controller to move to the next round
# during the workflow cycle. The controller will wait until the min_clients returned from clients
# before move to the next step.
min_clients = 2

# number of global round of the training.
num_rounds = 5

# starting round is 0-based
start_round = 0

# after received min number of clients' result,
# how much time should we wait further before move to the next step
wait_time_after_min_received = 0

# For ScatterAndGather, the server will aggregate the weights based on the client's result.
# the aggregator component id is named here. One can use the this ID to find the corresponding
# aggregator component listed below
aggregator_id = "aggregator"

# The Scatter and Gather controller use an persistor to load the model and save the model.
# The persistent component can be identified by component ID specified here.
persistor_id = "persistor"

# Shareable to a communication message, i.e. shared between clients and server.
# Shareable generator is a component that responsible to take the model convert to/from this communication message: Shareable.
# The component can be identified via "shareable_generator_id"
shareable_generator_id = "shareable_generator"

# train task name: client side needs to have an executor that handles this task
train_task_name = "train"

# train timeout in second. If zero, meaning no timeout.
train_timeout = 0
}
}
]

# List of components used in the server side workflow.
components = [
{
# This is the persistence component used in above workflow.
# PTFileModelPersistor is a Pytorch persistor which save/read the model to/from file.

id = "persistor"
path = "nvflare.app_opt.pt.file_model_persistor.PTFileModelPersistor"

# the persitor class take model class as argument
# This imply that the model is initialized from the server-side.
# The initialized model will be broadcast to all the clients to start the training.
args.model.path = "{model_class_path}"
},
{
# This is the generator that convert the model to shareable communication message structure used in workflow
id = "shareable_generator"
path = "nvflare.app_common.shareablegenerators.full_model_shareable_generator.FullModelShareableGenerator"
args = {}
},
{
# This is the aggregator that perform the weighted average aggregation.
# the aggregation is "in-time", so it doesn't wait for client results, but aggregates as soon as it received the data.
id = "aggregator"
path = "nvflare.app_common.aggregators.intime_accumulate_model_aggregator.InTimeAccumulateWeightedAggregator"
args.expected_data_kind = "WEIGHT_DIFF"
},
{
# This component is not directly used in Workflow.
# it select the best model based on the incoming global validation metrics.
id = "model_selector"
path = "nvflare.app_common.widgets.intime_model_selector.IntimeModelSelector"
# need to make sure this "key_metric" match what server side received
args.key_metric = "accuracy"
},
{
id = "receiver"
path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver"
args.events = ["fed.analytix_log_stats"]
},

{
id = "mlflow_receiver"
path = "nvflare.app_opt.tracking.mlflow.mlflow_receiver.MLflowReceiver"
args {
# tracking_uri = "http://0.0.0.0:5000"
tracking_uri = ""
kwargs {
experiment_name = "nvflare-sag-pt-experiment"
run_name = "nvflare-sag-pt-with-mlflow"
experiment_tags {
"mlflow.note.content": "## **NVFlare SAG PyTorch experiment with MLflow**"
}
run_tags {
"mlflow.note.content" = "## Federated Experiment tracking with MLflow \n### Example of using **[NVIDIA FLARE](https://nvflare.readthedocs.io/en/main/index.html)** to train an image classifier using federated averaging ([FedAvg]([FedAvg](https://arxiv.org/abs/1602.05629))) and [PyTorch](https://pytorch.org/) as the deep learning training framework. This example also highlights the NVFlare streaming capability from the clients to the server.\n\n> **_NOTE:_** \n This example uses the *[CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html)* dataset and will load its data within the trainer code.\n"
}
}
artifact_location = "artifacts"
events = ["fed.analytix_log_stats"]
}
}
]

}
5 changes: 5 additions & 0 deletions job_templates/sag_pt_in_proc/info.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
description = "scatter & gather workflow using pytorch with in_process executor"
client_category = "client_api"
controller_type = "server"
}
11 changes: 11 additions & 0 deletions job_templates/sag_pt_in_proc/info.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Job Template Information Card

## sag_pt_in_proc
name = "sag_pt_in_proc"
description = "Scatter and Gather Workflow using pytorch with in_process executor"
class_name = "ScatterAndGather"
controller_type = "server"
executor_type = "in_process_client_api_executor"
contributor = "NVIDIA"
init_publish_date = "2024-02-8"
last_updated_date = "2024-02-8" # yyyy-mm-dd
10 changes: 10 additions & 0 deletions job_templates/sag_pt_in_proc/meta.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
name = "sag_pt_in_proc"
resource_spec = {}
deploy_map {
# change deploy map as needed.
app = ["@ALL"]
}
min_clients = 2
mandatory_clients = []
}
1 change: 1 addition & 0 deletions nvflare/apis/fl_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ class FLMetaKey:
FILTER_HISTORY = "filter_history"
CONFIGS = "configs"
VALIDATE_TYPE = "validate_type"
START_ROUND = "start_round"
CURRENT_ROUND = "current_round"
TOTAL_ROUNDS = "total_rounds"
JOB_ID = "job_id"
Expand Down
5 changes: 3 additions & 2 deletions nvflare/app_common/executors/client_api_launcher_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os
from typing import Optional

from nvflare.apis.fl_constant import FLMetaKey
from nvflare.apis.fl_context import FLContext
from nvflare.app_common.executors.launcher_executor import LauncherExecutor
from nvflare.client.config import ConfigKey, ExchangeFormat, TransferType, write_config_to_file
Expand Down Expand Up @@ -123,8 +124,8 @@ def prepare_config_for_launch(self, fl_ctx: FLContext):

config_data = {
ConfigKey.TASK_EXCHANGE: task_exchange_attributes,
ConfigKey.SITE_NAME: fl_ctx.get_identity_name(),
ConfigKey.JOB_ID: fl_ctx.get_job_id(),
FLMetaKey.SITE_NAME: fl_ctx.get_identity_name(),
FLMetaKey.JOB_ID: fl_ctx.get_job_id(),
}

config_file_path = self._get_external_config_file_path(fl_ctx)
Expand Down
79 changes: 79 additions & 0 deletions nvflare/app_common/executors/exec_task_fn_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys
import traceback
from typing import Dict

from nvflare.fuel.utils.function_utils import find_task_fn, require_arguments


class ExecTaskFuncWrapper:
def __init__(self, task_fn_path: str, task_fn_args: Dict = None):
"""Wrapper for function given function path and args
Args:
task_fn_path (str): function path (ex: train.main, custom/train.main, custom.train.main).
task_fn_args (Dict, optional): function arguments to pass in.
"""
self.task_fn_path = task_fn_path
self.task_fn_args = task_fn_args
self.client_api = None
self.logger = logging.getLogger(self.__class__.__name__)

self.task_fn = find_task_fn(task_fn_path)
require_args, args_size, args_default_size = require_arguments(self.task_fn)
self.check_fn_inputs(task_fn_path, require_args, args_size, args_default_size)
self.task_fn_require_args = require_args

def run(self):
"""Call the task_fn with any required arguments."""
msg = f"\n start task run() with {self.task_fn_path}"
msg = msg if not self.task_fn_require_args else msg + f", {self.task_fn_args}"
self.logger.info(msg)
try:
if self.task_fn.__name__ == "main":
args_list = []
for k, v in self.task_fn_args.items():
args_list.extend(["--" + str(k), str(v)])

curr_argv = sys.argv
sys.argv = [self.task_fn_path.rsplit(".", 1)[0].replace(".", "/") + ".py"] + args_list
self.task_fn()
sys.argv = curr_argv
elif self.task_fn_require_args:
self.task_fn(**self.task_fn_args)
else:
self.task_fn()
except Exception as e:
msg = traceback.format_exc()
self.logger.error(msg)
if self.client_api:
self.client_api.exec_queue.ask_abort(msg)
raise e

def check_fn_inputs(self, task_fn_path, require_args: bool, required_args_size: int, args_default_size: int):
"""Check if the provided task_fn_args are compatible with the task_fn."""
if require_args:
if not self.task_fn_args:
raise ValueError(f"function '{task_fn_path}' requires arguments, but none provided")
elif len(self.task_fn_args) < required_args_size - args_default_size:
raise ValueError(
f"function '{task_fn_path}' requires {required_args_size} "
f"arguments, but {len(self.task_fn_args)} provided"
)
else:
if self.task_fn_args and self.task_fn.__name__ != "main":
msg = f"function '{task_fn_path}' does not require arguments, {self.task_fn_args} will be ignored"
self.logger.warning(msg)
Loading

0 comments on commit 81caf29

Please sign in to comment.