Skip to content
Open
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ dev = [
"pre-commit>=4.2.0",
"pytest-asyncio",
"pytest-cov",
"pytest-rerunfailures>=15.0",
"pytest>=7.0.0",
"pytest-xdist>=3.8.0",
"ruff>=0.14.0,<0.15.0",
Expand Down
7 changes: 5 additions & 2 deletions src/aiperf/common/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from abc import ABC
from typing import TYPE_CHECKING

from aiperf.common.constants import IS_WINDOWS
from aiperf.common.enums import CommandType, LifecycleState
from aiperf.common.exceptions import ServiceError
from aiperf.common.hooks import on_command
Expand Down Expand Up @@ -166,6 +167,8 @@ async def _kill(self) -> None:
# graceful stop has already failed; the lifecycle task may be wedged
# inside a C extension (zmq, uvloop, orjson) where CancelledError
# cannot interrupt. Replace this only if we add a robust abort path
# for blocked extension calls.
os.kill(os.getpid(), signal.SIGKILL)
# for blocked extension calls. Windows has no SIGKILL —
# ``signal.SIGKILL`` raises AttributeError on Windows; SIGTERM is the
# closest cross-platform unconditional kill there.
os.kill(os.getpid(), signal.SIGTERM if IS_WINDOWS else signal.SIGKILL)
raise asyncio.CancelledError(f"Killed {self}")
131 changes: 112 additions & 19 deletions src/aiperf/common/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# SPDX-License-Identifier: Apache-2.0

import asyncio
import atexit
import contextlib
import multiprocessing
import os
import signal
import sys
import tempfile
import uuid
import warnings
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -48,14 +50,26 @@ def bootstrap_and_run_service(
log_queue: Optional multiprocessing queue for child process logging.
kwargs: Additional keyword arguments to pass to the service constructor.
"""
is_child_process = multiprocessing.parent_process() is not None

# Release inherited terminal/pipe FDs in spawned children BEFORE anything
# else runs in this process. See _redirect_stdio_to_devnull for the
# per-platform reasoning. Doing it later (e.g. inside the async event
# loop) is too late on Python 3.13: by that point asyncio/logging have
# already grabbed C-level references to the inherited fd 1/2, and a
# later dup2-to-NUL no longer releases the parent's pipe handles fully —
# the parent's `process.communicate()` never sees EOF and hangs.
if (IS_MACOS or IS_WINDOWS) and is_child_process:
_redirect_stdio_to_devnull()

# Ignore SIGINT and SIGTERM in child processes. SIGINT is ignored so only
# the parent handles Ctrl+C. SIGTERM is ignored because graceful shutdown is
# handled via the message bus (ShutdownCommand); process.terminate() is only
# called after the message bus path has already timed out, and the manager
# falls through to SIGKILL after the join timeout anyway. Ignoring SIGTERM
# prevents SIGSEGV crashes that occur when SIGTERM arrives while C extension
# code (uvloop, zmq, aiohttp, orjson) is executing.
if multiprocessing.parent_process() is not None:
if is_child_process:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)

Expand All @@ -75,7 +89,6 @@ async def _run_service():
# Disable health server in child processes to prevent port conflicts.
# Multiple child processes on the same host cannot bind to the same port.
# The main process (SystemController) handles health probes for local mode.
is_child_process = multiprocessing.parent_process() is not None
if is_child_process:
Environment.SERVICE.HEALTH_ENABLED = False

Expand Down Expand Up @@ -116,14 +129,6 @@ async def _run_service():

setup_child_process_logging(log_queue, service.service_id, run)

# NOTE: Prevent child processes from accessing parent's terminal on macOS.
# This solves the macOS terminal corruption issue with Textual UI where child
# processes inherit terminal file descriptors and interfere with Textual's
# terminal management, causing ASCII garbage and freezing when mouse events occur.
# Only apply this in spawned child processes, NOT in the main process where Textual runs.
if IS_MACOS and is_child_process:
_redirect_stdio_to_devnull()

# Initialize global RandomGenerator for reproducible random number generation
from aiperf.common import random_generator as rng

Expand All @@ -144,6 +149,7 @@ async def _run_service():
_exit_if_service_failed(service)

_configure_event_loop_policy_for_platform()
_request_high_resolution_timer_on_windows()

with contextlib.suppress(asyncio.CancelledError):
if not Environment.SERVICE.DISABLE_UVLOOP:
Expand Down Expand Up @@ -171,6 +177,34 @@ def _configure_event_loop_policy_for_platform() -> None:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())


def _request_high_resolution_timer_on_windows() -> None:
"""Bump Windows system timer resolution from 15.6ms to 1ms.

asyncio.sleep on Windows is floored by the OS scheduling timer
interrupt rate, which defaults to 15.625ms. The aiperf scheduler
issues credits at sub-15ms intervals for >60 QPS, so without this
bump credit issuance clumps to the 15.6ms boundary and constant-rate
/ Poisson pacing breaks (CV blows past test thresholds).

``winmm.timeBeginPeriod(1)`` requests 1ms timer resolution. On
Windows 10+ this is scoped per-process — no impact on other apps'
battery life. We never call ``timeEndPeriod`` because the timer
bump should hold for the whole aiperf run; Windows restores the
default automatically when the process exits.

No-op on every non-Windows platform.
"""
if not IS_WINDOWS:
return
import ctypes

# winmm is part of Windows and always present, but guard defensively:
# if it ever fails, aiperf still runs — high-QPS tests may just
# produce noisier intervals.
with contextlib.suppress(OSError, AttributeError):
ctypes.WinDLL("winmm").timeBeginPeriod(1)


def _exit_if_service_failed(service) -> None:
"""Surface accumulated service failures as a non-zero SystemExit.

Expand All @@ -189,10 +223,21 @@ def _exit_if_service_failed(service) -> None:


def _redirect_stdio_to_devnull() -> None:
"""Redirect stdin/stdout/stderr to /dev/null for macOS child processes.

Prevents child processes from accessing the parent's terminal, which causes
Textual UI corruption (ASCII garbage and freezes from inherited terminal FDs).
"""Redirect stdin/stdout/stderr to NUL/devnull in spawned child processes.

macOS: avoid Textual UI terminal corruption — children inheriting the
parent's terminal FDs interfere with Textual's terminal management,
causing ASCII garbage and freezes on mouse events.

Windows: when aiperf is launched as a subprocess with stdout/stderr =
``subprocess.PIPE`` (e.g. from the integration test runner), Windows marks
those pipe handles inheritable. ``multiprocessing.spawn`` then propagates
them into every grandchild service. At shutdown the grandchildren still
hold those pipe handles, which causes either ``process.communicate()`` to
hang forever waiting for EOF, or a ``STATUS_ACCESS_VIOLATION`` (0xC0000005)
during ``DLL_PROCESS_DETACH``. Releasing the inherited pipe FDs to NUL
early makes shutdown clean. Service log output is already routed through
the multiprocessing log_queue, so this loses nothing.
"""
# Redirect at the OS level so spawned grandchild processes (e.g.
# ProcessPoolExecutor workers via 'spawn' context) inherit safe FDs
Expand All @@ -208,17 +253,65 @@ def _redirect_stdio_to_devnull() -> None:
# os.open on /dev/null hits a kernel fast path (no disk I/O), so
# the blocking calls are safe here.
devnull_fd = os.open(os.devnull, os.O_RDWR)
for fd in (0, 1, 2):
os.dup2(devnull_fd, fd)
os.dup2(devnull_fd, 0)
os.dup2(devnull_fd, 1)
os.close(devnull_fd)

# stderr: redirect to a per-process file rather than NUL. Releases the
# inherited stderr pipe handle from the parent (same shutdown rationale
# as fd 1), AND preserves uncaught Python tracebacks for postmortem —
# otherwise child crashes are invisible because Python's default
# ``sys.excepthook`` writes to stderr.
#
# Filename includes PID + a UUID suffix so a recycled PID (common on
# Windows) cannot O_TRUNC over a previous process's crash log. An atexit
# handler removes the file on clean exit if it's still empty — that keeps
# %TEMP% from accumulating zero-byte ``aiperf_child_*_stderr.log`` files
# over many runs while still preserving crash evidence (non-empty files
# are left in place for the user to inspect).
err_path = (
f"{tempfile.gettempdir()}{os.sep}"
f"aiperf_child_{os.getpid()}_{uuid.uuid4().hex[:8]}_stderr.log"
)
# 0o600 mode: owner-only read/write. The crash log can contain Python
# tracebacks with snippets of the user's config (model names, endpoint
# URLs, request data) and lives in a shared %TEMP%/`/tmp` directory.
# Restrictive permissions prevent other local users from harvesting it.
err_fd = os.open(err_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
os.dup2(err_fd, 2)
os.close(err_fd)
atexit.register(_remove_if_empty, err_path)

# Recreate Python-level streams from the redirected OS FDs.
# closefd=False keeps FD ownership at the OS level so that if these
# stream objects are garbage-collected (e.g. replaced by test frameworks),
# the underlying FDs 0/1/2 stay open and the /dev/null redirect holds.
sys.stdin = os.fdopen(0, "r", closefd=False)
sys.stdout = os.fdopen(1, "w", closefd=False)
sys.stderr = os.fdopen(2, "w", closefd=False)
#
# encoding="utf-8" is critical on Windows: without it, os.fdopen picks
# the system default (cp1252) which can't encode common Unicode chars
# (box-drawing arrows, emoji, etc.) used in aiperf's TRACE-level log
# messages. The first such write triggers UnicodeEncodeError, which
# Python's logging then re-emits as another UnicodeEncodeError on top,
# cascading into a flood that wedges the child before it can register.
# errors="replace" guards against any non-UTF8 binary slipping through.
sys.stdin = os.fdopen(0, "r", encoding="utf-8", errors="replace", closefd=False)
sys.stdout = os.fdopen(1, "w", encoding="utf-8", errors="replace", closefd=False)
sys.stderr = os.fdopen(2, "w", encoding="utf-8", errors="replace", closefd=False)


def _remove_if_empty(path: str) -> None:
"""Delete ``path`` on interpreter exit only if it has zero bytes.

Used by ``_redirect_stdio_to_devnull`` to clean up the per-process stderr
file when the process exited cleanly with no uncaught traceback. Files
with content (i.e. real crashes) are preserved for postmortem.
"""
try:
if os.path.getsize(path) == 0:
os.unlink(path)
except OSError:
# File already gone, or directory not writable — both fine to ignore.
pass


def _start_yappi_profiling() -> None:
Expand Down
8 changes: 6 additions & 2 deletions src/aiperf/common/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,14 @@ def resolve_alias(name: str) -> AliasResolutionResult:
Returns:
AliasResolutionResult with resolved name and any suggestions.
"""
# Check if this looks like a local path
# Check if this looks like a local path. Use ``path.anchor`` instead of
# ``path.is_absolute()`` because the latter requires a drive letter on
# Windows (``WindowsPath("/home/user/foo").is_absolute() == False``).
# Anchor is truthy for any path with a drive AND/OR root, so a POSIX-style
# absolute path passed on Windows is correctly recognized as local.
path = Path(name)
is_local_path = (
path.is_absolute()
bool(path.anchor)
or name.startswith("./")
or name.startswith("../")
or path.is_dir()
Expand Down
14 changes: 14 additions & 0 deletions src/aiperf/config/flags/_converter_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ def _apply_verbosity_and_ui(
elif not ui_set and not is_tty():
runtime_dict["ui"] = UIType.NONE

# Dashboard requires a TTY: Textual issues console-setup syscalls that
# block forever on Windows when stdout is a pipe (e.g. subprocess.PIPE
# from a test harness, ``aiperf ... | tee``, CI capture). Downgrade to
# SIMPLE rather than hanging. Applies to every platform — Linux/macOS
# don't hang, but a non-TTY dashboard renders nothing useful there either.
if runtime_dict.get("ui") == UIType.DASHBOARD and not is_tty():
import logging as _logging

_logging.getLogger(__name__).warning(
"--ui dashboard requires an interactive terminal; "
"stdout is not a TTY, falling back to --ui simple"
)
runtime_dict["ui"] = UIType.SIMPLE


def _build_communication(cli: CLIConfig) -> dict[str, Any] | None:
from aiperf.common.enums import CommunicationType
Expand Down
19 changes: 16 additions & 3 deletions src/aiperf/controller/multiprocess_service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ class MultiProcessRunInfo(BaseModel):

model_config = ConfigDict(arbitrary_types_allowed=True)

process: Process | None = Field(default=None)
process: Process | None = Field(
default=None,
description="The multiprocessing Process handle for the spawned service.",
)
service_type: ServiceTypeT = Field(
...,
description="Type of service running in the process",
Expand Down Expand Up @@ -143,8 +146,18 @@ async def wait_for_all_services_registration(
"""
self.debug("Waiting for all required services to register...")

# Get the set of required service types for checking completion
required_types = set(self.required_services.keys())
# Wait for every service we've actually spawned, not just the ones in
# required_services. Optional services (GPU telemetry, server metrics,
# API) are started via run_service() and tracked in multi_process_info
# but never added to required_services. On slow targets (Windows VDI
# multiprocessing.spawn) those optional services register hundreds of
# ms after the core ones — if we only waited on required_services, the
# ProfileConfigureCommand would broadcast before the optionals had
# subscribed, leaving them un-configured and their data missing from
# the final export.
required_types = set(
info.service_type for info in self.multi_process_info
) or set(self.required_services.keys())

# TODO: Can this be done better by using asyncio.Event()?

Expand Down
29 changes: 20 additions & 9 deletions src/aiperf/controller/system_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ def _print_cancel_warning(self) -> None:
console.print()
console.print(
Panel(
"[bold yellow]⚠️ BENCHMARK CANCELLED[/bold yellow]\n\n"
"[bold yellow]BENCHMARK CANCELLED[/bold yellow]\n\n"
"Stopping credit issuance and cancelling in-flight requests...\n"
"Results will be written to files.\n\n"
"[dim]Press Ctrl+C again to force quit immediately[/dim]\n"
Expand All @@ -809,7 +809,7 @@ def _print_force_quit_warning(self) -> None:
console.print()
console.print(
Panel(
"[bold red]🛑 FORCE QUIT[/bold red]\n\n"
"[bold red]FORCE QUIT[/bold red]\n\n"
"Terminating all processes immediately.\n"
"Results may be incomplete or not written to files.",
border_style="red",
Expand Down Expand Up @@ -923,14 +923,25 @@ async def _stop_system_controller(self) -> None:
await self.ui.wait_for_tasks()
await asyncio.sleep(0.1) # Give time for screen clear to finish

if not self._exit_errors:
await self._print_post_benchmark_info_and_metrics()
else:
self._print_exit_errors_and_log_file()
# Post-shutdown reporting must never prevent reaching os._exit(): by
# this point services/comms/UI are already stopped, so any unhandled
# raise here leaves the parent process alive with no work to do, and
# an integration runner waiting on process.communicate() blocks until
# its timeout. The concrete failure that motivated this guard was a
# UnicodeEncodeError from a Rich console.print() of a non-cp1252 char
# on Windows PIPE'd stdout — but any rendering bug has the same blast
# radius, so we catch broadly.
try:
if not self._exit_errors:
await self._print_post_benchmark_info_and_metrics()
else:
self._print_exit_errors_and_log_file()

if Environment.DEV.MODE:
# Print a warning message to the console if developer mode is enabled, on exit after results
print_developer_mode_warning()
if Environment.DEV.MODE:
# Print a warning message to the console if developer mode is enabled, on exit after results
print_developer_mode_warning()
except Exception as e: # noqa: BLE001 - last-chance guard; any raise here hangs the process tree
self.error(f"Post-shutdown reporting failed (continuing to exit): {e!r}")

# Clean up the global log queue to prevent semaphore leaks
await cleanup_global_log_queue()
Expand Down
7 changes: 7 additions & 0 deletions src/aiperf/dataset/loader/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ def _is_url(self, content: str) -> bool:
Raises:
ValueError: If URL has only scheme or only netloc (invalid).
"""
# A real URL contains the "://" separator between scheme and authority.
# Without it, urlparse would mis-classify Windows drive-letter paths
# like "C:\Users\foo" as having scheme="c" and crash here. Filter them
# out cheaply before urlparse runs.
if "://" not in content:
return False

url = urlparse(content)

# Valid URL with both scheme and netloc
Expand Down
12 changes: 6 additions & 6 deletions src/aiperf/exporters/console_osl_mismatch_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def _create_warning_text(
[bold]Why:[/bold] Server hit EOS token before reaching requested output length.

[bold]Fix Options:[/bold]
[green]--extra-inputs ignore_eos:true[/green] Generate until max_tokens (vLLM, TensorRT-LLM)
[green]--extra-inputs min_tokens:<N>[/green] Set minimum output length (vLLM, TensorRT-LLM, SGLang)
[green]--use-server-token-count[/green] Use server-reported token counts if tokenizer mismatch suspected
- [green]--extra-inputs ignore_eos:true[/green] - Generate until max_tokens (vLLM, TensorRT-LLM)
- [green]--extra-inputs min_tokens:<N>[/green] - Set minimum output length (vLLM, TensorRT-LLM, SGLang)
- [green]--use-server-token-count[/green] - Use server-reported token counts if tokenizer mismatch suspected

[bold]Diagnostics:[/bold]
Review [cyan]profile_export.jsonl[/cyan] [cyan]osl_mismatch_diff_pct[/cyan] for per-request values
Adjust: [green]AIPERF_METRICS_OSL_MISMATCH_PCT_THRESHOLD={self._pct_threshold:g}[/green]
Adjust: [green]AIPERF_METRICS_OSL_MISMATCH_MAX_TOKEN_THRESHOLD={self._max_token_threshold}[/green]\
- Review [cyan]profile_export.jsonl[/cyan] -> [cyan]osl_mismatch_diff_pct[/cyan] for per-request values
- Adjust: [green]AIPERF_METRICS_OSL_MISMATCH_PCT_THRESHOLD={self._pct_threshold:g}[/green]
- Adjust: [green]AIPERF_METRICS_OSL_MISMATCH_MAX_TOKEN_THRESHOLD={self._max_token_threshold}[/green]\
"""
Loading
Loading