Skip to content

Intercept p2p_nccl_connector#100

Open
leoda1 wants to merge 4 commits intoflagos-ai:mainfrom
leoda1:fix_ds32_bug
Open

Intercept p2p_nccl_connector#100
leoda1 wants to merge 4 commits intoflagos-ai:mainfrom
leoda1:fix_ds32_bug

Conversation

@leoda1
Copy link
Copy Markdown

@leoda1 leoda1 commented Mar 20, 2026

PR Category

PR Type

Description

Related Issues

Changes

Testing

Checklist

  • I have run the existing tests and they pass
  • I have added tests for my changes (if applicable)
  • I have updated the documentation (if applicable)

Copilot AI review requested due to automatic review settings March 20, 2026 10:02
@github-actions github-actions bot added the core label Mar 20, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c613682c17

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +167 to +168
else:
layer[block_ids[:num_block], ...] = kv_cache
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Slice KV cache to overlap before partial injection

When block counts differ, this branch only truncates block_ids but still assigns the full kv_cache. If recv_tensor returns more blocks than were allocated locally (num_block > len(block_ids)), the left-hand side selects fewer rows than the right-hand side provides, causing a runtime shape-mismatch error in the mismatch path that is supposed to be recoverable. Compute an overlap length and slice both the indices and kv_cache (the FlashAttention mismatch branch has the same problem).

Useful? React with 👍 / 👎.

Comment on lines +193 to +197
metadata: KVConnectorMetadata = self._get_connector_metadata()
assert isinstance(metadata, P2pNcclConnectorMetadata)

if metadata is None:
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return on missing metadata before asserting type

This asserts P2pNcclConnectorMetadata before checking whether metadata is None, so any step where _get_connector_metadata() has no payload will raise AssertionError instead of no-oping. That makes idle/no-transfer iterations fragile and also renders the subsequent if metadata is None branch unreachable.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a vllm-fl override implementation of the vLLM P2pNcclConnector and registers it at plugin initialization time, so KV-transfer over NCCL can be intercepted/customized by this plugin.

Changes:

  • Introduce P2pNcclConnector / metadata types under vllm_fl.distributed.kv_transfer.
  • Implement worker-side KV recv/injection and producer-side KV extract/send logic using P2pNcclEngine.
  • Override vLLM’s connector registration to point "P2pNcclConnector" at the plugin implementation.

Reviewed changes

Copilot reviewed 2 out of 3 changed files in this pull request and generated 7 comments.

File Description
vllm_fl/distributed/kv_transfer/p2p_nccl_connector.py New connector implementation providing send/recv + metadata build logic for KV transfer.
vllm_fl/distributed/kv_transfer/init.py Package init file to expose the new kv_transfer module path.
vllm_fl/init.py Registers the plugin connector by overriding the vLLM connector factory registry entry.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

)
except Exception as e:
logger.error(f"Register GlmMoeDsa model error: {str(e)}")

Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s trailing whitespace on this blank line; please remove it to avoid noisy diffs / formatting issues.

Suggested change

Copilot uses AI. Check for mistakes.
Comment on lines +118 to +120
from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory
KVConnectorFactory._registry.pop("P2pNcclConnector", None)
KVConnectorFactory.register_connector(
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This directly mutates KVConnectorFactory._registry, which is a private implementation detail of vLLM and may break across vLLM versions. Prefer using a public unregister/override API if available, or update register_connector(...) usage so it can safely overwrite an existing registration without reaching into _registry.

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +51
def make_meta(
request_id: str, token_ids: list[int], block_ids: list[int], block_size: int
) -> "ReqMeta":
block_ids_tensor = torch.tensor(block_ids)
return ReqMeta(
request_id=request_id,
block_ids=block_ids_tensor,
num_tokens=len(token_ids),
)
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReqMeta.make_meta() takes block_size but never uses it, and torch.tensor(block_ids) will infer dtype (empty list becomes float). Consider removing block_size or using it, and explicitly create block_ids_tensor with an integer index dtype (e.g., torch.long).

Copilot uses AI. Check for mistakes.
Comment on lines +165 to +175
if len(block_ids) == num_block:
layer[block_ids, ...] = kv_cache
else:
layer[block_ids[:num_block], ...] = kv_cache
logger.warning(
"🚧kv_cache does not match, block_ids:%d, "
"num_block:%d, request_id:%s",
len(block_ids),
num_block,
request_id,
)
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mismatch handling here assumes len(block_ids) >= num_block and will raise a runtime error if kv_cache contains more blocks than block_ids (shape mismatch on assignment). Use n = min(len(block_ids), num_block) and slice both block_ids and kv_cache to n before assigning (and log the truncation).

Copilot uses AI. Check for mistakes.
Comment on lines +179 to +190
self.check_tensors_except_dim(layer, kv_cache, 1)
if len(block_ids) == num_block:
layer[:, block_ids, ...] = kv_cache
else:
layer[:, block_ids[:num_block], ...] = kv_cache
logger.warning(
"🚧kv_cache does not match, block_ids:%d, "
"num_block:%d, request_id:%s",
len(block_ids),
num_block,
request_id,
)
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as the MLA/FlashInfer branch: if kv_cache.shape[1] > len(block_ids), the current assignment will fail due to shape mismatch. Slice both the indices and kv_cache to the shared minimum length before assigning.

Copilot uses AI. Check for mistakes.
Comment on lines +194 to +197
assert isinstance(metadata, P2pNcclConnectorMetadata)

if metadata is None:
return
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata is asserted to be a P2pNcclConnectorMetadata before checking for None, which makes the if metadata is None: branch unreachable. Either check for None before the isinstance assert, or drop the None check entirely.

Suggested change
assert isinstance(metadata, P2pNcclConnectorMetadata)
if metadata is None:
return
if metadata is None:
return
assert isinstance(metadata, P2pNcclConnectorMetadata)

Copilot uses AI. Check for mistakes.
Comment on lines +267 to +294
def extract_kv_from_layer(
layer: torch.Tensor,
block_ids: torch.Tensor,
) -> torch.Tensor:
"""
Extract KV cache slices from a given attention layer tensor.

This function handles multiple backend layouts:
- MLA (Multi-Linear Attention) or FlashInfer: KV tensors are
indexed along the first dimension.
- FlashAttention: KV tensors are indexed along the second
dimension.

Args:
layer (torch.Tensor): The KV cache from the attention layer.
block_ids (torch.Tensor): Indices of blocks to extract.

Returns:
torch.Tensor: A tensor containing the extracted KV slices.
Returns None if the layout is unsupported.
"""
if layer.ndim == 3 or layer.shape[1] == 2: # MLA or FlashInfer
return layer[block_ids, ...]

if layer.shape[0] == 2: # FlashAttention
return layer[:, block_ids, ...]

return None
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract_kv_from_layer is annotated to return torch.Tensor, but it can return None for unsupported layouts. Update the return annotation to torch.Tensor | None (and consider reflecting that in the docstring’s Returns section).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c613682c17

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +167 to +168
else:
layer[block_ids[:num_block], ...] = kv_cache
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Slice KV cache to overlap before partial injection

When block counts differ, this branch only truncates block_ids but still assigns the full kv_cache. If recv_tensor returns more blocks than were allocated locally (num_block > len(block_ids)), the left-hand side selects fewer rows than the right-hand side provides, causing a runtime shape-mismatch error in the mismatch path that is supposed to be recoverable. Compute an overlap length and slice both the indices and kv_cache (the FlashAttention mismatch branch has the same problem).

Useful? React with 👍 / 👎.

Comment on lines +193 to +197
metadata: KVConnectorMetadata = self._get_connector_metadata()
assert isinstance(metadata, P2pNcclConnectorMetadata)

if metadata is None:
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return on missing metadata before asserting type

This asserts P2pNcclConnectorMetadata before checking whether metadata is None, so any step where _get_connector_metadata() has no payload will raise AssertionError instead of no-oping. That makes idle/no-transfer iterations fragile and also renders the subsequent if metadata is None branch unreachable.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 26ba2bc6ae

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

return None

connector_metadata = self._get_connector_metadata()
assert isinstance(connector_metadata, P2pNcclConnectorMetadata)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard save path when connector metadata is missing

save_kv_layer asserts P2pNcclConnectorMetadata immediately after _get_connector_metadata() without handling the no-metadata case, so producer-side idle steps can raise AssertionError instead of no-oping. The same file already checks for None in start_load_kv, which indicates _get_connector_metadata() can legitimately be absent for a step; this assertion can therefore crash normal iterations where no KV transfer is scheduled.

Useful? React with 👍 / 👎.

continue

remote_address, message = router_socket.recv_multipart()
data = msgpack.loads(message)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle malformed register frames without killing discovery

The discovery listener parses every incoming frame with msgpack.loads but does not catch decode errors, so a single malformed or partial registration packet will terminate the background thread. Once that thread dies, instance heartbeats stop being processed and entries age out via TTL, after which routing degrades to persistent 503 responses even though workers may still be running.

Useful? React with 👍 / 👎.

@cyber-pioneer
Copy link
Copy Markdown
Collaborator

add usage doc both in pr descrption and repo

auth_header = request.headers.get("Authorization")
is_chat_request = _is_chat_request_path(request.path)
include_reasoning = bool(original_request_data.get("include_reasoning", True))
thinking_mode = "chat"

Check warning

Code scanning / CodeQL

Variable defined multiple times

This assignment to 'thinking_mode' is unnecessary as it is [redefined](1) before this value is used.
Copilot AI review requested due to automatic review settings March 24, 2026 13:08
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b50c95615f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

import os
import logging
from vllm_fl.utils import get_op_config as _get_op_config
# from vllm_fl.utils import get_op_config as _get_op_config
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restore get_op_config import used by register()

This line comments out the _get_op_config import, but register() still unconditionally calls _get_op_config() later in the same file. As soon as vllm_fl.register() is invoked, it will raise NameError and abort plugin/platform initialization instead of returning the platform class.

Useful? React with 👍 / 👎.

)

self.p2p_nccl_engine = (
P2pNcclEngine(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Instantiate P2pFlagcxEngine in intercepted connector

The connector registered as P2pNcclConnector still constructs P2pNcclEngine here, so the newly added P2pFlagcxEngine is never used. In deployments expecting FlagCX (or lacking NCCL), this intercepted path will still initialize NCCL and fail KV transfer setup, which breaks the intended backend swap.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 5 changed files in this pull request and generated 6 comments.

Comments suppressed due to low confidence (1)

vllm_fl/init.py:41

  • The import of _get_op_config was commented out, but register() still calls _get_op_config() below. This will raise a NameError when the platform plugin is loaded. Either restore the import or remove/guard the call (e.g., only call if the import succeeds).
import os
import logging
# from vllm_fl.utils import get_op_config as _get_op_config


logger = logging.getLogger(__name__)


def __getattr__(name):
    if name == "distributed":
        import importlib
        module = importlib.import_module(f".{name}", __name__)
        globals()[name] = module
        return module
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


def _patch_transformers_compat():
    """Patch transformers compatibility for ALLOWED_LAYER_TYPES and tokenizer."""
    import transformers.configuration_utils as cfg
    if not hasattr(cfg, "ALLOWED_LAYER_TYPES"):
        cfg.ALLOWED_LAYER_TYPES = getattr(
            cfg, "ALLOWED_ATTENTION_LAYER_TYPES", ()
        )


def register():
    """Register the FL platform."""
    _patch_transformers_compat()

    # Model-specific platform patches
    from vllm_fl.patches.glm_moe_dsa import apply_platform_patches as glm5_platform
    glm5_platform()

    multiproc_method = os.environ.get("VLLM_WORKER_MULTIPROC_METHOD")
    if multiproc_method is None:
        os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
    _get_op_config()
    return "vllm_fl.platform.PlatformFL"

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +38 to +44
from plugin.interservice.flagcx_wrapper import (
FLAGCXLibrary,
buffer_type,
flagcxComm_t,
flagcxDataTypeEnum,
flagcxUniqueId,
)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module unconditionally imports plugin.interservice.flagcx_wrapper at import time. If FLAGCX_PATH is unset/invalid (or the wrapper isn’t installed), importing p2p_flagcx_engine will raise ImportError before any helpful message can be emitted. Consider lazily importing the wrapper inside __init__ (after validating FLAGCX_PATH/library_path) and raising a clear ValueError with setup instructions when the wrapper can’t be loaded.

Suggested change
from plugin.interservice.flagcx_wrapper import (
FLAGCXLibrary,
buffer_type,
flagcxComm_t,
flagcxDataTypeEnum,
flagcxUniqueId,
)
try:
from plugin.interservice.flagcx_wrapper import (
FLAGCXLibrary,
buffer_type,
flagcxComm_t,
flagcxDataTypeEnum,
flagcxUniqueId,
)
except ImportError as exc:
raise ValueError(
"Failed to import 'plugin.interservice.flagcx_wrapper'. "
"Ensure that the FlagCX Python wrapper is installed and that "
"FLAGCX_PATH is set to a directory containing the wrapper (or that "
"the package is otherwise available on PYTHONPATH)."
) from exc

Copilot uses AI. Check for mistakes.
Comment on lines +363 to +374
flagcx_stream = self.flagcx.adaptor_stream_copy(stream)
self.flagcx.flagcxSend(
buffer_type(tensor.data_ptr()),
tensor.numel(),
flagcxDataTypeEnum.from_torch(tensor.dtype),
dst,
comm,
flagcx_stream,
)
self.flagcx.adaptor_stream_free(flagcx_stream)
stream.synchronize()

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.synchronize() here forces a device-wide wait for every send, which defeats the point of async transfers/overlap (e.g., PUT_ASYNC) and can significantly reduce throughput. Prefer letting the caller manage synchronization (or only synchronizing when strictly necessary for correctness).

Copilot uses AI. Check for mistakes.
Comment on lines +383 to +393
flagcx_stream = self.flagcx.adaptor_stream_copy(stream)
self.flagcx.flagcxRecv(
buffer_type(tensor.data_ptr()),
tensor.numel(),
flagcxDataTypeEnum.from_torch(tensor.dtype),
src,
comm,
flagcx_stream,
)
self.flagcx.adaptor_stream_free(flagcx_stream)
stream.synchronize()
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream.synchronize() here makes every recv blocking, preventing overlap with compute and potentially stalling the listener thread under load. Prefer leaving synchronization to higher-level logic (or using stream/event-based coordination) so transfers can remain asynchronous.

Copilot uses AI. Check for mistakes.
Comment on lines +451 to +472
global count

original_request_data = await request.get_json()
auth_header = request.headers.get("Authorization")
is_chat_request = _is_chat_request_path(request.path)
include_reasoning = bool(original_request_data.get("include_reasoning", True))
thinking_mode = "chat"

if is_chat_request:
try:
request_data, thinking_mode = _build_completion_request_from_chat(
original_request_data
)
except ValueError as exc:
return _error_response(str(exc), 400)
else:
request_data = original_request_data
upstream_path = "/v1/completions" if is_chat_request else request.path

pair_index = count
count += 1

Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count is a module-level global used to choose instance pairs, but it’s incremented without any lock (pair_index = count; count += 1). With concurrent requests (Quart handles multiple in-flight requests), this can lead to lost updates/duplicate indices and skewed routing. Use a lock/Condition, an itertools.count() iterator protected by a lock, or derive the index from a per-request UUID/hash instead.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +104
self.p2p_nccl_engine = (
P2pNcclEngine(
local_rank=self._local_rank,
config=self._kv_transfer_config,
hostname="",
port_offset=self._rank,
)
if role == KVConnectorRole.WORKER
else None
)
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This connector is being registered to override vLLM's P2pNcclConnector, but it still instantiates P2pNcclEngine (NCCL). As a result, the newly added P2pFlagcxEngine is never used and the PR does not actually switch the transfer backend. If the intent is to intercept NCCL with FlagCX, instantiate P2pFlagcxEngine here (or otherwise wire it into the connector).

Copilot uses AI. Check for mistakes.
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import regex as re
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regex is a third-party dependency, but this project’s declared dependencies don’t include it (pyproject.toml only lists pyyaml). Importing this connector will fail in a minimal install. Use the standard-library re module here, or add regex as an explicit dependency.

Suggested change
import regex as re
import re

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants