Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
97b8bdc
add dygraph parallel run interface
chenwhql Aug 7, 2020
00b56d5
polish implement & unified env property name
chenwhql Aug 7, 2020
17f7fe9
add print config arg
chenwhql Aug 10, 2020
07c86aa
refactor init_parallel_env function
chenwhql Aug 11, 2020
4c955a1
Compatible with multiprocessing and launch modes
chenwhql Aug 13, 2020
523e007
set default trainer start port
chenwhql Aug 14, 2020
8101b03
support run in python 2
chenwhql Aug 15, 2020
d3b9a06
polish python2 support code
chenwhql Aug 17, 2020
48c46ff
remove python2 support
chenwhql Aug 17, 2020
b06d400
refine launch import
chenwhql Aug 19, 2020
e1df353
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chenwhql Aug 19, 2020
2c7b3fd
polish dome design details
chenwhql Aug 19, 2020
39fddff
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chenwhql Aug 19, 2020
d26f495
refactor api implemention & path
chenwhql Aug 20, 2020
bf985cc
use new method _set_expected_place
chenwhql Aug 20, 2020
7939384
add spawn unittest framework & mnist test
chenwhql Aug 24, 2020
95c0367
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chenwhql Aug 24, 2020
04580d8
add more unittests & doc
chenwhql Aug 24, 2020
131afd4
fix unittest failed
chenwhql Aug 25, 2020
e170f10
polish english doc
chenwhql Aug 25, 2020
0ef215d
self review and polish details
chenwhql Aug 25, 2020
b27cfee
refactor code by reviewer's comments
chenwhql Aug 25, 2020
f50f343
fix unittest failed
chenwhql Aug 26, 2020
11221a8
fix parallel_env unittest
chenwhql Aug 26, 2020
0980c23
fix several typos
chenwhql Aug 26, 2020
af50518
fix error introduced when fixing typos
chenwhql Aug 27, 2020
a378140
add unpublic note for start_processes
chenwhql Aug 27, 2020
cca82b6
polish details by xiaoguang's comment
chenwhql Aug 27, 2020
82223a6
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
chenwhql Aug 27, 2020
d39331c
verify correctly when spawn nprocs=-1
chenwhql Aug 27, 2020
10df04c
resolve collective api conflict
chenwhql Aug 27, 2020
3a2d7e8
refactor spawn & init_parallel_env design
chenwhql Aug 27, 2020
0582c4b
polish doc details
chenwhql Aug 27, 2020
9ceaeff
open spawn unittests
chenwhql Aug 27, 2020
4b7d810
try to fix doc compile error
chenwhql Aug 27, 2020
4261e22
try to fix unknown doc format error
chenwhql Aug 27, 2020
cad6872
add skip unittest when not gpu
chenwhql Aug 28, 2020
377c919
resolve develop conflict
chenwhql Aug 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions python/paddle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,6 @@
from .framework import no_grad #DEFINE_ALIAS
from .framework import save #DEFINE_ALIAS
from .framework import load #DEFINE_ALIAS
from .framework import prepare_context #DEFINE_ALIAS
from .framework import ParallelEnv #DEFINE_ALIAS
from .framework import DataParallel #DEFINE_ALIAS

from .framework import NoamDecay #DEFINE_ALIAS
from .framework import PiecewiseDecay #DEFINE_ALIAS
Expand Down
18 changes: 18 additions & 0 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# start multiprocess apis
__all__ = ["spawn", "start_processes"]

# dygraph parallel apis
__all__ += [
"prepare_context", "init_parallel_env", "ParallelEnv", "DataParallel"
]

from . import start_processes
from .start_processes import spawn
from .start_processes import start_processes

from . import parallel
from .parallel import init_parallel_env
from paddle.fluid.dygraph.parallel import prepare_context #DEFINE_ALIAS
from paddle.fluid.dygraph.parallel import ParallelEnv #DEFINE_ALIAS
from paddle.fluid.dygraph.parallel import DataParallel #DEFINE_ALIAS
15 changes: 10 additions & 5 deletions python/paddle/distributed/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle
import paddle.fluid as fluid

from paddle.distributed.utils import *
import paddle.distributed.cloud_utils as cloud_utils
from paddle.distributed import cloud_utils


def _print_arguments(args):
Expand Down Expand Up @@ -167,7 +165,8 @@ def get_cluster_from_args(args, selected_gpus):

def get_gpus(selected_gpus):
if selected_gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
from paddle.fluid import core
gpus_num = core.get_cuda_device_count()
selected_gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
Expand All @@ -190,7 +189,7 @@ def get_gpus(selected_gpus):
return selected_gpus


def launch(args):
def get_cluster_and_pod(args):
# parse arguments, used for cloud-single-machine and local
selected_gpus = get_gpus(args.selected_gpus)
trainers_num = cloud_utils.get_trainers_num()
Expand All @@ -209,6 +208,12 @@ def launch(args):
cluster, pod = get_cluster_from_args(args, selected_gpus)
logger.info("get cluster from args:{}".format(cluster))

return cluster, pod


def launch(args):
cluster, pod = get_cluster_and_pod(args)

procs = start_local_trainers(
cluster,
pod,
Expand Down
297 changes: 297 additions & 0 deletions python/paddle/distributed/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except jin compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import six

from paddle import compat as cpt
from paddle.distributed.launch import get_cluster_and_pod, _print_arguments

# deprecated module import
from paddle.fluid import core
from paddle.fluid.framework import _set_expected_place
from paddle.fluid.dygraph import parallel_helper
from paddle.fluid.dygraph.parallel import ParallelEnv

__all__ = ["init_parallel_env"]

ParallelStrategy = core.ParallelStrategy


# NOTE(chenweihang): The existence of this class leads to
# the maintenance of two arguments. When the launch.py arguments
# is updated, the arguments here also need to be updated,
# but I have not thought of a better way here
class ParallelEnvArgs(object):
def __init__(self):
# Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..
self.cluster_node_ips = None

# The current node ip.
self.node_ip = None

# wheter to use paddlecloud platform to run your multi-process job.
# If false, no need to set this argument.
self.use_paddlecloud = None

# The trainer's started port on a single node
self.started_port = None

# Print the config or not
self.print_config = True

# It's for gpu training and the training process will run
# on the selected_gpus, each process is bound to a single GPU.
# And if it's not set, this module will use all the gpu cards
# for training.
self.selected_gpus = None


def _update_env_vars(rank, options):
# 1. input check
if not isinstance(rank, six.integer_types):
raise TypeError("input `rank` type error, expected type is integer, "
"but received type is %s." % type(rank))
if rank < 0:
raise ValueError("input `rank` should be greater than 0, "
"but received %d." % rank)

# 2. check and prepare environment variables
# The necessary environment variables include:
# - PADDLE_TRAINER_ID
# - PADDLE_TRAINERS_NUM
# - PADDLE_CURRENT_ENDPOINT
# - PADDLE_TRAINER_ENDPOINTS

# get args from kwargs
args = ParallelEnvArgs()
# set default `node_ip` and `cluster_node_ips`
args.cluster_node_ips = options.get('cluster_node_ips', None)
args.node_ip = options.get('node_ip', None)
if args.cluster_node_ips is not None and args.node_ip is None:
raise ValueError("please input current node ip, "
"cannot only give `cluster_node_ips`.")
default_node_ip = os.environ.get("PADDLE_MASTER_IPADDR", None)
default_node_ip = "127.0.0.1" if default_node_ip is None else default_node_ip
if args.node_ip is None:
args.node_ip = default_node_ip
if args.cluster_node_ips is None:
args.cluster_node_ips = default_node_ip

# NOTE(chenweihang): Here should set `started_port` before
# `get_cluster_and_pod` and keep each process's started_port
# is same, see [ why need set default master info before run? ]
args.started_port = options.get('started_port', None)
if args.started_port is None:
default_port = os.environ.get("PADDLE_MASTER_PORT", None)
if default_port is None:
raise RuntimeError(
"Data parallel training start failed. If you start data parallel "
"training by `paddle.distributed.launch` module, Please ensure "
"that one of the following rules is met:\n"
" 1. Do not set `paddle.distributed.init_parallel_env` argument "
"`rank` or set it to be -1;\n"
" 2. Set `paddle.distributed.init_parallel_env` start port for "
"parallel training by `started_port=**`, e.g. started_port=6170."
)
args.started_port = int(default_port)

args.use_paddlecloud = options.get('use_paddlecloud', False)
args.print_config = options.get('print_config', True)

# set default `selected_gpus`
# TODO(chenweihang): if users gived number of `selected_gpus`
# is not equal to the spawn's nprocs, it will cause error,
# and because we remove the `proc num` argument of
# `init_parallel_env`, when above error occured, we do not
# have a good way to check, so users are not recommended to
# use this parameter, it is best to delete
args.selected_gpus = options.get('selected_gpus', None)
if args.selected_gpus is None:
args.selected_gpus = os.environ.get("PADDLE_CUDA_VISIBLE_DEVICES", None)
if args.selected_gpus is None:
raise ValueError(
"Data parallel training start failed. If you start data parallel "
"training by `paddle.distributed.launch` module, Please ensure "
"that one of the following rules is met:\n"
" 1. Do not set `paddle.distributed.init_parallel_env` argument "
"`rank` or set it to be -1;\n"
" 2. Set `paddle.distributed.init_parallel_env` selected gpus of "
"parallel training by `selected_gpus=**`, e.g. selected_gpus='0,1,2,3'."
)

# reuse code of launch.py
cluster, pod = get_cluster_and_pod(args)

# remove useless env vars
os.environ.pop("http_proxy", None)
os.environ.pop("https_proxy", None)

# update env vars
trainer = pod.get_trainer(rank)
if trainer is None:
raise RuntimeError(
"The expected trainer is not exists, its trainer rank is %d." %
rank)
proc_env = {
"FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in trainer.gpus]),
"PADDLE_TRAINER_ID": "%d" % trainer.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % trainer.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
}
# no copy, each process will hold env vars itself
os.environ.update(proc_env)

# print config
if args.print_config and rank == 0:
_print_arguments(args)


def _check_env_vars():
def _check_var_exists(var_name):
var = os.environ.get(var_name, None)
if var is None:
raise ValueError("paddle.distributed initialize error,"
"environment variable %s is needed, but not set.",
var_name)

_check_var_exists("FLAGS_selected_gpus")
_check_var_exists("PADDLE_TRAINER_ID")
_check_var_exists("PADDLE_CURRENT_ENDPOINT")
_check_var_exists("PADDLE_TRAINERS_NUM")
_check_var_exists("PADDLE_TRAINER_ENDPOINTS")


def init_parallel_env(rank=-1, backend='nccl', **options):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation of this function should be unified with paddle.distributed.launch and fleetrun in 2.0rc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx, of course!

"""
Initialize parallel training environments in dynamic mode.

Args:
rank(int, optional): Rank of current process. Default vaule is -1.
When it is the default value -1, you should use ``paddle.disstributed.launch``
module to start training, the environment variables for parallel training
are configured by ``paddle.disstributed.launch`` module.
backend(str, optional): The backend to communication between multiple devices.
Now only support ``nccl`` . Default value is ``nccl`` .
**options(dict, optional): Other initial parallel execution environment configuration options.

The following options are currently supported:

- cluster_node_ips: Paddle cluster nodes ips, such as "192.168.0.16,192.168.0.17". Default: "127.0.0.1".

- node_ip: The current node ip, such as "192.168.0.16". Default: "127.0.0.1".

- started_port: The trainer's started port on a single node, such as 6170. Default: None.

- selected_gpus: The training process will run on the selected_gpus, such as "0,1,2,3". Default: None.

- print_config: Print current parallel training config. Default: True.

- use_paddlecloud: Wheter to use paddlecloud platform to run your multi-process job. Default: False.

Returns:
ParallelStrategy

Examples:
.. code-block:: python

import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist

class LinearNet(nn.Layer):
def __init__(self):
super(LinearNet, self).__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)

def forward(self, x):
return self._linear2(self._linear1(x))

def train(rank):
# 1. enable dynamic mode
paddle.disable_static()

# 2. initialize parallel environment
strategy = dist.init_parallel_env(rank)

# 3. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = dist.DataParallel(layer, strategy)

loss_fn = nn.MSELoss()
sgd = opt.SGD(
learning_rate=0.001, parameter_list=dp_layer.parameters())

# 4. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)

loss = dp_layer.scale_loss(loss)
loss.backward()
dp_layer.apply_collective_grads()

sgd.minimize(loss)
dp_layer.clear_gradients()

if __name__ == '__main__':
dist.spawn(train, args=(), nprocs=2)
"""

# 1. input check
if not isinstance(backend, six.string_types):
raise TypeError("input `backend` type error, expected type is str, "
"but received type is %s." % type(backend))
if cpt.to_text(backend) != 'nccl':
raise ValueError(
"backend `%s` is not supported, now only supports `nccl` backend." %
backend)

# 2. update or check env
# NOTE(chenweihang): if rank is default value, users should config
# parallel environment by module `paddle.distributed.launch`,
# so here we only check the environment variables
if rank != -1:
_update_env_vars(rank, options)
else:
_check_env_vars()

# 3. init ParallelStrategy
strategy = ParallelStrategy()
if cpt.to_text(backend) == 'nccl':
strategy.nranks = ParallelEnv().nranks
strategy.local_rank = ParallelEnv().local_rank
strategy.trainer_endpoints = ParallelEnv().trainer_endpoints
strategy.current_endpoint = ParallelEnv().current_endpoint
if strategy.nranks < 2:
return
# NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode,
# users will not call `dygraph.guard` or `enable_dygraph`
# directly, if they want to switch detault place,
# they need to call a function to change default place,
# here just set correctly place to users
place = core.CUDAPlace(ParallelEnv().dev_id)
_set_expected_place(place)

# init nccl context
parallel_helper._set_parallel_ctx(
core.NCCLParallelContext(strategy, place))
parallel_helper._init_parallel_ctx()

return strategy
Loading