Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions src/utils/libfabric/libfabric_rail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,6 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const {
NIXL_TRACE << "Completion received on rail " << rail_id << " flags: " << std::hex
<< completion.flags << " data: " << completion.data
<< " context: " << completion.op_context << std::dec;

// Process completion using local data. Callbacks have their own thread safety
nixl_status_t status = processCompletionQueueEntry(&completion);
if (status != NIXL_SUCCESS) {
Expand All @@ -771,6 +770,29 @@ nixlLibfabricRail::processCompletionQueueEntry(struct fi_cq_data_entry *comp) co
<< " FI_WRITE: " << (flags & FI_WRITE)
<< " FI_REMOTE_WRITE: " << (flags & FI_REMOTE_WRITE) << std::dec;

nixlLibfabricReq *req = findRequestFromContext(comp->op_context);
if (req && req->in_use) {
req->cq_ts = std::chrono::steady_clock::now();
using us = std::chrono::microseconds;
auto submit_us =
std::chrono::duration_cast<us>(
req->submit_end_ts - req->submit_start_ts).count();

auto total_us =
std::chrono::duration_cast<us>(
req->cq_ts - req->submit_start_ts).count();

auto inflight_us = total_us - submit_us;

NIXL_INFO
<< "NIXL_XFER "
<< "rail=" << rail_id
<< " id=" << req->xfer_id
<< " op=" << req->operation_type
<< " submit_us=" << submit_us
<< " inflight_us=" << inflight_us
<< " total_us=" << total_us;
}
if (flags & FI_SEND) {
// Local send completions (fi_senddata) - use context
return processLocalSendCompletion(comp);
Expand Down Expand Up @@ -851,7 +873,6 @@ nixlLibfabricRail::processLocalSendCompletion(struct fi_cq_data_entry *comp) con
<< " on rail " << rail_id;
return NIXL_ERR_BACKEND;
}

return NIXL_SUCCESS;
}

Expand All @@ -878,7 +899,6 @@ nixlLibfabricRail::processLocalTransferCompletion(struct fi_cq_data_entry *comp,
<< comp->op_context << " on rail " << rail_id;
return NIXL_ERR_BACKEND;
}

return NIXL_SUCCESS;
}

Expand Down Expand Up @@ -1124,7 +1144,10 @@ nixlLibfabricRail::postWrite(const void *local_buffer,
NIXL_ERROR << "Invalid request for write on rail " << rail_id;
return NIXL_ERR_INVALID_PARAM;
}

req->submit_start_ts = std::chrono::steady_clock::now();
req->xfer_id = g_nixl_xfer_id.fetch_add(1, std::memory_order_relaxed);
req->bytes = length;
req->rail_id = rail_id;
NIXL_TRACE << "Posting RDMA write on endpoint: " << std::hex << endpoint
<< " local_buffer: " << local_buffer << " length: " << length
<< " immediate_data: " << immediate_data << " dest_addr: " << dest_addr
Expand Down Expand Up @@ -1152,6 +1175,7 @@ nixlLibfabricRail::postWrite(const void *local_buffer,
NIXL_TRACE << "RDMA write posted successfully"
<< (attempt > 0 ? " after " + std::to_string(attempt + 1) + " attempts" :
"");
req->submit_end_ts = std::chrono::steady_clock::now();
return NIXL_SUCCESS;
}

Expand Down
11 changes: 11 additions & 0 deletions src/utils/libfabric/libfabric_rail.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <mutex>
#include <ostream>
#include <stack>
#include <chrono>
#include <atomic>

#include "nixl.h"
#include "backend/backend_aux.h"
Expand All @@ -38,6 +40,7 @@
// Forward declarations
class nixlLibfabricConnection;


/**
* @brief Request structure for libfabric operations
*
Expand All @@ -61,6 +64,11 @@ struct nixlLibfabricReq {
uint64_t remote_addr; ///< Remote memory address for transfers
struct fid_mr *local_mr; ///< Local memory registration for transfers
uint64_t remote_key; ///< Remote access key for transfers
uint64_t transfer_id;
size_t bytes;
std::chrono::steady_clock::time_point submit_start_ts;
std::chrono::steady_clock::time_point submit_end_ts;
std::chrono::steady_clock::time_point cq_ts;

/** Default constructor initializing all fields */
nixlLibfabricReq()
Expand All @@ -82,6 +90,9 @@ struct nixlLibfabricReq {
}
};

// global monotonically increasing transfer id
inline std::atomic<uint64_t> g_nixl_xfer_id{0};

/** Thread-safe request pool with O(1) allocation/release */
class RequestPool {
public:
Expand Down