Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 54 additions & 21 deletions kvcached/integration/sglang/patches.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import inspect
import math
import types
from typing import Any, List, Optional, Tuple, Union, cast
from typing import Any, Callable, List, Optional, Tuple, Union, cast

from kvcached.integration.patch_base import BasePatch, enable_kvcached
from kvcached.integration.version_utils import VersionAwarePatch, version_range
Expand Down Expand Up @@ -1213,41 +1213,74 @@ def apply(self, sched_mod: types.ModuleType) -> bool:

@version_range(SGLANG_ALL_RANGE)
def patch_scheduler_memory_leak(self, sched_mod: types.ModuleType) -> bool:
"""Patch scheduler to suppress memory leak check when kvcached is enabled"""
"""Patch scheduler to suppress memory leak check when kvcached is enabled.

kvcached maps physical KV pages lazily, so SGLang's static-pool
invariant (total == available + in-use) does not hold and its leak
detector would raise spuriously. We neutralize the leak *raisers*.

Older SGLang keeps the whole check in a single Scheduler method whose
source mentions ``token_to_kv_pool_allocator``. Newer SGLang
(>=0.5.11) moved it into ``SchedulerRuntimeCheckerMixin`` and split it
across several small methods (e.g. ``_check_req_pool`` raises directly,
``_report_leak`` is the generic choke point for *token/KV* pool leaks).

We suppress only the leak checks for pools kvcached actually manages
(the KV / token pools). A check that is specific to
``req_to_token_pool`` is deliberately left intact -- kvcached does not
manage the request pool, its invariant still holds, and silencing it
would hide a genuine request-pool leak. The old single-method layout
(which names ``token_to_kv_pool_allocator``) and the new generic
reporter (which names no pool, and is only ever called for the token
pools) are both kept; only the req-pool-specific check is skipped.
"""
Scheduler = self._get_target_class(sched_mod)
if Scheduler is None:
return False

target_method_name: Union[str, None] = None
target_method_names: List[str] = []
for name, fn in inspect.getmembers(Scheduler, predicate=inspect.isfunction):
try:
src = inspect.getsource(fn)
except Exception:
continue
if "token_to_kv_pool_allocator memory leak detected!" in src or (
"memory leak detected" in src and "token_to_kv_pool_allocator" in src
):
target_method_name = name
break
if "memory leak detected" not in src:
continue
# Skip a check that is specific to the request pool, which kvcached
# does not manage. The generic reporter names no pool (so it is not
# excluded) and the legacy combined check names the KV allocator.
if "req_to_token_pool" in src and "token_to_kv_pool" not in src:
continue
target_method_names.append(name)

if target_method_name is None:
if not target_method_names:
self.logger.debug("No memory leak detection method found in Scheduler")
return False

original = getattr(Scheduler, target_method_name)
if self._is_already_patched(original):
self.logger.debug("Scheduler memory leak check already patched")
return True
def _make_wrapped(original: Callable[..., Any]) -> Callable[..., Any]:
def _wrapped(self, *args: Any, **kwargs: Any):
# Disable memory leak detection when ENABLE_KVCACHED is set
if enable_kvcached():
return
return original(self, *args, **kwargs)

return _wrapped

patched_any = False
for target_method_name in target_method_names:
original = getattr(Scheduler, target_method_name)
if self._is_already_patched(original):
self.logger.debug(
f"Scheduler.{target_method_name} leak check already patched")
patched_any = True
continue

def _wrapped(self, *args: Any, **kwargs: Any):
# Disable memory leak detection when ENABLE_KVCACHED is set
if enable_kvcached():
return
return original(self, *args, **kwargs)
wrapped = _make_wrapped(original)
self._mark_as_patched(wrapped)
setattr(Scheduler, target_method_name, wrapped)
patched_any = True

self._mark_as_patched(_wrapped)
setattr(Scheduler, target_method_name, _wrapped)
return True
return patched_any


class RadixCacheLimitPatch(VersionAwarePatch, BasePatch):
Expand Down
10 changes: 10 additions & 0 deletions kvcached/integration/version_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ def detect_version(self, library_name: str, force_refresh: bool = False) -> Opti
except Exception as e:
self.logger.warning(f"Error detecting version for {library_name}: {e}")

# Fallback to installed-package metadata. Some builds (e.g. source
# builds of SGLang) don't expose a module-level __version__, but the
# distribution metadata still carries the version.
if detected_version is None:
try:
import importlib.metadata as _md
detected_version = _md.version(library_name)
except Exception:
pass

self._version_cache[library_name] = detected_version
return detected_version

Expand Down
Loading