diff --git a/kvcached/integration/sglang/patches.py b/kvcached/integration/sglang/patches.py index bc02b218..354b8451 100644 --- a/kvcached/integration/sglang/patches.py +++ b/kvcached/integration/sglang/patches.py @@ -8,7 +8,7 @@ import inspect import math import types -from typing import Any, List, Optional, Tuple, Union, cast +from typing import Any, Callable, List, Optional, Tuple, Union, cast from kvcached.integration.patch_base import BasePatch, enable_kvcached from kvcached.integration.version_utils import VersionAwarePatch, version_range @@ -1213,41 +1213,74 @@ def apply(self, sched_mod: types.ModuleType) -> bool: @version_range(SGLANG_ALL_RANGE) def patch_scheduler_memory_leak(self, sched_mod: types.ModuleType) -> bool: - """Patch scheduler to suppress memory leak check when kvcached is enabled""" + """Patch scheduler to suppress memory leak check when kvcached is enabled. + + kvcached maps physical KV pages lazily, so SGLang's static-pool + invariant (total == available + in-use) does not hold and its leak + detector would raise spuriously. We neutralize the leak *raisers*. + + Older SGLang keeps the whole check in a single Scheduler method whose + source mentions ``token_to_kv_pool_allocator``. Newer SGLang + (>=0.5.11) moved it into ``SchedulerRuntimeCheckerMixin`` and split it + across several small methods (e.g. ``_check_req_pool`` raises directly, + ``_report_leak`` is the generic choke point for *token/KV* pool leaks). + + We suppress only the leak checks for pools kvcached actually manages + (the KV / token pools). A check that is specific to + ``req_to_token_pool`` is deliberately left intact -- kvcached does not + manage the request pool, its invariant still holds, and silencing it + would hide a genuine request-pool leak. The old single-method layout + (which names ``token_to_kv_pool_allocator``) and the new generic + reporter (which names no pool, and is only ever called for the token + pools) are both kept; only the req-pool-specific check is skipped. + """ Scheduler = self._get_target_class(sched_mod) if Scheduler is None: return False - target_method_name: Union[str, None] = None + target_method_names: List[str] = [] for name, fn in inspect.getmembers(Scheduler, predicate=inspect.isfunction): try: src = inspect.getsource(fn) except Exception: continue - if "token_to_kv_pool_allocator memory leak detected!" in src or ( - "memory leak detected" in src and "token_to_kv_pool_allocator" in src - ): - target_method_name = name - break + if "memory leak detected" not in src: + continue + # Skip a check that is specific to the request pool, which kvcached + # does not manage. The generic reporter names no pool (so it is not + # excluded) and the legacy combined check names the KV allocator. + if "req_to_token_pool" in src and "token_to_kv_pool" not in src: + continue + target_method_names.append(name) - if target_method_name is None: + if not target_method_names: self.logger.debug("No memory leak detection method found in Scheduler") return False - original = getattr(Scheduler, target_method_name) - if self._is_already_patched(original): - self.logger.debug("Scheduler memory leak check already patched") - return True + def _make_wrapped(original: Callable[..., Any]) -> Callable[..., Any]: + def _wrapped(self, *args: Any, **kwargs: Any): + # Disable memory leak detection when ENABLE_KVCACHED is set + if enable_kvcached(): + return + return original(self, *args, **kwargs) + + return _wrapped + + patched_any = False + for target_method_name in target_method_names: + original = getattr(Scheduler, target_method_name) + if self._is_already_patched(original): + self.logger.debug( + f"Scheduler.{target_method_name} leak check already patched") + patched_any = True + continue - def _wrapped(self, *args: Any, **kwargs: Any): - # Disable memory leak detection when ENABLE_KVCACHED is set - if enable_kvcached(): - return - return original(self, *args, **kwargs) + wrapped = _make_wrapped(original) + self._mark_as_patched(wrapped) + setattr(Scheduler, target_method_name, wrapped) + patched_any = True - self._mark_as_patched(_wrapped) - setattr(Scheduler, target_method_name, _wrapped) - return True + return patched_any class RadixCacheLimitPatch(VersionAwarePatch, BasePatch): diff --git a/kvcached/integration/version_utils.py b/kvcached/integration/version_utils.py index 0de036a2..ed65fc1e 100644 --- a/kvcached/integration/version_utils.py +++ b/kvcached/integration/version_utils.py @@ -154,6 +154,16 @@ def detect_version(self, library_name: str, force_refresh: bool = False) -> Opti except Exception as e: self.logger.warning(f"Error detecting version for {library_name}: {e}") + # Fallback to installed-package metadata. Some builds (e.g. source + # builds of SGLang) don't expose a module-level __version__, but the + # distribution metadata still carries the version. + if detected_version is None: + try: + import importlib.metadata as _md + detected_version = _md.version(library_name) + except Exception: + pass + self._version_cache[library_name] = detected_version return detected_version