Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add P2P distributed optimization to advanced examples #3189

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ venv/
ENV/
env.bak/
venv.bak/
.mise.toml

# Spyder project settings
.spyderproject
Expand Down
4 changes: 4 additions & 0 deletions examples/advanced/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ Please also install "./requirements.txt" in each example folder.
* [Swarm Learning](./swarm_learning/README.md)
* Example of swarm learning with NVIDIA FLARE using PyTorch with the CIFAR-10 dataset.

## Distributed Optimization / P2P algorithms
* [Distributed Optimization](./distributed_optimization/README.md)
* Example of using the low-level NVFlare APIs to implement and run P2P distributed optimization algorithms.

## Vertical Federated Learning
* [Vertical Federated Learning](./vertical_federated_learning/README.md)
* Example of running split learning using the CIFAR-10 dataset.
Expand Down
2 changes: 2 additions & 0 deletions examples/advanced/distributed_optimization/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tmp
data
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Consensus algorithm
In this example we show how to run the consensus algorithm. You can find a detailed walkthrough in the [tutorial](tutorial.ipynb) or you can just run the provided [script](launcher.py) via `python launcher.py`.
63 changes: 63 additions & 0 deletions examples/advanced/distributed_optimization/1-consensus/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
import random
import matplotlib.pyplot as plt
from nvflare.job_config.api import FedJob

from nvflare.app_opt.p2p.controllers import P2PAlgorithmController
from nvflare.app_opt.p2p.executors import ConsensusExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network


class CustomConsensusExecutor(ConsensusExecutor):
def __init__(self):
super().__init__(initial_value=random.randint(0, 10))


if __name__ == "__main__":
# Create job
job = FedJob(name="consensus")

# generate random config
num_clients = 6
network, _ = generate_random_network(num_clients=num_clients)
config = Config(network=network, extra={"iterations": 50})

# send controller to server
controller = P2PAlgorithmController(config=config)
job.to_server(controller)

# Add clients
for i in range(num_clients):
executor = CustomConsensusExecutor()
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run("./tmp/runs/consensus")

history = {
f"site-{i + 1}": torch.load(
f"tmp/runs/consensus/site-{i + 1}/value_sequence.pt"
)
for i in range(num_clients)
}
plt.figure()
for i in range(num_clients):
plt.plot(history[f"site-{i + 1}"], label=f"site-{i + 1}")
plt.legend()
plt.title("Evolution of local values")
plt.show()
766 changes: 766 additions & 0 deletions examples/advanced/distributed_optimization/1-consensus/tutorial.ipynb

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions examples/advanced/distributed_optimization/2-two_moons/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Distributed classification - two moons dataset

In this example we consider the simple [two moons](https://scikit-learn.org/dev/modules/generated/sklearn.datasets.make_moons.html) classification problem and compare different distributed optimization algorithms:
- Distributed gradient descent
- Gradient tracking
- GTAdam

We run all the algorithms with 6 clients, for 1000 iterations and with a stepsize of 0.01. These common parameters can be changed in the `config.py` file.

The models and datasets are stored in `utils.py` and are the same for all algorithms.

## Distributed gradient descent
```
python launcher_dgd.py
```
![dgd](dgd_results.png)

## Gradient tracking
```
python launcher_gt.py
```
![gt](gt_results.png)

## GTAdam
```
python launcher_gtadam.py
```
![gtadam](gtadam_results.png)
16 changes: 16 additions & 0 deletions examples/advanced/distributed_optimization/2-two_moons/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
NUM_CLIENTS = 6
ITERATIONS = 1000
STEPSIZE = 0.01
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from config import ITERATIONS, NUM_CLIENTS, STEPSIZE
from utils import NeuralNetwork, get_dataloaders, plot_results

from nvflare.app_opt.p2p.controllers import P2PAlgorithmController
from nvflare.app_opt.p2p.executors import DGDExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob


class CustomDGDExecutor(DGDExecutor):
def __init__(self, data_seed: int | None = None):
self._data_seed = data_seed
train_dataloader, test_dataloader = get_dataloaders(data_seed)
super().__init__(
model=NeuralNetwork(),
loss=torch.nn.CrossEntropyLoss(),
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
)


if __name__ == "__main__":
# Create job
job_name = "dgd"
job = FedJob(name=job_name)

# generate random config
network, _ = generate_random_network(num_clients=NUM_CLIENTS)
config = Config(
network=network,
extra={"iterations": ITERATIONS, "stepsize": STEPSIZE},
)

# send controller to server
controller = P2PAlgorithmController(config=config)
job.to_server(controller)

# Add clients
for i in range(NUM_CLIENTS):
executor = CustomDGDExecutor(data_seed=i)
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run(f"./tmp/runs/{job_name}")

# plot and save results
plot_results(job_name, NUM_CLIENTS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from config import ITERATIONS, NUM_CLIENTS, STEPSIZE
from utils import NeuralNetwork, get_dataloaders, plot_results

from nvflare.app_opt.p2p.controllers import P2PAlgorithmController
from nvflare.app_opt.p2p.executors import GTExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob


class CustomGTExecutor(GTExecutor):
def __init__(self, data_seed: int | None = None):
self._data_seed = data_seed
train_dataloader, test_dataloader = get_dataloaders(data_seed)
super().__init__(
model=NeuralNetwork(),
loss=torch.nn.CrossEntropyLoss(),
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
)


if __name__ == "__main__":
# Create job
job_name = "gt"
job = FedJob(name=job_name)

# generate random config
network, _ = generate_random_network(num_clients=NUM_CLIENTS)
config = Config(
network=network, extra={"iterations": ITERATIONS, "stepsize": STEPSIZE}
)

# send controller to server
controller = P2PAlgorithmController(config=config)
job.to_server(controller)

# Add clients
for i in range(NUM_CLIENTS):
executor = CustomGTExecutor(data_seed=i)
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run(f"./tmp/runs/{job_name}")

plot_results(job_name, NUM_CLIENTS)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from config import ITERATIONS, NUM_CLIENTS, STEPSIZE
from utils import NeuralNetwork, get_dataloaders, plot_results

from nvflare.app_opt.p2p.controllers import P2PAlgorithmController
from nvflare.app_opt.p2p.executors import GTADAMExecutor
from nvflare.app_opt.p2p.types import Config
from nvflare.app_opt.p2p.utils.config_generator import generate_random_network
from nvflare.job_config.api import FedJob


class CustomGTADAMExecutor(GTADAMExecutor):
def __init__(self, data_seed: int | None = None):
self._data_seed = data_seed
train_dataloader, test_dataloader = get_dataloaders(data_seed)
super().__init__(
model=NeuralNetwork(),
loss=torch.nn.CrossEntropyLoss(),
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
)

if __name__ == "__main__":
# Create job
job_name = "gtadam"
job = FedJob(name=job_name)

# generate random config
network, _ = generate_random_network(num_clients=NUM_CLIENTS)
config = Config(
network=network,
extra={
"iterations": ITERATIONS,
"stepsize": STEPSIZE,
"beta1": 0.9,
"beta2": 0.999,
"epsilon": 1e-8,
},
)

# send controller to server
controller = P2PAlgorithmController(config=config)
job.to_server(controller)

# Add clients
for i in range(NUM_CLIENTS):
executor = CustomGTADAMExecutor(data_seed=i)
job.to(executor, f"site-{i + 1}")

# run
job.export_job("./tmp/job_configs")
job.simulator_run(f"./tmp/runs/{job_name}")

plot_results(job_name, NUM_CLIENTS)
Loading