Skip to content

Commit 6047f16

Browse files
authored
Merge branch 'NVIDIA:main' into support_relay
2 parents 3d79c7a + f8dd354 commit 6047f16

32 files changed

+574
-200
lines changed

examples/advanced/kaplan-meier-he/km_job.py

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def main():
6363
runner = ScriptRunner(
6464
script=train_script,
6565
script_args=script_args,
66+
framework="raw",
6667
params_exchange_format="raw",
6768
launch_external_process=False,
6869
)

examples/advanced/streaming/README.md

+69-38
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,99 @@
1-
# Object Streaming Examples
1+
# Object Streaming
22

33
## Overview
4-
The examples here demonstrate how to use object streamers to send large file/objects memory efficiently.
4+
The examples here demonstrate how to use object streamers to send large objects in a memory-efficient manner.
55

6-
The object streamer uses less memory because it sends files by chunks (default chunk size is 1MB) and
7-
it sends containers entry by entry.
6+
Current default setting is to send and receive large objects in full, so extra memory will be needed and allocated to hold the received message.
7+
This works fine when the message is small, but can become a limit when model size is large, e.g. for large language models.
88

9-
For example, if you have a dict with 10 1GB entries, it will take 10GB extra space to send the dict without
10-
streaming. It only requires extra 1GB to serialize the entry using streaming.
9+
To save on memory usage, we can stream the message send / receive: when sending large objects (e.g. a dict),
10+
streamer sends containers entry by entry (e.g. one dict item each time); further, if we save the object to a file,
11+
streamer can send the file by chunks (default chunk size is 1MB).
1112

13+
Thus, the memory demand can be reduced to the size of the largest entry for container streaming; while nearly no extra memory is needed for file
14+
streaming. For example, if sending a dict with 10 1GB entries, without streaming, it will take 10GB extra space to send the dict.
15+
With container streaming, it only requires extra 1GB; and if saved to a file before sending, it only requires 1MB extra space to send the file.
16+
17+
All examples are run with NVFlare Simulator via [JobAPI](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html).
1218
## Concepts
1319

1420
### Object Streamer
15-
16-
ObjectStreamer is a base class to stream an object piece by piece. The `StreamableEngine` built in the NVFlare can
21+
ObjectStreamer is the base class to stream an object piece by piece. The `StreamableEngine` built in the NVFlare can
1722
stream any implementations of ObjectSteamer
1823

19-
Following implementations are included in NVFlare,
24+
The following implementations are included in NVFlare,
2025

21-
* `FileStreamer`: It can be used to stream a file
22-
* `ContainerStreamer`: This class can stream a container entry by entry. Currently, dict, list and set are supported
26+
* `ContainerStreamer`: This class is used to stream a container entry by entry. Currently, dict, list and set are supported
27+
* `FileStreamer`: This class is used to stream a file
2328

24-
The container streamer can only stream the top level entries. All the sub entries of a top entry are sent at once with
25-
the top entry.
29+
Note that the container streamer split the stream by the top level entries. All the sub entries of a top entry are expected to be
30+
sent as a whole, therefore the memory is determined by the largest entry at top level.
2631

2732
### Object Retriever
28-
29-
`ObjectRetriever` is designed to request an object to be streamed from a remote site. It automatically sets up the streaming
33+
Building upon the streamers, `ObjectRetriever` is designed for easier integration with existing code: to request an object to be streamed from a remote site. It automatically sets up the streaming
3034
on both ends and handles the coordination.
3135

32-
Currently, following implementations are available,
33-
34-
* `FileRetriever`: It's used to retrieve a file from remote site using FileStreamer.
35-
* `ContainerRetriever`: This class can be used to retrieve a container from remote site using ContainerStreamer.
36+
Similarly, the following implementations are available,
3637

37-
To use ContainerRetriever, the container must be given a name and added on the sending site,
38+
* `ContainerRetriever`: This class is used to retrieve a container from remote site using `ContainerStreamer`.
39+
* `FileRetriever`: This class is used to retrieve a file from remote site using `FileStreamer`.
3840

41+
Note that to use ContainerRetriever, the container must be given a name and added on the sending site,
3942
```
4043
ContainerRetriever.add_container("model", model_dict)
4144
```
4245

43-
## Example Jobs
46+
## Simple Examples
47+
First, we demonstrate how to use the Streamer directly without Retriever:
48+
```commandline
49+
python simple_file_streaming_job.py
50+
```
51+
Note that in this example, the file streaming is relatively "standalone", as the `FileReceiver` and `FileSender`
52+
are used directly as components, and no training workflow is used - as executor is required by NVFlare, here we used
53+
a dummy executor.
54+
55+
Although the file streaming is simple, it is not very practical for real-world applications, because
56+
in most cases, rather than standalone, we need to send an object when it is generated at certain point in the workflow. In such cases,
57+
Retriever is more convenient to use:
58+
```commandline
59+
python simple_dict_streaming_job.py
60+
```
61+
In this second example, the `ContainerRetriever` is setup in both server and client, and will automatically handle the streaming.
62+
It couples closely with the workflow, and is easier to define what to send and where to retrieve.
63+
64+
## Full-scale Examples and Comparisons
65+
The above two simple examples illustrated the basic usage of streaming with random small messages. In the following,
66+
we will demonstrate how to use the streamer with Retriever in a workflow with real large language model object,
67+
and compare the memory usage with and without streaming. To track the memory usage, we use a simple script `utils/log_memory.sh`.
68+
Note that the tracked usage is not fully accurate, but it is sufficient to give us a rough idea.
69+
70+
All three settings: regular, container streaming, and file streaming, are integrated in the same script to avoid extra variabilities.
71+
To run the examples:
72+
```commandline
73+
bash regular_transmission.sh
74+
```
75+
```commandline
76+
bash container_stream.sh
77+
```
78+
```commandline
79+
bash file_stream.sh
80+
```
4481

45-
### file_streaming job
82+
We then examine the memory usage by comparing the peak memory usage of the three settings. The results are shown below,
83+
note that the numbers here are the results of one experiment on one machine, and can be highly variable depending on the system and the environment.
84+
85+
| Setting | Peak Memory Usage (MB) | Job Finishing Time (s) |
86+
| --- | --- | --- |
87+
| Regular Transmission | 42,427 | 47
88+
| Container Streaming | 23,265 | 50
89+
| File Streaming | 19,176 | 170
90+
91+
As shown, the memory usage is significantly reduced by using streaming, especially for file streaming,
92+
while file streaming takes much longer time to finish the job.
4693

47-
This job uses the FileStreamer object to send a large file from server to client.
4894

49-
It demonstrates following mechanisms:
50-
1. It uses components to handle the file transferring. No training workflow is used.
51-
Since executor is required by NVFlare, a dummy executor is created.
52-
2. It shows how to use the streamer directly without an object retriever.
5395

54-
The job creates a temporary file to test. You can run the job in POC or using simulator as follows,
5596

56-
```
57-
nvflare simulator -n 1 -t 1 jobs/file_streaming
58-
```
59-
### dict_streaming job
6097

61-
This job demonstrate how to send a dict from server to client using object retriever.
6298

63-
It creates a task called "retrieve_dict" to tell client to get ready for the streaming.
6499

65-
The example can be run in simulator like this,
66-
```
67-
nvflare simulator -n 1 -t 1 jobs/dict_streaming
68-
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pkill -9 python
2+
bash utils/log_memory.sh >>/tmp/nvflare/workspace/container.txt &
3+
python streaming_job.py --retriever_mode container
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pkill -9 python
2+
bash utils/log_memory.sh >>/tmp/nvflare/workspace/file.txt &
3+
python streaming_job.py --retriever_mode file

examples/advanced/streaming/jobs/dict_streaming/app/config/config_fed_client.json

-23
This file was deleted.

examples/advanced/streaming/jobs/dict_streaming/app/config/config_fed_server.json

-20
This file was deleted.

examples/advanced/streaming/jobs/dict_streaming/app/custom/__init__.py

-13
This file was deleted.

examples/advanced/streaming/jobs/dict_streaming/meta.json

-10
This file was deleted.

examples/advanced/streaming/jobs/file_streaming/app/config/config_fed_client.json

-23
This file was deleted.

examples/advanced/streaming/jobs/file_streaming/app/config/config_fed_server.json

-19
This file was deleted.

examples/advanced/streaming/jobs/file_streaming/app/custom/__init__.py

-13
This file was deleted.

examples/advanced/streaming/jobs/file_streaming/meta.json

-10
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
pkill -9 python
2+
mkdir /tmp/nvflare/workspace/
3+
bash utils/log_memory.sh >>/tmp/nvflare/workspace/regular.txt &
4+
python streaming_job.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from src.simple_streaming_controller import SimpleStreamingController
16+
from src.simple_streaming_executor import SimpleStreamingExecutor
17+
18+
from nvflare import FedJob
19+
from nvflare.app_common.streamers.container_retriever import ContainerRetriever
20+
21+
22+
def main():
23+
# Create the FedJob
24+
job = FedJob(name="simple_dict_streaming", min_clients=1)
25+
26+
# Define dict_retriever component and send to both server and clients
27+
dict_retriever = ContainerRetriever()
28+
job.to_server(dict_retriever, id="dict_retriever")
29+
job.to_clients(dict_retriever, id="dict_retriever")
30+
31+
# Define the controller workflow and send to server
32+
controller = SimpleStreamingController(dict_retriever_id="dict_retriever")
33+
job.to_server(controller)
34+
35+
# Define the executor and send to clients
36+
executor = SimpleStreamingExecutor(dict_retriever_id="dict_retriever")
37+
job.to_clients(executor, tasks=["*"])
38+
39+
# Export the job
40+
job_dir = "/tmp/nvflare/workspace/jobs/simple_dict_streaming"
41+
print("job_dir=", job_dir)
42+
job.export_job(job_dir)
43+
44+
# Run the job
45+
work_dir = "/tmp/nvflare/workspace/works/simple_dict_streaming"
46+
print("workspace_dir=", work_dir)
47+
48+
# starting the monitoring
49+
job.simulator_run(work_dir, n_clients=1, threads=1)
50+
51+
52+
if __name__ == "__main__":
53+
main()

0 commit comments

Comments
 (0)