From 52ce5ce77c6368cdb4c5b287e4b8228a2dcfb901 Mon Sep 17 00:00:00 2001 From: AtlantaPepsi Date: Fri, 9 Jan 2026 15:55:46 -0600 Subject: [PATCH 1/6] nicp2p (with questions) --- src/client/Presets/NicPeerToPeer.hpp | 365 +++++++++++++++++++++++++++ src/client/Presets/Presets.hpp | 2 + src/header/TransferBench.hpp | 122 +++++++++ 3 files changed, 489 insertions(+) create mode 100644 src/client/Presets/NicPeerToPeer.hpp diff --git a/src/client/Presets/NicPeerToPeer.hpp b/src/client/Presets/NicPeerToPeer.hpp new file mode 100644 index 00000000..746aca1e --- /dev/null +++ b/src/client/Presets/NicPeerToPeer.hpp @@ -0,0 +1,365 @@ +/* +Copyright (c) Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +MemType parseMemType(std::string const memTypeIdx) { + bool isCpu = false; + int memType = 2; + if (memTypeIdx.length() >= 1) { + char firstChar = std::toupper(memTypeIdx[0]); + if (firstChar == 'G' && firstChar == 'C') { + Utils::Print("WARNING: Invalid MEM_POLICY first character '%c', using default 'G'\n", memTypeIdx[0]); + } + isCpu = firstChar == 'C'; + } + + if (memTypeIdx.length() >= 2) { + if (std::isdigit(memTypeIdx[1])) { + int level = memTypeIdx[1] - '0'; + if (level >= 0 && level <= 3) { + memType = level; + } else { + Utils::Print("WARNING: Invalid MEM_POLICY level '%c', must be 0-3, using default 2\n", memTypeIdx[1]); + } + } else { + Utils::Print("WARNING: Invalid MEM_POLICY second character '%c', using default 2\n", memTypeIdx[1]); + } + } + + return Utils::GetMemType(memType, isCpu); +} + +int NicPeerToPeerPreset(EnvVars& ev, + size_t const numBytesPerTransfer, + std::string const presetName) +{ + int numRanks = TransferBench::GetNumRanks(); + + int numDetectedNics = TransferBench::GetNumExecutors(EXE_NIC); + + // Collect env vars for this preset + //int numCpuDevices = EnvVars::GetEnvVar("NUM_CPU_DEVICES", numDetectedCpus); + //int numGpuDevices = EnvVars::GetEnvVar("NUM_GPU_DEVICES", numDetectedGpus); + int numQueuePairs = EnvVars::GetEnvVar("NUM_QUEUE_PAIRS", 1); + int useRemoteRead = EnvVars::GetEnvVar("USE_REMOTE_READ", 0); + int showFullMatrix = EnvVars::GetEnvVar("OUTPUT_FORMAT", 1); + std::string nicFilter = EnvVars::GetEnvVar("NIC_FILTER", ""); + std::string srcMemIdx = EnvVars::GetEnvVar("SRC_MEM", "G2"); + std::string dstMemIdx = EnvVars::GetEnvVar("DST_MEM", "G2"); + int rr = EnvVars::GetEnvVar("FAST_EXE", 0); + + // Parse NIC_FILTER to build list of NIC indices to use + std::vector nicIndices; + if (nicFilter.empty()) { + // No filter specified, use all detected NICs + for (int i = 0; i < numDetectedNics; i++) { + nicIndices.push_back(i); + } + } else { + // Parse comma-separated list of NIC indices or names + std::istringstream ss(nicFilter); + std::string token; + while (std::getline(ss, token, ',')) { + // Trim whitespace + token.erase(0, token.find_first_not_of(" \t")); + token.erase(token.find_last_not_of(" \t") + 1); + + // Check if token is a number (NIC index) + bool isNumber = !token.empty() && std::all_of(token.begin(), token.end(), ::isdigit); + + if (isNumber) { + int nicIdx = std::stoi(token); + if (nicIdx >= 0 && nicIdx < numDetectedNics) { + nicIndices.push_back(nicIdx); + } else { + Utils::Print("WARNING: NIC index %d out of range (0-%d), ignoring\n", nicIdx, numDetectedNics - 1); + } + } else { + // Try to match by NIC name + bool found = false; + for (int nicIdx = 0; nicIdx < numDetectedNics; nicIdx++) { + std::string nicName = TransferBench::GetExecutorName({EXE_NIC, nicIdx}); + if (nicName == token) { + nicIndices.push_back(nicIdx); + found = true; + break; + } + } + if (!found) { + Utils::Print("WARNING: NIC '%s' not found, ignoring\n", token.c_str()); + } + } + } + } + + // Parse Memtype for src/dst + MemType srcTypeActual = parseMemType(srcMemIdx); + MemType dstTypeActual = parseMemType(dstMemIdx); + + // Create a round-robin schedule for all-to-all communication + std::vector>> schedule; + if (rr) { + if (numRanks % 2 == 0) { + // Even number of ranks: use round-robin tournament scheduling + for (int round = 0; round < numRanks - 1; round++) { + std::vector> roundPairs; + for (int i = 0; i < numRanks / 2; i++) { + int rank1 = i; + int rank2 = numRanks - 1 - i; + if (round > 0) { + // Rotate all except the first rank + if (rank1 > 0) rank1 = ((rank1 - 1 + round) % (numRanks - 1)) + 1; + if (rank2 > 0) rank2 = ((rank2 - 1 + round) % (numRanks - 1)) + 1; + } + if (rank1 != rank2) { + roundPairs.push_back({rank1, rank2}); + } + } + schedule.push_back(roundPairs); + } + } else { + // Odd number of ranks: one rank sits out each round + for (int round = 0; round < numRanks; round++) { + std::vector> roundPairs; + for (int i = 0; i < numRanks / 2; i++) { + int rank1 = (round + i) % numRanks; + int rank2 = (round + numRanks - 1 - i) % numRanks; + if (rank1 != rank2) { + roundPairs.push_back({rank1, rank2}); + } + } + schedule.push_back(roundPairs); + } + } + } + + // Display EnvVars + if (Utils::RankDoesOutput()) { + ev.DisplayEnvVars(); + if (!ev.hideEnv) { + if (!ev.outputToCsv) printf("[P2P Network Related]\n"); + ev.Print("NUM_NIC_SE", numQueuePairs, "Using %d queue pairs per Transfer", numQueuePairs); + ev.Print("USE_REMOTE_READ", useRemoteRead, "Using %s as executor", useRemoteRead ? "DST" : "SRC"); + ev.Print("OUTPUT_FORMAT", showFullMatrix, "Printing results in %s format", showFullMatrix ? "full matrix" : "column"); + ev.Print("NIC_FILTER", nicFilter, "Selecting %d NICs", nicFilter.size()); + // TODO: Display filtered NICs? + ev.Print("FAST_EXE", rr, "Executing p2p node pairs in parallel"); + printf("\n"); + } + } + + // TODO: validate env vars + + TransferBench::ConfigOptions cfg = ev.ToConfigOptions(); + TransferBench::TestResults results; + + // Calculate total IB devices per rank + // TODO: assert same # of NIC all ranks + int const numNicsPerRank = nicIndices.size(); + int const numTotalNics = numNicsPerRank * numRanks; + + // Initialize output table + Utils::Print("Unidirectional copy peak bandwidth GB/s (Using Nearest NIC RDMA)\n"); + + int numRows = showFullMatrix ? 3 + numTotalNics : 1 + numTotalNics * numTotalNics; + int numCols = showFullMatrix ? numRows : 7; + int precision = 2; + Utils::TableHelper table(numRows, numCols, precision); + // Device/Memory names for table + std::vector srcExes; + std::vector dstExes; + std::vector srcMems; + std::vector dstMems; + + // Query closest device to each NIC available, store device info to a map + std::vector avgBandwidth; + //std::vector minBandwidth; + //std::vector maxBandwidth; + //std::vector stdDev; + + // Loop over all possible src+NIC/dst+NIC pairs across all ranks and collect P2P results + for (int srcRank = 0; srcRank < numRanks; srcRank++) { + for (int srcNicIdx = 0; srcNicIdx < numNicsPerRank; srcNicIdx++) { + for (int dstRank = 0; dstRank < numRanks; dstRank++) { + for (int dstNicIdx = 0; dstNicIdx < numNicsPerRank; dstNicIdx++) { + std::vector transfers(1); + + int srcNic = nicIndices[srcNicIdx]; + int dstNic = nicIndices[dstNicIdx]; + + // Determine which GPU memory to use based on NIC proximity and its info + int srcGpuIndex = TransferBench::GetClosestGpuToNic(srcNic, srcRank); + int dstGpuIndex = TransferBench::GetClosestGpuToNic(dstNic, dstRank); + + // TODO: error msg + if (srcGpuIndex == -1 || dstGpuIndex == -1) ; + transfers[0].numBytes = numBytesPerTransfer; + transfers[0].srcs.push_back({srcTypeActual, srcGpuIndex, srcRank}); + transfers[0].dsts.push_back({dstTypeActual, dstGpuIndex, dstRank}); + transfers[0].exeDevice = {EXE_NIC, (useRemoteRead ? dstGpuIndex : srcGpuIndex), (useRemoteRead ? dstRank : srcRank)}; + transfers[0].exeSubIndex = (useRemoteRead ? srcGpuIndex : dstGpuIndex); + transfers[0].numSubExecs = numQueuePairs; + + if (!TransferBench::RunTransfers(cfg, transfers, results)) { + for (auto const& err : results.errResults) + Utils::Print("%s\n", err.errMsg.c_str()); + return 1; + } + avgBandwidth.push_back(results.tfrResults[0].avgBandwidthGbPerSec); + srcExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDevice)); + dstExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDstDevice)); + + srcMems.push_back(srcGpuIndex); + dstMems.push_back(dstGpuIndex); + } + } + } + } + + // Draw table outlines + table.DrawRowBorder(0); + table.DrawColBorder(0); + table.DrawColBorder(numCols); + table.DrawRowBorder(numRows); + + // Rendering table + if (showFullMatrix) { + table.Set(0, 0, useRemoteRead ? "SRC\\DST+EXE " : "SRC+EXE\\DST "); + table.DrawRowBorder(1); + table.DrawColBorder(1); + table.Set(1, 1, " NIC Device "); + table.Set(2, 2, " Mem Device "); + int rowIdx = 3; + int entryIdx = 0; + + for (int rank = 0; rank < numRanks; rank++) { + table.DrawRowBorder(rowIdx); + table.DrawColBorder(rowIdx); + table.Set(rowIdx, 0, " Rank %02d ", rank); + table.Set(0, rowIdx, " Rank %02d ", rank); + for (int nic = 0; nic < numNicsPerRank; nic++) { + table.Set(rowIdx, 1, " %s ", srcExes[entryIdx].c_str()); + table.Set(rowIdx, 2, " GPU %02d ", srcMems[entryIdx]); + table.Set(1, rowIdx, " %s ", dstExes[rowIdx - 3].c_str()); + table.Set(2, rowIdx, " GPU %02d ", dstMems[rowIdx - 3]); + int colIdx = 3; + for (int dstRank = 0; dstRank < numRanks; dstRank++) { + for (int dstNic = 0; dstNic < numNicsPerRank; dstNic++) { + table.Set(rowIdx, colIdx++ , " %.2f ", avgBandwidth[entryIdx++]); + } + } + rowIdx++; + } + } + } else { + table.Set(0, 0, " SRC Rank "); + table.Set(0, 1, " SRC NIC "); + table.Set(0, 2, " SRC MEM "); + table.Set(0, 3, " DST Rank "); + table.Set(0, 4, " DST NIC "); + table.Set(0, 5, " DST MEM "); + table.Set(0, 6, " bw (GB/s) "); + table.DrawColBorder(3); + table.DrawColBorder(6); + int rowIdx = 1; + + for (int src = 0; src < numRanks; src++) { + for (int i = 0; i < numNicsPerRank; i++) { + table.DrawRowBorder(rowIdx); + for (int dst = 0; dst < numRanks; dst++) { + for (int j = 0; j < numNicsPerRank; j++) { + table.Set(rowIdx, 0, " Rank %02d ", src); + table.Set(rowIdx, 1, " %s ", srcExes[rowIdx - 1].c_str()); + table.Set(rowIdx, 2, " GPU %02d ", srcMems[rowIdx - 1]); + table.Set(rowIdx, 3, " Rank %02d ", dst); + table.Set(rowIdx, 4, " %s ", dstExes[rowIdx - 1].c_str()); + table.Set(rowIdx, 5, " GPU %02d ", dstMems[rowIdx - 1]); + table.Set(rowIdx, 6, " %.2f ", avgBandwidth[rowIdx - 1]); + rowIdx++; + } + } + } + } + } + + table.PrintTable(ev.outputToCsv, ev.showBorders); + + // Ranking fastest/slowest connection + Utils::TableHelper summaryTable(11, 6, precision); + Utils::Print("Summary of top 10 fastest/slowest connection\n"); + + summaryTable.Set(0, 0, " Fastest Bandwidth (GB/s) "); + summaryTable.Set(0, 1, " Src "); + summaryTable.Set(0, 2, " Dst "); + summaryTable.Set(0, 3, " Slowest Bandwidth (GB/s) "); + summaryTable.Set(0, 4, " Src "); + summaryTable.Set(0, 5, " Dst "); + + for (int i = 0; i <= 11; i++) summaryTable.DrawRowBorder(i); + for (int i = 0; i <= 6; i++) summaryTable.DrawColBorder(i); + + std::vector idx(avgBandwidth.size()); + std::iota(idx.begin(), idx.end(), 0); + std::sort(idx.begin(), idx.end(), [&](size_t i1, size_t i2) {return avgBandwidth[i1] > avgBandwidth[i2];}); + for (int i = 0; i < 10; i++) { + int index = idx[i]; + int dstNicIdx = index % numNicsPerRank; + index /= numNicsPerRank; + + int dstRank = index % numRanks; + index /= numRanks; + + int srcNicIdx = index % numNicsPerRank; + index /= numNicsPerRank; + + int srcRank = index; + + summaryTable.Set(1 + i, 1, " R%02d:%s ", srcRank, srcExes[idx[i]].c_str()); + summaryTable.Set(1 + i, 2, " R%02d:%s ", dstRank, dstExes[idx[i]].c_str()); + summaryTable.Set(1 + i, 0, " %.2f ", avgBandwidth[idx[i]]); + + index = idx[idx.size() - 1 - i]; + dstNicIdx = index % numNicsPerRank; + index /= numNicsPerRank; + + dstRank = index % numRanks; + index /= numRanks; + + srcNicIdx = index % numNicsPerRank; + index /= numNicsPerRank; + + srcRank = index; + + summaryTable.Set(1 + i, 4, " R%02d:%s ", srcRank, srcExes[idx[idx.size() - 1 - i]].c_str()); + summaryTable.Set(1 + i, 5, " R%02d:%s ", dstRank, dstExes[idx[idx.size() - 1 - i]].c_str()); + summaryTable.Set(1 + i, 3, " %.2f ", avgBandwidth[idx[idx.size() - 1 - i]]); + } + summaryTable.PrintTable(ev.outputToCsv, ev.showBorders); + +/* + if (!ev.outputToCsv && avgCount > 0) { + Utils::Print("\n"); + } +*/ + return 0; +} + + diff --git a/src/client/Presets/Presets.hpp b/src/client/Presets/Presets.hpp index 34361f9b..4156c34f 100644 --- a/src/client/Presets/Presets.hpp +++ b/src/client/Presets/Presets.hpp @@ -32,6 +32,7 @@ THE SOFTWARE. #include "AllToAllSweep.hpp" #include "HealthCheck.hpp" #include "NicRings.hpp" +#include "NicPeerToPeer.hpp" #include "OneToAll.hpp" #include "PeerToPeer.hpp" #include "Scaling.hpp" @@ -49,6 +50,7 @@ std::map> presetFuncMap = {"a2asweep", {AllToAllSweepPreset, "Test GFX-based all-to-all transfers swept across different CU and GFX unroll counts"}}, {"healthcheck", {HealthCheckPreset, "Simple bandwidth health check (MI300X series only)"}}, {"nicrings", {NicRingsPreset, "Tests NIC rings created across identical NIC indices across ranks"}}, + {"nicp2p", {NicPeerToPeerPreset, "Multi-node peer-to-peer bandwidth test using Nearest NIC RDMA transfers"}}, {"one2all", {OneToAllPreset, "Test all subsets of parallel transfers from one GPU to all others"}}, {"p2p" , {PeerToPeerPreset, "Peer-to-peer device memory bandwidth test"}}, {"rsweep", {SweepPreset, "Randomly sweep through sets of Transfers"}}, diff --git a/src/header/TransferBench.hpp b/src/header/TransferBench.hpp index 39069305..3e8dd92c 100644 --- a/src/header/TransferBench.hpp +++ b/src/header/TransferBench.hpp @@ -503,6 +503,28 @@ namespace TransferBench */ void GetClosestNicsToGpu(std::vector& nicIndices, int gpuIndex, int targetRank = -1); + + /** + * Returns the index of a GPU closest to the given NIC + * + * @param[in] nicIndex Index of the NIC to query + * @param[in] targetRank Rank to query (-1 for local rank) + * @note This function is applicable when the IBV/RDMA executor is available + * @returns GPU index closest to IB Verbs capable NIC index nicIndex, or -1 if unable to detect + */ + int GetClosestGpuToNic(int nicIndex, int targetRank); + + /** + * Returns the indices of the GPUs closest to the given NIC + * + * @param[out] gpuIndices Vector that will contain GPU indices closest to given NIC + * @param[in] nicIndex Index of the NIC to query + * @param[in] targetRank Rank to query (-1 for local rank) + * @note This function is applicable when the IBV/RDMA executor is available + * @returns GPU indices closest to NIC nicIndex, or empty if unable to detect + */ + void GetClosestGpusToNic(std::vector& nicIndices, int gpuIndex, int targetRank = -1); + /** * @returns 0-indexed rank for this process */ @@ -915,6 +937,17 @@ namespace { */ void GetClosestNicsToGpu(std::vector& nicIndices, int gpuIndex, int targetRank = -1) const; + /** + * Returns the indices of the GPUs closest to the given NIC + * + * @param[out] gpuIndices Vector that will contain GPU indices closest to given NIC + * @param[in] nicIndex Index of the NIC to query + * @param[in] targetRank Rank to query (-1 for local rank) + * @note This function is applicable when the IBV/RDMA executor is available + * @returns GPU indices closest to NIC nicIndex, or empty if unable to detect + */ + void GetClosestGpusToNic(std::vector& gpuIndices, int nicIndex, int targetRank = -1) const; + std::string GetHostname(int targetRank) const; std::string GetPpodId(int targetRank) const; int GetVpodId(int targetRank) const; @@ -977,6 +1010,7 @@ namespace { std::map closestCpuNumaToNic; std::map nicIsActive; std::map> closestNicsToGpu; + std::map> closestGpusToNic; std::map, std::string> executorName; }; @@ -5457,6 +5491,7 @@ static bool IsConfiguredGid(union ibv_gid const& gid) topo.closestCpuNumaToGpu.clear(); topo.closestCpuNumaToNic.clear(); topo.closestNicsToGpu.clear(); + topo.closestGpusToNic.clear(); memset(topo.hostname, 0, sizeof(topo.hostname)); gethostname(topo.hostname, 32); @@ -5640,6 +5675,54 @@ static bool IsConfiguredGid(union ibv_gid const& gid) assignedCount[closestIdx]++; } } + + // Compute the reverse mapping: closest GPU(s) for each NIC + // Build list of GPU bus addresses + std::vector gpuAddressList; + for (int gpuIdx = 0; gpuIdx < numGpus; gpuIdx++) { + char hipPciBusId[64]; + hipError_t err = hipDeviceGetPCIBusId(hipPciBusId, sizeof(hipPciBusId), gpuIdx); + if (err == hipSuccess) { + gpuAddressList.push_back(std::string(hipPciBusId)); + } else { + gpuAddressList.push_back(""); + } + } + + // Loop over each NIC to find the closest GPU(s) based on PCIe address + for (int nicIndex = 0; nicIndex < numNics; nicIndex++) { + if (!ibvDeviceList[nicIndex].hasActivePort || ibvDeviceList[nicIndex].busId.empty()) { + continue; + } + + // Find closest GPUs using LCA algorithm + std::set closestGpuIdxs = GetNearestDevicesInTree(ibvDeviceList[nicIndex].busId, gpuAddressList); + + if (closestGpuIdxs.empty()) { + // Fallback: use bus ID distance + int minDistance = std::numeric_limits::max(); + int closestIdx = -1; + + for (int gpuIdx = 0; gpuIdx < numGpus; gpuIdx++) { + if (gpuAddressList[gpuIdx].empty()) continue; + + int distance = GetBusIdDistance(ibvDeviceList[nicIndex].busId, gpuAddressList[gpuIdx]); + if (distance >= 0 && distance < minDistance) { + minDistance = distance; + closestIdx = gpuIdx; + } + } + + if (closestIdx != -1) { + topo.closestGpusToNic[nicIndex].push_back(closestIdx); + } + } else { + // Store all GPUs that are equally close + for (int idx : closestGpuIdxs) { + topo.closestGpusToNic[nicIndex].push_back(idx); + } + } + } #endif if (verbose) { @@ -5657,6 +5740,20 @@ static bool IsConfiguredGid(union ibv_gid const& gid) printf("\n"); } } +#ifdef NIC_EXEC_ENABLED + for (int nicIndex = 0; nicIndex < numNics; nicIndex++) { + printf("[INFO] Rank %03d: NIC [%02d/%02d] %s Closest GPUs:", rank, nicIndex, numNics, + ibvDeviceList[nicIndex].name.c_str()); + if (topo.closestGpusToNic[nicIndex].size() == 0) { + printf(" none"); + } else { + for (auto gpuIndex : topo.closestGpusToNic[nicIndex]) { + printf(" %d", gpuIndex); + } + } + printf("\n"); + } +#endif } } @@ -5766,6 +5863,7 @@ static bool IsConfiguredGid(union ibv_gid const& gid) SendMap(peerRank, topo.closestCpuNumaToNic); SendMap(peerRank, topo.nicIsActive); SendMap(peerRank, topo.closestNicsToGpu); + SendMap(peerRank, topo.closestGpusToNic); SendMap(peerRank, topo.executorName); }; @@ -5781,6 +5879,7 @@ static bool IsConfiguredGid(union ibv_gid const& gid) RecvMap(peerRank, topo.closestCpuNumaToNic); RecvMap(peerRank, topo.nicIsActive); RecvMap(peerRank, topo.closestNicsToGpu); + RecvMap(peerRank, topo.closestGpusToNic); RecvMap(peerRank, topo.executorName); } @@ -6049,6 +6148,16 @@ static bool IsConfiguredGid(union ibv_gid const& gid) nicIndices = rankInfo[targetRank].closestNicsToGpu.at(gpuIndex); } + void System::GetClosestGpusToNic(std::vector& gpuIndices, int nicIndex, int targetRank) const + { + gpuIndices.clear(); + if (targetRank < 0 || targetRank >= numRanks) targetRank = rank; + if (nicIndex < 0 || nicIndex >= GetNumExecutors(EXE_NIC, targetRank)) return; + if (rankInfo[targetRank].closestGpusToNic.count(nicIndex) > 0) { + gpuIndices = rankInfo[targetRank].closestGpusToNic.at(nicIndex); + } + } + std::string System::GetHostname(int targetRank) const { if (targetRank < 0 || targetRank >= numRanks) targetRank = rank; @@ -6129,6 +6238,19 @@ static bool IsConfiguredGid(union ibv_gid const& gid) System::Get().GetClosestNicsToGpu(nicIndices, gpuIndex, targetRank); } + int GetClosestGpuToNic(int nicIndex, int targetRank) + { + std::vector gpuIndices; + System::Get().GetClosestGpusToNic(gpuIndices, nicIndex, targetRank); + if (gpuIndices.size() == 0) return -1; + return gpuIndices[0]; + } + + void GetClosestGpusToNic(std::vector& gpuIndices, int nicIndex, int targetRank) + { + System::Get().GetClosestGpusToNic(gpuIndices, nicIndex, targetRank); + } + void GetClosestNicsToCpu(std::vector& nicIndices, int cpuIndex, int targetRank) { int numNics = GetNumExecutors(EXE_NIC, targetRank); From e49bf79a87b54b4ee62173cc9b19b2ea81307942 Mon Sep 17 00:00:00 2001 From: AtlantaPepsi Date: Mon, 12 Jan 2026 08:30:15 -0600 Subject: [PATCH 2/6] modifications --- src/client/Presets/NicPeerToPeer.hpp | 157 ++++++++++++++++++++------- 1 file changed, 119 insertions(+), 38 deletions(-) diff --git a/src/client/Presets/NicPeerToPeer.hpp b/src/client/Presets/NicPeerToPeer.hpp index 746aca1e..de5c91df 100644 --- a/src/client/Presets/NicPeerToPeer.hpp +++ b/src/client/Presets/NicPeerToPeer.hpp @@ -20,6 +20,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +// Helper functions MemType parseMemType(std::string const memTypeIdx) { bool isCpu = false; int memType = 2; @@ -47,9 +48,15 @@ MemType parseMemType(std::string const memTypeIdx) { return Utils::GetMemType(memType, isCpu); } +int GetClosestDeviceToNic(MemType memType, int nicIdx, int rank) { + return TransferBench::IsCpuMemType(memType) ? + TransferBench::GetClosestCpuNumaToNic(nicIdx, rank) : + TransferBench::GetClosestGpuToNic(nicIdx, rank); +} + int NicPeerToPeerPreset(EnvVars& ev, - size_t const numBytesPerTransfer, - std::string const presetName) + size_t const numBytesPerTransfer, + std::string const presetName) { int numRanks = TransferBench::GetNumRanks(); @@ -131,6 +138,7 @@ int NicPeerToPeerPreset(EnvVars& ev, } if (rank1 != rank2) { roundPairs.push_back({rank1, rank2}); + roundPairs.push_back({rank2, rank1}); } } schedule.push_back(roundPairs); @@ -144,11 +152,18 @@ int NicPeerToPeerPreset(EnvVars& ev, int rank2 = (round + numRanks - 1 - i) % numRanks; if (rank1 != rank2) { roundPairs.push_back({rank1, rank2}); + roundPairs.push_back({rank2, rank1}); } } schedule.push_back(roundPairs); } } + // Finally, a round where every rank does loopback + std::vector> selfRound; + for (int rank = 0; rank < numRanks; rank++) { + selfRound.push_back({rank, rank}); + } + schedule.push_back(selfRound); } // Display EnvVars @@ -161,12 +176,16 @@ int NicPeerToPeerPreset(EnvVars& ev, ev.Print("OUTPUT_FORMAT", showFullMatrix, "Printing results in %s format", showFullMatrix ? "full matrix" : "column"); ev.Print("NIC_FILTER", nicFilter, "Selecting %d NICs", nicFilter.size()); // TODO: Display filtered NICs? + // TODO: More detailed info about mem type? + ev.Print("SRC_MEM", srcMemIdx, "Source memory type"); + ev.Print("DST_MEM", dstMemIdx, "Destination memory type"); ev.Print("FAST_EXE", rr, "Executing p2p node pairs in parallel"); printf("\n"); } } // TODO: validate env vars + // TODO: assert same RR schedule TransferBench::ConfigOptions cfg = ev.ToConfigOptions(); TransferBench::TestResults results; @@ -195,40 +214,106 @@ int NicPeerToPeerPreset(EnvVars& ev, //std::vector maxBandwidth; //std::vector stdDev; - // Loop over all possible src+NIC/dst+NIC pairs across all ranks and collect P2P results - for (int srcRank = 0; srcRank < numRanks; srcRank++) { - for (int srcNicIdx = 0; srcNicIdx < numNicsPerRank; srcNicIdx++) { - for (int dstRank = 0; dstRank < numRanks; dstRank++) { + // Transfer starts + if (rr) { + // Pre-allocate result vectors for all transfer combinations + int totalTransfers = numRanks * numNicsPerRank * numRanks * numNicsPerRank; + avgBandwidth.resize(totalTransfers); + srcExes.resize(totalTransfers); + dstExes.resize(totalTransfers); + srcMems.resize(totalTransfers); + dstMems.resize(totalTransfers); + for (auto const& roundPairs : schedule) { + for (int srcNicIdx = 0; srcNicIdx < numNicsPerRank; srcNicIdx++) { for (int dstNicIdx = 0; dstNicIdx < numNicsPerRank; dstNicIdx++) { - std::vector transfers(1); - - int srcNic = nicIndices[srcNicIdx]; - int dstNic = nicIndices[dstNicIdx]; - - // Determine which GPU memory to use based on NIC proximity and its info - int srcGpuIndex = TransferBench::GetClosestGpuToNic(srcNic, srcRank); - int dstGpuIndex = TransferBench::GetClosestGpuToNic(dstNic, dstRank); - - // TODO: error msg - if (srcGpuIndex == -1 || dstGpuIndex == -1) ; - transfers[0].numBytes = numBytesPerTransfer; - transfers[0].srcs.push_back({srcTypeActual, srcGpuIndex, srcRank}); - transfers[0].dsts.push_back({dstTypeActual, dstGpuIndex, dstRank}); - transfers[0].exeDevice = {EXE_NIC, (useRemoteRead ? dstGpuIndex : srcGpuIndex), (useRemoteRead ? dstRank : srcRank)}; - transfers[0].exeSubIndex = (useRemoteRead ? srcGpuIndex : dstGpuIndex); - transfers[0].numSubExecs = numQueuePairs; + std::vector transfers; + for (auto const& pair : roundPairs) { + Transfer transfer; + int srcRank = pair.first; + int dstRank = pair.second; + + int srcNic = nicIndices[srcNicIdx]; + int dstNic = nicIndices[dstNicIdx]; + + // Determine which GPU memory/CPU NUMA to use based on NIC proximity and its info + int srcMemIndex = GetClosestDeviceToNic(srcTypeActual, srcNic, srcRank); + int dstMemIndex = GetClosestDeviceToNic(dstTypeActual, dstNic, dstRank); + + // TODO: error msg + if (srcMemIndex == -1 || dstMemIndex == -1) ; + transfer.numBytes = numBytesPerTransfer; + transfer.srcs.push_back({srcTypeActual, srcMemIndex, srcRank}); + transfer.dsts.push_back({dstTypeActual, dstMemIndex, dstRank}); + transfer.exeDevice = {EXE_NIC, (useRemoteRead ? dstMemIndex : srcMemIndex), (useRemoteRead ? dstRank : srcRank)}; + transfer.exeSubIndex = (useRemoteRead ? srcMemIndex : dstMemIndex); + transfer.numSubExecs = numQueuePairs; + + transfers.push_back(transfer); + } if (!TransferBench::RunTransfers(cfg, transfers, results)) { for (auto const& err : results.errResults) Utils::Print("%s\n", err.errMsg.c_str()); return 1; } - avgBandwidth.push_back(results.tfrResults[0].avgBandwidthGbPerSec); - srcExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDevice)); - dstExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDstDevice)); - - srcMems.push_back(srcGpuIndex); - dstMems.push_back(dstGpuIndex); + + for (size_t i = 0; i < results.tfrResults.size(); i++) { + int srcRank = transfers[i].srcs[0].memRank; + int dstRank = transfers[i].dsts[0].memRank; + + // Calculate index in table-rendering order: srcRank x srcNicIdx x dstRank x dstNicIdx + int idx = srcRank * (numNicsPerRank * numRanks * numNicsPerRank) + + srcNicIdx * (numRanks * numNicsPerRank) + + dstRank * numNicsPerRank + + dstNicIdx; + + avgBandwidth[idx] = results.tfrResults[i].avgBandwidthGbPerSec; + srcExes[idx] = TransferBench::GetExecutorName(results.tfrResults[i].exeDevice); + dstExes[idx] = TransferBench::GetExecutorName(results.tfrResults[i].exeDstDevice); + // TODO: add mem device info in transfer result? + srcMems[idx] = transfers[i].srcs[0].memIndex; + dstMems[idx] = transfers[i].dsts[0].memIndex; + + } + } + } + } + } else { + // Loop over all possible src+NIC/dst+NIC pairs across all ranks and collect P2P results + for (int srcRank = 0; srcRank < numRanks; srcRank++) { + for (int srcNicIdx = 0; srcNicIdx < numNicsPerRank; srcNicIdx++) { + for (int dstRank = 0; dstRank < numRanks; dstRank++) { + for (int dstNicIdx = 0; dstNicIdx < numNicsPerRank; dstNicIdx++) { + std::vector transfers(1); + + int srcNic = nicIndices[srcNicIdx]; + int dstNic = nicIndices[dstNicIdx]; + + // Determine which GPU memory/CPU NUMA to use based on NIC proximity and its info + int srcMemIndex = GetClosestDeviceToNic(srcTypeActual, srcNic, srcRank); + int dstMemIndex = GetClosestDeviceToNic(dstTypeActual, dstNic, dstRank); + + // TODO: error msg + if (srcMemIndex == -1 || dstMemIndex == -1) ; + transfers[0].numBytes = numBytesPerTransfer; + transfers[0].srcs.push_back({srcTypeActual, srcMemIndex, srcRank}); + transfers[0].dsts.push_back({dstTypeActual, dstMemIndex, dstRank}); + transfers[0].exeDevice = {EXE_NIC, (useRemoteRead ? dstMemIndex : srcMemIndex), (useRemoteRead ? dstRank : srcRank)}; + transfers[0].exeSubIndex = (useRemoteRead ? srcMemIndex : dstMemIndex); + transfers[0].numSubExecs = numQueuePairs; + + if (!TransferBench::RunTransfers(cfg, transfers, results)) { + for (auto const& err : results.errResults) + Utils::Print("%s\n", err.errMsg.c_str()); + return 1; + } + avgBandwidth.push_back(results.tfrResults[0].avgBandwidthGbPerSec); + srcExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDevice)); + dstExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDstDevice)); + + srcMems.push_back(srcMemIndex); + dstMems.push_back(dstMemIndex); + } } } } @@ -257,9 +342,9 @@ int NicPeerToPeerPreset(EnvVars& ev, table.Set(0, rowIdx, " Rank %02d ", rank); for (int nic = 0; nic < numNicsPerRank; nic++) { table.Set(rowIdx, 1, " %s ", srcExes[entryIdx].c_str()); - table.Set(rowIdx, 2, " GPU %02d ", srcMems[entryIdx]); + table.Set(rowIdx, 2, " %cPU %02d ", TransferBench::IsCpuMemType(srcTypeActual) ? 'C' : 'G', srcMems[entryIdx]); table.Set(1, rowIdx, " %s ", dstExes[rowIdx - 3].c_str()); - table.Set(2, rowIdx, " GPU %02d ", dstMems[rowIdx - 3]); + table.Set(2, rowIdx, " %cPU %02d ", TransferBench::IsCpuMemType(dstTypeActual) ? 'C' : 'G', dstMems[rowIdx - 3]); int colIdx = 3; for (int dstRank = 0; dstRank < numRanks; dstRank++) { for (int dstNic = 0; dstNic < numNicsPerRank; dstNic++) { @@ -288,10 +373,10 @@ int NicPeerToPeerPreset(EnvVars& ev, for (int j = 0; j < numNicsPerRank; j++) { table.Set(rowIdx, 0, " Rank %02d ", src); table.Set(rowIdx, 1, " %s ", srcExes[rowIdx - 1].c_str()); - table.Set(rowIdx, 2, " GPU %02d ", srcMems[rowIdx - 1]); + table.Set(rowIdx, 2, " %cPU %02d ", TransferBench::IsCpuMemType(srcTypeActual) ? 'C' : 'G', srcMems[rowIdx - 1]); table.Set(rowIdx, 3, " Rank %02d ", dst); table.Set(rowIdx, 4, " %s ", dstExes[rowIdx - 1].c_str()); - table.Set(rowIdx, 5, " GPU %02d ", dstMems[rowIdx - 1]); + table.Set(rowIdx, 5, " %cPU %02d ", TransferBench::IsCpuMemType(dstTypeActual) ? 'C' : 'G', dstMems[rowIdx - 1]); table.Set(rowIdx, 6, " %.2f ", avgBandwidth[rowIdx - 1]); rowIdx++; } @@ -303,6 +388,7 @@ int NicPeerToPeerPreset(EnvVars& ev, table.PrintTable(ev.outputToCsv, ev.showBorders); // Ranking fastest/slowest connection + // TODO: expand length of the list via user passed in value Utils::TableHelper summaryTable(11, 6, precision); Utils::Print("Summary of top 10 fastest/slowest connection\n"); @@ -354,11 +440,6 @@ int NicPeerToPeerPreset(EnvVars& ev, } summaryTable.PrintTable(ev.outputToCsv, ev.showBorders); -/* - if (!ev.outputToCsv && avgCount > 0) { - Utils::Print("\n"); - } -*/ return 0; } From 894aa5f7ebd50bde1619a61956c7808b14d1df37 Mon Sep 17 00:00:00 2001 From: AtlantaPepsi Date: Mon, 26 Jan 2026 09:54:28 -0600 Subject: [PATCH 3/6] fixed executor and scheduling bug, add real time msg and nic parallelism --- src/client/Presets/NicPeerToPeer.hpp | 360 ++++++++++++--------------- 1 file changed, 163 insertions(+), 197 deletions(-) diff --git a/src/client/Presets/NicPeerToPeer.hpp b/src/client/Presets/NicPeerToPeer.hpp index de5c91df..0d2320ba 100644 --- a/src/client/Presets/NicPeerToPeer.hpp +++ b/src/client/Presets/NicPeerToPeer.hpp @@ -21,6 +21,81 @@ THE SOFTWARE. */ // Helper functions + +// Returns a schedule of round robin pairing of N elements, +// each round containing n pairs, where n is between 1 to N/2 +void roundRobinGen(std::vector>>& schedule, + int N, int n = 0) { + if (n <= 0) n = N/2; + if (N <= 0 || n > N/2 || (N/2) % n != 0) // Assuming balanced load for each round + { + //... + printf("cannot create round robin schedule, falling back to serial"); + } + + // Step 1: Generate standard round-robin tournament (maximum parallelism) + std::vector>> fullSchedule; + + if (N % 2 == 0) { + // Even number of items: use round-robin tournament scheduling + for (int round = 0; round < N - 1; round++) { + std::vector> roundPairs; + std::vector> roundPairsReversed; + for (int i = 0; i < N / 2; i++) { + int item1 = i; + int item2 = N - 1 - i; + if (round > 0) { + // Rotate all except the first item + if (item1 > 0) item1 = ((item1 - 1 + round) % (N - 1)) + 1; + if (item2 > 0) item2 = ((item2 - 1 + round) % (N - 1)) + 1; + } + if (item1 != item2) { + roundPairs.push_back({item1, item2}); + roundPairsReversed.push_back({item2, item1}); + } + } + fullSchedule.push_back(roundPairs); + fullSchedule.push_back(roundPairsReversed); + } + } else { + // Odd number of items: one item sits out each round + for (int round = 0; round < N; round++) { + std::vector> roundPairs; + std::vector> roundPairsReversed; + for (int i = 0; i < N / 2; i++) { + int item1 = (round + i) % N; + int item2 = (round + N - 1 - i) % N; + if (item1 != item2) { + roundPairs.push_back({item1, item2}); + roundPairsReversed.push_back({item2, item1}); + } + } + fullSchedule.push_back(roundPairs); + fullSchedule.push_back(roundPairsReversed); + } + } + + // A loopback round + std::vector> selfRound; + for (int i = 0; i < N; i++) { + selfRound.push_back({i, i}); + } + fullSchedule.push_back(selfRound); + + // Step 2: Split each full round into sub-rounds with at most n pairs + for (auto const& fullRound : fullSchedule) { + for (size_t start = 0; start < fullRound.size(); start += n) { + std::vector> subRound; + for (size_t i = start; i < start + n && i < fullRound.size(); i++) { + subRound.push_back(fullRound[i]); + } + if (!subRound.empty()) { + schedule.push_back(subRound); + } + } + } +} + MemType parseMemType(std::string const memTypeIdx) { bool isCpu = false; int memType = 2; @@ -59,8 +134,7 @@ int NicPeerToPeerPreset(EnvVars& ev, std::string const presetName) { int numRanks = TransferBench::GetNumRanks(); - - int numDetectedNics = TransferBench::GetNumExecutors(EXE_NIC); + int numNicsPerRank = TransferBench::GetNumExecutors(EXE_NIC); // Collect env vars for this preset //int numCpuDevices = EnvVars::GetEnvVar("NUM_CPU_DEVICES", numDetectedCpus); @@ -68,104 +142,14 @@ int NicPeerToPeerPreset(EnvVars& ev, int numQueuePairs = EnvVars::GetEnvVar("NUM_QUEUE_PAIRS", 1); int useRemoteRead = EnvVars::GetEnvVar("USE_REMOTE_READ", 0); int showFullMatrix = EnvVars::GetEnvVar("OUTPUT_FORMAT", 1); - std::string nicFilter = EnvVars::GetEnvVar("NIC_FILTER", ""); std::string srcMemIdx = EnvVars::GetEnvVar("SRC_MEM", "G2"); std::string dstMemIdx = EnvVars::GetEnvVar("DST_MEM", "G2"); int rr = EnvVars::GetEnvVar("FAST_EXE", 0); - // Parse NIC_FILTER to build list of NIC indices to use - std::vector nicIndices; - if (nicFilter.empty()) { - // No filter specified, use all detected NICs - for (int i = 0; i < numDetectedNics; i++) { - nicIndices.push_back(i); - } - } else { - // Parse comma-separated list of NIC indices or names - std::istringstream ss(nicFilter); - std::string token; - while (std::getline(ss, token, ',')) { - // Trim whitespace - token.erase(0, token.find_first_not_of(" \t")); - token.erase(token.find_last_not_of(" \t") + 1); - - // Check if token is a number (NIC index) - bool isNumber = !token.empty() && std::all_of(token.begin(), token.end(), ::isdigit); - - if (isNumber) { - int nicIdx = std::stoi(token); - if (nicIdx >= 0 && nicIdx < numDetectedNics) { - nicIndices.push_back(nicIdx); - } else { - Utils::Print("WARNING: NIC index %d out of range (0-%d), ignoring\n", nicIdx, numDetectedNics - 1); - } - } else { - // Try to match by NIC name - bool found = false; - for (int nicIdx = 0; nicIdx < numDetectedNics; nicIdx++) { - std::string nicName = TransferBench::GetExecutorName({EXE_NIC, nicIdx}); - if (nicName == token) { - nicIndices.push_back(nicIdx); - found = true; - break; - } - } - if (!found) { - Utils::Print("WARNING: NIC '%s' not found, ignoring\n", token.c_str()); - } - } - } - } - // Parse Memtype for src/dst MemType srcTypeActual = parseMemType(srcMemIdx); MemType dstTypeActual = parseMemType(dstMemIdx); - // Create a round-robin schedule for all-to-all communication - std::vector>> schedule; - if (rr) { - if (numRanks % 2 == 0) { - // Even number of ranks: use round-robin tournament scheduling - for (int round = 0; round < numRanks - 1; round++) { - std::vector> roundPairs; - for (int i = 0; i < numRanks / 2; i++) { - int rank1 = i; - int rank2 = numRanks - 1 - i; - if (round > 0) { - // Rotate all except the first rank - if (rank1 > 0) rank1 = ((rank1 - 1 + round) % (numRanks - 1)) + 1; - if (rank2 > 0) rank2 = ((rank2 - 1 + round) % (numRanks - 1)) + 1; - } - if (rank1 != rank2) { - roundPairs.push_back({rank1, rank2}); - roundPairs.push_back({rank2, rank1}); - } - } - schedule.push_back(roundPairs); - } - } else { - // Odd number of ranks: one rank sits out each round - for (int round = 0; round < numRanks; round++) { - std::vector> roundPairs; - for (int i = 0; i < numRanks / 2; i++) { - int rank1 = (round + i) % numRanks; - int rank2 = (round + numRanks - 1 - i) % numRanks; - if (rank1 != rank2) { - roundPairs.push_back({rank1, rank2}); - roundPairs.push_back({rank2, rank1}); - } - } - schedule.push_back(roundPairs); - } - } - // Finally, a round where every rank does loopback - std::vector> selfRound; - for (int rank = 0; rank < numRanks; rank++) { - selfRound.push_back({rank, rank}); - } - schedule.push_back(selfRound); - } - // Display EnvVars if (Utils::RankDoesOutput()) { ev.DisplayEnvVars(); @@ -174,7 +158,6 @@ int NicPeerToPeerPreset(EnvVars& ev, ev.Print("NUM_NIC_SE", numQueuePairs, "Using %d queue pairs per Transfer", numQueuePairs); ev.Print("USE_REMOTE_READ", useRemoteRead, "Using %s as executor", useRemoteRead ? "DST" : "SRC"); ev.Print("OUTPUT_FORMAT", showFullMatrix, "Printing results in %s format", showFullMatrix ? "full matrix" : "column"); - ev.Print("NIC_FILTER", nicFilter, "Selecting %d NICs", nicFilter.size()); // TODO: Display filtered NICs? // TODO: More detailed info about mem type? ev.Print("SRC_MEM", srcMemIdx, "Source memory type"); @@ -192,11 +175,10 @@ int NicPeerToPeerPreset(EnvVars& ev, // Calculate total IB devices per rank // TODO: assert same # of NIC all ranks - int const numNicsPerRank = nicIndices.size(); int const numTotalNics = numNicsPerRank * numRanks; // Initialize output table - Utils::Print("Unidirectional copy peak bandwidth GB/s (Using Nearest NIC RDMA)\n"); + Utils::Print("Unidirectional copy peak bandwidth GB/s (NIC RDMA Using Nearest Device)\n"); int numRows = showFullMatrix ? 3 + numTotalNics : 1 + numTotalNics * numTotalNics; int numCols = showFullMatrix ? numRows : 7; @@ -210,112 +192,98 @@ int NicPeerToPeerPreset(EnvVars& ev, // Query closest device to each NIC available, store device info to a map std::vector avgBandwidth; - //std::vector minBandwidth; - //std::vector maxBandwidth; - //std::vector stdDev; - - // Transfer starts - if (rr) { - // Pre-allocate result vectors for all transfer combinations - int totalTransfers = numRanks * numNicsPerRank * numRanks * numNicsPerRank; - avgBandwidth.resize(totalTransfers); - srcExes.resize(totalTransfers); - dstExes.resize(totalTransfers); - srcMems.resize(totalTransfers); - dstMems.resize(totalTransfers); - for (auto const& roundPairs : schedule) { - for (int srcNicIdx = 0; srcNicIdx < numNicsPerRank; srcNicIdx++) { - for (int dstNicIdx = 0; dstNicIdx < numNicsPerRank; dstNicIdx++) { - std::vector transfers; - for (auto const& pair : roundPairs) { - Transfer transfer; - int srcRank = pair.first; - int dstRank = pair.second; - - int srcNic = nicIndices[srcNicIdx]; - int dstNic = nicIndices[dstNicIdx]; - - // Determine which GPU memory/CPU NUMA to use based on NIC proximity and its info - int srcMemIndex = GetClosestDeviceToNic(srcTypeActual, srcNic, srcRank); - int dstMemIndex = GetClosestDeviceToNic(dstTypeActual, dstNic, dstRank); - - // TODO: error msg - if (srcMemIndex == -1 || dstMemIndex == -1) ; - transfer.numBytes = numBytesPerTransfer; - transfer.srcs.push_back({srcTypeActual, srcMemIndex, srcRank}); - transfer.dsts.push_back({dstTypeActual, dstMemIndex, dstRank}); - transfer.exeDevice = {EXE_NIC, (useRemoteRead ? dstMemIndex : srcMemIndex), (useRemoteRead ? dstRank : srcRank)}; - transfer.exeSubIndex = (useRemoteRead ? srcMemIndex : dstMemIndex); - transfer.numSubExecs = numQueuePairs; - - transfers.push_back(transfer); - } - - if (!TransferBench::RunTransfers(cfg, transfers, results)) { - for (auto const& err : results.errResults) - Utils::Print("%s\n", err.errMsg.c_str()); - return 1; - } - - for (size_t i = 0; i < results.tfrResults.size(); i++) { - int srcRank = transfers[i].srcs[0].memRank; - int dstRank = transfers[i].dsts[0].memRank; - // Calculate index in table-rendering order: srcRank x srcNicIdx x dstRank x dstNicIdx - int idx = srcRank * (numNicsPerRank * numRanks * numNicsPerRank) - + srcNicIdx * (numRanks * numNicsPerRank) - + dstRank * numNicsPerRank - + dstNicIdx; + // Create a round-robin schedule for all-to-all communication + std::vector>> schedule; + std::vector>> nicSchedule; - avgBandwidth[idx] = results.tfrResults[i].avgBandwidthGbPerSec; - srcExes[idx] = TransferBench::GetExecutorName(results.tfrResults[i].exeDevice); - dstExes[idx] = TransferBench::GetExecutorName(results.tfrResults[i].exeDstDevice); - // TODO: add mem device info in transfer result? - srcMems[idx] = transfers[i].srcs[0].memIndex; - dstMems[idx] = transfers[i].dsts[0].memIndex; + if (rr) { + roundRobinGen(schedule, numRanks); + roundRobinGen(nicSchedule, numNicsPerRank, rr); + } else { + roundRobinGen(schedule, numRanks, 1); + roundRobinGen(nicSchedule, numNicsPerRank, 1); + } - } + int totalTransfers = numRanks * numNicsPerRank * numRanks * numNicsPerRank; + int transfersPerIt = totalTransfers / (schedule.size() * nicSchedule.size()); + int counter = 0; + double durationSec = 0; + avgBandwidth.resize(totalTransfers); + srcExes.resize(totalTransfers); + dstExes.resize(totalTransfers); + srcMems.resize(totalTransfers); + dstMems.resize(totalTransfers); + + // Execute transfers: node-level rounds -> NIC-level rounds -> node pairs + for (auto const& roundPairs : schedule) { + for (auto const& nicRoundPairs : nicSchedule) { + std::vector transfers; + auto cpuStart = std::chrono::high_resolution_clock::now(); + + for (auto const& nodePair : roundPairs) { + int srcRank = nodePair.first; + int dstRank = nodePair.second; + + for (auto const& nicPair : nicRoundPairs) { + int srcNicIdx = nicPair.first; + int dstNicIdx = nicPair.second; + + Transfer transfer; + + // Determine which GPU memory/CPU NUMA to use based on NIC proximity and its info + int srcMemIndex = GetClosestDeviceToNic(srcTypeActual, srcNicIdx, srcRank); + int dstMemIndex = GetClosestDeviceToNic(dstTypeActual, dstNicIdx, dstRank); + + // TODO: error msg + if (srcMemIndex == -1 || dstMemIndex == -1) ; + transfer.numBytes = numBytesPerTransfer; + transfer.srcs.push_back({srcTypeActual, srcMemIndex, srcRank}); + transfer.dsts.push_back({dstTypeActual, dstMemIndex, dstRank}); + transfer.exeDevice = {EXE_NIC, (useRemoteRead ? dstNicIdx : srcNicIdx), (useRemoteRead ? dstRank : srcRank)}; + transfer.exeSubIndex = (useRemoteRead ? srcNicIdx : dstNicIdx); + transfer.numSubExecs = numQueuePairs; + + transfers.push_back(transfer); } } - } - } else { - // Loop over all possible src+NIC/dst+NIC pairs across all ranks and collect P2P results - for (int srcRank = 0; srcRank < numRanks; srcRank++) { - for (int srcNicIdx = 0; srcNicIdx < numNicsPerRank; srcNicIdx++) { - for (int dstRank = 0; dstRank < numRanks; dstRank++) { - for (int dstNicIdx = 0; dstNicIdx < numNicsPerRank; dstNicIdx++) { - std::vector transfers(1); - - int srcNic = nicIndices[srcNicIdx]; - int dstNic = nicIndices[dstNicIdx]; - - // Determine which GPU memory/CPU NUMA to use based on NIC proximity and its info - int srcMemIndex = GetClosestDeviceToNic(srcTypeActual, srcNic, srcRank); - int dstMemIndex = GetClosestDeviceToNic(dstTypeActual, dstNic, dstRank); - - // TODO: error msg - if (srcMemIndex == -1 || dstMemIndex == -1) ; - transfers[0].numBytes = numBytesPerTransfer; - transfers[0].srcs.push_back({srcTypeActual, srcMemIndex, srcRank}); - transfers[0].dsts.push_back({dstTypeActual, dstMemIndex, dstRank}); - transfers[0].exeDevice = {EXE_NIC, (useRemoteRead ? dstMemIndex : srcMemIndex), (useRemoteRead ? dstRank : srcRank)}; - transfers[0].exeSubIndex = (useRemoteRead ? srcMemIndex : dstMemIndex); - transfers[0].numSubExecs = numQueuePairs; - - if (!TransferBench::RunTransfers(cfg, transfers, results)) { - for (auto const& err : results.errResults) - Utils::Print("%s\n", err.errMsg.c_str()); - return 1; - } - avgBandwidth.push_back(results.tfrResults[0].avgBandwidthGbPerSec); - srcExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDevice)); - dstExes.push_back(TransferBench::GetExecutorName(results.tfrResults[0].exeDstDevice)); - - srcMems.push_back(srcMemIndex); - dstMems.push_back(dstMemIndex); - } - } + + if (!TransferBench::RunTransfers(cfg, transfers, results)) { + for (auto const& err : results.errResults) + Utils::Print("%s\n", err.errMsg.c_str()); + return 1; + } + + counter++; + + // Store results with correct indexing + for (size_t i = 0; i < results.tfrResults.size(); i++) { + int srcRank = transfers[i].srcs[0].memRank; + int dstRank = transfers[i].dsts[0].memRank; + + auto srcExe = useRemoteRead ? results.tfrResults[i].exeDstDevice : results.tfrResults[i].exeDevice; + auto dstExe = useRemoteRead ? results.tfrResults[i].exeDevice : results.tfrResults[i].exeDstDevice; + int srcNicIdx = srcExe.exeIndex; + int dstNicIdx = dstExe.exeIndex; + + // Calculate index in table-rendering order: srcRank x srcNicIdx x dstRank x dstNicIdx + int idx = srcRank * (numNicsPerRank * numRanks * numNicsPerRank) + + srcNicIdx * (numRanks * numNicsPerRank) + + dstRank * numNicsPerRank + + dstNicIdx; + avgBandwidth[idx] = results.tfrResults[i].avgBandwidthGbPerSec; + srcExes[idx] = TransferBench::GetExecutorName(srcExe); + dstExes[idx] = TransferBench::GetExecutorName(dstExe); + // TODO: add mem device info in transfer result? + srcMems[idx] = transfers[i].srcs[0].memIndex; + dstMems[idx] = transfers[i].dsts[0].memIndex; } + + auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; + durationSec += std::chrono::duration_cast>(cpuDelta).count(); + fprintf(stderr, "Completed %d/%d pairs in %6.3fs, estimated remaining time %6.3fs.\n", + counter * transfersPerIt, totalTransfers, durationSec, + durationSec * (nicSchedule.size() * schedule.size() - counter) / counter ); } } @@ -442,5 +410,3 @@ int NicPeerToPeerPreset(EnvVars& ev, return 0; } - - From f786fba660f58ed4e01793be3cbd7126bcdcea50 Mon Sep 17 00:00:00 2001 From: Tim Hu Date: Wed, 28 Jan 2026 16:15:41 +0000 Subject: [PATCH 4/6] cleanup; adjusting EnvVar; modifying schedule --- src/client/Presets/NicPeerToPeer.hpp | 188 +++++++++++++-------------- 1 file changed, 94 insertions(+), 94 deletions(-) diff --git a/src/client/Presets/NicPeerToPeer.hpp b/src/client/Presets/NicPeerToPeer.hpp index 0d2320ba..1a670657 100644 --- a/src/client/Presets/NicPeerToPeer.hpp +++ b/src/client/Presets/NicPeerToPeer.hpp @@ -23,65 +23,84 @@ THE SOFTWARE. // Helper functions // Returns a schedule of round robin pairing of N elements, -// each round containing n pairs, where n is between 1 to N/2 -void roundRobinGen(std::vector>>& schedule, - int N, int n = 0) { - if (n <= 0) n = N/2; - if (N <= 0 || n > N/2 || (N/2) % n != 0) // Assuming balanced load for each round - { - //... - printf("cannot create round robin schedule, falling back to serial"); - } +// if parallel, each round contains N/2 pairs, otherwise serial +void RoundRobinSchedule(std::vector>>& schedule, + int N, int parallel = 0) { - // Step 1: Generate standard round-robin tournament (maximum parallelism) + // Generate standard round-robin tournament (maximum parallelism) std::vector>> fullSchedule; - if (N % 2 == 0) { - // Even number of items: use round-robin tournament scheduling - for (int round = 0; round < N - 1; round++) { - std::vector> roundPairs; - std::vector> roundPairsReversed; - for (int i = 0; i < N / 2; i++) { - int item1 = i; - int item2 = N - 1 - i; - if (round > 0) { - // Rotate all except the first item - if (item1 > 0) item1 = ((item1 - 1 + round) % (N - 1)) + 1; - if (item2 > 0) item2 = ((item2 - 1 + round) % (N - 1)) + 1; - } - if (item1 != item2) { - roundPairs.push_back({item1, item2}); - roundPairsReversed.push_back({item2, item1}); - } + // Pad odd number of ranks with a dummy round (N+1) + int paddedN = N + (N % 2 == 1); + // Round-robin tournament scheduling + for (int round = 0; round < paddedN - 1; round++) { + std::vector> roundPairs; + std::vector> roundPairsReversed; + for (int i = 0; i < paddedN / 2; i++) { + int item1 = i; + int item2 = paddedN - 1 - i; + if (round > 0) { + // Rotate all except the first item + if (item1 > 0) item1 = ((item1 - 1 + round) % (paddedN - 1)) + 1; + if (item2 > 0) item2 = ((item2 - 1 + round) % (paddedN - 1)) + 1; } - fullSchedule.push_back(roundPairs); - fullSchedule.push_back(roundPairsReversed); - } - } else { - // Odd number of items: one item sits out each round - for (int round = 0; round < N; round++) { - std::vector> roundPairs; - std::vector> roundPairsReversed; - for (int i = 0; i < N / 2; i++) { - int item1 = (round + i) % N; - int item2 = (round + N - 1 - i) % N; - if (item1 != item2) { + if (item1 != item2) { + // Ignore dummy round, its partner sits out this ronud + if (paddedN == N || (item1 != paddedN-1 && item2 != paddedN-1)){ roundPairs.push_back({item1, item2}); roundPairsReversed.push_back({item2, item1}); } } - fullSchedule.push_back(roundPairs); - fullSchedule.push_back(roundPairsReversed); } + fullSchedule.push_back(roundPairs); + fullSchedule.push_back(roundPairsReversed); } - // A loopback round + // A loopback round where all run in parallel std::vector> selfRound; for (int i = 0; i < N; i++) { selfRound.push_back({i, i}); } fullSchedule.push_back(selfRound); + if (parallel) { + schedule = std::move(fullSchedule); + } else { + // Serialize each round if needed + for (auto const& fullRound : fullSchedule) { + for (auto const& match : fullRound) { + std::vector> subRound; + subRound.push_back({match.first, match.second}); + schedule.push_back(subRound); + } + } + } +} + +// Returns a schedule for ordered 2-combination of N elements +// by pairing the list with its rotating self, +// each round contains n pairs, where 1 <= n <= N and N is divisible by n +// and an element cannot appear more than twice in a round, +void CombinationSchedule(std::vector>>& schedule, + int N, int n = 0) { + std::vector>> fullSchedule; + + if (n <= 0) n = N; + if (N <= 0 || n > N || N % n != 0) // Assuming balanced load for each round + { + n = 1; + Utils::Print("[WARN] cannot create round robin schedule, falling back to serial"); + } + + // Generate rounds of combination based on incrementing distance + for (int i = 1; i < N; i++) { + std::vector> round; + for (int j = 0; j < N; j++) { + round.push_back({j, (j+i)%N}); + } + fullSchedule.push_back(round); + } + // Step 2: Split each full round into sub-rounds with at most n pairs for (auto const& fullRound : fullSchedule) { for (size_t start = 0; start < fullRound.size(); start += n) { @@ -96,33 +115,6 @@ void roundRobinGen(std::vector>>& schedule, } } -MemType parseMemType(std::string const memTypeIdx) { - bool isCpu = false; - int memType = 2; - if (memTypeIdx.length() >= 1) { - char firstChar = std::toupper(memTypeIdx[0]); - if (firstChar == 'G' && firstChar == 'C') { - Utils::Print("WARNING: Invalid MEM_POLICY first character '%c', using default 'G'\n", memTypeIdx[0]); - } - isCpu = firstChar == 'C'; - } - - if (memTypeIdx.length() >= 2) { - if (std::isdigit(memTypeIdx[1])) { - int level = memTypeIdx[1] - '0'; - if (level >= 0 && level <= 3) { - memType = level; - } else { - Utils::Print("WARNING: Invalid MEM_POLICY level '%c', must be 0-3, using default 2\n", memTypeIdx[1]); - } - } else { - Utils::Print("WARNING: Invalid MEM_POLICY second character '%c', using default 2\n", memTypeIdx[1]); - } - } - - return Utils::GetMemType(memType, isCpu); -} - int GetClosestDeviceToNic(MemType memType, int nicIdx, int rank) { return TransferBench::IsCpuMemType(memType) ? TransferBench::GetClosestCpuNumaToNic(nicIdx, rank) : @@ -133,22 +125,34 @@ int NicPeerToPeerPreset(EnvVars& ev, size_t const numBytesPerTransfer, std::string const presetName) { + if (Utils::GetNumRankGroups() > 1) { + Utils::Print("[ERROR] NIC p2p preset can only be run across ranks that are homogenous\n"); + Utils::Print("[ERROR] Run ./TransferBench without any args to display topology information\n"); + Utils::Print("[ERROR] NIC_FILTER may also be used to limit NIC visibility\n"); + return 1; + } + int numRanks = TransferBench::GetNumRanks(); int numNicsPerRank = TransferBench::GetNumExecutors(EXE_NIC); // Collect env vars for this preset //int numCpuDevices = EnvVars::GetEnvVar("NUM_CPU_DEVICES", numDetectedCpus); //int numGpuDevices = EnvVars::GetEnvVar("NUM_GPU_DEVICES", numDetectedGpus); - int numQueuePairs = EnvVars::GetEnvVar("NUM_QUEUE_PAIRS", 1); - int useRemoteRead = EnvVars::GetEnvVar("USE_REMOTE_READ", 0); + int numQueuePairs = EnvVars::GetEnvVar("NUM_QUEUE_PAIRS", 1); + int useRemoteRead = EnvVars::GetEnvVar("USE_REMOTE_READ", 0); int showFullMatrix = EnvVars::GetEnvVar("OUTPUT_FORMAT", 1); - std::string srcMemIdx = EnvVars::GetEnvVar("SRC_MEM", "G2"); - std::string dstMemIdx = EnvVars::GetEnvVar("DST_MEM", "G2"); - int rr = EnvVars::GetEnvVar("FAST_EXE", 0); + int srcCpu = EnvVars::GetEnvVar("USE_CPU_SRC_MEM", 0); + int dstCpu = EnvVars::GetEnvVar("USE_CPU_DST_MEM", 0); + int srcMemType = EnvVars::GetEnvVar("SRC_MEM_TYPE", 2); + int dstMemType = EnvVars::GetEnvVar("DST_MEM_TYPE", 2); + int nodeParallel = EnvVars::GetEnvVar("PARALLEL_NODE", 1); + int nicParLevel = EnvVars::GetEnvVar("NIC_PARALLEL_LEVEL", numNicsPerRank); // Parse Memtype for src/dst - MemType srcTypeActual = parseMemType(srcMemIdx); - MemType dstTypeActual = parseMemType(dstMemIdx); + MemType srcTypeActual = Utils::GetMemType(srcMemType, srcCpu); + MemType dstTypeActual = Utils::GetMemType(dstMemType, dstCpu); + std::string srcTypeStr = Utils::GetMemTypeStr(srcMemType, srcCpu); + std::string dstTypeStr = Utils::GetMemTypeStr(dstMemType, dstCpu); // Display EnvVars if (Utils::RankDoesOutput()) { @@ -158,28 +162,26 @@ int NicPeerToPeerPreset(EnvVars& ev, ev.Print("NUM_NIC_SE", numQueuePairs, "Using %d queue pairs per Transfer", numQueuePairs); ev.Print("USE_REMOTE_READ", useRemoteRead, "Using %s as executor", useRemoteRead ? "DST" : "SRC"); ev.Print("OUTPUT_FORMAT", showFullMatrix, "Printing results in %s format", showFullMatrix ? "full matrix" : "column"); - // TODO: Display filtered NICs? - // TODO: More detailed info about mem type? - ev.Print("SRC_MEM", srcMemIdx, "Source memory type"); - ev.Print("DST_MEM", dstMemIdx, "Destination memory type"); - ev.Print("FAST_EXE", rr, "Executing p2p node pairs in parallel"); + ev.Print("USE_CPU_SRC_MEM", srcCpu, "Source memory is %s", srcCpu ? "CPU" : "GPU"); + ev.Print("USE_CPU_DST_MEM", dstCpu, "Destination memory is %s", dstCpu ? "CPU" : "GPU"); + ev.Print("SRC_MEM_TYPE", srcMemType, "Using %s memory (%s)", srcTypeStr.c_str(), Utils::GetAllMemTypeStr(srcCpu).c_str()); + ev.Print("DST_MEM_TYPE", dstMemType, "Using %s memory (%s)", dstTypeStr.c_str(), Utils::GetAllMemTypeStr(dstCpu).c_str()); + ev.Print("PARALLEL_NODE", nodeParallel, "Executing p2p node pairs in parallel: %s", nodeParallel ? "yes" : "no"); + ev.Print("NIC_PARALLEL_LEVEL", nicParLevel, "Between a pair of nodes, %d pairs of NIC-NIC transfers executed in parallel", nicParLevel); printf("\n"); } } // TODO: validate env vars - // TODO: assert same RR schedule TransferBench::ConfigOptions cfg = ev.ToConfigOptions(); TransferBench::TestResults results; - // Calculate total IB devices per rank - // TODO: assert same # of NIC all ranks - int const numTotalNics = numNicsPerRank * numRanks; // Initialize output table Utils::Print("Unidirectional copy peak bandwidth GB/s (NIC RDMA Using Nearest Device)\n"); + int const numTotalNics = numNicsPerRank * numRanks; int numRows = showFullMatrix ? 3 + numTotalNics : 1 + numTotalNics * numTotalNics; int numCols = showFullMatrix ? numRows : 7; int precision = 2; @@ -197,13 +199,8 @@ int NicPeerToPeerPreset(EnvVars& ev, std::vector>> schedule; std::vector>> nicSchedule; - if (rr) { - roundRobinGen(schedule, numRanks); - roundRobinGen(nicSchedule, numNicsPerRank, rr); - } else { - roundRobinGen(schedule, numRanks, 1); - roundRobinGen(nicSchedule, numNicsPerRank, 1); - } + RoundRobinSchedule(schedule, numRanks, nodeParallel); + CombinationSchedule(nicSchedule, numNicsPerRank, nicParLevel); int totalTransfers = numRanks * numNicsPerRank * numRanks * numNicsPerRank; int transfersPerIt = totalTransfers / (schedule.size() * nicSchedule.size()); @@ -235,8 +232,11 @@ int NicPeerToPeerPreset(EnvVars& ev, int srcMemIndex = GetClosestDeviceToNic(srcTypeActual, srcNicIdx, srcRank); int dstMemIndex = GetClosestDeviceToNic(dstTypeActual, dstNicIdx, dstRank); - // TODO: error msg - if (srcMemIndex == -1 || dstMemIndex == -1) ; + if (srcMemIndex == -1 || dstMemIndex == -1) { + Utils::Print("[ERROR] No proper GPU device can be found for transfer R%dN%d - R%dN%d\n", + srcRank, srcNicIdx, dstRank, dstNicIdx); + return 1; + } transfer.numBytes = numBytesPerTransfer; transfer.srcs.push_back({srcTypeActual, srcMemIndex, srcRank}); transfer.dsts.push_back({dstTypeActual, dstMemIndex, dstRank}); From 60198397ffcc599b208db5ba95bafbfe1b5896fa Mon Sep 17 00:00:00 2001 From: Tim <43156029+AtlantaPepsi@users.noreply.github.com> Date: Wed, 28 Jan 2026 12:25:41 -0600 Subject: [PATCH 5/6] Update NicPeerToPeer.hpp --- src/client/Presets/NicPeerToPeer.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/Presets/NicPeerToPeer.hpp b/src/client/Presets/NicPeerToPeer.hpp index 1a670657..6ba2f281 100644 --- a/src/client/Presets/NicPeerToPeer.hpp +++ b/src/client/Presets/NicPeerToPeer.hpp @@ -22,7 +22,7 @@ THE SOFTWARE. // Helper functions -// Returns a schedule of round robin pairing of N elements, +// Returns a schedule of round robin pairing of N elements, using Circle Method // if parallel, each round contains N/2 pairs, otherwise serial void RoundRobinSchedule(std::vector>>& schedule, int N, int parallel = 0) { From edab42292a1d7e3d8ccff284a74f45204c114e4a Mon Sep 17 00:00:00 2001 From: Tim <43156029+AtlantaPepsi@users.noreply.github.com> Date: Thu, 29 Jan 2026 14:55:26 -0600 Subject: [PATCH 6/6] Update NicPeerToPeer.hpp --- src/client/Presets/NicPeerToPeer.hpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/client/Presets/NicPeerToPeer.hpp b/src/client/Presets/NicPeerToPeer.hpp index 6ba2f281..db33a34b 100644 --- a/src/client/Presets/NicPeerToPeer.hpp +++ b/src/client/Presets/NicPeerToPeer.hpp @@ -93,7 +93,7 @@ void CombinationSchedule(std::vector>>& schedule } // Generate rounds of combination based on incrementing distance - for (int i = 1; i < N; i++) { + for (int i = 0; i < N; i++) { std::vector> round; for (int j = 0; j < N; j++) { round.push_back({j, (j+i)%N}); @@ -254,7 +254,7 @@ int NicPeerToPeerPreset(EnvVars& ev, return 1; } - counter++; + counter += transfers.size(); // Store results with correct indexing for (size_t i = 0; i < results.tfrResults.size(); i++) { @@ -282,8 +282,7 @@ int NicPeerToPeerPreset(EnvVars& ev, auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; durationSec += std::chrono::duration_cast>(cpuDelta).count(); fprintf(stderr, "Completed %d/%d pairs in %6.3fs, estimated remaining time %6.3fs.\n", - counter * transfersPerIt, totalTransfers, durationSec, - durationSec * (nicSchedule.size() * schedule.size() - counter) / counter ); + counter, totalTransfers, durationSec, durationSec * (totalTransfers - counter) / counter); } }