Skip to content

internal review #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
652da01
Xccl process group for Pytorch
Chao1Han Aug 29, 2024
0cb0016
Merge remote-tracking branch 'upstream/main' into xccl-bak
Chao1Han Sep 20, 2024
a71d69a
Align latest
Chao1Han Sep 20, 2024
af6f03c
hidden env
Chao1Han Sep 24, 2024
88bea25
refine findccl code
Chao1Han Sep 29, 2024
f6ea934
Add comments for build xccl
Chao1Han Sep 30, 2024
1226e3b
refine workxccl
Chao1Han Sep 30, 2024
d62e0be
refine timeout
Chao1Han Sep 30, 2024
714de2a
rm head
Chao1Han Sep 30, 2024
0923781
update
Chao1Han Sep 30, 2024
31d092d
minor fix
Chao1Han Oct 9, 2024
cbea299
rm duplicate code and refine cmake
Chao1Han Oct 9, 2024
ef261c6
update cmake
Chao1Han Oct 10, 2024
6c648cd
hidden xccl specific
Chao1Han Sep 24, 2024
e621fe6
fix ci fail
Chao1Han Oct 11, 2024
3f225d9
rm vir fun and modify tensor check
Chao1Han Oct 12, 2024
1138a4a
Merge branch 'xccl-bak' into xccl-bak2
Chao1Han Oct 12, 2024
8e5e78a
refine collective, getcomm
Chao1Han Oct 12, 2024
1267963
accept comments
Chao1Han Oct 12, 2024
3d55b85
rm attr
Chao1Han Oct 12, 2024
f69059a
add default ccl root dir
Chao1Han Oct 12, 2024
bed720c
update
Chao1Han Oct 12, 2024
fd44abe
update
Chao1Han Oct 12, 2024
d12b922
code refine
zhangxiaoli73 Oct 13, 2024
b57e812
minor fix
Chao1Han Oct 14, 2024
5968f0f
update
Chao1Han Oct 15, 2024
edba8aa
update
Chao1Han Oct 16, 2024
56a5e7f
Refine specific code
Chao1Han Oct 17, 2024
a062f9f
accept comments
Chao1Han Oct 17, 2024
ae90994
Merge branch 'xccl-bak' into xccl-bak2
Chao1Han Oct 17, 2024
ab04fc0
rm header and refine profilehead
Chao1Han Oct 17, 2024
4ee49fb
add get_device_count
Chao1Han Oct 17, 2024
1a2c9c2
add backendSupportsSequenceNumbers
Chao1Han Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,12 @@ ncclRedOpRAII getNcclReduceOp(
NCCL_MINOR));
break;
case ReduceOp::BAND:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BAND with NCCL");
break;
case ReduceOp::BOR:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BOR with NCCL");
break;
case ReduceOp::BXOR:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BXOR with NCCL");
C10_THROW_ERROR(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change NCCL now.

ValueError,
"Cannot use ReduceOp." + reduce_op_to_string(reduceOp) +
" with NCCL");
break;
default:
C10_THROW_ERROR(ValueError, "Unhandled ReduceOp");
Expand Down
21 changes: 9 additions & 12 deletions torch/csrc/distributed/c10d/ProcessGroupXCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,15 @@ ccl::reduction getXcclReduceOp(const ReduceOp& reduceOp, at::Tensor& input) {
return xcclOps.at(reduceOp);
} catch (const std::out_of_range&) {
switch (reduceOp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to switch

case ReduceOp::AVG:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp AVG with XCCL");
break;
case ReduceOp::BAND:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BAND with XCCL");
break;
case ReduceOp::BOR:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BOR with XCCL");
break;
case ReduceOp::BXOR:
C10_THROW_ERROR(ValueError, "Cannot use ReduceOp.BXOR with XCCL");
case c10d::ReduceOp::BAND:
case c10d::ReduceOp::BOR:
case c10d::ReduceOp::BXOR:
case c10d::ReduceOp::AVG:
case c10d::ReduceOp::PREMUL_SUM:
C10_THROW_ERROR(
ValueError,
"Cannot use ReduceOp." + reduce_op_to_string(reduceOp) +
" with XCCL");
break;
default:
C10_THROW_ERROR(ValueError, "Unhandled ReduceOp");
Expand Down Expand Up @@ -133,7 +131,6 @@ void ProcessGroupXCCL::WorkXCCL::synchronizeInternal(
auto currentTimepoint = std::chrono::steady_clock::now();
auto timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
currentTimepoint - workStartTime_);
std::chrono::milliseconds opTimeout = std::chrono::milliseconds(60000);
if (timeElapsed >= timeout) {
std::string exceptionMsg = c10::str(
"Work ran for ",
Expand Down
57 changes: 28 additions & 29 deletions torch/csrc/distributed/c10d/ProcessGroupXCCL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,34 @@ class TORCH_API ProcessGroupXCCL : public Backend {
c10::intrusive_ptr<Store> store_;
std::mutex mutex_;
bool blockingWait_ = false;

private:
XCCL_KVS kvs;
std::mutex kvs_mutex;
XCCL_KVS get_kvs(int rank, c10d::Store& store) {
std::lock_guard<std::mutex> lock(kvs_mutex);
if (kvs)
return kvs;
std::string storeKey = "xccl_kvs";
// Rank 0 broadcast the bootstrap network information to other ranks
if (rank == 0) {
kvs = ccl::create_main_kvs();
ccl::kvs::address_type main_addr = kvs->get_address();
auto ccl_kvs_addr =
std::vector<uint8_t>(main_addr.begin(), main_addr.end());
store.set(storeKey, ccl_kvs_addr);
} else {
auto ccl_kvs_addr = store.get(storeKey);
if (ccl_kvs_addr.size() != ccl::kvs::address_max_size) {
throw std::runtime_error("Unexpected ccl kvs addr from the store\n");
}
ccl::kvs::address_type main_addr;
std::copy_n(
ccl_kvs_addr.begin(), ccl::kvs::address_max_size, main_addr.begin());
kvs = ccl::create_kvs(main_addr);
}
return kvs;
}
};

namespace {
Expand Down Expand Up @@ -297,35 +325,6 @@ bool with_mpirun() {
: false;
}

XCCL_KVS kvs;
std::mutex kvs_mutex;
XCCL_KVS get_kvs(int rank, c10d::Store& store) {
std::lock_guard<std::mutex> lock(kvs_mutex);
if (kvs)
return kvs;
std::string storeKey = "xccl_kvs";

// Rank 0 broadcast the bootstrap network information to other ranks
if (rank == 0) {
kvs = ccl::create_main_kvs();
ccl::kvs::address_type main_addr = kvs->get_address();
auto ccl_kvs_addr =
std::vector<uint8_t>(main_addr.begin(), main_addr.end());
store.set(storeKey, ccl_kvs_addr);
} else {
auto ccl_kvs_addr = store.get(storeKey);
if (ccl_kvs_addr.size() != ccl::kvs::address_max_size) {
throw std::runtime_error("Unexpected ccl kvs addr from the store\n");
}
ccl::kvs::address_type main_addr;
std::copy_n(
ccl_kvs_addr.begin(), ccl::kvs::address_max_size, main_addr.begin());
kvs = ccl::create_kvs(main_addr);
}

return kvs;
}

} // namespace
} // namespace c10d

Expand Down
25 changes: 25 additions & 0 deletions torch/csrc/distributed/c10d/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,31 @@ size_t computeLengthsAndOffsets(
return offset;
}

inline std::string reduce_op_to_string(c10d::ReduceOp op) {
switch (op) {
case c10d::ReduceOp::SUM:
return "SUM";
case c10d::ReduceOp::PRODUCT:
return "PRODUCT";
case c10d::ReduceOp::MIN:
return "MIN";
case c10d::ReduceOp::MAX:
return "MAX";
case c10d::ReduceOp::BAND:
return "BAND";
case c10d::ReduceOp::BOR:
return "BOR";
case c10d::ReduceOp::BXOR:
return "BXOR";
case c10d::ReduceOp::AVG:
return "AVG";
case c10d::ReduceOp::PREMUL_SUM:
return "PREMUL_SUM";
default:
return "UNKNOWN";
}
}

using RankType = uint32_t;
using SizeType = uint64_t;

Expand Down
Loading