From 305823e8a078d13d3a7433161f1b7c4360f96c60 Mon Sep 17 00:00:00 2001 From: sangcho Date: Wed, 14 Feb 2024 12:24:48 +0000 Subject: [PATCH 1/2] . --- .bazeliskrc | 1 + WORKSPACE | 6 +++--- pygloo/BUILD | 21 +++++++++++++++++++++ setup.py | 6 +++++- 4 files changed, 30 insertions(+), 4 deletions(-) create mode 100644 .bazeliskrc diff --git a/.bazeliskrc b/.bazeliskrc new file mode 100644 index 0000000..bd4a535 --- /dev/null +++ b/.bazeliskrc @@ -0,0 +1 @@ +USE_BAZEL_VERSION=5.4.1 diff --git a/WORKSPACE b/WORKSPACE index cdd3513..a0acc5e 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -67,9 +67,9 @@ http_archive( http_archive( name = "hiredis", build_file_content = all_content, - strip_prefix = "hiredis-1.0.0", - urls = ["https://github.com/redis/hiredis/archive/v1.0.0.tar.gz"], - sha256 = "2a0b5fe5119ec973a0c1966bfc4bd7ed39dbce1cb6d749064af9121fe971936f", + strip_prefix = "hiredis-1.2.0", + urls = ["https://github.com/redis/hiredis/archive/v1.2.0.tar.gz"], + sha256 = "82ad632d31ee05da13b537c124f819eb88e18851d9cb0c30ae0552084811588c", ) # gloo source code repository diff --git a/pygloo/BUILD b/pygloo/BUILD index 20c5151..3370786 100644 --- a/pygloo/BUILD +++ b/pygloo/BUILD @@ -1,5 +1,6 @@ load("@rules_foreign_cc//tools/build_defs:cmake.bzl", "cmake_external") load("@rules_foreign_cc//tools/build_defs:make.bzl", "make") +load("@rules_cc//cc:defs.bzl", "cc_library") load("@pybind11_bazel//:build_defs.bzl", "pybind_library") load("@pybind11_bazel//:build_defs.bzl", "pybind_extension") @@ -27,6 +28,26 @@ make( static_libraries = ["libhiredis.a"], ) +# cc_library( +# name = "hiredis", +# srcs = glob( +# [ +# "*.c", +# "*.h", +# ], +# exclude = +# [ +# "test.c", +# ], +# ), +# hdrs = glob([ +# "*.h", +# "adapters/*.h", +# ]), +# include_prefix = "hiredis", +# visibility = ["//visibility:public"], +# ) + cmake_external( name = "gloo", # Values to be passed as -Dkey=value on the CMake command line; diff --git a/setup.py b/setup.py index 42c4070..868c0dc 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ def bazel_invoke(invoker, cmdline, *args, **kwargs): candidates.append(os.path.join(home, ".bazel", "bin", "bazel")) result = None for i, cmd in enumerate(candidates): + print("SANG-TODO cmd", cmd) try: result = invoker([cmd] + cmdline, *args, **kwargs) break @@ -78,14 +79,17 @@ def build(): raise RuntimeError(msg) bazel_env = dict(os.environ, PYTHON3_BIN_PATH=sys.executable) + print("SANG-TODO bazel_env", bazel_env) version_info = bazel_invoke(subprocess.check_output, ["--version"]) + print("SANG-TODO bazel version: ", version_info) bazel_version_str = version_info.rstrip().decode("utf-8").split(" ", 1)[1] bazel_version_split = bazel_version_str.split(".") bazel_version_digits = [ "".join(takewhile(str.isdigit, s)) for s in bazel_version_split ] bazel_version = tuple(map(int, bazel_version_digits)) + print("SANG-TODO bazel version", bazel_version) if bazel_version < SUPPORTED_BAZEL: logger.warning("Expected Bazel version {} but found {}".format( ".".join(map(str, SUPPORTED_BAZEL)), bazel_version_str)) @@ -93,7 +97,7 @@ def build(): bazel_targets = ["//pygloo:all"] return bazel_invoke( subprocess.check_call, - ["build", "--verbose_failures", "--"] + bazel_targets, + ["build", "--verbose_failures", "--subcommands", "--"] + bazel_targets, env=bazel_env) From ff45ad9b89bbc930d45e96b31fdd7a1fc4d734e2 Mon Sep 17 00:00:00 2001 From: sangcho Date: Tue, 20 Feb 2024 07:54:02 +0000 Subject: [PATCH 2/2] done --- pygloo/BUILD | 20 ---------- pygloo/include/collective.h | 12 ++++++ pygloo/include/future.h | 34 ++++++++++++++++ pygloo/main.cc | 15 +++++++ pygloo/src/future.cc | 24 +++++++++++ pygloo/src/irecv.cc | 58 +++++++++++++++++++++++++++ pygloo/src/isend.cc | 58 +++++++++++++++++++++++++++ pygloo/src/recv.cc | 28 +++++++------ pygloo/src/send.cc | 28 +++++++------ setup.py | 7 ++++ tests/test_isend_irecv.py | 80 +++++++++++++++++++++++++++++++++++++ tests/test_send_recv.py | 2 +- 12 files changed, 321 insertions(+), 45 deletions(-) create mode 100644 pygloo/include/future.h create mode 100644 pygloo/src/future.cc create mode 100644 pygloo/src/irecv.cc create mode 100644 pygloo/src/isend.cc create mode 100644 tests/test_isend_irecv.py diff --git a/pygloo/BUILD b/pygloo/BUILD index 3370786..cb8d083 100644 --- a/pygloo/BUILD +++ b/pygloo/BUILD @@ -28,26 +28,6 @@ make( static_libraries = ["libhiredis.a"], ) -# cc_library( -# name = "hiredis", -# srcs = glob( -# [ -# "*.c", -# "*.h", -# ], -# exclude = -# [ -# "test.c", -# ], -# ), -# hdrs = glob([ -# "*.h", -# "adapters/*.h", -# ]), -# include_prefix = "hiredis", -# visibility = ["//visibility:public"], -# ) - cmake_external( name = "gloo", # Values to be passed as -Dkey=value on the CMake command line; diff --git a/pygloo/include/collective.h b/pygloo/include/collective.h index daa7a5a..0a03798 100644 --- a/pygloo/include/collective.h +++ b/pygloo/include/collective.h @@ -1,3 +1,5 @@ +#include + #include #include #include @@ -95,10 +97,20 @@ void gather_wrapper(const std::shared_ptr &context, glooDataType_t datatype, int root = 0, uint32_t tag = 0); void send_wrapper(const std::shared_ptr &context, + intptr_t sendbuf, size_t size, glooDataType_t datatype, + int peer, uint32_t tag = 0, + std::chrono::milliseconds timeout_ms = std::chrono::milliseconds(0)); + +std::shared_ptr isend_wrapper(const std::shared_ptr &context, intptr_t sendbuf, size_t size, glooDataType_t datatype, int peer, uint32_t tag = 0); void recv_wrapper(const std::shared_ptr &context, + intptr_t recvbuf, size_t size, glooDataType_t datatype, + int peer, uint32_t tag = 0, + std::chrono::milliseconds timeout_ms = std::chrono::milliseconds(0)); + +std::shared_ptr irecv_wrapper(const std::shared_ptr &context, intptr_t recvbuf, size_t size, glooDataType_t datatype, int peer, uint32_t tag = 0); diff --git a/pygloo/include/future.h b/pygloo/include/future.h new file mode 100644 index 0000000..f3fd128 --- /dev/null +++ b/pygloo/include/future.h @@ -0,0 +1,34 @@ +#include +#include +#include + +namespace pygloo { +namespace future { + +using UnboundBuffer = gloo::transport::UnboundBuffer; + +enum class Op : std::uint8_t { + SEND = 0, + RECV, + UNUSED, +}; + +class Future { +public: + Future(std::unique_ptr gloo_buffer, Op op); + ~Future(); + + // Not a threadsafe. + bool Wait(std::chrono::milliseconds timeout); + +private: + /// Disable copy constructor because it needs to accept unique_ptr. + Future(const Future& other) = delete; + + /// Private Attributes. + std::unique_ptr gloo_buffer_; + Op op_; +}; + +} // namespace future +} // namespace pygloo diff --git a/pygloo/main.cc b/pygloo/main.cc index 8946434..cd5ee8a 100644 --- a/pygloo/main.cc +++ b/pygloo/main.cc @@ -1,3 +1,4 @@ +#include #include #include @@ -89,10 +90,21 @@ PYBIND11_MODULE(pygloo, m) { pybind11::arg("root") = 0, pybind11::arg("tag") = 0); m.def("send", &pygloo::send_wrapper, pybind11::arg("context") = nullptr, + pybind11::arg("sendbuf") = nullptr, pybind11::arg("size") = nullptr, + pybind11::arg("datatype") = nullptr, pybind11::arg("peer") = nullptr, + pybind11::arg("tag") = 0, pybind11::arg("timeout_ms") = 0); + + m.def("isend", &pygloo::isend_wrapper, pybind11::arg("context") = nullptr, pybind11::arg("sendbuf") = nullptr, pybind11::arg("size") = nullptr, pybind11::arg("datatype") = nullptr, pybind11::arg("peer") = nullptr, pybind11::arg("tag") = 0); + m.def("recv", &pygloo::recv_wrapper, pybind11::arg("context") = nullptr, + pybind11::arg("recvbuf") = nullptr, pybind11::arg("size") = nullptr, + pybind11::arg("datatype") = nullptr, pybind11::arg("peer") = nullptr, + pybind11::arg("tag") = 0, pybind11::arg("timeout_ms") = 0.0); + + m.def("irecv", &pygloo::irecv_wrapper, pybind11::arg("context") = nullptr, pybind11::arg("recvbuf") = nullptr, pybind11::arg("size") = nullptr, pybind11::arg("datatype") = nullptr, pybind11::arg("peer") = nullptr, pybind11::arg("tag") = 0); @@ -127,6 +139,9 @@ PYBIND11_MODULE(pygloo, m) { .def("setTimeout", &gloo::Context::setTimeout) .def("getTimeout", &gloo::Context::getTimeout); + pybind11::class_>(m, "Future") + .def("Wait", &pygloo::future::Future::Wait); + pygloo::transport::def_transport_module(m); pygloo::rendezvous::def_rendezvous_module(m); } diff --git a/pygloo/src/future.cc b/pygloo/src/future.cc new file mode 100644 index 0000000..5a6f979 --- /dev/null +++ b/pygloo/src/future.cc @@ -0,0 +1,24 @@ +#include +#include +#include "gloo/transport/unbound_buffer.h" + +namespace pygloo { +namespace future { + +Future::Future(std::unique_ptr gloo_buffer, Op op) : gloo_buffer_(std::move(gloo_buffer)), op_(op) {} + +Future::~Future() {} + +bool Future::Wait(std::chrono::milliseconds timeout) { + if (op_ == Op::SEND) { + return gloo_buffer_->waitSend(timeout); + } else if (op_ == Op::RECV) { + return gloo_buffer_->waitRecv(timeout); + } else { + // this should never happen. + assert(false); + } +} + +} // namespace future +} // namespace pygloo diff --git a/pygloo/src/irecv.cc b/pygloo/src/irecv.cc new file mode 100644 index 0000000..e7879d7 --- /dev/null +++ b/pygloo/src/irecv.cc @@ -0,0 +1,58 @@ +#include +#include + +namespace pygloo { + +template +std::shared_ptr irecv(const std::shared_ptr &context, intptr_t recvbuf, + size_t size, int peer, uint32_t tag) { + if (context->rank == peer) + throw std::runtime_error( + "peer equals to current rank. Please specify other peer values."); + + auto outputBuffer = context->createUnboundBuffer( + reinterpret_cast(recvbuf), size * sizeof(T)); + + constexpr uint8_t kSendRecvSlotPrefix = 0x09; + gloo::Slot slot = gloo::Slot::build(kSendRecvSlotPrefix, tag); + + outputBuffer->recv(peer, slot); + return std::make_shared(std::move(outputBuffer), future::Op::RECV); +} + +std::shared_ptr irecv_wrapper(const std::shared_ptr &context, + intptr_t recvbuf, size_t size, glooDataType_t datatype, + int peer, uint32_t tag) { + switch (datatype) { + case glooDataType_t::glooInt8: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooUint8: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooInt32: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooUint32: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooInt64: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooUint64: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooFloat16: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooFloat32: + return irecv(context, recvbuf, size, peer, tag); + break; + case glooDataType_t::glooFloat64: + return irecv(context, recvbuf, size, peer, tag); + break; + default: + throw std::runtime_error("Unhandled dataType"); + } +} +} // namespace pygloo diff --git a/pygloo/src/isend.cc b/pygloo/src/isend.cc new file mode 100644 index 0000000..c3b0577 --- /dev/null +++ b/pygloo/src/isend.cc @@ -0,0 +1,58 @@ +#include +#include +#include +namespace pygloo { + +template +std::shared_ptr isend(const std::shared_ptr &context, intptr_t sendbuf, + size_t size, int peer, uint32_t tag) { + if (context->rank == peer) + throw std::runtime_error( + "peer equals to current rank. Please specify other peer values."); + + auto inputBuffer = context->createUnboundBuffer( + reinterpret_cast(sendbuf), size * sizeof(T)); + + constexpr uint8_t kSendRecvSlotPrefix = 0x09; + gloo::Slot slot = gloo::Slot::build(kSendRecvSlotPrefix, tag); + + inputBuffer->send(peer, slot); + return std::make_shared(std::move(inputBuffer), future::Op::SEND); +} + +std::shared_ptr isend_wrapper(const std::shared_ptr &context, + intptr_t sendbuf, size_t size, glooDataType_t datatype, + int peer, uint32_t tag) { + switch (datatype) { + case glooDataType_t::glooInt8: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooUint8: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooInt32: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooUint32: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooInt64: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooUint64: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooFloat16: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooFloat32: + return isend(context, sendbuf, size, peer, tag); + break; + case glooDataType_t::glooFloat64: + return isend(context, sendbuf, size, peer, tag); + break; + default: + throw std::runtime_error("Unhandled dataType"); + } +} +} // namespace pygloo diff --git a/pygloo/src/recv.cc b/pygloo/src/recv.cc index 20c3d60..a59d29f 100644 --- a/pygloo/src/recv.cc +++ b/pygloo/src/recv.cc @@ -5,7 +5,7 @@ namespace pygloo { template void recv(const std::shared_ptr &context, intptr_t recvbuf, - size_t size, int peer, uint32_t tag) { + size_t size, int peer, uint32_t tag, std::chrono::milliseconds timeout_ms) { if (context->rank == peer) throw std::runtime_error( "peer equals to current rank. Please specify other peer values."); @@ -17,39 +17,43 @@ void recv(const std::shared_ptr &context, intptr_t recvbuf, gloo::Slot slot = gloo::Slot::build(kSendRecvSlotPrefix, tag); outputBuffer->recv(peer, slot); - outputBuffer->waitRecv(context->getTimeout()); + if (timeout_ms == std::chrono::milliseconds(0)) { + timeout_ms = context->getTimeout(); + } + outputBuffer->waitRecv(timeout_ms); } void recv_wrapper(const std::shared_ptr &context, intptr_t recvbuf, size_t size, glooDataType_t datatype, - int peer, uint32_t tag) { + int peer, uint32_t tag, + std::chrono::milliseconds timeout_ms) { switch (datatype) { case glooDataType_t::glooInt8: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooUint8: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooInt32: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooUint32: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooInt64: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooUint64: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooFloat16: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooFloat32: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooFloat64: - recv(context, recvbuf, size, peer, tag); + recv(context, recvbuf, size, peer, tag, timeout_ms); break; default: throw std::runtime_error("Unhandled dataType"); diff --git a/pygloo/src/send.cc b/pygloo/src/send.cc index bee9977..4c29998 100644 --- a/pygloo/src/send.cc +++ b/pygloo/src/send.cc @@ -5,7 +5,7 @@ namespace pygloo { template void send(const std::shared_ptr &context, intptr_t sendbuf, - size_t size, int peer, uint32_t tag) { + size_t size, int peer, uint32_t tag, std::chrono::milliseconds timeout_ms) { if (context->rank == peer) throw std::runtime_error( "peer equals to current rank. Please specify other peer values."); @@ -17,39 +17,43 @@ void send(const std::shared_ptr &context, intptr_t sendbuf, gloo::Slot slot = gloo::Slot::build(kSendRecvSlotPrefix, tag); inputBuffer->send(peer, slot); - inputBuffer->waitSend(context->getTimeout()); + + if (timeout_ms == std::chrono::milliseconds(0)) { + timeout_ms = context->getTimeout(); + } + inputBuffer->waitSend(timeout_ms); } void send_wrapper(const std::shared_ptr &context, intptr_t sendbuf, size_t size, glooDataType_t datatype, - int peer, uint32_t tag) { + int peer, uint32_t tag, std::chrono::milliseconds timeout_ms) { switch (datatype) { case glooDataType_t::glooInt8: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooUint8: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooInt32: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooUint32: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooInt64: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooUint64: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooFloat16: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooFloat32: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; case glooDataType_t::glooFloat64: - send(context, sendbuf, size, peer, tag); + send(context, sendbuf, size, peer, tag, timeout_ms); break; default: throw std::runtime_error("Unhandled dataType"); diff --git a/setup.py b/setup.py index 868c0dc..956a6f1 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,7 @@ import tempfile import zipfile import time +from pathlib import Path from itertools import chain from itertools import takewhile @@ -78,6 +79,12 @@ def build(): ", ".join(".".join(map(str, v)) for v in SUPPORTED_PYTHONS)) raise RuntimeError(msg) + # Delete the build file if it already exists. Otherwise, bazel will + # use the old shared object built. + build_path = Path("build") + if build_path.exists(): + shutil.rmtree(build_path) + bazel_env = dict(os.environ, PYTHON3_BIN_PATH=sys.executable) print("SANG-TODO bazel_env", bazel_env) diff --git a/tests/test_isend_irecv.py b/tests/test_isend_irecv.py new file mode 100644 index 0000000..44bee9a --- /dev/null +++ b/tests/test_isend_irecv.py @@ -0,0 +1,80 @@ +import pygloo +import numpy as np +import os +import ray +import time +import shutil +import torch +import datetime + +@ray.remote(num_cpus=1) +def test_send_recv(rank, world_size, fileStore_path): + ''' + rank # Rank of this process within list of participating processes + world_size # Number of participating processes + ''' + if rank==0: + if os.path.exists(fileStore_path): + shutil.rmtree(fileStore_path) + os.makedirs(fileStore_path) + else: time.sleep(0.5) + + context = pygloo.rendezvous.Context(rank, world_size) + + attr = pygloo.transport.tcp.attr("localhost") + # Perform rendezvous for TCP pairs + dev = pygloo.transport.tcp.CreateDevice(attr) + + fileStore = pygloo.rendezvous.FileStore(fileStore_path) + store = pygloo.rendezvous.PrefixStore(str(world_size), fileStore) + + context.connectFullMesh(store, dev) + + if rank == 0: + futures = [] + for tag in range(30): + sendbuf = np.array([[1,2,3],[1,2,3]], dtype=np.float32) + sendptr = sendbuf.ctypes.data + + # sendbuf = torch.Tensor([[1,2,3],[1,2,3]]).float() + # sendptr = sendbuf.data_ptr() + + data_size = sendbuf.size if isinstance(sendbuf, np.ndarray) else sendbuf.numpy().size + datatype = pygloo.glooDataType_t.glooFloat32 + peer = 1 + print(f"rank {rank} sends {sendbuf}. tag {tag}") + fut = pygloo.isend(context, sendptr, data_size, datatype, peer, tag) + futures.append(fut) + + for fut in futures: + fut.Wait(datetime.timedelta(seconds=30)) + print("send finished.") + + elif rank == 1: + futures = [] + for tag in range(30): + recvbuf = np.zeros((2,3), dtype=np.float32) + recvptr = recvbuf.ctypes.data + + data_size = recvbuf.size if isinstance(recvbuf, np.ndarray) else recvbuf.numpy().size + datatype = pygloo.glooDataType_t.glooFloat32 + peer = 0 + + fut = pygloo.irecv(context, recvptr, data_size, datatype, peer, tag) + futures.append(fut) + print(f"rank {rank} receives {recvbuf}. tag: {tag}") + + for fut in futures: + fut.Wait(datetime.timedelta(seconds=30)) + print("Receive done.") + else: + raise Exception("Only support 2 process to test send function and recv function") + + +if __name__ == "__main__": + ray.init(num_cpus=6) + world_size = 2 + fileStore_path = f"{ray.worker._global_node.get_session_dir_path()}" + "/collective/gloo/rendezvous" + + fns = [test_send_recv.remote(i, world_size, fileStore_path) for i in range(world_size)] + ray.get(fns) diff --git a/tests/test_send_recv.py b/tests/test_send_recv.py index 77c0758..acccb17 100644 --- a/tests/test_send_recv.py +++ b/tests/test_send_recv.py @@ -39,7 +39,7 @@ def test_send_recv(rank, world_size, fileStore_path): data_size = sendbuf.size if isinstance(sendbuf, np.ndarray) else sendbuf.numpy().size datatype = pygloo.glooDataType_t.glooFloat32 peer = 1 - pygloo.send(context, sendptr, data_size, datatype, peer) + pygloo.send(context, sendptr, data_size, datatype, peer, 0, 1l) print(f"rank {rank} sends {sendbuf}") elif rank == 1: