diff --git a/README.md b/README.md index fa1ff70..a94912d 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,13 @@ with GlobalGPUController(gpu_ids=[0, 1], vram_to_keep="750MB", interval=90, busy - CLI + API parity: same controllers power both code paths. - Continuous docs + CI: mkdocs + mkdocstrings build in CI to keep guidance up to date. +## For developers + +- Install dev extras: `pip install -e ".[dev]"` (add `.[rocm]` if you need ROCm SMI). +- Fast CUDA checks: `pytest tests/cuda_controller tests/global_controller tests/utilities/test_platform_manager.py tests/test_cli_thresholds.py` +- ROCm-only tests carry `@pytest.mark.rocm`; run with `pytest --run-rocm tests/rocm_controller`. +- Markers: `rocm` (needs ROCm stack) and `large_memory` (opt-in locally). + ## Contributing Contributions are welcome—especially around ROCm support, platform fallbacks, and scheduler-specific recipes. Open an issue or PR if you hit edge cases on your cluster. diff --git a/docs/getting-started.md b/docs/getting-started.md index aefd689..f3949fe 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -39,6 +39,12 @@ understand the minimum knobs you need to keep a GPU occupied. pip install keep-gpu ``` +## For contributors + +- Install dev extras: `pip install -e ".[dev]"` (append `.[rocm]` if you need ROCm SMI). +- Fast CUDA checks: `pytest tests/cuda_controller tests/global_controller tests/utilities/test_platform_manager.py tests/test_cli_thresholds.py` +- ROCm-only tests are marked `rocm`; run with `pytest --run-rocm tests/rocm_controller`. + === "Editable dev install" ```bash git clone https://github.com/Wangmerlyn/KeepGPU.git diff --git a/pyproject.toml b/pyproject.toml index 8acbf93..a69fed6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,3 +139,9 @@ exclude = ["build", "dist", ".venv"] known-first-party = ["keep_gpu"] combine-as-imports = true force-single-line = false + +[tool.pytest.ini_options] +markers = [ + "rocm: tests that require ROCm stack", + "large_memory: tests that use large VRAM", +] diff --git a/src/keep_gpu/global_gpu_controller/global_gpu_controller.py b/src/keep_gpu/global_gpu_controller/global_gpu_controller.py index 89676ae..804f2de 100644 --- a/src/keep_gpu/global_gpu_controller/global_gpu_controller.py +++ b/src/keep_gpu/global_gpu_controller/global_gpu_controller.py @@ -30,25 +30,33 @@ def __init__( CudaGPUController, ) - if gpu_ids is None: - self.gpu_ids = list(range(torch.cuda.device_count())) - else: - self.gpu_ids = gpu_ids + controller_cls = CudaGPUController + elif self.computing_platform == ComputingPlatform.ROCM: + from keep_gpu.single_gpu_controller.rocm_gpu_controller import ( + RocmGPUController, + ) - self.controllers = [ - CudaGPUController( - rank=i, - interval=interval, - vram_to_keep=vram_to_keep, - busy_threshold=busy_threshold, - ) - for i in self.gpu_ids - ] + controller_cls = RocmGPUController else: raise NotImplementedError( f"GlobalGPUController not implemented for platform {self.computing_platform}" ) + if gpu_ids is None: + self.gpu_ids = list(range(torch.cuda.device_count())) + else: + self.gpu_ids = gpu_ids + + self.controllers = [ + controller_cls( + rank=i, + interval=interval, + vram_to_keep=vram_to_keep, + busy_threshold=busy_threshold, + ) + for i in self.gpu_ids + ] + def keep(self) -> None: for ctrl in self.controllers: ctrl.keep() diff --git a/src/keep_gpu/single_gpu_controller/base_gpu_controller.py b/src/keep_gpu/single_gpu_controller/base_gpu_controller.py index 87f629e..4adc5ff 100644 --- a/src/keep_gpu/single_gpu_controller/base_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/base_gpu_controller.py @@ -1,12 +1,25 @@ +from typing import Union + +from keep_gpu.utilities.humanized_input import parse_size + + class BaseGPUController: - def __init__(self, vram_to_keep: int, interval: float): + def __init__(self, vram_to_keep: Union[int, str], interval: float): """ Base class for GPU controllers. Args: - vram_to_keep (int): Amount of VRAM (in MB) to keep free. - interval (int): Time interval (in seconds) for checks or actions. - """ + vram_to_keep (int or str): Amount of VRAM to keep busy. Accepts integers + (tensor element count) or human strings like "1GiB" (converted to + element count for float32 tensors). + interval (float): Time interval (in seconds) between keep-alive cycles. + """ + if isinstance(vram_to_keep, str): + vram_to_keep = parse_size(vram_to_keep) + elif not isinstance(vram_to_keep, int): + raise TypeError( + f"vram_to_keep must be str or int, got {type(vram_to_keep)}" + ) self.vram_to_keep = vram_to_keep self.interval = interval diff --git a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py index 7ff794a..baf9579 100644 --- a/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/cuda_gpu_controller.py @@ -61,14 +61,6 @@ def __init__( hogging the GPU. """ - if isinstance(vram_to_keep, str): - vram_to_keep = self.parse_size(vram_to_keep) - elif isinstance(vram_to_keep, int): - vram_to_keep = vram_to_keep - else: - raise TypeError( - f"vram_to_keep must be str or int, got {type(vram_to_keep)}" - ) super().__init__(vram_to_keep=vram_to_keep, interval=interval) self.rank = rank self.device = torch.device(f"cuda:{rank}") @@ -185,7 +177,7 @@ def _run_mat_batch(self, matrix: torch.Tensor) -> None: toc = time.time() logger.debug( - "rank %s: mat ops batch done – avg %.2f ms", + "rank %s: mat ops batch done - avg %.2f ms", self.rank, (toc - tic) * 1000 / self.matmul_iterations, ) diff --git a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py index e69de29..2f65b80 100644 --- a/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py +++ b/src/keep_gpu/single_gpu_controller/rocm_gpu_controller.py @@ -0,0 +1,143 @@ +import threading +import time +from typing import Optional + +import torch + +from keep_gpu.single_gpu_controller.base_gpu_controller import BaseGPUController +from keep_gpu.utilities.logger import setup_logger + +logger = setup_logger(__name__) + + +class RocmGPUController(BaseGPUController): + """ + Keep a single ROCm GPU busy by running lightweight elementwise ops + in a background thread. Requires a ROCm-enabled torch build. + """ + + def __init__( + self, + *, + rank: int, + interval: float = 1.0, + vram_to_keep: str | int = "1000 MB", + busy_threshold: int = 10, + iterations: int = 5000, + ): + super().__init__(vram_to_keep=vram_to_keep, interval=interval) + self.rank = rank + self.device = torch.device(f"cuda:{rank}") + self.busy_threshold = busy_threshold + self.iterations = iterations + self._stop_evt: Optional[threading.Event] = None + self._thread: Optional[threading.Thread] = None + + # Lazy rocm_smi import; keep handle for reuse + try: + import rocm_smi # type: ignore + + self._rocm_smi = rocm_smi + except Exception as exc: # pragma: no cover - env-specific + logger.debug("rocm_smi not available: %s", exc) + self._rocm_smi = None + + def keep(self) -> None: + if self._thread and self._thread.is_alive(): + logger.warning("rank %s: keep thread already running", self.rank) + return + if self._rocm_smi: + try: + self._rocm_smi.rsmi_init() + except Exception as exc: # pragma: no cover - env-specific + logger.debug("rsmi_init failed: %s", exc) + + self._stop_evt = threading.Event() + self._thread = threading.Thread( + target=self._keep_loop, + name=f"gpu-keeper-rocm-{self.rank}", + daemon=True, + ) + self._thread.start() + logger.info("rank %s: ROCm keep thread started", self.rank) + + def release(self) -> None: + if not (self._thread and self._thread.is_alive()): + logger.warning("rank %s: keep thread not running", self.rank) + return + self._stop_evt.set() + self._thread.join() + torch.cuda.empty_cache() + if self._rocm_smi: + try: + self._rocm_smi.rsmi_shut_down() + except Exception as exc: # pragma: no cover - best effort + logger.debug("rsmi_shut_down failed: %s", exc) + logger.info("rank %s: keep thread stopped & cache cleared", self.rank) + + def __enter__(self): + self.keep() + return self + + def __exit__(self, exc_type, exc, tb): + self.release() + + def _query_utilization(self) -> Optional[int]: + if not self._rocm_smi: + return None + try: + util = self._rocm_smi.rsmi_dev_busy_percent_get(self.rank) + return int(util) + except Exception as exc: # pragma: no cover - env-specific + logger.debug("ROCm utilization query failed: %s", exc) + return None + + def _keep_loop(self) -> None: + torch.cuda.set_device(self.rank) + tensor = None + while not self._stop_evt.is_set(): + try: + tensor = torch.rand( + self.vram_to_keep, + device=self.device, + dtype=torch.float32, + requires_grad=False, + ) + break + except RuntimeError: + logger.exception("rank %s: failed to allocate tensor", self.rank) + time.sleep(self.interval) + if tensor is None: + logger.error("rank %s: failed to allocate tensor, exiting loop", self.rank) + raise RuntimeError("Failed to allocate tensor for ROCm GPU keeping") + + while not self._stop_evt.is_set(): + try: + util = self._query_utilization() + if util is not None and util > self.busy_threshold: + logger.debug("rank %s: GPU busy (%d%%), sleeping", self.rank, util) + else: + self._run_batch(tensor) + time.sleep(self.interval) + except RuntimeError as exc: + if "out of memory" in str(exc).lower(): + torch.cuda.empty_cache() + time.sleep(self.interval) + except Exception: + logger.exception("rank %s: unexpected error", self.rank) + time.sleep(self.interval) + + @torch.no_grad() + def _run_batch(self, tensor: torch.Tensor) -> None: + tic = time.time() + for _ in range(self.iterations): + torch.relu_(tensor) + if self._stop_evt.is_set(): + break + torch.cuda.synchronize() + toc = time.time() + logger.debug( + "rank %s: elementwise batch done - avg %.2f ms", + self.rank, + (toc - tic) * 1000 / max(1, self.iterations), + ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..22d1680 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,31 @@ +import pytest +import torch + + +def pytest_addoption(parser): + parser.addoption( + "--run-rocm", + action="store_true", + default=False, + help="run tests marked as rocm (require ROCm stack)", + ) + + +def pytest_configure(config): + config.addinivalue_line("markers", "rocm: tests that require ROCm stack") + config.addinivalue_line("markers", "large_memory: tests that use large VRAM") + + +def pytest_collection_modifyitems(config, items): + if config.getoption("--run-rocm"): + return + + skip_rocm = pytest.mark.skip(reason="need --run-rocm option to run") + for item in items: + if "rocm" in item.keywords: + item.add_marker(skip_rocm) + + +@pytest.fixture +def rocm_available(): + return bool(torch.cuda.is_available() and getattr(torch.version, "hip", None)) diff --git a/tests/rocm_controller/test_rocm_utilization.py b/tests/rocm_controller/test_rocm_utilization.py new file mode 100644 index 0000000..4618398 --- /dev/null +++ b/tests/rocm_controller/test_rocm_utilization.py @@ -0,0 +1,36 @@ +import sys + +import pytest + +from keep_gpu.single_gpu_controller import rocm_gpu_controller as rgc + + +@pytest.mark.rocm +def test_query_rocm_utilization_with_mock(monkeypatch, rocm_available): + if not rocm_available: + pytest.skip("ROCm stack not available") + + class DummyRocmSMI: + calls = 0 + + @classmethod + def rsmi_init(cls): + cls.calls += 1 + + @staticmethod + def rsmi_dev_busy_percent_get(index): + assert index == 1 + return 42 + + @classmethod + def rsmi_shut_down(cls): + cls.calls += 1 + + # Ensure the counter is reset to avoid leaking state between tests + DummyRocmSMI.calls = 0 + monkeypatch.setitem(sys.modules, "rocm_smi", DummyRocmSMI) + util = rgc._query_rocm_utilization(1) + assert util == 42 + assert DummyRocmSMI.calls == 2 # init + shutdown + # Reset after test for cleanliness + DummyRocmSMI.calls = 0