From d2ae4c7a8e19a4a22f090e0c74edbac55aca7264 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=B3=E7=9A=AE=E5=B9=BC=E9=B8=9F?= <2960474346@qq.com> Date: Thu, 21 May 2026 17:46:17 +0800 Subject: [PATCH 1/2] fix: defer tp prealloc until first alloc --- kvcached/kv_cache_manager.py | 25 ++++++- tests/test_tp_prealloc_startup.py | 116 ++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 tests/test_tp_prealloc_startup.py diff --git a/kvcached/kv_cache_manager.py b/kvcached/kv_cache_manager.py index 1e18a924..ccadcf1d 100644 --- a/kvcached/kv_cache_manager.py +++ b/kvcached/kv_cache_manager.py @@ -140,6 +140,16 @@ def unmap_callback(world_size: int, offsets: List[int]) -> None: except Exception as e: logger.warning("Failed to set up broadcast callbacks: %s. Falling back to single-process mode.", e) + # In multi-process mode, vLLM reserves its null block from the first + # post-init alloc() call rather than via reserve_null_block=True. + # Starting the background prealloc thread before that first alloc can + # race the same multi-process map path and stall TP startup. + self._defer_prealloc_until_first_alloc = ( + not self.reserve_null_block + and (self.world_size > 1 or use_worker_ipc) + ) + self._prealloc_started = False + self.num_avail_blocks = 0 # Only count free blocks in avail_pages self.avail_pages: Dict[int, InternalPage] = {} self.full_pages: Dict[int, InternalPage] = {} @@ -159,6 +169,12 @@ def unmap_callback(world_size: int, offsets: List[int]) -> None: # pre-alloc thread) and finally set the event. threading.Thread(target=self._post_init, daemon=True).start() + def _start_prealloc_thread(self) -> None: + if self._prealloc_started: + return + self.page_allocator.start_prealloc_thread() + self._prealloc_started = True + def _post_init(self): if self.null_block is not None: return @@ -188,8 +204,8 @@ def _check_kv_tensors_created(): # KV tensors created now # Possibly reserve the first block as null block for padding tokens self._reserve_null_block() - - self.page_allocator.start_prealloc_thread() + if not self._defer_prealloc_until_first_alloc: + self._start_prealloc_thread() except Exception as e: logger.error( f"Error during KVCacheManager post-initialization: {e}") @@ -275,6 +291,9 @@ def _alloc(self, self.num_avail_blocks -= num_from_page remaining_need -= num_from_page + if self._defer_prealloc_until_first_alloc: + self._start_prealloc_thread() + return ret_index @synchronized @@ -460,7 +479,7 @@ def clear(self): self._reserve_null_block() # Restart the prealloc thread now that null block is safely reserved. - self.page_allocator.start_prealloc_thread() + self._start_prealloc_thread() # Private methods @synchronized diff --git a/tests/test_tp_prealloc_startup.py b/tests/test_tp_prealloc_startup.py new file mode 100644 index 00000000..7dcd1631 --- /dev/null +++ b/tests/test_tp_prealloc_startup.py @@ -0,0 +1,116 @@ +# SPDX-FileCopyrightText: Copyright contributors to the kvcached project +# SPDX-License-Identifier: Apache-2.0 + +import sys +import types + + +sys.modules.setdefault("torch", types.ModuleType("torch")) + +fake_vmm_ops = types.ModuleType("kvcached.vmm_ops") +fake_vmm_ops.PageAllocator = object +fake_vmm_ops.InternalPage = object +fake_vmm_ops.kv_tensors_created = lambda *args, **kwargs: True +fake_vmm_ops.map_to_kv_tensors = lambda *args, **kwargs: True +fake_vmm_ops.unmap_from_kv_tensors = lambda *args, **kwargs: True +sys.modules.setdefault("kvcached.vmm_ops", fake_vmm_ops) + +fake_interfaces = types.ModuleType("kvcached.integration.vllm.interfaces") +fake_interfaces.should_use_worker_ipc = lambda: False +sys.modules.setdefault("kvcached.integration", types.ModuleType("kvcached.integration")) +sys.modules.setdefault("kvcached.integration.vllm", types.ModuleType("kvcached.integration.vllm")) +sys.modules.setdefault("kvcached.integration.vllm.interfaces", fake_interfaces) + +from kvcached import kv_cache_manager as kcm + + +class FakeInternalPage: + def __init__(self, page_id: int, page_size: int): + self.page_id = page_id + self.page_size = page_size + self._free_blocks = [] + + def init(self, block_mem_size: int): + blocks_per_page = self.page_size // block_mem_size + self._free_blocks = list(range(self.page_id * blocks_per_page, + (self.page_id + 1) * blocks_per_page)) + + def num_free_blocks(self) -> int: + return len(self._free_blocks) + + def alloc(self, num_blocks: int = 1): + allocated = self._free_blocks[:num_blocks] + self._free_blocks = self._free_blocks[num_blocks:] + return allocated + + def full(self) -> bool: + return not self._free_blocks + + @staticmethod + def get_num_blocks(page_size: int, block_mem_size: int) -> int: + return page_size // block_mem_size + + +class FakePageAllocator: + def __init__(self, *args, **kwargs): + self.start_prealloc_calls = 0 + self._free_pages = [0, 1, 2] + + def set_should_use_worker_ipc_callback(self, callback): + self.should_use_worker_ipc_callback = callback + + def set_broadcast_map_callback(self, callback): + self.broadcast_map_callback = callback + + def set_broadcast_unmap_callback(self, callback): + self.broadcast_unmap_callback = callback + + def start_prealloc_thread(self): + self.start_prealloc_calls += 1 + + def alloc_page(self): + return FakeInternalPage(self._free_pages.pop(0), kcm.PAGE_SIZE) + + def get_num_free_pages(self): + return len(self._free_pages) + + def get_avail_physical_pages(self): + return len(self._free_pages) + + def get_num_reserved_pages(self): + return 0 + + +def _build_manager(monkeypatch, *, world_size: int): + monkeypatch.setattr(kcm, "PageAllocator", FakePageAllocator) + monkeypatch.setattr(kcm, "InternalPage", FakeInternalPage) + monkeypatch.setattr(kcm, "broadcast_kv_tensors_created", + lambda *args, **kwargs: True) + + manager = kcm.KVCacheManager( + num_blocks=1024, + block_size=16, + cell_size=1024, + num_layers=16, + world_size=world_size, + async_sched=True, + ) + assert manager._post_init_done.wait(timeout=1) + return manager + + +def test_multi_process_prealloc_waits_until_first_alloc(monkeypatch): + manager = _build_manager(monkeypatch, world_size=2) + + assert manager.page_allocator.start_prealloc_calls == 0 + + block_ids = manager.alloc(1) + + assert block_ids == [0] + assert manager.page_allocator.start_prealloc_calls == 1 + + +def test_single_process_keeps_eager_prealloc(monkeypatch): + manager = _build_manager(monkeypatch, world_size=1) + + assert manager.page_allocator.start_prealloc_calls == 1 \ No newline at end of file From fabc1ce02f39e87c0cc5fed6bbd16b78eda75442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9F=B3=E7=9A=AE=E5=B9=BC=E9=B8=9F?= <2960474346@qq.com> Date: Thu, 21 May 2026 18:36:15 +0800 Subject: [PATCH 2/2] fix: harden tp prealloc startup follow-up --- kvcached/kv_cache_manager.py | 1 + tests/test_tp_prealloc_startup.py | 66 +++++++++++++++++++++++-------- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/kvcached/kv_cache_manager.py b/kvcached/kv_cache_manager.py index ccadcf1d..028a4cbb 100644 --- a/kvcached/kv_cache_manager.py +++ b/kvcached/kv_cache_manager.py @@ -446,6 +446,7 @@ def clear(self): # causing the null-block reservation to get a non-zero block. self.page_allocator._stop_prealloc_thread( timeout=PREALLOC_THREAD_TIMEOUT) + self._prealloc_started = False # Clear reserved blocks self.free_reserved() diff --git a/tests/test_tp_prealloc_startup.py b/tests/test_tp_prealloc_startup.py index 7dcd1631..bb4af3ad 100644 --- a/tests/test_tp_prealloc_startup.py +++ b/tests/test_tp_prealloc_startup.py @@ -1,34 +1,42 @@ # SPDX-FileCopyrightText: Copyright contributors to the kvcached project # SPDX-License-Identifier: Apache-2.0 +import importlib import sys import types +from typing import Any +kcm: Any = None -sys.modules.setdefault("torch", types.ModuleType("torch")) -fake_vmm_ops = types.ModuleType("kvcached.vmm_ops") -fake_vmm_ops.PageAllocator = object -fake_vmm_ops.InternalPage = object -fake_vmm_ops.kv_tensors_created = lambda *args, **kwargs: True -fake_vmm_ops.map_to_kv_tensors = lambda *args, **kwargs: True -fake_vmm_ops.unmap_from_kv_tensors = lambda *args, **kwargs: True -sys.modules.setdefault("kvcached.vmm_ops", fake_vmm_ops) +def _load_kv_cache_manager(monkeypatch, *, use_worker_ipc: bool = False): + monkeypatch.setitem(sys.modules, "torch", types.ModuleType("torch")) -fake_interfaces = types.ModuleType("kvcached.integration.vllm.interfaces") -fake_interfaces.should_use_worker_ipc = lambda: False -sys.modules.setdefault("kvcached.integration", types.ModuleType("kvcached.integration")) -sys.modules.setdefault("kvcached.integration.vllm", types.ModuleType("kvcached.integration.vllm")) -sys.modules.setdefault("kvcached.integration.vllm.interfaces", fake_interfaces) + fake_vmm_ops: Any = types.ModuleType("kvcached.vmm_ops") + fake_vmm_ops.PageAllocator = object + fake_vmm_ops.InternalPage = object + fake_vmm_ops.kv_tensors_created = lambda *args, **kwargs: True + fake_vmm_ops.map_to_kv_tensors = lambda *args, **kwargs: True + fake_vmm_ops.unmap_from_kv_tensors = lambda *args, **kwargs: True + monkeypatch.setitem(sys.modules, "kvcached.vmm_ops", fake_vmm_ops) -from kvcached import kv_cache_manager as kcm + fake_interfaces: Any = types.ModuleType("kvcached.integration.vllm.interfaces") + fake_interfaces.should_use_worker_ipc = lambda: use_worker_ipc + monkeypatch.setitem( + sys.modules, + "kvcached.integration.vllm.interfaces", + fake_interfaces, + ) + + kcm = importlib.import_module("kvcached.kv_cache_manager") + return importlib.reload(kcm) class FakeInternalPage: def __init__(self, page_id: int, page_size: int): self.page_id = page_id self.page_size = page_size - self._free_blocks = [] + self._free_blocks: list[int] = [] def init(self, block_mem_size: int): blocks_per_page = self.page_size // block_mem_size @@ -54,6 +62,7 @@ def get_num_blocks(page_size: int, block_mem_size: int) -> int: class FakePageAllocator: def __init__(self, *args, **kwargs): self.start_prealloc_calls = 0 + self.stop_prealloc_calls = 0 self._free_pages = [0, 1, 2] def set_should_use_worker_ipc_callback(self, callback): @@ -68,9 +77,21 @@ def set_broadcast_unmap_callback(self, callback): def start_prealloc_thread(self): self.start_prealloc_calls += 1 + def _stop_prealloc_thread(self, timeout=None): + self.stop_prealloc_calls += 1 + def alloc_page(self): return FakeInternalPage(self._free_pages.pop(0), kcm.PAGE_SIZE) + def free_pages(self, page_ids): + self._free_pages.extend(sorted(page_ids)) + + def trim(self): + return None + + def reset_free_page_order(self): + self._free_pages = sorted(self._free_pages) + def get_num_free_pages(self): return len(self._free_pages) @@ -82,6 +103,8 @@ def get_num_reserved_pages(self): def _build_manager(monkeypatch, *, world_size: int): + global kcm + kcm = _load_kv_cache_manager(monkeypatch) monkeypatch.setattr(kcm, "PageAllocator", FakePageAllocator) monkeypatch.setattr(kcm, "InternalPage", FakeInternalPage) monkeypatch.setattr(kcm, "broadcast_kv_tensors_created", @@ -113,4 +136,15 @@ def test_multi_process_prealloc_waits_until_first_alloc(monkeypatch): def test_single_process_keeps_eager_prealloc(monkeypatch): manager = _build_manager(monkeypatch, world_size=1) - assert manager.page_allocator.start_prealloc_calls == 1 \ No newline at end of file + assert manager.page_allocator.start_prealloc_calls == 1 + + +def test_clear_restarts_prealloc_thread(monkeypatch): + manager = _build_manager(monkeypatch, world_size=1) + + assert manager.page_allocator.start_prealloc_calls == 1 + + manager.clear() + + assert manager.page_allocator.stop_prealloc_calls == 1 + assert manager.page_allocator.start_prealloc_calls == 2 \ No newline at end of file