Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions miles/utils/debug_utils/periodic_py_spy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
import os
import shutil
import subprocess
import threading
import time

import psutil

logger = logging.getLogger(__name__)

DUMP_INTERVAL_ENV = "MILES_DEBUG_PYSPY_DUMP_INTERVAL"
_PROCESS_KEYWORDS = ("python", "ray::", "sglang::")
_PER_PROCESS_DUMP_TIMEOUT_SECONDS = 60

_started = False


def maybe_start_periodic_pyspy_dump() -> None:
global _started

interval = float(os.environ.get(DUMP_INTERVAL_ENV, "0") or "0")
if interval <= 0 or _started:
return

if shutil.which("py-spy") is None:
logger.error(f"{DUMP_INTERVAL_ENV}={interval} set but py-spy not found on PATH; skipping periodic dump")
return

_started = True
thread = threading.Thread(
target=_dump_loop,
args=(interval,),
daemon=True,
name="debug-pyspy-dump",
)
thread.start()
logger.info(f"Started periodic py-spy dump (interval={interval}s)")


def _dump_loop(interval: float) -> None:
while True:
time.sleep(interval)
try:
_dump_all_processes()
except Exception:
logger.exception("Periodic py-spy dump iteration failed")


def _dump_all_processes() -> None:
targets = _collect_target_processes()
print(
f"===== [debug-pyspy] ts={int(time.time())} processes={len(targets)} =====",
flush=True,
)
for pid, cmdline in targets:
_dump_one_process(pid=pid, cmdline=cmdline)


def _collect_target_processes() -> list[tuple[int, str]]:
targets: list[tuple[int, str]] = []
for proc in psutil.process_iter(["pid", "cmdline"]):
try:
cmdline = " ".join(proc.info["cmdline"] or [])
except (psutil.Error, OSError):
continue
if any(keyword in cmdline for keyword in _PROCESS_KEYWORDS):
targets.append((proc.info["pid"], cmdline[:160]))
return targets


def _dump_one_process(pid: int, cmdline: str) -> None:
for native_flag in ("--native", ""):
cmd = f"py-spy dump {native_flag} --pid {pid}".strip()
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=_PER_PROCESS_DUMP_TIMEOUT_SECONDS,
check=True,
)
print(f"----- [debug-pyspy] pid={pid} cmd={cmdline}\n{result.stdout}", flush=True)
return
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
stderr = (getattr(e, "stderr", "") or "").strip()
if native_flag == "":
print(f"----- [debug-pyspy] pid={pid} cmd={cmdline} DUMP FAILED: {stderr[:200]}", flush=True)
2 changes: 2 additions & 0 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from miles.utils.arguments import parse_args
from miles.utils.async_utils import eager_create_task
from miles.utils.control_server.server import start_control_server
from miles.utils.debug_utils.periodic_py_spy import maybe_start_periodic_pyspy_dump
from miles.utils.logging_utils import configure_logger
from miles.utils.mini_ft_controller import maybe_start_mini_ft_controller
from miles.utils.misc import should_run_periodic_action
Expand All @@ -18,6 +19,7 @@

async def train(args):
configure_logger(args, source=MainProcessIdentity())
maybe_start_periodic_pyspy_dump()
# allocate the GPUs
pgs = create_placement_groups(args)
init_tracking(args)
Expand Down
2 changes: 2 additions & 0 deletions train_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from miles.utils.arguments import parse_args
from miles.utils.async_utils import eager_create_task
from miles.utils.control_server.server import start_control_server
from miles.utils.debug_utils.periodic_py_spy import maybe_start_periodic_pyspy_dump
from miles.utils.logging_utils import configure_logger
from miles.utils.mini_ft_controller import maybe_start_mini_ft_controller
from miles.utils.misc import should_run_periodic_action
Expand All @@ -18,6 +19,7 @@
async def train(args):
assert not args.colocate, "Colocation is not supported for async training."
configure_logger(args, source=MainProcessIdentity())
maybe_start_periodic_pyspy_dump()
# allocate the GPUs
pgs = create_placement_groups(args)
init_tracking(args)
Expand Down
Loading