diff --git a/src/plugins/libfabric/libfabric_backend.cpp b/src/plugins/libfabric/libfabric_backend.cpp index 8bbd3264e5..76ddcf7fbb 100644 --- a/src/plugins/libfabric/libfabric_backend.cpp +++ b/src/plugins/libfabric/libfabric_backend.cpp @@ -1339,20 +1339,42 @@ nixlLibfabricEngine::cmThread() { NIXL_DEBUG << "ConnectionManagement thread started successfully"; NIXL_DEBUG << "Initial receives already posted in main thread, entering progress loop"; - // Main progress loop - continuously process completions on all rails - while (!cm_thread_stop_.load()) { + NIXL_DEBUG << "CM: Thread started"; + + // Adaptive backoff state (per-thread) + static thread_local int backoff_us = 50; // start at 50 µs + static thread_local const int backoff_us_max = 2000; // cap at 2 ms + + // Prefer blocking progress if supported (verbs with FI_WAIT_FD) + const bool blocking_supported = (rail_manager.getNumControlRails() > 0) && + rail_manager.getControlRail(0).blocking_cq_sread_supported; + + while (!cm_thread_stop_.load(std::memory_order_relaxed)) { + nixl_status_t status; + if (blocking_supported) { + // With blocking control CQ progress, rely on rail_manager to block up to its timeout + status = rail_manager.progressAllControlRails(true); // blocking=true inside rail path + } else { + // Non-blocking path: progress and adaptively back off on idle + status = rail_manager.progressAllControlRails(false); + } - nixl_status_t status = rail_manager.progressAllControlRails(); if (status == NIXL_SUCCESS) { - NIXL_DEBUG << "Processed completions on control rails"; - } else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to process completions on control rails"; - return NIXL_ERR_BACKEND; + // Work was done reset backoff + backoff_us = 50; + // Optionally continue immediately to drain more completions + continue; } - // Sleep briefly to avoid spinning too aggressively when blocking cq read is not used - if (!rail_manager.getControlRail(0).blocking_cq_sread_supported) { - std::this_thread::sleep_for(std::chrono::nanoseconds(10)); + if (status == NIXL_IN_PROG) { + // No completions available sleep adaptively + std::this_thread::sleep_for(std::chrono::microseconds(backoff_us)); + backoff_us = std::min(backoff_us * 2, backoff_us_max); + continue; } + + // Unexpected error log and exit + NIXL_ERROR << "CM: Failed to process completions on control rails, status=" << status; + return NIXL_ERR_BACKEND; } NIXL_DEBUG << "ConnectionManagement thread exiting cleanly"; return NIXL_SUCCESS; @@ -1366,24 +1388,33 @@ nixlLibfabricEngine::cmThread() { nixl_status_t nixlLibfabricEngine::progressThread() { NIXL_DEBUG << "Progress thread started successfully for data rails only"; - // Main progress loop - continuously process completions only on data rails - while (!progress_thread_stop_.load()) { - // Process completions only on data rails (non-blocking) - bool any_completions = false; - nixl_status_t status = rail_manager.progressActiveDataRails(); + + // Adaptive backoff layered over configured delay + static thread_local int backoff_us = static_cast(progress_thread_delay_.count()); + static thread_local const int backoff_us_min = 50; // floor at 50 µs + static thread_local const int backoff_us_max = 5000; // cap at 5 ms + if (backoff_us <= 0) backoff_us = backoff_us_min; + + while (!progress_thread_stop_.load(std::memory_order_relaxed)) { + nixl_status_t status = rail_manager.progressActiveDataRails(); // non-blocking if (status == NIXL_SUCCESS) { - any_completions = true; - NIXL_DEBUG << "Processed completions on data rails"; - } else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to process completions on data rails"; - // Don't return error, continue for robustness + // Completions processed reset backoff and continue draining + backoff_us = std::max(backoff_us_min, static_cast(progress_thread_delay_.count())); + continue; } - if (!any_completions) { - std::this_thread::sleep_for(progress_thread_delay_); + if (status == NIXL_IN_PROG) { + // Idle sleep adaptively, increasing up to cap + std::this_thread::sleep_for(std::chrono::microseconds(backoff_us)); + backoff_us = std::min(backoff_us * 2, backoff_us_max); + continue; } + // Error log and keep going for robustness (do not kill the PT) + NIXL_ERROR << "PT: Failed to process completions on data rails, status=" << status; + std::this_thread::sleep_for(std::chrono::microseconds(backoff_us_min)); } - NIXL_DEBUG << "Progress thread exiting cleanly"; + NIXL_DEBUG << "PT: Thread exiting"; return NIXL_SUCCESS; + } void diff --git a/src/utils/libfabric/libfabric_rail.cpp b/src/utils/libfabric/libfabric_rail.cpp index 2a165d7f9d..cb71a1cd76 100644 --- a/src/utils/libfabric/libfabric_rail.cpp +++ b/src/utils/libfabric/libfabric_rail.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #ifdef HAVE_SYNAPSEAI #include @@ -700,8 +701,9 @@ nixlLibfabricRail::setXferIdCallback(std::function callback) { nixl_status_t nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const { // Completion processing - struct fi_cq_data_entry completion; - memset(&completion, 0, sizeof(completion)); + // Batch read to amortize lock and syscall overhead + struct fi_cq_data_entry entries[32]; + memset(entries, 0, sizeof(entries)); int ret; @@ -711,10 +713,10 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const { if (use_blocking && blocking_cq_sread_supported) { // Blocking read using fi_cq_sread (used by CM thread) - ret = fi_cq_sread(cq, &completion, 1, nullptr, NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC); + ret = fi_cq_sread(cq, entries, 1, nullptr, NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC); } else { // Non-blocking read (used by progress thread or fallback) - ret = fi_cq_read(cq, &completion, 1); + ret = fi_cq_read(cq, entries, 32); } if (ret < 0 && ret != -FI_EAGAIN) { @@ -738,24 +740,25 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const { } // CQ lock released here - completion is now local data - if (ret == -FI_EAGAIN) { + if (ret == -FI_EAGAIN || ret == 0) { return NIXL_IN_PROG; // No completions available } - if (ret == 1) { - NIXL_TRACE << "Completion received on rail " << rail_id << " flags: " << std::hex - << completion.flags << " data: " << completion.data - << " context: " << completion.op_context << std::dec; - // Process completion using local data. Callbacks have their own thread safety - nixl_status_t status = processCompletionQueueEntry(&completion); - if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to process completion on rail " << rail_id; - return status; + if (ret > 0) { + bool ok = true; + for (int i = 0; i < ret; ++i) { + NIXL_TRACE << "Completion received on rail " << rail_id << " flags=" << std::hex + << entries[i].flags << " data=" << entries[i].data + << " context=" << entries[i].op_context << std::dec; + nixl_status_t status = processCompletionQueueEntry(&entries[i]); + if (status != NIXL_SUCCESS) { + NIXL_ERROR << "Failed to process completion on rail " << rail_id; + ok = false; + break; + } } - - NIXL_DEBUG << "Completion processed on rail " << rail_id; - return NIXL_SUCCESS; + return ok ? NIXL_SUCCESS : NIXL_ERR_BACKEND; } return NIXL_ERR_BACKEND; // Unexpected case @@ -1077,7 +1080,7 @@ nixlLibfabricRail::postSend(uint64_t immediate_data, if (ret == -FI_EAGAIN) { // Resource temporarily unavailable - retry indefinitely for all providers - attempt++; + ++attempt; // Log every N attempts to avoid log spam if (attempt % NIXL_LIBFABRIC_LOG_INTERVAL_ATTEMPTS == 0) { @@ -1088,17 +1091,17 @@ nixlLibfabricRail::postSend(uint64_t immediate_data, << ", retrying (attempt " << attempt << ")"; } - // Exponential backoff with cap to avoid overwhelming the system - int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10), - NIXL_LIBFABRIC_MAX_RETRY_DELAY_US); - - // Progress completion queue to drain pending completions before retry - nixl_status_t progress_status = progressCompletionQueue(false); - if (progress_status == NIXL_SUCCESS) { - NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry"; + // Progress CQ a few times before backing off + if (attempt <= 8) { + (void)progressCompletionQueue(false); + } else { + int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms + if (blocking_cq_sread_supported) + (void)progressCompletionQueue(true); + else + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); } - usleep(delay_us); continue; } else { // Other error - don't retry, fail immediately @@ -1157,7 +1160,7 @@ nixlLibfabricRail::postWrite(const void *local_buffer, if (ret == -FI_EAGAIN) { // Resource temporarily unavailable - retry indefinitely for all providers - attempt++; + ++attempt; // Log every N attempts to avoid log spam if (attempt % NIXL_LIBFABRIC_LOG_INTERVAL_ATTEMPTS == 0) { @@ -1168,17 +1171,16 @@ nixlLibfabricRail::postWrite(const void *local_buffer, << ", retrying (attempt " << attempt << ")"; } - // Exponential backoff with cap to avoid overwhelming the system - int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10), - NIXL_LIBFABRIC_MAX_RETRY_DELAY_US); - - // Progress completion queue to drain pending completions before retry - nixl_status_t progress_status = progressCompletionQueue(false); - if (progress_status == NIXL_SUCCESS) { - NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry"; + // Progress CQ a few times before backing off + if (attempt <= 8) { + (void)progressCompletionQueue(false); + } else { + int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms + if (blocking_cq_sread_supported) + (void)progressCompletionQueue(true); + else + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); } - - usleep(delay_us); continue; } else { // Other error - don't retry, fail immediately @@ -1245,17 +1247,16 @@ nixlLibfabricRail::postRead(void *local_buffer, << ", retrying (attempt " << attempt << ")"; } - // Exponential backoff with cap to avoid overwhelming the system - int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10), - NIXL_LIBFABRIC_MAX_RETRY_DELAY_US); - - // Progress completion queue to drain pending completions before retry - nixl_status_t progress_status = progressCompletionQueue(false); - if (progress_status == NIXL_SUCCESS) { - NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry"; + // Progress CQ a few times before backing off + if (attempt <= 8) { + (void)progressCompletionQueue(false); + } else { + int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms + if (blocking_cq_sread_supported) + (void)progressCompletionQueue(true); + else + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); } - - usleep(delay_us); continue; } else { // Other error - don't retry, fail immediately diff --git a/src/utils/libfabric/libfabric_rail_manager.cpp b/src/utils/libfabric/libfabric_rail_manager.cpp index e2d639c6f5..49762b0afc 100644 --- a/src/utils/libfabric/libfabric_rail_manager.cpp +++ b/src/utils/libfabric/libfabric_rail_manager.cpp @@ -658,11 +658,11 @@ nixlLibfabricRailManager::progressActiveDataRails() { } nixl_status_t -nixlLibfabricRailManager::progressAllControlRails() { +nixlLibfabricRailManager::progressAllControlRails(bool blocking) { bool any_completions = false; for (size_t rail_id = 0; rail_id < num_control_rails_; ++rail_id) { - nixl_status_t status = - control_rails_[rail_id]->progressCompletionQueue(true); // Blocking for control rails + nixl_status_t status = control_rails_[rail_id]->progressCompletionQueue( + blocking); // Blocking for control rails if (status == NIXL_SUCCESS) { any_completions = true; NIXL_DEBUG << "Processed completion on control rail " << rail_id; diff --git a/src/utils/libfabric/libfabric_rail_manager.h b/src/utils/libfabric/libfabric_rail_manager.h index fe68e962d0..6983571cab 100644 --- a/src/utils/libfabric/libfabric_rail_manager.h +++ b/src/utils/libfabric/libfabric_rail_manager.h @@ -222,7 +222,7 @@ class nixlLibfabricRailManager { * @return NIXL_SUCCESS if completions processed, NIXL_IN_PROG if none, error on failure */ nixl_status_t - progressAllControlRails(); + progressAllControlRails(bool blocking); /** Validate that all rails are properly initialized * @return NIXL_SUCCESS if all rails initialized, error code otherwise */