Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 0 additions & 41 deletions tensorrt_llm/_torch/pyexecutor/py_executor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import dataclasses
import datetime
import functools
import gc
import os
import pickle # nosec B403
import threading
import time
import traceback
import weakref
from contextlib import contextmanager
from typing import Dict, Iterable, List, Optional, Tuple, Union

Expand Down Expand Up @@ -59,10 +57,6 @@
# Format: "start1-stop1,start2-stop2,..." or single iterations "iter1,iter2,..."
PROFILE_START_STOP_ENV_VAR_NAME = "TLLM_PROFILE_START_STOP"

# Environment variable to enable garbage collection profiling.
# Set to "1" to enable recording of garbage collection events during profiling.
PROFILE_RECORD_GC_ENV_VAR_NAME = "TLLM_PROFILE_RECORD_GC"

# Environment variable to enable PyTorch profiler tracing.
# Set to a path to save detailed tracing of PyTorch operations.
PROFILE_TRACE_ENV_VAR_NAME = "TLLM_TORCH_PROFILE_TRACE"
Expand Down Expand Up @@ -97,40 +91,6 @@ def _load_iteration_indexes(env_var: str):
return frozenset(starts), frozenset(stops)


class _GCNvtxHandle:
pass


def _gc_nvtx_watcher():
enabled = os.environ.get(PROFILE_RECORD_GC_ENV_VAR_NAME, None)
if not enabled:
return None

range_id: Optional[int] = None

def gc_callback(phase, _):
nonlocal range_id
if phase == "start":
assert range_id is None, "Unexpected state in GC callback: another GC while last GC not finished?"
range_id = torch.cuda.nvtx.range_start("Python GC")
elif phase == "stop":
assert range_id is not None, "Unexpected state in GC callback: no active GC but got GC finished?"
torch.cuda.nvtx.range_end(range_id)
range_id = None

gc.callbacks.append(gc_callback)

def gc_cleanup(callback):
try:
gc.callbacks.remove(callback)
except ValueError:
pass

handle = _GCNvtxHandle()
weakref.finalize(handle, gc_cleanup, gc_callback)
return handle


@dataclasses.dataclass
class BatchState:
sample_state: SampleState
Expand Down Expand Up @@ -178,7 +138,6 @@ def __init__(self,
# profile config
self.profile_start_iters, self.profile_stop_iters = _load_iteration_indexes(
PROFILE_START_STOP_ENV_VAR_NAME)
self.gc_nvtx_watcher_handle = _gc_nvtx_watcher()

# related modules
self.resource_manager = resource_manager
Expand Down
80 changes: 80 additions & 0 deletions tensorrt_llm/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,18 @@ def nvtx_range_debug(msg: str,
return _null_context_manager()


def nvtx_mark_debug(msg: str,
color: str = "grey",
domain: str = "TensorRT-LLM",
category: Optional[str] = None) -> None:
"""
Creates an NVTX marker for debugging purposes.
"""
if os.getenv("TLLM_LLMAPI_ENABLE_NVTX", "0") == "1" or \
os.getenv("TLLM_NVTX_DEBUG", "0") == "1":
nvtx_mark(msg, color=color, domain=domain, category=category)


def nvtx_mark(msg: str,
color: str = "grey",
domain: str = "TensorRT-LLM",
Expand Down Expand Up @@ -1195,3 +1207,71 @@ def is_device_integrated() -> bool:
if not torch.cuda.is_available():
return False
return torch.cuda.get_device_properties().is_integrated


# Environment variable to enable garbage collection profiling.
# Set to "1" to enable recording of garbage collection events during profiling.
PROFILE_RECORD_GC_ENV_VAR_NAME = "TLLM_PROFILE_RECORD_GC"


class _GCNvtxHandle:
"""Handle object for GC NVTX watcher to keep it alive."""


# Singleton for the GC NVTX watcher handle.
_gc_watcher_handle: Optional[_GCNvtxHandle] = None


def _setup_gc_nvtx_profiling() -> Optional[_GCNvtxHandle]:
"""
Set up NVTX range markers for Python garbage collection events (singleton).
This helps in profiling to visualize when GC occurs during execution.

This function is called automatically at module import time. The environment
variable TLLM_PROFILE_RECORD_GC must be set before importing this module.

This is an internal function and should not be called directly by users.

Returns:
_GCNvtxHandle or None: A handle object that keeps the GC callback alive,
or None if GC profiling is not enabled.
"""
global _gc_watcher_handle

# Return existing handle if already initialized
if _gc_watcher_handle is not None:
return _gc_watcher_handle

enabled = os.environ.get(PROFILE_RECORD_GC_ENV_VAR_NAME, None)
if not enabled:
return None

range_id: Optional[int] = None

def gc_callback(phase, _):
nonlocal range_id
if phase == "start":
assert range_id is None, "Unexpected state in GC callback: another GC while last GC not finished?"
range_id = torch.cuda.nvtx.range_start("Python GC")
elif phase == "stop":
assert range_id is not None, "Unexpected state in GC callback: no active GC but got GC finished?"
torch.cuda.nvtx.range_end(range_id)
range_id = None

gc.callbacks.append(gc_callback)

def gc_cleanup(callback):
try:
gc.callbacks.remove(callback)
except ValueError:
pass

handle = _GCNvtxHandle()
weakref.finalize(handle, gc_cleanup, gc_callback)

_gc_watcher_handle = handle
return handle


# Initialize GC NVTX profiling singleton at module import time
_setup_gc_nvtx_profiling()
3 changes: 1 addition & 2 deletions tensorrt_llm/bench/benchmark/utils/general.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import json
from importlib.metadata import version
from pathlib import Path
from random import choices, shuffle
from typing import Dict, List, Tuple, Union
Expand Down Expand Up @@ -170,7 +169,7 @@ def get_settings(params: dict, dataset_metadata: DatasetMetadata, model: str,

backend = params.get("backend", "pytorch")
return {
"sw_version": version("tensorrt_llm"),
"sw_version": "1.2",
"model_path": model_path,
"settings_config": {
"max_batch_size": int(max_batch_size),
Expand Down
3 changes: 3 additions & 0 deletions tensorrt_llm/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ def _create_ray_executor(
is_llm_executor: bool,
tp_size: int,
):
logger.warning(f"Orchestrator is creating Ray executor")
from .ray_executor import RayExecutor

return RayExecutor(worker_kwargs,
Expand All @@ -386,6 +387,7 @@ def _create_rpc_executor(
):
"""Create RPC-based executor (GenerationExecutorRpcProxy)."""
from .rpc_proxy import GenerationExecutorRpcProxy
logger.warning(f"Orchestrator is creating RPC executor")
return GenerationExecutorRpcProxy(
worker_kwargs,
model_world_size=model_world_size,
Expand All @@ -408,6 +410,7 @@ def _create_ipc_executor(
use_worker: If True, creates GenerationExecutorWorker (single process).
If False, creates GenerationExecutorProxy (multi-process with IPC).
"""
logger.warning(f"Orchestrator is creating IPC executor")
if use_worker:
from .worker import GenerationExecutorWorker
return GenerationExecutorWorker(**worker_kwargs,
Expand Down
Loading