diff --git a/install_nixl.py b/install_nixl.py new file mode 100644 index 0000000000..1c17638ac5 --- /dev/null +++ b/install_nixl.py @@ -0,0 +1,360 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import glob + +# install_prerequisites.py +import os +import shutil +import subprocess +import sys + +# --- Configuration --- +WHEELS_CACHE_HOME = os.environ.get("WHEELS_CACHE_HOME", "/tmp/wheels_cache") +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) +UCX_DIR = os.path.join("/tmp", "ucx_source") +LIBFABRIC_DIR = os.path.join("/tmp", "libfabric_source") +NIXL_DIR = os.path.join("/tmp", "nixl_source") +UCX_INSTALL_DIR = os.path.join("/tmp", "ucx_install") +LIBFABRIC_INSTALL_DIR = os.path.join("/tmp", "libfabric_install") + +# --- Repository and Version Configuration --- +UCX_REPO_URL = "https://github.com/openucx/ucx.git" +UCX_BRANCH = "v1.19.x" +LIBFABRIC_REPO_URL = "https://github.com/ofiwg/libfabric.git" +LIBFABRIC_REF = "v1.21.0" # Using a recent stable tag +NIXL_REPO_URL = "https://github.com/intel-staging/nixl.git" +NIXL_BRANCH = "libfabric" + + +# --- Helper Functions --- +def run_command(command, cwd=".", env=None): + """Helper function to run a shell command and check for errors.""" + print(f"--> Running command: {' '.join(command)} in '{cwd}'", flush=True) + subprocess.check_call(command, cwd=cwd, env=env) + + +def is_pip_package_installed(package_name): + """Checks if a package is installed via pip without raising an exception.""" + result = subprocess.run( + [sys.executable, "-m", "pip", "show", package_name], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + return result.returncode == 0 + + +def find_nixl_wheel_in_cache(cache_dir): + """Finds a nixl wheel file in the specified cache directory.""" + # The repaired wheel will have a 'manylinux' tag, but this glob still works. + search_pattern = os.path.join(cache_dir, "nixl-*.whl") + wheels = glob.glob(search_pattern) + if wheels: + # Sort to get the most recent/highest version if multiple exist + wheels.sort() + return wheels[-1] + return None + + +def install_system_dependencies(): + """Installs required system packages using apt-get if run as root.""" + if os.geteuid() != 0: + print("\n---", flush=True) + print( + "WARNING: Not running as root. Skipping system dependency installation.", + flush=True, + ) + print( + "Please ensure the following packages are installed on your system:", + flush=True, + ) + print( + " patchelf build-essential git cmake ninja-build autotools-dev automake meson libtool libtool-bin", + flush=True, + ) + print("---\n", flush=True) + return + + print("--- Running as root. Installing system dependencies... ---", flush=True) + apt_packages = [ + "patchelf", # <-- Add patchelf here + "build-essential", + "git", + "cmake", + "ninja-build", + "autotools-dev", + "automake", + "meson", + "libtool", + "libtool-bin", + "libhwloc-dev", + "zip", + ] + run_command(["apt-get", "update"]) + run_command(["apt-get", "install", "-y"] + apt_packages) + print("--- System dependencies installed successfully. ---\n", flush=True) + + +def build_and_install_prerequisites(args): + """Builds UCX and NIXL from source, creating a self-contained wheel.""" + + # ... (initial checks and setup are unchanged) ... + if not args.force_reinstall and is_pip_package_installed("nixl"): + print("--> NIXL is already installed. Nothing to do.", flush=True) + return + + cached_wheel = find_nixl_wheel_in_cache(WHEELS_CACHE_HOME) + if not args.force_reinstall and cached_wheel: + print( + f"\n--> Found self-contained wheel: {os.path.basename(cached_wheel)}.", + flush=True, + ) + print("--> Installing from cache, skipping all source builds.", flush=True) + install_command = [sys.executable, "-m", "pip", "install", cached_wheel] + run_command(install_command) + print("\n--- Installation from cache complete. ---", flush=True) + return + + print( + "\n--> No installed package or cached wheel found. Starting full build process...", + flush=True, + ) + print("\n--> Installing auditwheel...", flush=True) + run_command([sys.executable, "-m", "pip", "install", "auditwheel"]) + install_system_dependencies() + ucx_install_path = os.path.abspath(UCX_INSTALL_DIR) + print(f"--> Using wheel cache directory: {WHEELS_CACHE_HOME}", flush=True) + os.makedirs(WHEELS_CACHE_HOME, exist_ok=True) + + # -- Step 1: Build UCX from source -- + print("\n[1/3] Configuring and building UCX from source...", flush=True) + if not os.path.exists(UCX_DIR): + run_command(["git", "clone", UCX_REPO_URL, UCX_DIR]) + ucx_source_path = os.path.abspath(UCX_DIR) + run_command(["git", "checkout", "v1.19.x"], cwd=ucx_source_path) + run_command(["./autogen.sh"], cwd=ucx_source_path) + configure_command = [ + "./configure", + f"--prefix={ucx_install_path}", + "--enable-shared", + "--disable-static", + "--disable-doxygen-doc", + "--enable-optimizations", + "--enable-cma", + "--enable-devel-headers", + "--with-verbs", + "--enable-mt", + ] + run_command(configure_command, cwd=ucx_source_path) + run_command(["make", "-j", str(os.cpu_count() or 1)], cwd=ucx_source_path) + run_command(["make", "install"], cwd=ucx_source_path) + print("--- UCX build and install complete ---", flush=True) + + # -- Step 2: Build Libfabric from source -- + print( + f"\n[2/4] Configuring and building Libfabric (ref: {LIBFABRIC_REF}) from source...", + flush=True, + ) + if not os.path.exists(LIBFABRIC_DIR): + run_command(["git", "clone", LIBFABRIC_REPO_URL, LIBFABRIC_DIR]) + run_command(["git", "checkout", LIBFABRIC_REF], cwd=LIBFABRIC_DIR) + run_command(["./autogen.sh"], cwd=LIBFABRIC_DIR) + configure_command_lf = [ + "./configure", + f"--prefix={LIBFABRIC_INSTALL_DIR}", + "--enable-verbs", + "--enable-shm", + "--enable-sockets", + "--enable-tcp", + "--with-synapseai=/usr/include/habanalabs", # As requested + ] + run_command(configure_command_lf, cwd=LIBFABRIC_DIR) + run_command(["make", "-j", str(os.cpu_count() or 1)], cwd=LIBFABRIC_DIR) + run_command(["make", "install"], cwd=LIBFABRIC_DIR) + print("--- Libfabric build and install complete ---", flush=True) + + # -- Step 3: Build NIXL wheel from source -- + print( + f"\n[3/4] Building NIXL (branch: {NIXL_BRANCH}) wheel from source...", + flush=True, + ) + if not os.path.exists(NIXL_DIR): + run_command(["git", "clone", "--branch", NIXL_BRANCH, NIXL_REPO_URL, NIXL_DIR]) + + build_env = os.environ.copy() + # Configure environment to find both UCX and Libfabric + ucx_install_path = os.path.abspath(UCX_INSTALL_DIR) + lf_install_path = os.path.abspath(LIBFABRIC_INSTALL_DIR) + + ucx_pkg_path = os.path.join(ucx_install_path, "lib", "pkgconfig") + lf_pkg_path = os.path.join(lf_install_path, "lib", "pkgconfig") + build_env["PKG_CONFIG_PATH"] = f"{ucx_pkg_path}:{lf_pkg_path}".strip(":") + + ucx_lib_path = os.path.join(ucx_install_path, "lib") + ucx_plugin_path = os.path.join(ucx_lib_path, "ucx") + lf_lib_path = os.path.join(lf_install_path, "lib") + build_env[ + "LD_LIBRARY_PATH" + ] = f"{ucx_lib_path}:{ucx_plugin_path}:{lf_lib_path}".strip(":") + + print(f"--> Using PKG_CONFIG_PATH: {build_env['PKG_CONFIG_PATH']}", flush=True) + print(f"--> Using LD_LIBRARY_PATH: {build_env['LD_LIBRARY_PATH']}", flush=True) + + temp_wheel_dir = os.path.join(ROOT_DIR, "temp_wheelhouse") + # Define the build command for nixl wheel with specific meson arguments + wheel_build_cmd = [ + sys.executable, + "-m", + "pip", + "wheel", + ".", + "--no-deps", + f"--wheel-dir={temp_wheel_dir}", + # Pass meson arguments via pip's config-settings + "--config-settings=setup-args=-Ddisable_gds_backend=true", + f"--config-settings=setup-args=-Dlibfabric_path={lf_install_path}", + f"--config-settings=setup-args=-Ducx_path={ucx_install_path}", + ] + + run_command(wheel_build_cmd, cwd=os.path.abspath(NIXL_DIR), env=build_env) + + # -- Step 4: Repair wheel, then replace libfabric -- + # auditwheel may bundle an incompatible libfabric, so we need to replace it + print( + "\n[4/4] Repairing wheel with auditwheel and correcting libfabric...", + flush=True, + ) + unrepaired_wheel = find_nixl_wheel_in_cache(temp_wheel_dir) + if not unrepaired_wheel: + raise RuntimeError("Failed to find the NIXL wheel after building it.") + + # First, run auditwheel to bundle all other dependencies + run_command( + [ + sys.executable, + "-m", + "auditwheel", + "repair", + "--exclude", + "libplugin_UCX.so", + unrepaired_wheel, + f"--wheel-dir={WHEELS_CACHE_HOME}", + ], + env=build_env, + ) + + repaired_wheel = find_nixl_wheel_in_cache(WHEELS_CACHE_HOME) + if not repaired_wheel: + raise RuntimeError("Failed to find repaired wheel from auditwheel.") + + # Now, unpack the repaired wheel to perform surgery on it + wheel_unpack_dir = os.path.join(temp_wheel_dir, "wheel_unpack") + if os.path.exists(wheel_unpack_dir): + shutil.rmtree(wheel_unpack_dir) + os.makedirs(wheel_unpack_dir) + run_command(["unzip", "-q", repaired_wheel, "-d", wheel_unpack_dir]) + + # Find the main NIXL extension file to inspect its dependencies + nixl_extension_search = glob.glob(os.path.join(wheel_unpack_dir, "nixl", "*.so")) + if not nixl_extension_search: + raise RuntimeError("Could not find main NIXL .so extension file.") + # nixl_extension_file = nixl_extension_search[0] + + # Find the .libs directory + libs_dir_search = glob.glob(os.path.join(wheel_unpack_dir, "*.libs")) + if not libs_dir_search: + raise RuntimeError("Could not find .libs directory in unpacked wheel.") + libs_dir = libs_dir_search[0] + + # Find the incorrect libfabric that auditwheel bundled + incorrect_lib_basename = None + for lib in os.listdir(libs_dir): + if "libfabric" in lib: + incorrect_lib_basename = lib + break + + # Only perform replacement if we found a library to replace + if incorrect_lib_basename: + incorrect_lib_path = os.path.join(libs_dir, incorrect_lib_basename) + print( + f"--> Found and deleting incorrect bundled library: {incorrect_lib_basename}", + flush=True, + ) + os.remove(incorrect_lib_path) + + # Find the correct, pre-built libfabric library + lf_lib_path = os.path.join(lf_install_path, "lib") + libfabric_so_files = glob.glob(os.path.join(lf_lib_path, "libfabric.so.1.*")) + if not libfabric_so_files: + raise RuntimeError(f"Could not find libfabric.so.1.* in {lf_lib_path}") + correct_libfabric_src = max(libfabric_so_files, key=len) + correct_libfabric_basename = os.path.basename(correct_libfabric_src) + + # Copy it into the wheel's .libs directory + print( + f"--> Copying correct library '{correct_libfabric_basename}' into wheel", + flush=True, + ) + shutil.copy2(correct_libfabric_src, os.path.join(libs_dir, incorrect_lib_path)) + + # Use patchelf to update the dependency link in the main NIXL extension + # print(f"--> Patching NIXL extension to link against '{correct_libfabric_basename}'", flush=True) + # run_command(['patchelf', '--replace-needed', incorrect_lib_basename, correct_libfabric_basename, nixl_extension_file]) + else: + print( + "--> Warning: Did not find a bundled libfabric to remove. It might have been excluded.", + flush=True, + ) + + # Repack the corrected wheel, overwriting the one from auditwheel + print( + f"--> Repacking corrected wheel to '{os.path.basename(repaired_wheel)}'", + flush=True, + ) + run_command(["zip", "-r", repaired_wheel, "."], cwd=wheel_unpack_dir) + + # --- Cleanup --- + shutil.rmtree(temp_wheel_dir) + + # --- Final Installation --- + newly_built_wheel = find_nixl_wheel_in_cache(WHEELS_CACHE_HOME) + if not newly_built_wheel: + raise RuntimeError("Failed to find the repaired NIXL wheel.") + + print( + f"--> Successfully built self-contained wheel: {os.path.basename(newly_built_wheel)}. Now installing...", + flush=True, + ) + install_command = [sys.executable, "-m", "pip", "install", newly_built_wheel] + if args.force_reinstall: + install_command.insert(-1, "--force-reinstall") + + run_command(install_command) + print("--- NIXL installation complete ---", flush=True) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Build and install UCX and NIXL dependencies." + ) + parser.add_argument( + "--force-reinstall", + action="store_true", + help="Force rebuild and reinstall of UCX and NIXL even if they are already installed.", + ) + args = parser.parse_args() + build_and_install_prerequisites(args) diff --git a/meson.build b/meson.build index 8a7be09c91..767217d15b 100644 --- a/meson.build +++ b/meson.build @@ -104,6 +104,56 @@ else warning('CUDA not found. UCX backend will be built without CUDA support, and some plugins will be disabled.') endif +# SynapseAI (Habana Gaudi) dependency detection +synapse_inc_path = get_option('synapsepath_inc') +synapse_lib_path = get_option('synapsepath_lib') + +if synapse_lib_path == '' + #use default path + # Try to find both libSynapse and hl-thunk libraries + synapse_lib = cpp.find_library('Synapse', + dirs: ['/usr/lib/habanalabs', '/usr/local/lib/habanalabs'], + required: false) + hlthunk_lib = cpp.find_library('hl-thunk', + dirs: ['/usr/lib/habanalabs', '/usr/local/lib/habanalabs'], + required: false) +else + synapse_lib = cpp.find_library('Synapse', + dirs: [synapse_lib_path], + required: false) + hlthunk_lib = cpp.find_library('hl-thunk', + dirs: [synapse_lib_path], + required: false) +endif + +if synapse_inc_path == '' + #use default path + synapse_inc_path = '/usr/include/habanalabs' +endif + +# SynapseAI support requires both libraries +synapseai_dep = dependency('', required: false) # Initialize as not found +if synapse_lib.found() and hlthunk_lib.found() + synapseai_dep = declare_dependency(dependencies: [synapse_lib, hlthunk_lib]) +elif hlthunk_lib.found() + # Fallback to just hl-thunk if libSynapse not available + synapseai_dep = hlthunk_lib +endif + +if synapseai_dep.found() + # Create proper dependency with include paths (including DRM path for habanalabs headers) + synapseai_dep = declare_dependency( + dependencies: synapseai_dep, + include_directories: [ + include_directories('/usr/include/drm'), + include_directories(synapse_inc_path) + ] + ) + message('Found SynapseAI support for Habana Gaudi devices') +else + warning('SynapseAI not found. Habana Gaudi device support will be disabled.') +endif + # DOCA doca_gpunetio_dep = dependency('doca-gpunetio', required : false) diff --git a/meson_options.txt b/meson_options.txt index a316184f8d..9e4a5b4e6a 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -24,6 +24,8 @@ option('gds_path', type: 'string', value: '/usr/local/cuda/', description: 'Path option('cudapath_inc', type: 'string', value: '', description: 'Include path for CUDA') option('cudapath_lib', type: 'string', value: '', description: 'Library path for CUDA') option('cudapath_stub', type: 'string', value: '', description: 'Extra Stub path for CUDA') +option('synapsepath_inc', type: 'string', value: '', description: 'Include path for Intel Gaudi/ HPU') +option('synapsepath_lib', type: 'string', value: '', description: 'Library path for Intel Gaudi/ HPU') option('static_plugins', type: 'string', value: '', description: 'Plugins to be built-in, comma-separated') option('build_docs', type: 'boolean', value: false, description: 'Build Doxygen documentation') option('log_level', type: 'combo', choices: ['trace', 'debug', 'info', 'warning', 'error', 'fatal', 'auto'], value: 'auto', description: 'Log Level (auto: auto-detect based on build type: trace for debug builds, info for release builds)') diff --git a/src/plugins/libfabric/README.md b/src/plugins/libfabric/README.md index 2846bfedb2..70dc8e13bf 100644 --- a/src/plugins/libfabric/README.md +++ b/src/plugins/libfabric/README.md @@ -21,6 +21,7 @@ EFA Specific **Topology-Aware Optimization**: Hardware-aware GPU-to-EFA and NUMA - **Libfabric** - Many system will have installed libfabric already. If not, custom libfabric installation is available via https://ofiwg.github.io/libfabric/ - Minimum required version: v2.3.0rc2 - For EFA enabled AWS instances, it is recommanded to install through AWS EFA installer: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html - Minimum required version: 1.43.2 + - **Note:** HMEM support for some GPU implementations (SynapseAI, etc.) requires libfabric v1.16.x or newer - **hwloc** - hwloc is used to understand the underlying architecture to optimize application performance. Suggested version: 2.10.0 or newer diff --git a/src/plugins/libfabric/libfabric_backend.cpp b/src/plugins/libfabric/libfabric_backend.cpp index 1f9ff20e30..76ddcf7fbb 100644 --- a/src/plugins/libfabric/libfabric_backend.cpp +++ b/src/plugins/libfabric/libfabric_backend.cpp @@ -263,6 +263,21 @@ nixlLibfabricEngine::nixlLibfabricEngine(const nixlBackendInitParams *init_param NIXL_DEBUG << "Using default striping threshold: " << striping_threshold_ << " bytes"; } + // Parse default HMEM interface parameter + // Auto-detect from topology if not specified + std::string hmem_iface_str; + if (getInitParam("default_hmem_iface", hmem_iface_str) == NIXL_SUCCESS) { + default_hmem_iface_ = hmem_iface_str; + NIXL_DEBUG << "Using custom default HMEM interface from backend params: " << default_hmem_iface_; + } else { + // Auto-detect device type from topology + // Note: topology discovery happens in rail_manager constructor + // For now, leave empty to use GDR fallback by default + // SynapseAI will be auto-detected per-registration via /dev/accel check + default_hmem_iface_ = ""; + NIXL_DEBUG << "No default HMEM interface specified, will auto-detect per-registration"; + } + // Initialize Rail Manager which will discover the topology and create all rails. try { NIXL_DEBUG << "Rail Manager created with " << rail_manager.getNumDataRails() @@ -721,7 +736,7 @@ nixl_mem_list_t nixlLibfabricEngine::getSupportedMems() const { nixl_mem_list_t mems; mems.push_back(DRAM_SEG); -#ifdef HAVE_CUDA +#if defined(HAVE_CUDA) || defined(HAVE_SYNAPSEAI) mems.push_back(VRAM_SEG); #endif return mems; @@ -737,9 +752,10 @@ nixlLibfabricEngine::registerMem(const nixlBlobDesc &mem, priv->length_ = mem.len; priv->gpu_device_id_ = mem.devId; // Store GPU device ID -#ifdef HAVE_CUDA - // Handle CUDA memory registration with GPU Direct RDMA support if (nixl_mem == VRAM_SEG) { +#ifdef HAVE_CUDA + // Handle CUDA memory registration with GPU Direct RDMA support + // For multi-GPU support, skip CUDA address workaround if (cuda_addr_wa_) { bool need_restart; @@ -763,28 +779,76 @@ nixlLibfabricEngine::registerMem(const nixlBlobDesc &mem, } NIXL_DEBUG << "Set CUDA device context to GPU " << mem.devId; } - } #endif +#ifdef HAVE_SYNAPSEAI + // Handle SynapseAI memory registration + NIXL_DEBUG << "Registering SynapseAI device memory for device " << mem.devId; + // SynapseAI-specific setup would go here if needed +#endif + } + // Initialize vectors to accommodate all possible rails (for indexing consistency) priv->rail_mr_list_.resize(rail_manager.getNumDataRails(), nullptr); priv->rail_key_list_.resize(rail_manager.getNumDataRails(), 0); -#ifdef HAVE_CUDA - // Set CUDA context before libfabric operations for VRAM if (nixl_mem == VRAM_SEG) { +#ifdef HAVE_CUDA + // Set CUDA context before libfabric operations for VRAM vramApplyCtx(); - } #endif +#ifdef HAVE_SYNAPSEAI + // SynapseAI context application would go here if needed +#endif + } + + // Determine HMEM interface hint based on priority: + // 1. Environment variables (highest priority) + // 2. Per-registration hints via metaInfo blob + // 3. Backend-wide defaults from custom params + // 4. Auto-detection (fallback - empty string) + std::string hmem_hint; + + // Priority 1: Check environment variables + const char* env_hmem = getenv("HMEM_IFACE"); + if (env_hmem && env_hmem[0] != '\0') { + hmem_hint = env_hmem; + NIXL_DEBUG << "Using HMEM interface from environment variable: " << hmem_hint; + } + // Priority 2: Check per-registration hint from metaInfo + else if (!mem.metaInfo.empty()) { + hmem_hint = std::string(mem.metaInfo.begin(), mem.metaInfo.end()); + NIXL_DEBUG << "Using HMEM interface from metaInfo hint: " << hmem_hint; + } + // Priority 3: Use backend-wide default + else if (!default_hmem_iface_.empty()) { + hmem_hint = default_hmem_iface_; + NIXL_DEBUG << "Using HMEM interface from backend default: " << hmem_hint; + } + // Priority 4: Auto-detect from system topology + else { + // Auto-detect device type based on topology discovery + // Intel HPU requires FI_HMEM_SYNAPSEAI (no GDR support exists) + // NVIDIA GPU can use GDR fallback (empty hint) + if (nixl_mem == VRAM_SEG && rail_manager.getNumIntelHpus() > 0) { + hmem_hint = "SYNAPSEAI"; + NIXL_DEBUG << "Auto-detected Intel HPU system, using HMEM interface: SYNAPSEAI"; + } else { + // Leave empty for GDR fallback (CUDA) or DRAM + NIXL_DEBUG << "Auto-detection: using GDR fallback (empty hint)"; + } + } // Use Rail Manager for centralized memory registration with GPU Direct RDMA support NIXL_TRACE << "Registering memory: addr=" << (void *)mem.addr << " len=" << mem.len - << " mem_type=" << nixl_mem << " devId=" << mem.devId; + << " mem_type=" << nixl_mem << " devId=" << mem.devId + << " hmem_hint=" << (hmem_hint.empty() ? "auto" : hmem_hint); nixl_status_t status = rail_manager.registerMemory((void *)mem.addr, mem.len, nixl_mem, mem.devId, + hmem_hint, priv->rail_mr_list_, priv->rail_key_list_, priv->selected_rails_); @@ -1017,7 +1081,8 @@ nixlLibfabricEngine::postXfer(const nixl_xfer_op_t &operation, int gpu_id = local[desc_idx].devId; NIXL_DEBUG << "Processing descriptor " << desc_idx << " GPU " << gpu_id - << " addr: " << transfer_addr << " size: " << transfer_size; + << " local_addr: " << transfer_addr << " size: " << transfer_size + << " remote_addr: " << (void *)remote[desc_idx].addr; NIXL_DEBUG << "DEBUG: remote_agent='" << remote_agent << "' localAgent='" << localAgent << "'"; @@ -1053,11 +1118,14 @@ nixlLibfabricEngine::postXfer(const nixl_xfer_op_t &operation, } // Prepare and submit transfer for remote agents + // Use descriptor's specific target address + uint64_t remote_target_addr = remote[desc_idx].addr; + nixl_status_t status = rail_manager.prepareAndSubmitTransfer( op_type, transfer_addr, transfer_size, - remote_md->remote_buf_addr_, + remote_target_addr, local_md->selected_rails_, local_md->rail_mr_list_, remote_md->rail_remote_key_list_, @@ -1271,20 +1339,42 @@ nixlLibfabricEngine::cmThread() { NIXL_DEBUG << "ConnectionManagement thread started successfully"; NIXL_DEBUG << "Initial receives already posted in main thread, entering progress loop"; - // Main progress loop - continuously process completions on all rails - while (!cm_thread_stop_.load()) { + NIXL_DEBUG << "CM: Thread started"; + + // Adaptive backoff state (per-thread) + static thread_local int backoff_us = 50; // start at 50 µs + static thread_local const int backoff_us_max = 2000; // cap at 2 ms + + // Prefer blocking progress if supported (verbs with FI_WAIT_FD) + const bool blocking_supported = (rail_manager.getNumControlRails() > 0) && + rail_manager.getControlRail(0).blocking_cq_sread_supported; + + while (!cm_thread_stop_.load(std::memory_order_relaxed)) { + nixl_status_t status; + if (blocking_supported) { + // With blocking control CQ progress, rely on rail_manager to block up to its timeout + status = rail_manager.progressAllControlRails(true); // blocking=true inside rail path + } else { + // Non-blocking path: progress and adaptively back off on idle + status = rail_manager.progressAllControlRails(false); + } - nixl_status_t status = rail_manager.progressAllControlRails(); if (status == NIXL_SUCCESS) { - NIXL_DEBUG << "Processed completions on control rails"; - } else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to process completions on control rails"; - return NIXL_ERR_BACKEND; + // Work was done reset backoff + backoff_us = 50; + // Optionally continue immediately to drain more completions + continue; } - // Sleep briefly to avoid spinning too aggressively when blocking cq read is not used - if (!rail_manager.getControlRail(0).blocking_cq_sread_supported) { - std::this_thread::sleep_for(std::chrono::nanoseconds(10)); + if (status == NIXL_IN_PROG) { + // No completions available sleep adaptively + std::this_thread::sleep_for(std::chrono::microseconds(backoff_us)); + backoff_us = std::min(backoff_us * 2, backoff_us_max); + continue; } + + // Unexpected error log and exit + NIXL_ERROR << "CM: Failed to process completions on control rails, status=" << status; + return NIXL_ERR_BACKEND; } NIXL_DEBUG << "ConnectionManagement thread exiting cleanly"; return NIXL_SUCCESS; @@ -1298,24 +1388,33 @@ nixlLibfabricEngine::cmThread() { nixl_status_t nixlLibfabricEngine::progressThread() { NIXL_DEBUG << "Progress thread started successfully for data rails only"; - // Main progress loop - continuously process completions only on data rails - while (!progress_thread_stop_.load()) { - // Process completions only on data rails (non-blocking) - bool any_completions = false; - nixl_status_t status = rail_manager.progressActiveDataRails(); + + // Adaptive backoff layered over configured delay + static thread_local int backoff_us = static_cast(progress_thread_delay_.count()); + static thread_local const int backoff_us_min = 50; // floor at 50 µs + static thread_local const int backoff_us_max = 5000; // cap at 5 ms + if (backoff_us <= 0) backoff_us = backoff_us_min; + + while (!progress_thread_stop_.load(std::memory_order_relaxed)) { + nixl_status_t status = rail_manager.progressActiveDataRails(); // non-blocking if (status == NIXL_SUCCESS) { - any_completions = true; - NIXL_DEBUG << "Processed completions on data rails"; - } else if (status != NIXL_IN_PROG && status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to process completions on data rails"; - // Don't return error, continue for robustness + // Completions processed reset backoff and continue draining + backoff_us = std::max(backoff_us_min, static_cast(progress_thread_delay_.count())); + continue; } - if (!any_completions) { - std::this_thread::sleep_for(progress_thread_delay_); + if (status == NIXL_IN_PROG) { + // Idle sleep adaptively, increasing up to cap + std::this_thread::sleep_for(std::chrono::microseconds(backoff_us)); + backoff_us = std::min(backoff_us * 2, backoff_us_max); + continue; } + // Error log and keep going for robustness (do not kill the PT) + NIXL_ERROR << "PT: Failed to process completions on data rails, status=" << status; + std::this_thread::sleep_for(std::chrono::microseconds(backoff_us_min)); } - NIXL_DEBUG << "Progress thread exiting cleanly"; + NIXL_DEBUG << "PT: Thread exiting"; return NIXL_SUCCESS; + } void diff --git a/src/plugins/libfabric/libfabric_backend.h b/src/plugins/libfabric/libfabric_backend.h index 9d97d4df20..fcee2544c4 100644 --- a/src/plugins/libfabric/libfabric_backend.h +++ b/src/plugins/libfabric/libfabric_backend.h @@ -184,6 +184,9 @@ class nixlLibfabricEngine : public nixlBackendEngine { mutable size_t total_transfer_size_; + // HMEM interface management + std::string default_hmem_iface_; // Backend-wide default HMEM interface from custom params (default: "cuda") + // Map of agent name to connection info // > mutable std::unordered_map> connections_; diff --git a/src/plugins/libfabric/meson.build b/src/plugins/libfabric/meson.build index c48d13806b..8f66f066e9 100644 --- a/src/plugins/libfabric/meson.build +++ b/src/plugins/libfabric/meson.build @@ -33,6 +33,12 @@ if cuda_dep.found() compile_flags += ['-DHAVE_CUDA'] endif +# Add SynapseAI support if available (dependency is globally defined) +if synapseai_dep.found() + libfabric_plugin_deps += [synapseai_dep] + compile_flags += ['-DHAVE_SYNAPSEAI'] +endif + # Build as static or shared library based on configuration if 'LIBFABRIC' in static_plugins libfabric_backend_lib = static_library( diff --git a/src/utils/libfabric/libfabric_common.cpp b/src/utils/libfabric/libfabric_common.cpp index ec8f85cc14..f17e0eee97 100644 --- a/src/utils/libfabric/libfabric_common.cpp +++ b/src/utils/libfabric/libfabric_common.cpp @@ -23,12 +23,101 @@ #include #include #include +#include +#include #include #include namespace LibfabricUtils { +// Provider-specific configurations +static const ProviderConfig PROVIDER_CONFIGS[] = { + { + "efa", + FI_MSG | FI_RMA | FI_LOCAL_COMM | FI_REMOTE_COMM, + FI_CONTEXT | FI_CONTEXT2, + 0, // let provider choose + FI_RM_UNSPEC, + FI_THREAD_SAFE + }, + { + "verbs", // Matches both "verbs" and "verbs;ofi_rxm" + FI_MSG | FI_RMA | FI_READ | FI_WRITE | FI_RECV | FI_SEND | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_MULTI_RECV | FI_LOCAL_COMM | FI_REMOTE_COMM | FI_HMEM, + 0, // no mode flags required + FI_MR_LOCAL | FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_HMEM, + FI_RM_ENABLED, + FI_THREAD_SAFE + }, + { + "tcp", + FI_MSG | FI_RMA | FI_LOCAL_COMM | FI_REMOTE_COMM, + FI_CONTEXT | FI_CONTEXT2, + 0, // basic MR mode, overridden in rail.cpp + FI_RM_UNSPEC, + FI_THREAD_UNSPEC + }, + { + "sockets", + FI_MSG | FI_RMA | FI_LOCAL_COMM | FI_REMOTE_COMM, + 0, + 0, // let provider choose + FI_RM_UNSPEC, + FI_THREAD_UNSPEC // default threading + } +}; + +static const size_t NUM_PROVIDER_CONFIGS = sizeof(PROVIDER_CONFIGS) / sizeof(PROVIDER_CONFIGS[0]); + +void +configureHintsForProvider(struct fi_info* hints, const std::string& provider_name) { + const ProviderConfig* config = nullptr; + + // Find matching config + // Match order: 1) exact match, 2) prefix match for composite providers (e.g., "verbs;ofi_rxm") + for (size_t i = 0; i < NUM_PROVIDER_CONFIGS; ++i) { + const std::string& config_name = PROVIDER_CONFIGS[i].name; + + // Exact match + if (provider_name == config_name) { + config = &PROVIDER_CONFIGS[i]; + break; + } + + // Composite provider match (e.g., "verbs;ofi_rxm" matches "verbs") + // Check if provider_name starts with config_name followed by ";" + if (provider_name.rfind(config_name + ";", 0) == 0) { + config = &PROVIDER_CONFIGS[i]; + break; + } + } + + if (!config) { + // Default configuration + NIXL_DEBUG << "No specific config for provider '" << provider_name << "', using defaults"; + hints->caps = FI_MSG | FI_RMA | FI_LOCAL_COMM | FI_REMOTE_COMM; + hints->mode = 0; + hints->ep_attr->type = FI_EP_RDM; + return; + } + + // Apply provider-specific configuration + hints->caps = config->caps; + hints->mode = config->mode; + hints->ep_attr->type = FI_EP_RDM; + + if (config->resource_mgmt != FI_RM_UNSPEC) { + hints->domain_attr->resource_mgmt = config->resource_mgmt; + } + + if (config->mr_mode != 0) { + hints->domain_attr->mr_mode = config->mr_mode; + } + + if (config->threading != FI_THREAD_UNSPEC) { + hints->domain_attr->threading = config->threading; + } +} std::pair> getAvailableNetworkDevices() { @@ -43,16 +132,24 @@ getAvailableNetworkDevices() { return {"none", {}}; } - hints->caps = 0; - hints->caps = FI_MSG | FI_RMA; // Basic messaging and RMA + // Check if FI_PROVIDER environment variable is set + const char* env_provider = getenv("FI_PROVIDER"); + std::string provider = env_provider && env_provider[0] != '\0' ? env_provider : ""; - hints->caps |= FI_LOCAL_COMM | FI_REMOTE_COMM; - hints->mode = FI_CONTEXT | FI_CONTEXT2; - hints->ep_attr->type = FI_EP_RDM; + if (!provider.empty()) { + hints->fabric_attr->prov_name = strdup(env_provider); + NIXL_INFO << "Using provider from FI_PROVIDER environment: " << env_provider; + // Configure hints based on provider + configureHintsForProvider(hints, provider); + } else { + // Auto-detect: start with default configuration + configureHintsForProvider(hints, ""); + } - int ret = fi_getinfo(FI_VERSION(1, 9), NULL, NULL, 0, hints, &info); + // Use FI_VERSION(1, 18) for DMABUF and HMEM support + int ret = fi_getinfo(FI_VERSION(1, 18), NULL, NULL, 0, hints, &info); if (ret) { - NIXL_ERROR << "fi_getinfo failed " << fi_strerror(-ret); + NIXL_ERROR << "fi_getinfo failed: " << fi_strerror(-ret); fi_freeinfo(hints); return {"none", {}}; } @@ -85,8 +182,23 @@ getAvailableNetworkDevices() { } } + // Provider selection priority: + // 1. EFA (AWS Elastic Fabric Adapter) + // 2. verbs;ofi_rxm (explicit verbs with RXM) + // 3. verbs (plain verbs) + // 4. sockets (TCP fallback) + if (provider_device_map.find("efa") != provider_device_map.end()) { return {"efa", provider_device_map["efa"]}; + } else if (provider_device_map.find("verbs;ofi_rxm") != provider_device_map.end()) { + // Explicit verbs with RXM + NIXL_INFO << "Using verbs with RXM for RDM endpoint support"; + return {"verbs;ofi_rxm", provider_device_map["verbs;ofi_rxm"]}; + } else if (provider_device_map.find("verbs") != provider_device_map.end()) { + // Plain verbs - might not support RDM, but try it + NIXL_WARN << "Using plain verbs provider - may not support RDM endpoints. " + << "Consider setting FI_PROVIDER=verbs;ofi_rxm for RDM support"; + return {"verbs", provider_device_map["verbs"]}; } else if (provider_device_map.find("sockets") != provider_device_map.end()) { return {"sockets", {provider_device_map["sockets"][0]}}; } diff --git a/src/utils/libfabric/libfabric_common.h b/src/utils/libfabric/libfabric_common.h index 4c149b445a..1e82753a02 100644 --- a/src/utils/libfabric/libfabric_common.h +++ b/src/utils/libfabric/libfabric_common.h @@ -31,6 +31,7 @@ #include #include #include +#include // Libfabric configuration constants #define NIXL_LIBFABRIC_DEFAULT_CONTROL_RAILS 1 @@ -142,6 +143,16 @@ struct BinaryNotification { } }; +// Provider configuration structure +struct ProviderConfig { + std::string name; + uint64_t caps; + uint64_t mode; + uint64_t mr_mode; + fi_resource_mgmt resource_mgmt; + fi_threading threading; +}; + // Global XFER_ID management namespace LibfabricUtils { // Get next unique XFER_ID @@ -163,6 +174,9 @@ getAvailableNetworkDevices(); // String utilities std::string hexdump(const void *data); +// Provider configuration helper +void +configureHintsForProvider(struct fi_info* hints, const std::string& provider_name); } // namespace LibfabricUtils #endif // NIXL_SRC_UTILS_LIBFABRIC_LIBFABRIC_COMMON_H diff --git a/src/utils/libfabric/libfabric_rail.cpp b/src/utils/libfabric/libfabric_rail.cpp index 97b3ebf2aa..cb71a1cd76 100644 --- a/src/utils/libfabric/libfabric_rail.cpp +++ b/src/utils/libfabric/libfabric_rail.cpp @@ -21,9 +21,23 @@ #include "serdes/serdes.h" #include "libfabric_common.h" +#include #include #include #include +#include + +#ifdef HAVE_SYNAPSEAI +#include +#include +#include + +// Static SynapseAI library handles +void* nixlLibfabricRail::synapseai_handle_ = nullptr; +void* nixlLibfabricRail::hlthunk_handle_ = nullptr; +std::mutex nixlLibfabricRail::synapseai_init_mutex_; +nixlLibfabricRail::SynapseAIOps nixlLibfabricRail::synapseai_ops_ = {}; +#endif // RequestPool Base Class Implementation @@ -410,27 +424,30 @@ nixlLibfabricRail::nixlLibfabricRail(const std::string &device, NIXL_ERROR << "fi_allocinfo failed for rail " << rail_id; throw std::runtime_error("Failed to allocate fi_info for rail " + std::to_string(rail_id)); } - hints->caps = 0; - hints->caps = FI_MSG | FI_RMA; - hints->caps |= FI_LOCAL_COMM | FI_REMOTE_COMM; - hints->mode = FI_CONTEXT | FI_CONTEXT2; - hints->ep_attr->type = FI_EP_RDM; - // Configure memory registration mode based on provider capabilities + + // Configure hints based on provider + LibfabricUtils::configureHintsForProvider(hints, provider); + + // Override mr_mode for TCP/sockets (they don't support advanced MR features) if (provider == "tcp" || provider == "sockets") { - // TCP provider doesn't support FI_MR_PROV_KEY or FI_MR_VIRT_ADDR, use basic mode hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED; hints->domain_attr->mr_key_size = 0; // Let provider decide } else { - // EFA and other providers support advanced memory registration - hints->domain_attr->mr_mode = - FI_MR_LOCAL | FI_MR_HMEM | FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY; + // Add HMEM support for other providers (EFA, verbs) + if (hints->domain_attr->mr_mode != 0) { + hints->domain_attr->mr_mode |= FI_MR_HMEM; + } else { + hints->domain_attr->mr_mode = + FI_MR_LOCAL | FI_MR_HMEM | FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY; + } hints->domain_attr->mr_key_size = 2; } + hints->domain_attr->name = strdup(device_name.c_str()); - hints->domain_attr->threading = FI_THREAD_SAFE; try { // Get fabric info for this specific device - int ret = fi_getinfo(FI_VERSION(1, 9), NULL, NULL, 0, hints, &info); + // Use FI_VERSION(1, 18) for DMABUF and HMEM support + int ret = fi_getinfo(FI_VERSION(1, 18), NULL, NULL, 0, hints, &info); if (ret) { NIXL_ERROR << "fi_getinfo failed for rail " << rail_id << ": " << fi_strerror(-ret); throw std::runtime_error("fi_getinfo failed for rail " + std::to_string(rail_id)); @@ -512,17 +529,19 @@ nixlLibfabricRail::nixlLibfabricRail(const std::string &device, } // Disable shared memory transfers for EFA provider to fix same-agent transfers - bool optval = false; - ret = fi_setopt(&endpoint->fid, - FI_OPT_ENDPOINT, - FI_OPT_SHARED_MEMORY_PERMITTED, - &optval, - sizeof(optval)); - if (ret && ret != -FI_ENOSYS) { - NIXL_WARN << "fi_setopt FI_OPT_SHARED_MEMORY_PERMITTED failed for rail " << rail_id - << ": " << fi_strerror(-ret) << " - continuing anyway"; - } else if (ret == 0) { - NIXL_DEBUG << "Successfully disabled shared memory transfers for rail " << rail_id; + if (provider_name.find("efa") == 0) { + bool optval = false; + ret = fi_setopt(&endpoint->fid, + FI_OPT_ENDPOINT, + FI_OPT_SHARED_MEMORY_PERMITTED, + &optval, + sizeof(optval)); + if (ret && ret != -FI_ENOSYS) { + NIXL_WARN << "fi_setopt FI_OPT_SHARED_MEMORY_PERMITTED failed for rail " << rail_id + << ": " << fi_strerror(-ret) << " - continuing anyway"; + } else if (ret == 0) { + NIXL_DEBUG << "Successfully disabled shared memory transfers for rail " << rail_id; + } } // Enable endpoint for this rail @@ -682,8 +701,9 @@ nixlLibfabricRail::setXferIdCallback(std::function callback) { nixl_status_t nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const { // Completion processing - struct fi_cq_data_entry completion; - memset(&completion, 0, sizeof(completion)); + // Batch read to amortize lock and syscall overhead + struct fi_cq_data_entry entries[32]; + memset(entries, 0, sizeof(entries)); int ret; @@ -693,10 +713,10 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const { if (use_blocking && blocking_cq_sread_supported) { // Blocking read using fi_cq_sread (used by CM thread) - ret = fi_cq_sread(cq, &completion, 1, nullptr, NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC); + ret = fi_cq_sread(cq, entries, 1, nullptr, NIXL_LIBFABRIC_CQ_SREAD_TIMEOUT_SEC); } else { // Non-blocking read (used by progress thread or fallback) - ret = fi_cq_read(cq, &completion, 1); + ret = fi_cq_read(cq, entries, 32); } if (ret < 0 && ret != -FI_EAGAIN) { @@ -720,24 +740,25 @@ nixlLibfabricRail::progressCompletionQueue(bool use_blocking) const { } // CQ lock released here - completion is now local data - if (ret == -FI_EAGAIN) { + if (ret == -FI_EAGAIN || ret == 0) { return NIXL_IN_PROG; // No completions available } - if (ret == 1) { - NIXL_TRACE << "Completion received on rail " << rail_id << " flags: " << std::hex - << completion.flags << " data: " << completion.data - << " context: " << completion.op_context << std::dec; - // Process completion using local data. Callbacks have their own thread safety - nixl_status_t status = processCompletionQueueEntry(&completion); - if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to process completion on rail " << rail_id; - return status; + if (ret > 0) { + bool ok = true; + for (int i = 0; i < ret; ++i) { + NIXL_TRACE << "Completion received on rail " << rail_id << " flags=" << std::hex + << entries[i].flags << " data=" << entries[i].data + << " context=" << entries[i].op_context << std::dec; + nixl_status_t status = processCompletionQueueEntry(&entries[i]); + if (status != NIXL_SUCCESS) { + NIXL_ERROR << "Failed to process completion on rail " << rail_id; + ok = false; + break; + } } - - NIXL_DEBUG << "Completion processed on rail " << rail_id; - return NIXL_SUCCESS; + return ok ? NIXL_SUCCESS : NIXL_ERR_BACKEND; } return NIXL_ERR_BACKEND; // Unexpected case @@ -1059,7 +1080,7 @@ nixlLibfabricRail::postSend(uint64_t immediate_data, if (ret == -FI_EAGAIN) { // Resource temporarily unavailable - retry indefinitely for all providers - attempt++; + ++attempt; // Log every N attempts to avoid log spam if (attempt % NIXL_LIBFABRIC_LOG_INTERVAL_ATTEMPTS == 0) { @@ -1070,17 +1091,17 @@ nixlLibfabricRail::postSend(uint64_t immediate_data, << ", retrying (attempt " << attempt << ")"; } - // Exponential backoff with cap to avoid overwhelming the system - int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10), - NIXL_LIBFABRIC_MAX_RETRY_DELAY_US); - - // Progress completion queue to drain pending completions before retry - nixl_status_t progress_status = progressCompletionQueue(false); - if (progress_status == NIXL_SUCCESS) { - NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry"; + // Progress CQ a few times before backing off + if (attempt <= 8) { + (void)progressCompletionQueue(false); + } else { + int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms + if (blocking_cq_sread_supported) + (void)progressCompletionQueue(true); + else + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); } - usleep(delay_us); continue; } else { // Other error - don't retry, fail immediately @@ -1139,7 +1160,7 @@ nixlLibfabricRail::postWrite(const void *local_buffer, if (ret == -FI_EAGAIN) { // Resource temporarily unavailable - retry indefinitely for all providers - attempt++; + ++attempt; // Log every N attempts to avoid log spam if (attempt % NIXL_LIBFABRIC_LOG_INTERVAL_ATTEMPTS == 0) { @@ -1150,17 +1171,16 @@ nixlLibfabricRail::postWrite(const void *local_buffer, << ", retrying (attempt " << attempt << ")"; } - // Exponential backoff with cap to avoid overwhelming the system - int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10), - NIXL_LIBFABRIC_MAX_RETRY_DELAY_US); - - // Progress completion queue to drain pending completions before retry - nixl_status_t progress_status = progressCompletionQueue(false); - if (progress_status == NIXL_SUCCESS) { - NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry"; + // Progress CQ a few times before backing off + if (attempt <= 8) { + (void)progressCompletionQueue(false); + } else { + int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms + if (blocking_cq_sread_supported) + (void)progressCompletionQueue(true); + else + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); } - - usleep(delay_us); continue; } else { // Other error - don't retry, fail immediately @@ -1227,17 +1247,16 @@ nixlLibfabricRail::postRead(void *local_buffer, << ", retrying (attempt " << attempt << ")"; } - // Exponential backoff with cap to avoid overwhelming the system - int delay_us = std::min(NIXL_LIBFABRIC_BASE_RETRY_DELAY_US * (1 + attempt / 10), - NIXL_LIBFABRIC_MAX_RETRY_DELAY_US); - - // Progress completion queue to drain pending completions before retry - nixl_status_t progress_status = progressCompletionQueue(false); - if (progress_status == NIXL_SUCCESS) { - NIXL_TRACE << "Progressed completions on rail " << rail_id << " before retry"; + // Progress CQ a few times before backing off + if (attempt <= 8) { + (void)progressCompletionQueue(false); + } else { + int delay_us = std::min(1000 * (attempt / 10 + 1), 100000); // 1ms..100ms + if (blocking_cq_sread_supported) + (void)progressCompletionQueue(true); + else + std::this_thread::sleep_for(std::chrono::microseconds(delay_us)); } - - usleep(delay_us); continue; } else { // Other error - don't retry, fail immediately @@ -1251,9 +1270,33 @@ nixlLibfabricRail::postRead(void *local_buffer, // Memory Registration Methods +uint64_t +nixlLibfabricRail::getMemoryRegistrationAccessFlags() const { + // Start with base flags needed for RDMA operations + uint64_t access_flags = FI_REMOTE_READ | FI_REMOTE_WRITE | FI_SEND | FI_RECV; + + // TCP/sockets providers need additional basic flags + if (provider_name == "tcp" || provider_name == "sockets") { + access_flags |= FI_READ | FI_WRITE; + } + + // Query provider capabilities and add conditionally + if (info && info->domain_attr) { + if (info->caps & FI_READ) access_flags |= FI_READ; + if (info->caps & FI_WRITE) access_flags |= FI_WRITE; + if (info->caps & FI_RMA) { + access_flags |= FI_READ | FI_WRITE; + } + } + + return access_flags; +} + nixl_status_t nixlLibfabricRail::registerMemory(void *buffer, size_t length, + const std::string &hmem_hint, + int device_id, struct fid_mr **mr_out, uint64_t *key_out) const { if (!buffer || !mr_out || !key_out) { @@ -1265,42 +1308,109 @@ nixlLibfabricRail::registerMemory(void *buffer, return NIXL_ERR_BACKEND; } - // Determine access flags based on provider capabilities - uint64_t provider_access_flags; - if (provider_name == "tcp" || provider_name == "sockets") { - // TCP provider has more limited memory registration capabilities - // Use basic flags that are commonly supported - provider_access_flags = FI_READ | FI_WRITE | FI_REMOTE_READ | FI_REMOTE_WRITE; - } else { - // EFA and other providers use standard remote access flags - provider_access_flags = FI_REMOTE_WRITE | FI_REMOTE_READ; - } + // Get access flags based on provider capabilities + uint64_t provider_access_flags = getMemoryRegistrationAccessFlags(); struct fid_mr *mr; + int ret; - // For TCP providers, use a unique key to avoid conflicts - // TCP provider assigns key 0 by default, but we need unique keys for multiple registrations - uint64_t requested_key = 0; - if (provider_name == "tcp" || provider_name == "sockets") { - // Generate a unique key based on buffer address to avoid collisions - // Use the lower bits of the buffer address as a simple unique identifier - requested_key = reinterpret_cast(buffer) & 0xFFFFFFFF; + // Determine registration method based on hint: + // - Empty hint: Use GDR method (fi_mr_reg) - Default path + // - With hint: Use FI_HMEM method (fi_mr_regattr) - Required for SynapseAI, optional for CUDA + + std::string hint_lower = hmem_hint; + std::transform(hint_lower.begin(), hint_lower.end(), hint_lower.begin(), ::tolower); - NIXL_DEBUG << "TCP provider: using requested key " << requested_key << " for buffer " - << buffer << " on rail " << rail_id; + // Validate hint and check if explicit FI_HMEM registration is requested + bool use_hmem = false; + if (!hint_lower.empty()) { + if (hint_lower == "cuda" || hint_lower == "synapseai") { + use_hmem = true; + } else { + NIXL_WARN << "Unknown HMEM hint '" << hmem_hint << "' on rail " << rail_id + << ", falling back to GDR method. Valid hints: CUDA, SYNAPSEAI"; + } } - NIXL_TRACE << "Memory Registration: rail=" << rail_id << " provider=" << provider_name - << " buffer=" << buffer << " length=" << length << " access_flags=0x" << std::hex - << provider_access_flags << std::dec << " requested_key=" << requested_key; + if (use_hmem) { + // === FI_HMEM Path === + NIXL_DEBUG << "Using FI_HMEM registration method on rail " << rail_id + << " (hint=" << hmem_hint << ", device_id=" << device_id << ")"; + + // Use fi_mr_regattr for HMEM device memory registration + struct fi_mr_attr mr_attr = {}; + struct iovec iov = {}; + + iov.iov_base = buffer; + iov.iov_len = length; + + mr_attr.mr_iov = &iov; + mr_attr.iov_count = 1; + mr_attr.access = provider_access_flags; + + // Map hint to FI_HMEM interface and set device ID + if (hint_lower == "cuda") { + mr_attr.iface = FI_HMEM_CUDA; + mr_attr.device.cuda = device_id; // Critical for multi-GPU + NIXL_DEBUG << "Using CUDA HMEM interface for memory registration on rail " << rail_id + << " device_id=" << device_id; + + NIXL_TRACE << "HMEM Registration: rail=" << rail_id << " provider=" << provider_name + << " buffer=" << buffer << " length=" << length << " iface=" << mr_attr.iface + << " device_id=" << device_id + << " access_flags=0x" << std::hex << provider_access_flags << std::dec; + + ret = fi_mr_regattr(domain, &mr_attr, 0, &mr); + if (ret) { + NIXL_ERROR << "fi_mr_regattr (HMEM) failed on rail " << rail_id << ": " << fi_strerror(-ret) + << " (buffer=" << buffer << ", length=" << length + << ", hint=" << hmem_hint << ", iface=" << mr_attr.iface + << ", device_id=" << device_id << ")"; + return NIXL_ERR_BACKEND; + } + } else if (hint_lower == "synapseai") { +#ifdef HAVE_SYNAPSEAI + // Use DMABUF path for SynapseAI + NIXL_DEBUG << "Using SynapseAI DMABUF registration on rail " << rail_id + << " device_id=" << device_id; + + nixl_status_t status = registerSynapseAIMemoryDmabuf(buffer, length, device_id, provider_access_flags, &mr); + if (status != NIXL_SUCCESS) { + return status; + } +#else + NIXL_ERROR << "SynapseAI support not enabled (HAVE_SYNAPSEAI not defined)"; + return NIXL_ERR_NOT_SUPPORTED; +#endif + } + } else { + // === GDR Path (Default) === + // Uses standard fi_mr_reg() which relies on GPU Direct RDMA kernel modules + // (nvidia-peermem) to enable direct NIC-to-GPU memory access. + + NIXL_DEBUG << "Using GDR registration method on rail " << rail_id + << " (standard fi_mr_reg, relies on nvidia-peermem kernel module)"; + + // For TCP providers, use a unique key to avoid conflicts + uint64_t requested_key = 0; + if (provider_name == "tcp" || provider_name == "sockets") { + // Generate a unique key based on buffer address to avoid collisions + requested_key = reinterpret_cast(buffer) & 0xFFFFFFFF; + NIXL_DEBUG << "TCP provider: using requested key " << requested_key << " for buffer " + << buffer << " on rail " << rail_id; + } - int ret = - fi_mr_reg(domain, buffer, length, provider_access_flags, 0, requested_key, 0, &mr, NULL); - if (ret) { - NIXL_ERROR << "fi_mr_reg failed on rail " << rail_id << ": " << fi_strerror(-ret) - << " (buffer=" << buffer << ", length=" << length - << ", requested_key=" << requested_key << ")"; - return NIXL_ERR_BACKEND; + NIXL_TRACE << "GDR Memory Registration: rail=" << rail_id << " provider=" << provider_name + << " buffer=" << buffer << " length=" << length << " access_flags=0x" << std::hex + << provider_access_flags << std::dec << " requested_key=" << requested_key; + + ret = fi_mr_reg(domain, buffer, length, provider_access_flags, 0, requested_key, 0, &mr, NULL); + if (ret) { + NIXL_ERROR << "fi_mr_reg failed on rail " << rail_id << ": " << fi_strerror(-ret) + << " (buffer=" << buffer << ", length=" << length + << ", requested_key=" << requested_key << ")"; + return NIXL_ERR_BACKEND; + } } *mr_out = mr; @@ -1314,6 +1424,137 @@ nixlLibfabricRail::registerMemory(void *buffer, return NIXL_SUCCESS; } +#ifdef HAVE_SYNAPSEAI +nixl_status_t +nixlLibfabricRail::registerSynapseAIMemoryDmabuf(void *buffer, size_t length, int device_id, uint64_t provider_access_flags, struct fid_mr **mr_out) const { + synDeviceId syn_device_id = static_cast(device_id); + synDeviceInfoV2 device_info; + + // Thread-safe initialization of static handles + std::lock_guard lock(synapseai_init_mutex_); + + // Load SynapseAI library functions (shared across instances) + if (!synapseai_handle_) { + synapseai_handle_ = dlopen("libSynapse.so", RTLD_NOW); + if (!synapseai_handle_) { + NIXL_ERROR << "Failed to dlopen libSynapse.so: " << dlerror(); + return NIXL_ERR_BACKEND; + } + + synapseai_ops_.synDeviceGetInfoV2 = + (synStatus (*)(const synDeviceId, synDeviceInfoV2 *))dlsym(synapseai_handle_, "synDeviceGetInfoV2"); + if (!synapseai_ops_.synDeviceGetInfoV2) { + NIXL_ERROR << "Failed to find synDeviceGetInfoV2: " << dlerror(); + return NIXL_ERR_BACKEND; + } + } + + if (!hlthunk_handle_) { + hlthunk_handle_ = dlopen("libhl-thunk.so", RTLD_NOW); + if (!hlthunk_handle_) { + NIXL_ERROR << "Failed to dlopen libhl-thunk.so: " << dlerror(); + return NIXL_ERR_BACKEND; + } + + synapseai_ops_.hlthunk_device_mapped_memory_export_dmabuf_fd = + (int (*)(int, uint64_t, uint64_t, uint64_t, uint32_t))dlsym(hlthunk_handle_, "hlthunk_device_mapped_memory_export_dmabuf_fd"); + if (!synapseai_ops_.hlthunk_device_mapped_memory_export_dmabuf_fd) { + NIXL_ERROR << "Failed to find hlthunk_device_mapped_memory_export_dmabuf_fd: " << dlerror(); + return NIXL_ERR_BACKEND; + } + } + + // Get device info + if (synapseai_ops_.synDeviceGetInfoV2(syn_device_id, &device_info) != synSuccess) { + NIXL_ERROR << "SynapseAI device " << device_id << " not available"; + return NIXL_ERR_BACKEND; + } + + NIXL_DEBUG << "Using SynapseAI device ID: " << device_id << " on rail " << rail_id; + + // Calculate aligned buffer size + const size_t ACCEL_PAGE_SIZE = 4096; + size_t modi_memlen = length; + + // Validate memory is within device range + uint64_t hbm_base = device_info.globalHbmBaseAddress; + uint64_t hbm_size = device_info.dramSize; + uint64_t buffer_addr = reinterpret_cast(buffer); + + if (buffer_addr < hbm_base || buffer_addr >= (hbm_base + hbm_size)) { + NIXL_ERROR << "Memory address 0x" << std::hex << buffer_addr + << " is not within HPU device memory range [0x" << hbm_base + << " - 0x" << (hbm_base + hbm_size) << "]" << std::dec; + return NIXL_ERR_INVALID_PARAM; + } + + // Align device offset to page size + uint64_t device_offset = buffer_addr - hbm_base; + uint64_t modi_mem_addr = buffer_addr; + if (buffer_addr % ACCEL_PAGE_SIZE) { + modi_mem_addr = (buffer_addr / ACCEL_PAGE_SIZE) * ACCEL_PAGE_SIZE; + device_offset -= buffer_addr - modi_mem_addr; + modi_memlen += ACCEL_PAGE_SIZE; + } + modi_memlen = (modi_memlen + ACCEL_PAGE_SIZE - 1) & ~(ACCEL_PAGE_SIZE - 1); + + NIXL_DEBUG << "Exporting dmabuf: fd=" << device_info.fd + << " base=0x" << std::hex << hbm_base + << " size=" << std::dec << modi_memlen + << " buffer=0x" << std::hex << buffer_addr + << " aligned=0x" << modi_mem_addr + << " offset=0x" << device_offset << std::dec; + + // Export dmabuf fd + int dmabuf_fd = synapseai_ops_.hlthunk_device_mapped_memory_export_dmabuf_fd( + device_info.fd, + hbm_base, + modi_memlen, + device_offset, + (O_RDWR | O_CLOEXEC) + ); + + if (dmabuf_fd < 0) { + NIXL_ERROR << "hlthunk_device_mapped_memory_export_dmabuf_fd failed: " << strerror(-dmabuf_fd); + return NIXL_ERR_BACKEND; + } + + NIXL_DEBUG << "Got dmabuf_fd: " << dmabuf_fd << " for device memory on rail " << rail_id; + + // Set up dmabuf structure + struct fi_mr_dmabuf dmabuf = {}; + dmabuf.fd = dmabuf_fd; + dmabuf.offset = 0; + dmabuf.len = modi_memlen; + dmabuf.base_addr = reinterpret_cast(modi_mem_addr); + + // Set up memory registration attributes + struct fi_mr_attr mr_attr = {}; + mr_attr.dmabuf = &dmabuf; + mr_attr.iov_count = 1; + mr_attr.access = provider_access_flags; + mr_attr.iface = FI_HMEM_SYNAPSEAI; + mr_attr.device.synapseai = static_cast(device_id); + + NIXL_DEBUG << "Registering SynapseAI memory with dmabuf fd: " << dmabuf_fd << " on rail " << rail_id; + + // Register memory with dmabuf + int ret = fi_mr_regattr(domain, &mr_attr, FI_MR_DMABUF, mr_out); + + // Cleanup fd after registration + close(dmabuf_fd); + + if (ret) { + NIXL_ERROR << "fi_mr_regattr (DMABUF) failed on rail " << rail_id << ": " << fi_strerror(-ret); + *mr_out = nullptr; + return NIXL_ERR_BACKEND; + } + + NIXL_INFO << "Successfully registered SynapseAI memory via dmabuf on rail " << rail_id; + return NIXL_SUCCESS; +} +#endif + nixl_status_t nixlLibfabricRail::deregisterMemory(struct fid_mr *mr) const { if (!mr) { diff --git a/src/utils/libfabric/libfabric_rail.h b/src/utils/libfabric/libfabric_rail.h index 7e5fdfdd9b..db23bbab85 100644 --- a/src/utils/libfabric/libfabric_rail.h +++ b/src/utils/libfabric/libfabric_rail.h @@ -30,6 +30,11 @@ #include "backend/backend_aux.h" #include "libfabric/libfabric_common.h" +#ifdef HAVE_SYNAPSEAI +#include +#include +#endif + // Forward declarations class nixlLibfabricConnection; @@ -274,9 +279,17 @@ class nixlLibfabricRail { isProperlyInitialized() const; // Memory registration methods - /** Register memory buffer with libfabric */ + /** Register memory buffer with libfabric with HMEM support + * @param buffer Memory buffer to register + * @param length Buffer length in bytes + * @param hmem_hint HMEM interface hint ("cuda", "synapseai", or empty for auto-detection) + * @param device_id Device ID for GPU memory (used when hmem_hint is specified, -1 for host memory) + * @param mr_out Output memory registration handle + * @param key_out Output remote access key + * @return NIXL_SUCCESS on success, error code on failure + */ nixl_status_t - registerMemory(void *buffer, size_t length, struct fid_mr **mr_out, uint64_t *key_out) const; + registerMemory(void *buffer, size_t length, const std::string &hmem_hint, int device_id, struct fid_mr **mr_out, uint64_t *key_out) const; /** Deregister memory from libfabric */ nixl_status_t @@ -370,6 +383,23 @@ class nixlLibfabricRail { nixlLibfabricReq * findRequestFromContext(void *context) const; +#ifdef HAVE_SYNAPSEAI + // SynapseAI DMABUF registration helper + nixl_status_t + registerSynapseAIMemoryDmabuf(void *buffer, size_t length, int device_id, uint64_t provider_access_flags, struct fid_mr **mr_out) const; + + // Static SynapseAI library handles (shared across all rails) + static void *synapseai_handle_; + static void *hlthunk_handle_; + static std::mutex synapseai_init_mutex_; + + struct SynapseAIOps { + synStatus (*synDeviceGetInfoV2)(const synDeviceId deviceId, synDeviceInfoV2 *pDeviceInfo); + int (*hlthunk_device_mapped_memory_export_dmabuf_fd)(int fd, uint64_t addr, uint64_t size, uint64_t offset, uint32_t flags); + }; + static SynapseAIOps synapseai_ops_; +#endif + private: // Core libfabric resources struct fi_info *info; // from rail_infos[rail_id] @@ -404,6 +434,10 @@ class nixlLibfabricRail { processRecvCompletion(struct fi_cq_data_entry *comp) const; nixl_status_t processRemoteWriteCompletion(struct fi_cq_data_entry *comp) const; + + // Memory registration helper + uint64_t + getMemoryRegistrationAccessFlags() const; }; diff --git a/src/utils/libfabric/libfabric_rail_manager.cpp b/src/utils/libfabric/libfabric_rail_manager.cpp index bb26e4b80f..49762b0afc 100644 --- a/src/utils/libfabric/libfabric_rail_manager.cpp +++ b/src/utils/libfabric/libfabric_rail_manager.cpp @@ -73,14 +73,14 @@ nixlLibfabricRailManager::~nixlLibfabricRailManager() { } nixl_status_t -nixlLibfabricRailManager::createDataRails(const std::vector &efa_devices, +nixlLibfabricRailManager::createDataRails(const std::vector &fabric_devices, const std::string &provider_name) { - num_data_rails_ = efa_devices.size(); + num_data_rails_ = fabric_devices.size(); // Pre-allocate to ensure contiguous memory allocation data_rails_.reserve(num_data_rails_); - // Build EFA device to rail index mapping for O(1) lookup - efa_device_to_rail_map.reserve(num_data_rails_); + // Build fabric device to rail index mapping for O(1) lookup + device_to_rail_map.reserve(num_data_rails_); try { data_rails_.clear(); @@ -88,12 +88,12 @@ nixlLibfabricRailManager::createDataRails(const std::vector &efa_de for (size_t i = 0; i < num_data_rails_; ++i) { data_rails_.emplace_back(std::make_unique( - efa_devices[i], provider_name, static_cast(i))); + fabric_devices[i], provider_name, static_cast(i))); - // Initialize EFA device mapping - efa_device_to_rail_map[efa_devices[i]] = i; + // Initialize fabric device mapping + device_to_rail_map[fabric_devices[i]] = i; - NIXL_DEBUG << "Created data rail " << i << " (device: " << efa_devices[i] + NIXL_DEBUG << "Created data rail " << i << " (device: " << fabric_devices[i] << ", provider: " << provider_name << ")"; } } @@ -105,7 +105,7 @@ nixlLibfabricRailManager::createDataRails(const std::vector &efa_de } nixl_status_t -nixlLibfabricRailManager::createControlRails(const std::vector &efa_devices, +nixlLibfabricRailManager::createControlRails(const std::vector &fabric_devices, const std::string &provider_name, size_t num_control_rails) { // Pre-allocate to ensure contiguous memory allocation @@ -118,8 +118,8 @@ nixlLibfabricRailManager::createControlRails(const std::vector &efa for (size_t i = 0; i < num_control_rails_; ++i) { control_rails_.emplace_back(std::make_unique( - efa_devices[i], provider_name, static_cast(i))); - NIXL_DEBUG << "Created control rail " << i << " (device: " << efa_devices[i] + fabric_devices[i], provider_name, static_cast(i))); + NIXL_DEBUG << "Created control rail " << i << " (device: " << fabric_devices[i] << ", provider: " << provider_name << ")"; } } @@ -314,39 +314,39 @@ nixlLibfabricRailManager::selectRailsForMemory(void *mem_addr, nixl_mem_t mem_type, int gpu_id) const { if (mem_type == VRAM_SEG) { -#ifdef HAVE_CUDA +#if defined(HAVE_CUDA) || defined(HAVE_SYNAPSEAI) if (gpu_id < 0) { NIXL_ERROR << "Invalid GPU ID " << gpu_id << " for VRAM memory " << mem_addr; return {}; // Return empty vector to indicate failure } - std::vector gpu_efa_devices = topology->getEfaDevicesForGpu(gpu_id); - if (gpu_efa_devices.empty()) { - NIXL_ERROR << "No EFA devices found for GPU " << gpu_id; + std::vector gpu_nics = topology->getNicsForGpu(gpu_id); + if (gpu_nics.empty()) { + NIXL_ERROR << "No NICs found for GPU " << gpu_id; return {}; // Return empty vector to indicate failure } std::vector gpu_rails; - for (const std::string &efa_device : gpu_efa_devices) { - auto it = efa_device_to_rail_map.find(efa_device); - if (it != efa_device_to_rail_map.end()) { + for (const std::string &device_name : gpu_nics) { + auto it = device_to_rail_map.find(device_name); + if (it != device_to_rail_map.end()) { // Bounds check: ensure rail index is valid if (it->second < data_rails_.size()) { gpu_rails.push_back(it->second); NIXL_DEBUG << "VRAM memory " << mem_addr << " on GPU " << gpu_id - << " mapped to rail " << it->second << " (EFA device: " << efa_device + << " mapped to rail " << it->second << " (fabric device: " << device_name << ")"; } else { - NIXL_WARN << "EFA device " << efa_device << " maps to rail " << it->second + NIXL_WARN << "Fabric device " << device_name << " maps to rail " << it->second << " but only " << data_rails_.size() << " rails available"; } } else { - NIXL_WARN << "EFA device " << efa_device << " not found in rail mapping for GPU " + NIXL_WARN << "Fabric device " << device_name << " not found in rail mapping for GPU " << gpu_id; } } if (gpu_rails.empty()) { NIXL_ERROR << "No valid rail mapping found for GPU " << gpu_id << " (checked " - << gpu_efa_devices.size() << " EFA devices)"; + << gpu_nics.size() << " NICs)"; return {}; } @@ -354,7 +354,7 @@ nixlLibfabricRailManager::selectRailsForMemory(void *mem_addr, << gpu_rails.size() << " rails total"; return gpu_rails; #else - NIXL_ERROR << "VRAM memory type not supported without CUDA"; + NIXL_ERROR << "VRAM memory type not supported without CUDA/SYNAPSEAI"; return {}; #endif } @@ -381,6 +381,7 @@ nixlLibfabricRailManager::registerMemory(void *buffer, size_t length, nixl_mem_t mem_type, int gpu_id, + const std::string &hmem_hint, std::vector &mr_list_out, std::vector &key_list_out, std::vector &selected_rails_out) { @@ -401,7 +402,7 @@ nixlLibfabricRailManager::registerMemory(void *buffer, key_list_out.resize(data_rails_.size(), 0); selected_rails_out = selected_rails; // Return which rails were selected - // Register memory on each selected rail + // Register memory on each selected rail with HMEM hint for (size_t i = 0; i < selected_rails.size(); ++i) { size_t rail_idx = selected_rails[i]; if (rail_idx >= data_rails_.size()) { @@ -419,7 +420,7 @@ nixlLibfabricRailManager::registerMemory(void *buffer, struct fid_mr *mr; uint64_t key; - nixl_status_t status = data_rails_[rail_idx]->registerMemory(buffer, length, &mr, &key); + nixl_status_t status = data_rails_[rail_idx]->registerMemory(buffer, length, hmem_hint, gpu_id, &mr, &key); if (status != NIXL_SUCCESS) { NIXL_ERROR << "Failed to register memory on rail " << rail_idx; // Cleanup already registered MRs @@ -657,11 +658,11 @@ nixlLibfabricRailManager::progressActiveDataRails() { } nixl_status_t -nixlLibfabricRailManager::progressAllControlRails() { +nixlLibfabricRailManager::progressAllControlRails(bool blocking) { bool any_completions = false; for (size_t rail_id = 0; rail_id < num_control_rails_; ++rail_id) { - nixl_status_t status = - control_rails_[rail_id]->progressCompletionQueue(true); // Blocking for control rails + nixl_status_t status = control_rails_[rail_id]->progressCompletionQueue( + blocking); // Blocking for control rails if (status == NIXL_SUCCESS) { any_completions = true; NIXL_DEBUG << "Processed completion on control rail " << rail_id; @@ -892,3 +893,19 @@ nixlLibfabricRailManager::getActiveRailCount() const { std::lock_guard lock(active_rails_mutex_); return active_rails_.size(); } + +int +nixlLibfabricRailManager::getNumNvidiaGpus() const { + if (topology) { + return topology->getNumNvidiaGpus(); + } + return 0; +} + +int +nixlLibfabricRailManager::getNumIntelHpus() const { + if (topology) { + return topology->getNumIntelHpus(); + } + return 0; +} diff --git a/src/utils/libfabric/libfabric_rail_manager.h b/src/utils/libfabric/libfabric_rail_manager.h index 5e93645ccc..6983571cab 100644 --- a/src/utils/libfabric/libfabric_rail_manager.h +++ b/src/utils/libfabric/libfabric_rail_manager.h @@ -48,22 +48,22 @@ class nixlLibfabricRailManager { ~nixlLibfabricRailManager(); // Rail management - /** Create data rails for high-bandwidth transfers (one per EFA device) - * @param efa_devices List of EFA device names to create rails on - * @param provider_name Provider name ("efa" or "efa-direct") + /** Create data rails for high-bandwidth transfers (one per fabric device) + * @param fabric_devices List of fabric device names to create rails on + * @param provider_name Provider name (e.g., "efa", "verbs", "sockets") * @return NIXL_SUCCESS on success, error code on failure */ nixl_status_t - createDataRails(const std::vector &efa_devices, const std::string &provider_name); + createDataRails(const std::vector &fabric_devices, const std::string &provider_name); /** Create control rails for connection management and notifications - * @param efa_devices List of EFA device names - * @param provider_name Provider name ("efa" or "efa-direct") + * @param fabric_devices List of fabric device names + * @param provider_name Provider name (e.g., "efa", "verbs", "sockets") * @param num_control_rails Number of control rails to create * @return NIXL_SUCCESS on success, error code on failure */ nixl_status_t - createControlRails(const std::vector &efa_devices, + createControlRails(const std::vector &fabric_devices, const std::string &provider_name, size_t num_control_rails); @@ -110,6 +110,7 @@ class nixlLibfabricRailManager { * @param length Buffer size in bytes * @param mem_type Memory type (DRAM_SEG or VRAM_SEG) * @param gpu_id GPU device ID (used for VRAM_SEG, ignored for DRAM_SEG) + * @param hmem_hint HMEM interface hint ("cuda", "synapseai", "ze", or empty for auto-detection) * @param mr_list_out Memory registration handles, indexed by rail ID * @param key_list_out Remote access keys, indexed by rail ID * @param selected_rails_out List of rail IDs where memory was registered @@ -120,6 +121,7 @@ class nixlLibfabricRailManager { size_t length, nixl_mem_t mem_type, int gpu_id, + const std::string &hmem_hint, std::vector &mr_list_out, std::vector &key_list_out, std::vector &selected_rails_out); @@ -220,7 +222,7 @@ class nixlLibfabricRailManager { * @return NIXL_SUCCESS if completions processed, NIXL_IN_PROG if none, error on failure */ nixl_status_t - progressAllControlRails(); + progressAllControlRails(bool blocking); /** Validate that all rails are properly initialized * @return NIXL_SUCCESS if all rails initialized, error code otherwise */ @@ -244,6 +246,15 @@ class nixlLibfabricRailManager { size_t getActiveRailCount() const; + // Topology Information APIs + /** Get number of NVIDIA GPUs in the system */ + int + getNumNvidiaGpus() const; + + /** Get number of Intel HPUs in the system */ + int + getNumIntelHpus() const; + // Memory Descriptor APIs /** Get memory descriptor for specified rail and MR */ struct fid_mr * @@ -302,8 +313,8 @@ class nixlLibfabricRailManager { std::unique_ptr topology; - // EFA device to rail mapping - std::unordered_map efa_device_to_rail_map; + // Fabric device to rail mapping + std::unordered_map device_to_rail_map; // Active Rail Tracking System std::unordered_set active_rails_; diff --git a/src/utils/libfabric/libfabric_topology.cpp b/src/utils/libfabric/libfabric_topology.cpp index 19c13c0865..6eb6cad136 100644 --- a/src/utils/libfabric/libfabric_topology.cpp +++ b/src/utils/libfabric/libfabric_topology.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -34,6 +35,8 @@ nixlLibfabricTopology::nixlLibfabricTopology() : num_gpus(0), + num_nvidia_gpus(0), + num_intel_hpus(0), num_numa_nodes(0), num_devices(0), topology_discovered(false), @@ -65,18 +68,18 @@ nixlLibfabricTopology::discoverTopology() { NIXL_ERROR << "Failed to initialize hwloc topology"; return status; } - // Discover EFA devices using libfabric - status = discoverEfaDevices(); + // Discover fabric devices using libfabric + status = discoverDevices(); if (status != NIXL_SUCCESS) { return status; } - // For EFA devices, build PCIe to Libfabric device mapping and full topology - if (provider_name == "efa") { + // For RDMA providers (EFA, verbs, etc.), build PCIe to Libfabric device mapping and full topology + if (isRdmaProvider()) { // Build PCIe to Libfabric device mapping status = buildPcieToLibfabricMapping(); if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to build PCIe to Libfabric mapping - this is required for EFA " - "topology discovery"; + NIXL_ERROR << "Failed to build PCIe to Libfabric mapping - this is required for " + << provider_name << " topology discovery"; return status; } // Discover hardware topology using hwloc @@ -85,10 +88,10 @@ nixlLibfabricTopology::discoverTopology() { NIXL_ERROR << "Failed to discover hwloc topology"; return status; } - // Build GPU to EFA mapping based on PCIe topology - status = buildGpuToEfaMapping(); + // Build GPU to NIC mapping based on PCIe topology + status = buildGpuToNicMapping(); if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to build GPU to EFA mapping"; + NIXL_ERROR << "Failed to build GPU to NIC mapping for " << provider_name; return status; } } else { @@ -98,6 +101,8 @@ nixlLibfabricTopology::discoverTopology() { // Set basic values without hwloc discovery num_gpus = 0; // TCP doesn't need GPU topology + num_nvidia_gpus = 0; + num_intel_hpus = 0; num_numa_nodes = 1; // Simple fallback // For TCP/sockets devices, no GPU-mapping required. @@ -108,8 +113,18 @@ nixlLibfabricTopology::discoverTopology() { return NIXL_SUCCESS; } +bool +nixlLibfabricTopology::isRdmaProvider() const { + // Check for exact match or composite provider (e.g., "verbs;ofi_rxm") + return (provider_name == "efa" || + provider_name == "verbs" || + provider_name.rfind("verbs;", 0) == 0 || // verbs;ofi_rxm, verbs;* + provider_name == "psm2" || + provider_name == "cxi"); +} + nixl_status_t -nixlLibfabricTopology::discoverEfaDevices() { +nixlLibfabricTopology::discoverDevices() { // Use the utility function from libfabric_common auto network_device = LibfabricUtils::getAvailableNetworkDevices(); provider_name = network_device.first; @@ -117,30 +132,34 @@ nixlLibfabricTopology::discoverEfaDevices() { num_devices = all_devices.size(); - // Set device type based on discovered provider + // Log discovered provider and device count if (provider_name == "efa") { - NIXL_INFO << "Discovered " << num_devices << " EFA-Direct devices"; + NIXL_INFO << "Discovered " << num_devices << " EFA devices"; + } else if (provider_name == "verbs") { + NIXL_INFO << "Discovered " << num_devices << " verbs devices (RDMA)"; } else if (provider_name == "sockets") { NIXL_INFO << "Discovered " << num_devices << " socket devices (TCP fallback)"; } else if (provider_name == "none" || all_devices.empty()) { NIXL_WARN << "No network devices found"; return NIXL_ERR_BACKEND; + } else { + NIXL_INFO << "Discovered " << num_devices << " " << provider_name << " devices"; } for (size_t i = 0; i < all_devices.size(); ++i) { - NIXL_TRACE << "Network device " << i << ": " << all_devices[i] + NIXL_TRACE << "Device " << i << ": " << all_devices[i] << " (provider: " << provider_name << ")"; } return NIXL_SUCCESS; } std::vector -nixlLibfabricTopology::getEfaDevicesForGpu(int gpu_id) const { - auto it = gpu_to_efa_devices.find(gpu_id); - if (it != gpu_to_efa_devices.end()) { +nixlLibfabricTopology::getNicsForGpu(int gpu_id) const { + auto it = gpu_to_nics.find(gpu_id); + if (it != gpu_to_nics.end()) { return it->second; } - NIXL_WARN << "No EFA devices found for GPU " << gpu_id << ", returning all devices"; + NIXL_WARN << "No NICs found for GPU " << gpu_id << ", returning all devices"; return all_devices; } @@ -150,23 +169,25 @@ nixlLibfabricTopology::isValidGpuId(int gpu_id) const { } bool -nixlLibfabricTopology::isValidDevice(const std::string &efa_device) const { - return std::find(all_devices.begin(), all_devices.end(), efa_device) != all_devices.end(); +nixlLibfabricTopology::isValidDevice(const std::string &device_name) const { + return std::find(all_devices.begin(), all_devices.end(), device_name) != all_devices.end(); } void nixlLibfabricTopology::printTopologyInfo() const { NIXL_TRACE << "=== Libfabric Topology Information ==="; NIXL_TRACE << "Topology discovered: " << (topology_discovered ? "Yes" : "No"); - NIXL_TRACE << "Number of GPUs: " << num_gpus; + NIXL_TRACE << "Provider: " << provider_name; + NIXL_TRACE << "Number of GPUs: " << num_gpus << " (" << num_nvidia_gpus << " NVIDIA, " + << num_intel_hpus << " Intel HPU)"; NIXL_TRACE << "Number of NUMA nodes: " << num_numa_nodes; - NIXL_TRACE << "Number of EFA devices: " << num_devices; - NIXL_TRACE << "EFA devices: "; + NIXL_TRACE << "Number of devices: " << num_devices; + NIXL_TRACE << "Available devices: "; for (size_t i = 0; i < all_devices.size(); ++i) { NIXL_TRACE << " [" << i << "] " << all_devices[i]; } - NIXL_TRACE << "GPU → EFA mapping:"; - for (const auto &pair : gpu_to_efa_devices) { + NIXL_TRACE << "GPU → NIC mapping:"; + for (const auto &pair : gpu_to_nics) { std::stringstream ss; ss << " GPU " << pair.first << " → ["; for (size_t i = 0; i < pair.second.size(); ++i) { @@ -176,7 +197,7 @@ nixlLibfabricTopology::printTopologyInfo() const { ss << "]"; NIXL_TRACE << ss.str(); } - NIXL_TRACE << "Host memory (DRAM) will use all available EFA devices for maximum bandwidth"; + NIXL_TRACE << "Host memory (DRAM) will use all available devices for maximum bandwidth"; NIXL_TRACE << "====================================="; } @@ -184,9 +205,10 @@ std::string nixlLibfabricTopology::getTopologyString() const { std::stringstream ss; ss << "Libfabric Topology: "; + ss << "Provider=" << provider_name << ", "; ss << "GPUs=" << num_gpus << ", "; ss << "NUMA=" << num_numa_nodes << ", "; - ss << "EFA=" << num_devices << ", "; + ss << "Devices=" << num_devices << ", "; ss << "Discovered=" << (topology_discovered ? "Yes" : "No"); return ss.str(); } @@ -215,7 +237,7 @@ nixlLibfabricTopology::initHwlocTopology() { return NIXL_ERR_BACKEND; } - // Enable I/O device discovery - this is the key to seeing EFA devices! + // Enable I/O device discovery - this is the key to seeing PCIe NICs! #if (HWLOC_API_VERSION >= 0x00020000) enum hwloc_type_filter_e filter = HWLOC_TYPE_FILTER_KEEP_ALL; ret = hwloc_topology_set_io_types_filter(hwloc_topology, filter); @@ -272,15 +294,15 @@ nixlLibfabricTopology::discoverHwlocTopology() { NIXL_ERROR << "hwloc topology not initialized"; return NIXL_ERR_BACKEND; } - // Discover GPUs and EFA devices using hwloc + // Discover GPUs and fabric devices using hwloc nixl_status_t status = discoverGpusWithHwloc(); if (status != NIXL_SUCCESS) { NIXL_ERROR << "Failed to discover GPUs with hwloc"; return status; } - status = discoverEfaDevicesWithHwloc(); + status = discoverDevicesWithHwloc(); if (status != NIXL_SUCCESS) { - NIXL_ERROR << "Failed to discover EFA devices with hwloc"; + NIXL_ERROR << "Failed to discover devices with hwloc"; return status; } // Discover NUMA topology @@ -295,7 +317,8 @@ nixlLibfabricTopology::discoverHwlocTopology() { nixl_status_t nixlLibfabricTopology::discoverGpusWithHwloc() { - num_gpus = 0; + num_nvidia_gpus = 0; + num_intel_hpus = 0; // Find all PCI devices and log detailed information hwloc_obj_t pci_obj = nullptr; while ((pci_obj = hwloc_get_next_pcidev(hwloc_topology, pci_obj)) != nullptr) { @@ -306,17 +329,30 @@ nixlLibfabricTopology::discoverGpusWithHwloc() { uint16_t device_id = pci_obj->attr->pcidev.device_id; uint16_t class_id = pci_obj->attr->pcidev.class_id; - NIXL_TRACE << "Found NVIDIA GPU " << num_gpus << ": " << pcie_addr << " (vendor=0x" + NIXL_TRACE << "Found NVIDIA GPU " << num_nvidia_gpus << ": " << pcie_addr << " (vendor=0x" << std::hex << vendor_id << ", device=0x" << device_id << ", class=0x" << class_id << std::dec << ")"; + num_nvidia_gpus++; + } else if (isIntelHpu(pci_obj)) { + std::string pcie_addr = getPcieAddressFromHwlocObj(pci_obj); + // Get device and vendor info + uint16_t vendor_id = pci_obj->attr->pcidev.vendor_id; + uint16_t device_id = pci_obj->attr->pcidev.device_id; + uint16_t class_id = pci_obj->attr->pcidev.class_id; - num_gpus++; + NIXL_TRACE << "Found Intel HPU " << num_intel_hpus << ": " << pcie_addr << " (vendor=0x" + << std::hex << vendor_id << ", device=0x" << device_id << ", class=0x" + << class_id << std::dec << ")"; + num_intel_hpus++; } } - NIXL_TRACE << "Discovered " << num_gpus << " NVIDIA GPUs via hwloc"; + num_gpus = num_nvidia_gpus + num_intel_hpus; + NIXL_TRACE << "Discovered " << num_gpus << " GPUs via hwloc (" << num_nvidia_gpus + << " NVIDIA, " << num_intel_hpus << " Intel HPU)"; // If we found more than 8 GPUs on P5en, investigate further + // FIXME: add Habana related messages if (num_gpus > 8) { NIXL_WARN << "Found " << num_gpus << " NVIDIA GPUs, but P5en should have 8. Investigating..."; @@ -340,24 +376,47 @@ nixlLibfabricTopology::discoverGpusWithHwloc() { } nixl_status_t -nixlLibfabricTopology::discoverEfaDevicesWithHwloc() { - // EFA devices are already discovered via libfabric +nixlLibfabricTopology::discoverDevicesWithHwloc() { + // Fabric devices are already discovered via libfabric // This method validates the hwloc discovery matches libfabric discovery - int hwloc_efa_count = 0; - hwloc_obj_t pci_obj = nullptr; - while ((pci_obj = hwloc_get_next_pcidev(hwloc_topology, pci_obj)) != nullptr) { - if (isEfaDevice(pci_obj)) { - hwloc_efa_count++; - NIXL_TRACE << "Found EFA device via hwloc: " << getPcieAddressFromHwlocObj(pci_obj); + // Only validate for providers with specific hwloc checks + if (provider_name == "efa") { + int hwloc_device_count = 0; + hwloc_obj_t pci_obj = nullptr; + while ((pci_obj = hwloc_get_next_pcidev(hwloc_topology, pci_obj)) != nullptr) { + if (isEfaDevice(pci_obj)) { + hwloc_device_count++; + NIXL_TRACE << "Found EFA device via hwloc: " << getPcieAddressFromHwlocObj(pci_obj); + } } - } - NIXL_TRACE << "hwloc found " << hwloc_efa_count << " EFA devices, libfabric found " - << num_devices; + NIXL_TRACE << "hwloc found " << hwloc_device_count << " EFA devices, libfabric found " + << num_devices; - if (hwloc_efa_count != num_devices) { - NIXL_WARN << "Mismatch between hwloc (" << hwloc_efa_count << ") and libfabric (" - << num_devices << ") EFA device counts"; + if (hwloc_device_count != num_devices) { + NIXL_WARN << "Mismatch between hwloc (" << hwloc_device_count << ") and libfabric (" + << num_devices << ") EFA device counts"; + } + } else if (provider_name == "verbs") { + int hwloc_device_count = 0; + hwloc_obj_t pci_obj = nullptr; + while ((pci_obj = hwloc_get_next_pcidev(hwloc_topology, pci_obj)) != nullptr) { + if (isMellanoxNic(pci_obj)) { + hwloc_device_count++; + NIXL_TRACE << "Found Mellanox NIC via hwloc: " << getPcieAddressFromHwlocObj(pci_obj); + } + } + + NIXL_TRACE << "hwloc found " << hwloc_device_count << " Mellanox NICs, libfabric found " + << num_devices; + + if (hwloc_device_count != num_devices) { + NIXL_WARN << "Mismatch between hwloc (" << hwloc_device_count << ") and libfabric (" + << num_devices << ") Mellanox NIC counts"; + } + } else { + // For other providers (sockets, psm2, etc.), skip hwloc validation + NIXL_TRACE << "Skipping hwloc device validation for provider: " << provider_name; } return NIXL_SUCCESS; @@ -368,7 +427,7 @@ nixlLibfabricTopology::buildPcieToLibfabricMapping() { pcie_to_libfabric_map.clear(); libfabric_to_pcie_map.clear(); - // Get EFA device info with PCIe addresses from libfabric + // Get fabric device info with PCIe addresses from libfabric struct fi_info *hints, *info; hints = fi_allocinfo(); @@ -380,8 +439,10 @@ nixlLibfabricTopology::buildPcieToLibfabricMapping() { // Configure hints for the discovered provider // This ensures consistency between device discovery and PCIe mapping hints->fabric_attr->prov_name = strdup(provider_name.c_str()); + LibfabricUtils::configureHintsForProvider(hints, provider_name); - int ret = fi_getinfo(FI_VERSION(1, 9), NULL, NULL, 0, hints, &info); + // Use FI_VERSION(1, 18) for DMABUF and HMEM support + int ret = fi_getinfo(FI_VERSION(1, 18), NULL, NULL, 0, hints, &info); if (ret) { NIXL_ERROR << "fi_getinfo failed for PCIe mapping with provider " << provider_name << ": " << fi_strerror(-ret); @@ -389,31 +450,81 @@ nixlLibfabricTopology::buildPcieToLibfabricMapping() { return NIXL_ERR_BACKEND; } + int device_count = 0; + int mapped_count = 0; for (struct fi_info *cur = info; cur; cur = cur->next) { - if (cur->domain_attr && cur->domain_attr->name && cur->nic && cur->nic->bus_attr) { - std::string libfabric_name = cur->domain_attr->name; - // Extract PCIe address from bus_attr if available - if (cur->nic->bus_attr->bus_type == FI_BUS_PCI && - cur->nic->bus_attr->attr.pci.domain_id != FI_ADDR_UNSPEC) { - char pcie_addr[32]; - snprintf(pcie_addr, - sizeof(pcie_addr), - "%x:%02x:%02x.%x", - cur->nic->bus_attr->attr.pci.domain_id, - cur->nic->bus_attr->attr.pci.bus_id, - cur->nic->bus_attr->attr.pci.device_id, - cur->nic->bus_attr->attr.pci.function_id); - - std::string pcie_address = pcie_addr; - pcie_to_libfabric_map[pcie_address] = libfabric_name; - libfabric_to_pcie_map[libfabric_name] = pcie_address; - - NIXL_TRACE << "Mapped PCIe " << pcie_address << " → Libfabric " << libfabric_name - << " (provider: " << provider_name << ")"; + device_count++; + if (!cur->domain_attr || !cur->domain_attr->name) { + NIXL_DEBUG << "Device " << device_count << ": missing domain_attr or name"; + continue; + } + + std::string libfabric_name = cur->domain_attr->name; + NIXL_DEBUG << "Processing device: " << libfabric_name; + + if (!cur->nic) { + NIXL_DEBUG << " Device " << libfabric_name << ": nic is NULL"; + continue; + } + + if (!cur->nic->bus_attr) { + NIXL_DEBUG << " Device " << libfabric_name << ": bus_attr is NULL (likely virtual device, bonded NIC, etc.)"; + continue; + } + + NIXL_DEBUG << " Device " << libfabric_name << ": bus_type=" << cur->nic->bus_attr->bus_type; + + if (cur->nic->bus_attr->bus_type != FI_BUS_PCI) { + NIXL_DEBUG << " Device " << libfabric_name << ": not a PCI device, trying sysfs fallback"; + + // Fallback: Try to get PCIe address from sysfs for bonded/virtual devices + std::string sysfs_path = "/sys/class/infiniband/" + libfabric_name + "/device"; + char resolved_path[PATH_MAX]; + if (realpath(sysfs_path.c_str(), resolved_path)) { + // Parse PCIe address from path like: /sys/devices/pci0000:6d/0000:6d:02.0/0000:6e:00.0 + std::string path_str(resolved_path); + size_t last_slash = path_str.rfind('/'); + if (last_slash != std::string::npos) { + std::string pcie_addr = path_str.substr(last_slash + 1); + // Verify format: domain:bus:device.function (e.g., 0000:6e:00.0) + if (pcie_addr.length() >= 7 && pcie_addr.find(':') != std::string::npos) { + pcie_to_libfabric_map[pcie_addr].push_back(libfabric_name); + libfabric_to_pcie_map[libfabric_name] = pcie_addr; + mapped_count++; + NIXL_DEBUG << " Successfully mapped PCIe " << pcie_addr << " → " << libfabric_name << " (via sysfs)"; + continue; + } + } } + NIXL_DEBUG << " Device " << libfabric_name << ": sysfs fallback failed"; + continue; } + + if (cur->nic->bus_attr->attr.pci.domain_id == FI_ADDR_UNSPEC) { + NIXL_DEBUG << " Device " << libfabric_name << ": PCIe domain_id is FI_ADDR_UNSPEC"; + continue; + } + + // Extract PCIe address from bus_attr if available + char pcie_addr[32]; + snprintf(pcie_addr, + sizeof(pcie_addr), + "%x:%02x:%02x.%x", + cur->nic->bus_attr->attr.pci.domain_id, + cur->nic->bus_attr->attr.pci.bus_id, + cur->nic->bus_attr->attr.pci.device_id, + cur->nic->bus_attr->attr.pci.function_id); + + std::string pcie_address = pcie_addr; + pcie_to_libfabric_map[pcie_address].push_back(libfabric_name); + libfabric_to_pcie_map[libfabric_name] = pcie_address; + mapped_count++; + + NIXL_DEBUG << " Successfully mapped PCIe " << pcie_address << " → Libfabric " << libfabric_name; } + NIXL_DEBUG << "PCIe mapping: processed " << device_count << " devices, successfully mapped " << mapped_count; + fi_freeinfo(info); fi_freeinfo(hints); NIXL_TRACE << "Built PCIe to Libfabric mapping for " << pcie_to_libfabric_map.size() @@ -422,16 +533,16 @@ nixlLibfabricTopology::buildPcieToLibfabricMapping() { } nixl_status_t -nixlLibfabricTopology::buildGpuToEfaMapping() { - gpu_to_efa_devices.clear(); - // Implement NIXL's topology-aware GPU-EFA grouping algorithm +nixlLibfabricTopology::buildGpuToNicMapping() { + gpu_to_nics.clear(); + // Implement NIXL's topology-aware GPU-NIC grouping algorithm nixl_status_t status = buildTopologyAwareGrouping(); if (status != NIXL_SUCCESS) { NIXL_WARN << "Topology-aware grouping failed, using fallback to use all available devices"; return buildFallbackMapping(); } - NIXL_TRACE << "Built GPU→EFA mapping for " << gpu_to_efa_devices.size() + NIXL_TRACE << "Built GPU→NIC mapping for " << gpu_to_nics.size() << " GPUs using topology-aware algorithm"; return NIXL_SUCCESS; @@ -442,10 +553,32 @@ nixlLibfabricTopology::buildTopologyAwareGrouping() { // Step 1: Build NIC info structures by correlating libfabric with hwloc std::vector discovered_nics; std::vector discovered_gpus; + + NIXL_DEBUG << "Starting NIC discovery: pcie_to_libfabric_map has " << pcie_to_libfabric_map.size() << " PCIe addresses"; + // Discover NICs by correlating libfabric devices with hwloc objects for (const auto &pair : pcie_to_libfabric_map) { const std::string &pcie_addr = pair.first; - const std::string &libfabric_name = pair.second; + const std::vector &libfabric_devices = pair.second; + + NIXL_DEBUG << "Processing PCIe address " << pcie_addr << " with " << libfabric_devices.size() << " libfabric device(s)"; + + // Deduplicate device names (libfabric may return the same device multiple times) + std::set seen; + std::vector unique_devices; + for (const auto &dev : libfabric_devices) { + if (seen.insert(dev).second) { + unique_devices.push_back(dev); + } + } + + if (unique_devices.size() < libfabric_devices.size()) { + NIXL_DEBUG << " Deduplicated " << libfabric_devices.size() << " → " << unique_devices.size() << " devices"; + } + + // Process all unique libfabric devices that share this PCIe address + for (const std::string &libfabric_name : unique_devices) { + NIXL_DEBUG << " Processing device: " << libfabric_name; // Parse PCIe address uint16_t domain_id; @@ -460,6 +593,9 @@ nixlLibfabricTopology::buildTopologyAwareGrouping() { continue; } + NIXL_DEBUG << "Parsed PCIe address: domain=" << domain_id << ", bus=" << (int)bus_id + << ", device=" << (int)device_id << ", function=" << (int)function_id; + // Find corresponding hwloc object hwloc_obj_t hwloc_node = hwloc_get_pcidev_by_busid(hwloc_topology, domain_id, bus_id, device_id, function_id); @@ -473,15 +609,24 @@ nixlLibfabricTopology::buildTopologyAwareGrouping() { nic.device_id = device_id; nic.function_id = function_id; discovered_nics.push_back(nic); - NIXL_TRACE << "Correlated NIC: " << pcie_addr << " → " << libfabric_name; + NIXL_DEBUG << " Successfully correlated NIC: " << pcie_addr << " → " << libfabric_name; } else { - NIXL_WARN << "Could not find hwloc object for PCIe address: " << pcie_addr; + NIXL_WARN << " Could not find hwloc object for PCIe address: " << pcie_addr; } - } + } // end for each libfabric device + } // end for each PCIe address + + NIXL_DEBUG << "NIC discovery complete: found " << discovered_nics.size() << " NICs"; + // Step 2: Discover GPUs + NIXL_DEBUG << "Starting GPU discovery"; hwloc_obj_t pci_obj = nullptr; + int pci_device_count = 0; + int gpu_count = 0; while ((pci_obj = hwloc_get_next_pcidev(hwloc_topology, pci_obj)) != nullptr) { - if (isNvidiaGpu(pci_obj)) { + pci_device_count++; + if (isNvidiaGpu(pci_obj) || isIntelHpu(pci_obj)) { + gpu_count++; GpuInfo gpu; gpu.hwloc_node = pci_obj; gpu.domain_id = pci_obj->attr->pcidev.domain; @@ -489,8 +634,11 @@ nixlLibfabricTopology::buildTopologyAwareGrouping() { gpu.device_id = pci_obj->attr->pcidev.dev; gpu.function_id = pci_obj->attr->pcidev.func; discovered_gpus.push_back(gpu); + NIXL_DEBUG << "Found GPU at " << std::hex << gpu.domain_id << ":" + << (int)gpu.bus_id << ":" << (int)gpu.device_id << "." << (int)gpu.function_id << std::dec; } } + NIXL_DEBUG << "GPU discovery complete: scanned " << pci_device_count << " PCI devices, found " << discovered_gpus.size() << " GPUs"; NIXL_TRACE << "Discovered " << discovered_nics.size() << " NICs and " << discovered_gpus.size() << " GPUs for grouping"; @@ -505,13 +653,13 @@ nixlLibfabricTopology::buildTopologyAwareGrouping() { if (status != NIXL_SUCCESS) { return status; } - // Step 4: Convert groups to GPU→EFA mapping + // Step 4: Convert groups to GPU→NIC mapping for (size_t group_idx = 0; group_idx < nic_groups.size(); ++group_idx) { const auto &group = nic_groups[group_idx]; if (group.has_gpu) { - std::vector gpu_efa_devices; + std::vector gpu_nics; for (const auto &nic : group.nics) { - gpu_efa_devices.push_back(nic.libfabric_name); + gpu_nics.push_back(nic.libfabric_name); } // Find GPU index in our discovered GPUs list int gpu_index = -1; @@ -527,26 +675,53 @@ nixlLibfabricTopology::buildTopologyAwareGrouping() { } if (gpu_index >= 0) { - gpu_to_efa_devices[gpu_index] = gpu_efa_devices; + gpu_to_nics[gpu_index] = gpu_nics; NIXL_TRACE << "GPU " << gpu_index << " (" << std::hex << group.closest_gpu.domain_id << ":" << static_cast(group.closest_gpu.bus_id) << ":" << static_cast(group.closest_gpu.device_id) << "." << static_cast(group.closest_gpu.function_id) << std::dec << ") → " - << gpu_efa_devices.size() << " EFA devices"; + << gpu_nics.size() << " NICs"; + } + } + } + + // Step 5: Handle virtual devices - if all NICs share the same PCIe address, + // assign them to all GPUs instead of just the closest one + if (!discovered_nics.empty() && pcie_to_libfabric_map.size() == 1) { + // All NICs share a single PCIe address - this is a virtual device + const std::string &vdev_pcie_addr = pcie_to_libfabric_map.begin()->first; + const std::vector &vdev_devices = pcie_to_libfabric_map.begin()->second; + + // Deduplicate device names - libfabric may report the same virtual device multiple times + std::vector unique_devices; + std::set seen; + for (const auto &dev : vdev_devices) { + if (seen.insert(dev).second) { + unique_devices.push_back(dev); } } + + NIXL_INFO << "Detected virtual device at PCIe " << vdev_pcie_addr + << " with " << vdev_devices.size() << " instances (" << unique_devices.size() << " unique)"; + NIXL_INFO << "Assigning virtual device to all " << discovered_gpus.size() << " GPUs (if bond, lower layer handles load balancing)"; + + // Assign unique virtual device instances to all GPUs + for (size_t gpu_idx = 0; gpu_idx < discovered_gpus.size(); ++gpu_idx) { + gpu_to_nics[static_cast(gpu_idx)] = unique_devices; + } } + return NIXL_SUCCESS; } nixl_status_t nixlLibfabricTopology::buildFallbackMapping() { // Fallback: if specific mapping failed, use simple approach - gpu_to_efa_devices.clear(); + gpu_to_nics.clear(); // Give all devices to all GPUs (not optimal but functional) for (int gpu_id = 0; gpu_id < num_gpus; ++gpu_id) { - gpu_to_efa_devices[gpu_id] = all_devices; + gpu_to_nics[gpu_id] = all_devices; } return NIXL_SUCCESS; } @@ -570,6 +745,21 @@ nixlLibfabricTopology::getPcieAddressFromHwlocObj(hwloc_obj_t obj) const { return std::string(pcie_addr); } +bool +nixlLibfabricTopology::isIntelHpu(hwloc_obj_t obj) const { + if (!obj || obj->type != HWLOC_OBJ_PCI_DEVICE) { + return false; + } + // Intel Habana vendor ID is 0x1da3 + if (obj->attr->pcidev.vendor_id != 0x1da3) { + return false; + } + // Gaudi devices use class 0x1200 (Processing Accelerators) + // Accept this class specifically for Habana devices + uint16_t class_id = obj->attr->pcidev.class_id; + return (class_id == 0x1200); +} + bool nixlLibfabricTopology::isNvidiaGpu(hwloc_obj_t obj) const { if (!obj || obj->type != HWLOC_OBJ_PCI_DEVICE) { @@ -598,6 +788,20 @@ nixlLibfabricTopology::isEfaDevice(hwloc_obj_t obj) const { (obj->attr->pcidev.device_id & 0xfff0) == 0xefa0; } +bool +nixlLibfabricTopology::isMellanoxNic(hwloc_obj_t obj) const { + if (!obj || obj->type != HWLOC_OBJ_PCI_DEVICE) { + return false; + } + + // Mellanox/NVIDIA vendor ID is 0x15b3 + // Class 0x0200 is Network controller (Ethernet) + // Class 0x0207 is InfiniBand controller + uint16_t class_id = obj->attr->pcidev.class_id; + return obj->attr->pcidev.vendor_id == 0x15b3 && + (class_id == 0x0200 || class_id == 0x0207); +} + nixl_status_t nixlLibfabricTopology::groupNicsWithGpus(const std::vector &discovered_nics, const std::vector &discovered_gpus, diff --git a/src/utils/libfabric/libfabric_topology.h b/src/utils/libfabric/libfabric_topology.h index f85bc74e9a..eafc356b58 100644 --- a/src/utils/libfabric/libfabric_topology.h +++ b/src/utils/libfabric/libfabric_topology.h @@ -24,16 +24,16 @@ #include /** - * @brief Topology discovery and management for AWS instances with EFA devices + * @brief Topology discovery and management for libfabric devices * - * Automatically discovers system topology using hwloc and maps GPUs to EFA devices - * based on PCIe proximity for optimal performance. Falls back to TCP/sockets - * when EFA devices are not available. + * Automatically discovers system topology using hwloc and maps GPUs to NICs + * based on PCIe proximity for optimal performance. Supports EFA, verbs, and other + * RDMA providers. Falls back to TCP/sockets when RDMA devices are not available. */ class nixlLibfabricTopology { private: - // GPU to EFA device mapping: GPU 0→[efa0,efa1], GPU 1→[efa2,efa3], etc. - std::map> gpu_to_efa_devices; + // GPU to NIC mapping for RDMA providers: GPU 0→[rdmap0s6-rdm,rdmap1s6-rdm], GPU 1→[rdmap2s6-rdm,rdmap3s6-rdm], etc. + std::map> gpu_to_nics; // All available network devices discovered on this system std::vector all_devices; @@ -42,7 +42,9 @@ class nixlLibfabricTopology { std::string provider_name; // System information - int num_gpus; + int num_gpus; // Total GPUs (NVIDIA + Intel HPU) + int num_nvidia_gpus; // NVIDIA GPU count + int num_intel_hpus; // Intel Habana HPU count int num_numa_nodes; int num_devices; @@ -53,14 +55,17 @@ class nixlLibfabricTopology { hwloc_topology_t hwloc_topology; // PCIe to Libfabric device mapping - std::map pcie_to_libfabric_map; + // One PCIe address can have multiple libfabric devices (bonded case) + std::map> pcie_to_libfabric_map; std::map libfabric_to_pcie_map; // Helper methods nixl_status_t - discoverEfaDevices(); + discoverDevices(); nixl_status_t discoverTopology(); + bool + isRdmaProvider() const; // hwloc-based discovery methods nixl_status_t @@ -72,9 +77,9 @@ class nixlLibfabricTopology { nixl_status_t discoverGpusWithHwloc(); nixl_status_t - discoverEfaDevicesWithHwloc(); + discoverDevicesWithHwloc(); nixl_status_t - buildGpuToEfaMapping(); + buildGpuToNicMapping(); void cleanupHwlocTopology(); @@ -117,9 +122,13 @@ class nixlLibfabricTopology { std::string getPcieAddressFromHwlocObj(hwloc_obj_t obj) const; bool + isIntelHpu(hwloc_obj_t obj) const; + bool isNvidiaGpu(hwloc_obj_t obj) const; bool isEfaDevice(hwloc_obj_t obj) const; + bool + isMellanoxNic(hwloc_obj_t obj) const; public: nixlLibfabricTopology(); // Automatically discovers topology @@ -127,7 +136,7 @@ class nixlLibfabricTopology { // GPU-based queries (main interface) std::vector - getEfaDevicesForGpu(int gpu_id) const; + getNicsForGpu(int gpu_id) const; // System information int @@ -135,6 +144,16 @@ class nixlLibfabricTopology { return num_gpus; } + int + getNumNvidiaGpus() const { + return num_nvidia_gpus; + } + + int + getNumIntelHpus() const { + return num_intel_hpus; + } + const std::vector & getAllDevices() const { return all_devices; @@ -154,7 +173,7 @@ class nixlLibfabricTopology { bool isValidGpuId(int gpu_id) const; bool - isValidDevice(const std::string &efa_device) const; + isValidDevice(const std::string &device_name) const; // Debug/info void diff --git a/src/utils/libfabric/meson.build b/src/utils/libfabric/meson.build index 39fa98bca3..18e62293ca 100644 --- a/src/utils/libfabric/meson.build +++ b/src/utils/libfabric/meson.build @@ -49,6 +49,12 @@ if cuda_dep.found() libfabric_utils_cpp_args += ['-DHAVE_CUDA'] endif +# Add SynapseAI support if available (dependency is globally defined) +if synapseai_dep.found() + libfabric_utils_deps += [synapseai_dep] + libfabric_utils_cpp_args += ['-DHAVE_SYNAPSEAI'] +endif + # Create static library libfabric_utils_lib = static_library( 'nixl_libfabric_utils', diff --git a/test/meson.build b/test/meson.build index aa9fd3acb6..0b0ec96f9d 100644 --- a/test/meson.build +++ b/test/meson.build @@ -13,6 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +subdir('utils') + +# Re-export dependency to project root +test_synapseai_utils_dep = test_synapseai_utils_dep + subdir('nixl') subdir('unit') subdir('gtest') diff --git a/test/nixl/meson.build b/test/nixl/meson.build index bc6a9c9ffa..f8ac290c2f 100644 --- a/test/nixl/meson.build +++ b/test/nixl/meson.build @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + desc_example = executable('desc_example', 'desc_example.cpp', dependencies: [nixl_dep, nixl_infra], @@ -27,10 +28,20 @@ agent_example = executable('agent_example', link_with: [serdes_lib], install: true) -nixl_test_app = executable('nixl_test', 'nixl_test.cpp', - dependencies: [nixl_dep, nixl_infra, stream_interface, thread_dep], - include_directories: [nixl_inc_dirs, utils_inc_dirs, '../../src/utils/serdes'], - link_with: [serdes_lib], install: true) +if synapseai_dep.found() + compile_flags += ['-DHAVE_SYNAPSEAI'] + nixl_test_app = executable('nixl_test', 'nixl_test.cpp', + cpp_args: compile_flags, + dependencies: [nixl_dep, nixl_infra, stream_interface, thread_dep, test_synapseai_utils_dep], + include_directories: [nixl_inc_dirs, utils_inc_dirs, '../../src/utils/serdes'], + link_with: [serdes_lib], install: true) +else + nixl_test_app = executable('nixl_test', 'nixl_test.cpp', + dependencies: [nixl_dep, nixl_infra, stream_interface, thread_dep], + include_directories: [nixl_inc_dirs, utils_inc_dirs, '../../src/utils/serdes'], + link_with: [serdes_lib], install: true) +endif + plugin_test = executable('test_plugin', 'test_plugin.cpp', diff --git a/test/nixl/nixl_test.cpp b/test/nixl/nixl_test.cpp index 03095c2cf3..8a8ed01364 100644 --- a/test/nixl/nixl_test.cpp +++ b/test/nixl/nixl_test.cpp @@ -27,7 +27,9 @@ #include "serdes/serdes.h" #include #include - +#ifdef HAVE_SYNAPSEAI +#include "synapseai_utils.h" +#endif #define NUM_TRANSFERS 2 #define NUM_THREADS 4 #define SIZE 1024 @@ -48,30 +50,39 @@ struct SharedNotificationState { static const std::string target("target"); static const std::string initiator("initiator"); -static std::vector> initMem(nixlAgent &agent, - nixl_reg_dlist_t &dram, - nixl_opt_args_t *extra_params, - uint8_t val) { +static std::vector> +initMem(nixlAgent &agent, nixl_reg_dlist_t &mem_dlist, nixl_opt_args_t *extra_params, uint8_t val) { std::vector> addrs; for (int i = 0; i < NUM_TRANSFERS; i++) { auto addr = std::make_unique(SIZE); - std::fill_n(addr.get(), SIZE, val); - std::cout << "Allocating : " << (void *)addr.get() << ", " - << "Setting to 0x" << std::hex << (unsigned)val << std::dec << std::endl; - dram.addDesc(nixlBlobDesc((uintptr_t)(addr.get()), SIZE, 0, "")); +#ifdef HAVE_SYNAPSEAI + auto device_buffer = Synapseaiutils::allocate_synapse_memory(SIZE, addr.get()); + + std::cout << "Allocating : " << addr.get() << ", " << "Setting to 0x" << std::hex + << (unsigned)val << std::dec << std::endl; + mem_dlist.addDesc(nixlBlobDesc( + (uintptr_t)(device_buffer), SIZE, Synapseaiutils::get_device_handle(), "")); +#else + mem_dlist.addDesc(nixlBlobDesc((uintptr_t)(addr.get()), SIZE, 0, "")); +#endif addrs.push_back(std::move(addr)); } - agent.registerMem(dram, extra_params); + agent.registerMem(mem_dlist, extra_params); return addrs; } -static void targetThread(nixlAgent &agent, nixl_opt_args_t *extra_params, int thread_id) { - nixl_reg_dlist_t dram_for_ucx(DRAM_SEG); - auto addrs = initMem(agent, dram_for_ucx, extra_params, 0); +static void +targetThread(nixlAgent &agent, nixl_opt_args_t *extra_params, int thread_id, std::string backend) { +#ifdef HAVE_SYNAPSEAI + nixl_reg_dlist_t mem_dlist(VRAM_SEG); +#else + nixl_reg_dlist_t mem_dlist(DRAM_SEG); +#endif + auto addrs = initMem(agent, mem_dlist, extra_params, 0); nixl_blob_t tgt_metadata; agent.getLocalMD(tgt_metadata); @@ -79,47 +90,68 @@ static void targetThread(nixlAgent &agent, nixl_opt_args_t *extra_params, int th std::cout << "Thread " << thread_id << " Start Control Path metadata exchanges\n"; std::cout << "Thread " << thread_id << " Desc List from Target to Initiator\n"; - dram_for_ucx.print(); + mem_dlist.print(); /** Only send desc list */ nixlSerDes serdes; - assert(dram_for_ucx.trim().serialize(&serdes) == NIXL_SUCCESS); + assert(mem_dlist.trim().serialize(&serdes) == NIXL_SUCCESS); std::cout << "Thread " << thread_id << " Wait for initiator and then send xfer descs\n"; std::string message = serdes.exportStr(); - while (agent.genNotif(initiator, message, extra_params) != NIXL_SUCCESS); - std::cout << "Thread " << thread_id << " End Control Path metadata exchanges\n"; + + while (agent.genNotif(initiator, message, extra_params) != NIXL_SUCCESS) + ; std::cout << "Thread " << thread_id << " Start Data Path Exchanges\n"; std::cout << "Thread " << thread_id << " Waiting to receive Data from Initiator\n"; bool rc = false; for (int n_tries = 0; !rc && n_tries < 100; n_tries++) { - //Only works with progress thread now, as backend is protected + // Only works with progress thread now, as backend is protected /** Sanity Check */ +#ifdef HAVE_SYNAPSEAI + for (int i = 0; i < mem_dlist.descCount(); ++i) { + nixlBlobDesc desc = mem_dlist[i]; + Synapseaiutils::copy_from_device_buffer((uint64_t)desc.addr, addrs[i].get(), desc.len); + } +#endif rc = std::all_of(addrs.begin(), addrs.end(), [](auto &addr) { - return std::all_of(addr.get(), addr.get() + SIZE, [](int x) { - return x == MEM_VAL; - }); + return std::all_of(addr.get(), addr.get() + SIZE, [](int x) { return x == MEM_VAL; }); }); - if (!rc) - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (!rc) std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if (!rc) - std::cerr << "Thread " << thread_id << " UCX Transfer failed, buffers are different\n"; + std::cerr << "Thread " << thread_id << " " << backend + << " Transfer failed, buffers are different\n"; else - std::cout << "Thread " << thread_id << " Transfer completed and Buffers match with Initiator\n" - << "Thread " << thread_id << " UCX Transfer Success!!!\n"; + std::cout << "Thread " << thread_id + << " Transfer completed and Buffers match with Initiator\n" + << "Thread " << thread_id << " " << backend << " Transfer Success!!!\n"; std::cout << "Thread " << thread_id << " Cleanup..\n"; - agent.deregisterMem(dram_for_ucx, extra_params); + agent.deregisterMem(mem_dlist, extra_params); +#ifdef HAVE_SYNAPSEAI + for (int i = 0; i < mem_dlist.descCount(); ++i) { + nixlBlobDesc desc = mem_dlist[i]; + Synapseaiutils::free_synapse_memory((uint64_t)desc.addr); + } +#endif } -static void initiatorThread(nixlAgent &agent, nixl_opt_args_t *extra_params, - const std::string &target_ip, int target_port, int thread_id, - SharedNotificationState &shared_state) { - nixl_reg_dlist_t dram_for_ucx(DRAM_SEG); - auto addrs = initMem(agent, dram_for_ucx, extra_params, MEM_VAL); +static void +initiatorThread(nixlAgent &agent, + nixl_opt_args_t *extra_params, + const std::string &target_ip, + int target_port, + int thread_id, + SharedNotificationState &shared_state, + std::string backend) { +#ifdef HAVE_SYNAPSEAI + nixl_reg_dlist_t mem_dlist(VRAM_SEG); +#else + nixl_reg_dlist_t mem_dlist(DRAM_SEG); +#endif + auto addrs = initMem(agent, mem_dlist, extra_params, MEM_VAL); std::cout << "Thread " << thread_id << " Start Control Path metadata exchanges\n"; std::cout << "Thread " << thread_id << " Exchange metadata with Target\n"; @@ -163,13 +195,15 @@ static void initiatorThread(nixlAgent &agent, nixl_opt_args_t *extra_params, } std::cout << "Thread " << thread_id << " Verify Deserialized Target's Desc List at Initiator\n"; - nixl_xfer_dlist_t dram_target_ucx(&remote_serdes); - nixl_xfer_dlist_t dram_initiator_ucx = dram_for_ucx.trim(); - dram_target_ucx.print(); + + nixl_xfer_dlist_t xfer_target_dlist(&remote_serdes); + nixl_xfer_dlist_t xfer_initiator_dlist = mem_dlist.trim(); + xfer_target_dlist.print(); std::cout << "Thread " << thread_id << " End Control Path metadata exchanges\n"; std::cout << "Thread " << thread_id << " Start Data Path Exchanges\n\n"; - std::cout << "Thread " << thread_id << " Create transfer request with UCX backend\n"; + std::cout << "Thread " << thread_id << " Create transfer request with " << backend + << " backend\n"; // Need to do this in a loop with NIXL_ERR_NOT_FOUND // UCX AM with desc list is faster than listener thread can recv/load MD with sockets @@ -177,8 +211,8 @@ static void initiatorThread(nixlAgent &agent, nixl_opt_args_t *extra_params, nixlXferReqH *treq; nixl_status_t ret = NIXL_SUCCESS; do { - ret = agent.createXferReq(NIXL_WRITE, dram_initiator_ucx, dram_target_ucx, - target, treq, extra_params); + ret = agent.createXferReq( + NIXL_WRITE, xfer_initiator_dlist, xfer_target_dlist, target, treq, extra_params); } while (ret == NIXL_ERR_NOT_FOUND); if (ret != NIXL_SUCCESS) { @@ -186,7 +220,7 @@ static void initiatorThread(nixlAgent &agent, nixl_opt_args_t *extra_params, exit(-1); } - std::cout << "Thread " << thread_id << " Post the request with UCX backend\n"; + std::cout << "Thread " << thread_id << " Post the request with " << backend << " backend\n"; ret = agent.postXferReq(treq); std::cout << "Thread " << thread_id << " Initiator posted Data Path transfer\n"; std::cout << "Thread " << thread_id << " Waiting for completion\n"; @@ -195,88 +229,133 @@ static void initiatorThread(nixlAgent &agent, nixl_opt_args_t *extra_params, ret = agent.getXferStatus(treq); assert(ret >= 0); } - std::cout << "Thread " << thread_id << " Completed Sending Data using UCX backend\n"; + std::cout << "Thread " << thread_id << " Completed Sending Data using " << backend + << " backend\n"; agent.releaseXferReq(treq); agent.invalidateLocalMD(&md_extra_params); std::cout << "Thread " << thread_id << " Cleanup..\n"; - agent.deregisterMem(dram_for_ucx, extra_params); + agent.deregisterMem(mem_dlist, extra_params); +#ifdef HAVE_SYNAPSEAI + for (int i = 0; i < mem_dlist.descCount(); ++i) { + nixlBlobDesc desc = mem_dlist[i]; + Synapseaiutils::free_synapse_memory((uint64_t)desc.addr); + } +#endif } -static void runTarget(const std::string &ip, int port, nixl_thread_sync_t sync_mode) { +static void +runTarget(const std::string &ip, int port, nixl_thread_sync_t sync_mode, std::string backend) { nixlAgentConfig cfg(true, true, port, sync_mode, 1, 0, 100000, false); + std::cout << "Starting Agent for target\n"; nixlAgent agent(target, cfg); nixl_b_params_t params = { - { "num_workers", "4" }, + {"num_workers", "4"}, }; - nixlBackendH *ucx; - agent.createBackend("UCX", params, ucx); + nixlBackendH *nixl_backend; + agent.createBackend(backend, params, nixl_backend); + +#ifdef HAVE_SYNAPSEAI + Synapseaiutils::init_synapse_device(); +#endif nixl_opt_args_t extra_params; - extra_params.backends.push_back(ucx); + extra_params.backends.push_back(nixl_backend); std::vector threads; for (int i = 0; i < NUM_THREADS; i++) - threads.emplace_back(targetThread, std::ref(agent), &extra_params, i); + threads.emplace_back(targetThread, std::ref(agent), &extra_params, i, backend); for (auto &thread : threads) thread.join(); +#ifdef HAVE_SYNAPSEAI + Synapseaiutils::deinit_synapse_device(); +#endif } -static void runInitiator(const std::string &target_ip, int target_port, nixl_thread_sync_t sync_mode) { +static void +runInitiator(const std::string &target_ip, + int target_port, + nixl_thread_sync_t sync_mode, + std::string backend) { nixlAgentConfig cfg(true, true, 0, sync_mode, 1, 0, 100000, false); std::cout << "Starting Agent for initiator\n"; nixlAgent agent(initiator, cfg); nixl_b_params_t params = { - { "num_workers", "4" }, + {"num_workers", "4"}, }; - nixlBackendH *ucx; - agent.createBackend("UCX", params, ucx); + nixlBackendH *nixl_backend; + agent.createBackend(backend, params, nixl_backend); + +#ifdef HAVE_SYNAPSEAI + Synapseaiutils::init_synapse_device(); +#endif nixl_opt_args_t extra_params; - extra_params.backends.push_back(ucx); + extra_params.backends.push_back(nixl_backend); SharedNotificationState shared_state; std::vector threads; for (int i = 0; i < NUM_THREADS; i++) - threads.emplace_back(initiatorThread, std::ref(agent), &extra_params, - target_ip, target_port, i, std::ref(shared_state)); + threads.emplace_back(initiatorThread, + std::ref(agent), + &extra_params, + target_ip, + target_port, + i, + std::ref(shared_state), + backend); for (auto &thread : threads) thread.join(); + +#ifdef HAVE_SYNAPSEAI + Synapseaiutils::deinit_synapse_device(); +#endif } -int main(int argc, char *argv[]) { +int +main(int argc, char *argv[]) { /** Argument Parsing */ if (argc < 4) { - std::cout <<"Enter the required arguments\n" << std::endl; - std::cout <<" " <<" " - << std::endl; + std::cout << "Enter the required arguments\n" << std::endl; + std::cout << " " << " " << std::endl; exit(-1); } std::string role = std::string(argv[1]); - const char *target_ip = argv[2]; - int target_port = std::stoi(argv[3]); + const char *target_ip = argv[2]; + int target_port = std::stoi(argv[3]); std::transform(role.begin(), role.end(), role.begin(), ::tolower); if (!role.compare(initiator) && !role.compare(target)) { - std::cerr << "Invalid role. Use 'initiator' or 'target'." - << "Currently "<< role < +#include +#include +#include +#include + +#include "libfabric_backend.h" +#include "common/nixl_log.h" + +using namespace std; + +nixlLibfabricEngine * +createEngine(std::string name, bool p_thread) { + nixlBackendInitParams init; + nixl_b_params_t custom_params; + + init.enableProgTh = p_thread; + init.pthrDelay = 100; + init.localAgent = name; + init.customParams = &custom_params; + init.type = "LIBFABRIC"; + + auto engine = new nixlLibfabricEngine(&init); + assert(!engine->getInitErr()); + if (engine->getInitErr()) { + std::cout << "Failed to initialize libfabric engine" << std::endl; + exit(1); + } + + return engine; +} + +void +releaseEngine(nixlLibfabricEngine *engine) { + delete engine; +} + +void +allocateAndRegister(nixlLibfabricEngine *engine, + int dev_id, + nixl_mem_t mem_type, + void *&addr, + size_t len, + nixlBackendMD *&md) { + nixlBlobDesc desc; + + // Allocate buffer + addr = calloc(1, len); + assert(addr != nullptr); + + desc.addr = (uintptr_t)addr; + desc.len = len; + desc.devId = dev_id; + + int ret = engine->registerMem(desc, mem_type, md); + assert(ret == NIXL_SUCCESS); +} + +void +deallocateAndDeregister(nixlLibfabricEngine *engine, + int dev_id, + nixl_mem_t mem_type, + void *&addr, + nixlBackendMD *&md) { + engine->deregisterMem(md); + free(addr); +} + +void +loadRemote(nixlLibfabricEngine *engine, + int dev_id, + std::string agent, + nixl_mem_t mem_type, + void *addr, + size_t len, + nixlBackendMD *&lmd, + nixlBackendMD *&rmd) { + nixlBlobDesc info; + info.addr = (uintptr_t)addr; + info.len = len; + info.devId = dev_id; + engine->getPublicData(lmd, info.metaInfo); + + assert(info.metaInfo.size() > 0); + + int ret = engine->loadRemoteMD(info, mem_type, agent, rmd); + assert(NIXL_SUCCESS == ret); +} + +void +populateDescs(nixl_meta_dlist_t &descs, int dev_id, void *addr, int desc_cnt, size_t desc_size, + nixlBackendMD *&md) { + for (int i = 0; i < desc_cnt; i++) { + nixlMetaDesc req; + req.addr = (uintptr_t)(((char *)addr) + i * desc_size); // Different offset per descriptor + req.len = desc_size; + req.devId = dev_id; + req.metadataP = md; + descs.addDesc(req); + } +} + +void +performTransfer(nixlLibfabricEngine *engine1, + nixlLibfabricEngine *engine2, + nixl_meta_dlist_t &req_src_descs, + nixl_meta_dlist_t &req_dst_descs, + void *addr1, + void *addr2, + size_t total_len, + nixl_xfer_op_t op) { + + std::string remote_agent("Agent2"); + if (engine1 == engine2) + remote_agent = "Agent1"; + + std::cout << "\t" << (op == NIXL_READ ? "READ" : "WRITE") << " from " << addr1 << " to " + << addr2 << " (" << total_len << " bytes, " << req_src_descs.descCount() + << " descriptors)\n"; + + nixl_opt_b_args_t opt_args; + opt_args.hasNotif = false; + + // Prepare and post transfer + nixlBackendReqH *handle = nullptr; + nixl_status_t ret = engine1->prepXfer(op, req_src_descs, req_dst_descs, remote_agent, handle, &opt_args); + assert(ret == NIXL_SUCCESS); + + ret = engine1->postXfer(op, req_src_descs, req_dst_descs, remote_agent, handle, &opt_args); + assert(ret == NIXL_SUCCESS || ret == NIXL_IN_PROG); + + if (ret == NIXL_SUCCESS) { + cout << "\t\tTransfer completed immediately\n"; + } else { + cout << "\t\tWaiting for transfer completion...\n"; + while (ret == NIXL_IN_PROG) { + ret = engine1->checkXfer(handle); + // checkXfer() already progresses rails when progress thread is disabled + assert(ret == NIXL_SUCCESS || ret == NIXL_IN_PROG); + } + } + + engine1->releaseReqH(handle); + cout << "\t\tTransfer complete\n"; +} + +void +test_multi_descriptor_offsets(bool p_thread) { + std::cout << "\n\n"; + std::cout << "****************************************************\n"; + std::cout << " Multi-descriptor offset test (Integration)\n"; + std::cout << " P-Thread=" << (p_thread ? "ON" : "OFF") << "\n"; + std::cout << "****************************************************\n"; + std::cout << "\n"; + + std::string agent1("Agent1"); + std::string agent2("Agent2"); + + // Create engines + nixlLibfabricEngine *engine1 = createEngine(agent1, p_thread); + nixlLibfabricEngine *engine2 = createEngine(agent2, p_thread); + + // Test parameters + const size_t TOTAL_SIZE = 1024 * 1024; // 1MB total + const size_t DESC_SIZE = 64 * 1024; // 64KB per descriptor + const int DESC_COUNT = TOTAL_SIZE / DESC_SIZE; // 16 descriptors + + std::cout << "Test configuration:\n"; + std::cout << " Total buffer size: " << TOTAL_SIZE << " bytes\n"; + std::cout << " Descriptor size: " << DESC_SIZE << " bytes\n"; + std::cout << " Descriptor count: " << DESC_COUNT << "\n\n"; + + // Allocate and register buffers + void *send_buf = nullptr; + void *recv_buf = nullptr; + nixlBackendMD *send_md = nullptr; + nixlBackendMD *recv_md = nullptr; + + allocateAndRegister(engine1, 0, DRAM_SEG, send_buf, TOTAL_SIZE, send_md); + allocateAndRegister(engine2, 0, DRAM_SEG, recv_buf, TOTAL_SIZE, recv_md); + + // Fill send buffer with unique pattern for each descriptor's region + for (int i = 0; i < DESC_COUNT; i++) { + size_t offset = i * DESC_SIZE; + uint8_t pattern = static_cast(i); + for (size_t j = 0; j < DESC_SIZE; j++) { + ((uint8_t *)send_buf)[offset + j] = pattern; + } + } + + // Zero receive buffer + memset(recv_buf, 0, TOTAL_SIZE); + + // Exchange connection info + std::string conn1, conn2; + engine1->getConnInfo(conn1); + engine2->getConnInfo(conn2); + + engine1->loadRemoteConnInfo(agent2, conn2); + engine2->loadRemoteConnInfo(agent1, conn1); + + std::cout << "Establishing connections...\n"; + engine1->connect(agent2); + engine2->connect(agent1); + + // Wait for async connection establishment to complete + // The CM thread handles connection progress + sleep(2); + std::cout << "Connections established\n\n"; + + // Load remote metadata + nixlBackendMD *recv_rmd = nullptr; + + loadRemote(engine1, 0, agent2, DRAM_SEG, recv_buf, TOTAL_SIZE, recv_md, recv_rmd); + + // Create descriptor lists with different offsets + nixl_meta_dlist_t src_descs(DRAM_SEG); + nixl_meta_dlist_t dst_descs(DRAM_SEG); + + populateDescs(src_descs, 0, send_buf, DESC_COUNT, DESC_SIZE, send_md); + populateDescs(dst_descs, 0, recv_buf, DESC_COUNT, DESC_SIZE, recv_rmd); + + std::cout << "Created " << src_descs.descCount() << " source descriptors\n"; + std::cout << "Created " << dst_descs.descCount() << " destination descriptors\n\n"; + + // Perform transfer + performTransfer(engine1, engine2, src_descs, dst_descs, send_buf, recv_buf, TOTAL_SIZE, NIXL_WRITE); + + // Verify data correctness for each descriptor's region + std::cout << "\nData verification:\n"; + bool all_correct = true; + + for (int i = 0; i < DESC_COUNT; i++) { + size_t offset = i * DESC_SIZE; + uint8_t expected_pattern = static_cast(i); + bool desc_correct = true; + + for (size_t j = 0; j < DESC_SIZE; j++) { + if (((uint8_t *)recv_buf)[offset + j] != expected_pattern) { + std::cerr << " ERROR: Descriptor " << i << " at offset " << offset + j + << " has wrong data: expected " << (int)expected_pattern << ", got " + << (int)((uint8_t *)recv_buf)[offset + j] << "\n"; + desc_correct = false; + all_correct = false; + break; // Only report first mismatch per descriptor + } + } + + if (desc_correct) { + std::cout << " Descriptor " << i << " (offset " << offset << "): OK (pattern " + << (int)expected_pattern << ")\n"; + } + } + + if (all_correct) { + std::cout << "\n✓ ALL DESCRIPTORS VERIFIED SUCCESSFULLY\n"; + std::cout << " Each descriptor transferred data from its correct offset\n"; + } else { + std::cerr << "\n✗ DATA CORRUPTION DETECTED\n"; + std::cerr << " Some descriptors received data from wrong offsets\n"; + std::cerr << " This indicates the descriptor offset bug is present!\n"; + exit(1); + } + + // Cleanup + engine1->disconnect(agent2); + engine2->disconnect(agent1); + + deallocateAndDeregister(engine1, 0, DRAM_SEG, send_buf, send_md); + deallocateAndDeregister(engine2, 0, DRAM_SEG, recv_buf, recv_md); + + releaseEngine(engine1); + releaseEngine(engine2); + + std::cout << "\nTest completed successfully!\n"; +} + +int +main(int argc, char **argv) { + bool p_thread = false; + + if (argc > 1 && std::string(argv[1]) == "--pthread") { + p_thread = true; + } + + test_multi_descriptor_offsets(p_thread); + + return 0; +} diff --git a/test/unit/plugins/libfabric/meson.build b/test/unit/plugins/libfabric/meson.build new file mode 100644 index 0000000000..0abcefe4bb --- /dev/null +++ b/test/unit/plugins/libfabric/meson.build @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Libfabric plugin integration tests + +libfabric_backend_dep = declare_dependency( + link_with: libfabric_backend_lib, + include_directories: [nixl_inc_dirs, '../../../../src/plugins/libfabric'] +) + +# Collect compile flags +compile_flags = [] +additional_deps = [] + +if cuda_dep.found() + additional_deps += [cuda_dep] + compile_flags += ['-DHAVE_CUDA'] +endif + +if synapseai_dep.found() + additional_deps += [synapseai_dep] + compile_flags += ['-DHAVE_SYNAPSEAI'] +endif + +libfabric_test_deps = [ + nixl_dep, + nixl_infra, + nixl_common_deps, + libfabric_utils_dep, + libfabric_backend_dep, + libfabric_dep, + thread_dep, +] + additional_deps + +# Integration test: Multi-descriptor offset handling with actual backend +test_backend_integration = executable( + 'test_libfabric_backend_integration', + 'libfabric_backend_integration_test.cpp', + dependencies: libfabric_test_deps, + cpp_args: compile_flags, + include_directories: [nixl_inc_dirs, utils_inc_dirs, '../../../../src/plugins/libfabric'], + install: false, +) + +test('libfabric_backend_integration', + test_backend_integration, + suite: ['unit', 'plugins', 'libfabric', 'integration'], + timeout: 120, +) + +# Future integration tests can be added here following the same pattern: +# - Multi-GPU transfers +# - Different memory types (DRAM, VRAM) +# - Notification flow +# - Connection management diff --git a/test/unit/plugins/meson.build b/test/unit/plugins/meson.build index af5aa09378..0f30c29c3a 100644 --- a/test/unit/plugins/meson.build +++ b/test/unit/plugins/meson.build @@ -1,4 +1,6 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025 Amazon.com, Inc. and affiliates. +# SPDX-FileCopyrightText: Copyright (c) 2025 Intel Corporation. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -47,3 +49,8 @@ hf3fs_lib_found = cc.find_library(hf3fs_lib_file, dirs: [hf3fs_lib_path], requir if hf3fs_lib_found.found() subdir('hf3fs') endif + +# Libfabric plugin tests +if libfabric_dep.found() + subdir('libfabric') +endif diff --git a/test/unit/utils/libfabric/libfabric_topology_test.cpp b/test/unit/utils/libfabric/libfabric_topology_test.cpp index 1352cf588e..33ae616703 100644 --- a/test/unit/utils/libfabric/libfabric_topology_test.cpp +++ b/test/unit/utils/libfabric/libfabric_topology_test.cpp @@ -44,14 +44,14 @@ main() { NIXL_INFO << "3. Testing GPU-specific queries (detected " << num_gpus << " GPUs)..."; int test_gpus = std::min(num_gpus, 3); // Test up to 3 GPUs or all available for (int gpu_id = 0; gpu_id < test_gpus; ++gpu_id) { - auto gpu_devices = topology.getEfaDevicesForGpu(gpu_id); + auto gpu_devices = topology.getNicsForGpu(gpu_id); std::string device_list; for (const auto &device : gpu_devices) { if (!device_list.empty()) device_list += " "; device_list += device; } NIXL_INFO << " GPU " << gpu_id << " mapped to " << gpu_devices.size() - << " EFA devices: " << device_list; + << " devices: " << device_list; } } else { NIXL_INFO << "3. Skipping GPU-specific tests (no GPUs detected)"; diff --git a/test/utils/meson.build b/test/utils/meson.build new file mode 100644 index 0000000000..702cdd06a0 --- /dev/null +++ b/test/utils/meson.build @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +utils_inc = include_directories('.') + +synapseai_utils_lib = static_library( + 'synapseai_utils', + ['synapseai_utils.cpp'], + include_directories: utils_inc, + dependencies: [ synapseai_dep ], +) + +synapseai_utils_dep = declare_dependency( + link_with: synapseai_utils_lib, + include_directories: utils_inc, + dependencies: [ synapseai_dep ], +) + +# Export to parent (test/) scope +test_synapseai_utils_dep = synapseai_utils_dep + diff --git a/test/utils/synapseai_utils.cpp b/test/utils/synapseai_utils.cpp new file mode 100644 index 0000000000..2909888a07 --- /dev/null +++ b/test/utils/synapseai_utils.cpp @@ -0,0 +1,110 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright 2025 Intel Corporation + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "synapseai_utils.h" + +static bool device_initialized = false; +static std::mutex mtx; +static synDeviceId deviceHandle; +static synStreamHandle stream; + +namespace Synapseaiutils { +static void +check(int ret, const char *msg) { + if (ret) { + fprintf(stderr, "%s: %s(%d)\n", msg, "failed", -ret); + exit(1); + } +} + +int +init_synapse_device() { + std::lock_guard lock(mtx); + auto env = std::getenv("HLS_MODULE_ID"); + int module_id = 0; + if (env != nullptr) { + module_id = std::stoi(env); + } + if (device_initialized) return 0; + check(synInitialize(), "synInitialize"); + check(synDeviceAcquireByModuleId(&deviceHandle, module_id), "synDeviceAcquire"); + device_initialized = true; + check(synStreamCreateGeneric(&stream, deviceHandle, 0), "synStreamCreateGeneric"); + return 0; +} + +synDeviceId +get_device_handle() { + return deviceHandle; +} + +uint64_t +allocate_synapse_memory(size_t len, void *host_buffer) { + uint64_t device_buffer; + std::lock_guard lock(mtx); + if (!device_initialized) { + fprintf(stderr, "%s\n", "device nor initialized"); + exit(1); + } + + check(synDeviceMalloc(deviceHandle, len, 0x0, 0, &device_buffer), "synDeviceMalloc"); + check(synHostMap(deviceHandle, len, host_buffer), "synHostMap"); + check(synMemCopyAsync(stream, (uint64_t)host_buffer, len, device_buffer, HOST_TO_DRAM), + "synMemCopyAsync"); + check(synStreamSynchronize(stream), "synStreamSynchronize"); + check(synHostUnmap(deviceHandle, host_buffer), "synHostUnmap"); + return device_buffer; +} + +void +free_synapse_memory(uint64_t ptr) { + std::lock_guard lock(mtx); + if (!device_initialized) fprintf(stderr, "%s\n", "device nor initialized"); + // cleanup Synapse resources + check(synDeviceFree(deviceHandle, ptr, 0), "synDeviceFree"); +} + +void +deinit_synapse_device() { + std::lock_guard lock(mtx); + if (!device_initialized) { + fprintf(stderr, "%s\n", "device nor initialized"); + exit(1); + } + check(synStreamDestroy(stream), "synStreamDestroy"); + check(synDeviceRelease(deviceHandle), "synDeviceRelease"); + check(synDestroy(), "synDestroy"); + device_initialized = false; +} + +void +copy_from_device_buffer(uint64_t device_buffer, void *host_buffer, size_t len) { + std::lock_guard lock(mtx); + if (!device_initialized) { + fprintf(stderr, "%s\n", "device nor initialized"); + exit(1); + } + check(synHostMap(deviceHandle, len, host_buffer), "synHostMap"); + check(synMemCopyAsync(stream, device_buffer, len, (uint64_t)host_buffer, DRAM_TO_HOST), + "synMemCopyAsync"); + check(synStreamSynchronize(stream), "synStreamSynchronize"); + check(synHostUnmap(deviceHandle, host_buffer), "synHostUnmap"); +} +} // namespace Synapseaiutils diff --git a/test/utils/synapseai_utils.h b/test/utils/synapseai_utils.h new file mode 100644 index 0000000000..d9cffa6616 --- /dev/null +++ b/test/utils/synapseai_utils.h @@ -0,0 +1,34 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright 2025 Intel Corporation + * + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "synapse_api.h" + +namespace Synapseaiutils { +int +init_synapse_device(); +synDeviceId +get_device_handle(); +uint64_t +allocate_synapse_memory(size_t len, void *host_buffer); +void +free_synapse_memory(uint64_t ptr); +void +deinit_synapse_device(); +void +copy_from_device_buffer(uint64_t device_buffer, void *host_buffer, size_t len); +} // namespace Synapseaiutils