-
Notifications
You must be signed in to change notification settings - Fork 5
[rocm, test] feat: add ROCm controller and test tooling #56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
e146e5b
926e6ba
54bc422
b52e8dc
16e05a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| import threading | ||
| import time | ||
| from typing import Optional | ||
|
|
||
| import torch | ||
|
|
||
| from keep_gpu.single_gpu_controller.base_gpu_controller import BaseGPUController | ||
| from keep_gpu.utilities.humanized_input import parse_size | ||
| from keep_gpu.utilities.logger import setup_logger | ||
|
|
||
| logger = setup_logger(__name__) | ||
|
|
||
|
|
||
| def _query_rocm_utilization(index: int) -> Optional[int]: | ||
| """ | ||
| Best-effort utilization query via rocm-smi. | ||
| Returns None if rocm-smi is unavailable or fails. | ||
| """ | ||
| try: | ||
| import rocm_smi # type: ignore | ||
|
|
||
| rocm_smi.rsmi_init() | ||
| # rsmi_dev_busy_percent_get returns percent (0-100) | ||
| util = rocm_smi.rsmi_dev_busy_percent_get(index) | ||
|
||
| rocm_smi.rsmi_shut_down() | ||
|
||
| return int(util) | ||
| except Exception as exc: # pragma: no cover - depends on ROCm availability | ||
| logger.debug("ROCm utilization query failed: %s", exc) | ||
| return None | ||
|
|
||
|
|
||
| class RocmGPUController(BaseGPUController): | ||
| """ | ||
| Keep a single ROCm GPU busy by running lightweight elementwise ops | ||
| in a background thread. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| rank: int, | ||
| interval: float = 1.0, | ||
| vram_to_keep: str | int = "1000 MB", | ||
| busy_threshold: int = 10, | ||
| iterations: int = 5000, | ||
| ): | ||
| if isinstance(vram_to_keep, str): | ||
| vram_to_keep = self.parse_size(vram_to_keep) | ||
| elif not isinstance(vram_to_keep, int): | ||
| raise TypeError( | ||
| f"vram_to_keep must be str or int, got {type(vram_to_keep)}" | ||
| ) | ||
|
||
| super().__init__(vram_to_keep=vram_to_keep, interval=interval) | ||
| self.rank = rank | ||
| self.device = torch.device(f"cuda:{rank}") | ||
| self.interval = interval | ||
| self.busy_threshold = busy_threshold | ||
| self.iterations = iterations | ||
|
|
||
| self._stop_evt: Optional[threading.Event] = None | ||
| self._thread: Optional[threading.Thread] = None | ||
|
|
||
| @staticmethod | ||
| def parse_size(text: str) -> int: | ||
| return parse_size(text) | ||
|
||
|
|
||
| def keep(self) -> None: | ||
| if self._thread and self._thread.is_alive(): | ||
| logger.warning("rank %s: keep thread already running", self.rank) | ||
| return | ||
|
|
||
| self._stop_evt = threading.Event() | ||
| self._thread = threading.Thread( | ||
| target=self._keep_loop, | ||
| name=f"gpu-keeper-rocm-{self.rank}", | ||
| daemon=True, | ||
| ) | ||
| self._thread.start() | ||
| logger.info("rank %s: ROCm keep thread started", self.rank) | ||
|
|
||
| def release(self) -> None: | ||
| if not (self._thread and self._thread.is_alive()): | ||
| logger.warning("rank %s: keep thread not running", self.rank) | ||
| return | ||
| self._stop_evt.set() | ||
| self._thread.join() | ||
| torch.cuda.empty_cache() | ||
| logger.info("rank %s: keep thread stopped & cache cleared", self.rank) | ||
|
|
||
| def __enter__(self): | ||
| self.keep() | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc, tb): | ||
| self.release() | ||
|
|
||
| def _keep_loop(self) -> None: | ||
| torch.cuda.set_device(self.rank) | ||
| tensor = None | ||
| while not self._stop_evt.is_set(): | ||
| try: | ||
| tensor = torch.rand( | ||
| self.vram_to_keep, | ||
| device=self.device, | ||
| dtype=torch.float32, | ||
| requires_grad=False, | ||
| ) | ||
| break | ||
| except RuntimeError as exc: | ||
| logger.error("rank %s: failed to allocate tensor: %s", self.rank, exc) | ||
| time.sleep(self.interval) | ||
| if tensor is None: | ||
| logger.error("rank %s: failed to allocate tensor, exiting loop", self.rank) | ||
| raise RuntimeError("Failed to allocate tensor for ROCm GPU keeping") | ||
|
|
||
| while not self._stop_evt.is_set(): | ||
| try: | ||
| util = _query_rocm_utilization(self.rank) | ||
| if util is not None and util > self.busy_threshold: | ||
| logger.debug("rank %s: GPU busy (%d%%), sleeping", self.rank, util) | ||
| else: | ||
| self._run_batch(tensor) | ||
| time.sleep(self.interval) | ||
| except RuntimeError as exc: | ||
| if "out of memory" in str(exc).lower(): | ||
| torch.cuda.empty_cache() | ||
| time.sleep(self.interval) | ||
| except Exception: | ||
| logger.exception("rank %s: unexpected error", self.rank) | ||
| time.sleep(self.interval) | ||
|
|
||
| @torch.no_grad() | ||
| def _run_batch(self, tensor: torch.Tensor) -> None: | ||
| tic = time.time() | ||
| for _ in range(self.iterations): | ||
| torch.relu_(tensor) | ||
| if self._stop_evt.is_set(): | ||
| break | ||
| torch.cuda.synchronize() | ||
| toc = time.time() | ||
| logger.debug( | ||
| "rank %s: elementwise batch done – avg %.2f ms", | ||
| self.rank, | ||
| (toc - tic) * 1000 / max(1, self.iterations), | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| import pytest | ||
| import torch | ||
|
|
||
|
|
||
| def pytest_addoption(parser): | ||
| parser.addoption( | ||
| "--run-rocm", | ||
| action="store_true", | ||
| default=False, | ||
| help="run tests marked as rocm (require ROCm stack)", | ||
| ) | ||
|
|
||
|
|
||
| def pytest_configure(config): | ||
| config.addinivalue_line("markers", "rocm: tests that require ROCm stack") | ||
| config.addinivalue_line("markers", "large_memory: tests that use large VRAM") | ||
|
|
||
|
Comment on lines
+14
to
+17
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| def pytest_collection_modifyitems(config, items): | ||
| if config.getoption("--run-rocm"): | ||
| return | ||
|
|
||
| skip_rocm = pytest.mark.skip(reason="need --run-rocm option to run") | ||
| for item in items: | ||
| if "rocm" in item.keywords: | ||
| item.add_marker(skip_rocm) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def rocm_available(): | ||
| return bool(torch.cuda.is_available() and getattr(torch.version, "hip", None)) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| import sys | ||
|
|
||
| import pytest | ||
|
|
||
| from keep_gpu.single_gpu_controller import rocm_gpu_controller as rgc | ||
|
|
||
|
|
||
| @pytest.mark.rocm | ||
| def test_query_rocm_utilization_with_mock(monkeypatch, rocm_available): | ||
| if not rocm_available: | ||
| pytest.skip("ROCm stack not available") | ||
|
|
||
| class DummyRocmSMI: | ||
| calls = 0 | ||
|
|
||
| @classmethod | ||
| def rsmi_init(cls): | ||
| cls.calls += 1 | ||
|
|
||
| @staticmethod | ||
| def rsmi_dev_busy_percent_get(index): | ||
| assert index == 1 | ||
| return 42 | ||
|
|
||
| @classmethod | ||
| def rsmi_shut_down(cls): | ||
| cls.calls += 1 | ||
|
|
||
| monkeypatch.setitem(sys.modules, "rocm_smi", DummyRocmSMI) | ||
| util = rgc._query_rocm_utilization(1) | ||
| assert util == 42 | ||
| assert DummyRocmSMI.calls == 2 # init + shutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This
elifblock for ROCm is very similar to the existingifblock for CUDA, leading to code duplication. To improve maintainability, you could refactor this. The logic for settingself.gpu_idsis identical and can be moved out of the conditional blocks. Then, you can use the conditional to select the appropriate controller class (CudaGPUControllerorRocmGPUController) and have a single list comprehension to create theself.controllerslist. This would make the code DRY (Don't Repeat Yourself) and easier to extend.