diff --git a/doc/source/reference/collective-communication.rst b/doc/source/reference/collective-communication.rst new file mode 100644 index 00000000..ff013899 --- /dev/null +++ b/doc/source/reference/collective-communication.rst @@ -0,0 +1,19 @@ +.. _ref_collective_communication: + +========================= +Collective communitcation +========================= + +.. autosummary:: + :toctree: generated/ + + xoscar.collective.init_process_group + xoscar.collective.new_group + xoscar.collective.reduce + xoscar.collective.allreduce + xoscar.collective.gather + xoscar.collective.allgather + xoscar.collective.scatter + xoscar.collective.reduce_scatter + xoscar.collective.alltoall + xoscar.collective.broadcast diff --git a/doc/source/reference/index.rst b/doc/source/reference/index.rst index 9d520a91..0ab66e15 100644 --- a/doc/source/reference/index.rst +++ b/doc/source/reference/index.rst @@ -9,3 +9,4 @@ API Reference actor-pool actor + collective-communication diff --git a/doc/source/user_guide/collective-communication.rst b/doc/source/user_guide/collective-communication.rst new file mode 100644 index 00000000..c0b85766 --- /dev/null +++ b/doc/source/user_guide/collective-communication.rst @@ -0,0 +1,173 @@ +.. _colletive-communication: + +========================= +Collective communitcation +========================= + +Collective communication is a global communication operation in which all processes in a process +group participate. + +xoscar supports collective communication among actors. It utilizes the Gloo backend on CPU and +the NCCL backend on GPU. You can determine which backend to use by setting the parameter ``backend`` +of function ``init_process_group`` when establishing the process group. + +To perform collective communication, you need to create an actor to invoke the relevant interfaces +for collective communication. First, you need to initialize the process group. After initializing +the process group, you can create smaller process groups within this overall process group for +collective communication. + +.. seealso:: + :ref:`ref_collective_communication` + + +Collective communication on CPU +------------------------------- +xoscar uses Gloo as backend on CPU, here is an example of how to perform a broadcast operation on CPU: + +.. code-block:: python + + from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config + from xoscar.context import get_context + from xoscar.collective.common import( + RANK_ADDRESS_ENV_KEY, + RENDEZVOUS_MASTER_IP_ENV_KEY, + RENDEZVOUS_MASTER_PORT_ENV_KEY, + ) + from xoscar.collective import ( + RankActor, + broadcast, + init_process_group, + new_group, + ) + import os + import numpy as np + import asyncio + + class WorkerActor(Actor): + def __init__(self, rank, world, *args, **kwargs): + self._rank = rank + self._world = world + + async def init_process_group(self): + os.environ[RANK_ADDRESS_ENV_KEY] = self.address + return await init_process_group(self._rank, self._world) + + async def test_broadcast(self): + root = 1 + _group = [0, 1] + sendbuf = np.zeros((2, 3), dtype=np.int64) + if self._rank == _group[root]: + sendbuf = sendbuf + self._rank + recvbuf = np.zeros_like(sendbuf, dtype=np.int64) + group = await new_group(_group) + if group is not None: + await broadcast(sendbuf, recvbuf, root=root, group_name=group) + print(np.equal(recvbuf, np.zeros_like(recvbuf) + _group[root])) + + pool = await create_actor_pool( + "127.0.0.1", + n_process=2, + envs=[ + { + RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1", + RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001", + } + ] + * 2, + ) + main_addr = pool.external_address + config = (await get_pool_config(pool.external_address)).as_dict() + all_addrs = list(config["mapping"].keys()) + all_addrs.remove(main_addr) + + async with pool: + ctx = get_context() + r0 = await ctx.create_actor(WorkerActor, 0, 2, address=all_addrs[0]) + r1 = await ctx.create_actor(WorkerActor, 1, 2, address=all_addrs[1]) + t0 = r0.init_process_group() + t1 = r1.init_process_group() + await asyncio.gather(*[t0, t1]) + + t0 = r0.test_broadcast() + t1 = r1.test_broadcast() + await asyncio.gather(*[t0, t1]) + +Collective communication on GPU +------------------------------- +xoscar uses NCCL as backend on GPU and it depends on cupy. Therefore, before using +collective communication with xOSCAR on GPU, you need to install the appropriate +version of Cupy based on your NVIDIA driver version. You can refer to https://docs.cupy.dev/en/stable/install.html#installing-cupy +for the installation steps and compatibility information. Here is an example +of how to perform a broadcast operation on GPU(2 GPUs are needed for this example): + +.. code-block:: python + + from xoscar import Actor, ActorRefType, actor_ref, create_actor_pool, get_pool_config + from xoscar.context import get_context + from xoscar.collective.common import( + RANK_ADDRESS_ENV_KEY, + RENDEZVOUS_MASTER_IP_ENV_KEY, + RENDEZVOUS_MASTER_PORT_ENV_KEY, + ) + from xoscar.collective import ( + RankActor, + broadcast, + init_process_group, + new_group, + ) + import os + import numpy as np + import asyncio + + class WorkerActor(Actor): + def __init__(self, rank, world, *args, **kwargs): + self._rank = rank + self._world = world + + async def init_process_group(self): + os.environ[RANK_ADDRESS_ENV_KEY] = self.address + return await init_process_group(self._rank, self._world, "nccl") + + async def test_broadcast(self): + import cupy as cp + + root = 1 + _group = [0, 1] + sendbuf = cp.zeros((2, 3), dtype=np.int64) + if self._rank == _group[root]: + sendbuf = sendbuf + self._rank + recvbuf = cp.zeros_like(sendbuf, dtype=np.int64) + group = await new_group(_group) + if group is not None: + await broadcast(sendbuf, recvbuf, root=root, group_name=group) + cp.testing.assert_array_equal(recvbuf, cp.zeros_like(recvbuf) + _group[root]) + + pool = await create_actor_pool( + "127.0.0.1", + n_process=2, + envs=[ + { + RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1", + RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001", + COLLECTIVE_DEVICE_ID_ENV_KEY: "0", + }, + { + RENDEZVOUS_MASTER_IP_ENV_KEY: "127.0.0.1", + RENDEZVOUS_MASTER_PORT_ENV_KEY: "25001", + COLLECTIVE_DEVICE_ID_ENV_KEY: "1", + }, + ], + ) + + config = (await get_pool_config(pool.external_address)).as_dict() + all_addrs = list(config["mapping"].keys()) + async with pool: + ctx = get_context() + r0 = await ctx.create_actor(NcclWorkerActor, 0, 2, address=all_addrs[0]) + r1 = await ctx.create_actor(NcclWorkerActor, 1, 2, address=all_addrs[1]) + t0 = r0.init_process_group() + t1 = r1.init_process_group() + await asyncio.gather(t0, t1) + t0 = r0.test_collective_np() + t1 = r1.test_collective_np() + await asyncio.gather(t0, t1) diff --git a/doc/source/user_guide/index.rst b/doc/source/user_guide/index.rst index 1cbcda7a..deedab78 100644 --- a/doc/source/user_guide/index.rst +++ b/doc/source/user_guide/index.rst @@ -8,4 +8,5 @@ User Guide :maxdepth: 2 actor - actor-pool \ No newline at end of file + actor-pool + collective-communication \ No newline at end of file diff --git a/python/xoscar/collective/core.py b/python/xoscar/collective/core.py index a2f50782..92480a6c 100644 --- a/python/xoscar/collective/core.py +++ b/python/xoscar/collective/core.py @@ -332,8 +332,7 @@ async def init_process_group( address: Optional[str] = None, ): """ - Initializes the default distributed process group, and this will also - initialize the distributed package. + Initializes the default distributed process group. Args: rank (int): Rank of the current process (it should be a @@ -347,9 +346,9 @@ async def init_process_group( ``nccl``. If the backend is not provided, then a ``gloo`` backend will be created. - device_id(int, optional): GPU ID the actor will bind, default ``None`` - If it is None and backend is gloo, it will try to get it from the environment variable COLLECTIVE_DEVICE_ID_ENV_KEY. - If the environment variable is not set either, it will return an error. + device_id(int, optional): GPU id that the actor will bind, default ``None``. + If it is ``None`` and backend is ``gloo``, it will try to get it from the environment variable ``COLLECTIVE_DEVICE_ID_ENV_KEY``. + If the environment variable is not set either, it will return an error. address(str, optional): actor address. default ``None`` """ @@ -394,12 +393,10 @@ async def new_group( This function requires that all processes in the main group (i.e. all processes that are part of the distributed job) enter this function, even - if they are not going to be members of the group. Additionally, groups - should be created in the same order in all processes. + if they are not going to be members of the group. Args: - ranks (list[int]): List of ranks of group members. If ``None``, will be - set to all ranks. Default is ``None``. + ranks (list[int]): List of ranks of group members. pg_options (ProcessGroupOptions, optional): process group options specifying what additional options need to be passed in during @@ -476,11 +473,9 @@ async def allreduce( the final result. Args: - send_data (Any): Input of the collective. The function - operates in-place. + send_data (Any): Input of the collective. - recv_data (Any): Output of the collective. The function - operates in-place. + recv_data (Any): Output of the collective. op (xoscar.collective.common.CollectiveReduceOp): One of the values from ``xoscar.collective.common.CollectiveReduceOp`` @@ -632,6 +627,7 @@ async def reduce_scatter( ): """ Reduces, then scatters a list of numpy or cupy data to all processes in a group. + It can be only used on linux. Args: send_data (Any): Input data. @@ -706,7 +702,7 @@ async def broadcast( stream: Optional[Any] = None, ): """ - Broadcasts the tensor to the whole group. + Broadcasts the numpy or cupy data to the whole group. data must have the same number of elements in all processes participating in the collective.