Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ The Timing Manager uses a **credit-based flow control system** to control when r
- Scales to large numbers of workers without bottlenecks
- Efficient message routing minimizes overhead

### Phase baseline handshake

Before each phase issues its first credit, TimingManager invokes a synchronous gate
on SystemController. SystemController's `BaselineCoordinator` broadcasts a
`PhaseBaselineRequestMessage` to all services that advertised
`ServiceCapability.BASELINE_COLLECTOR` and waits for acks (with a configurable
timeout). The gate releases when all collectors ack or the timeout fires. A
symmetric END gate fires after credits drain. This guarantees telemetry and
server-metrics scrapes capture clean pre/post-phase reference points without
requiring TimingManager to know about any specific collector. Failed acks count
as acks (logged), so a dead collector cannot block the phase.

### Data Flow & Messaging

This section describes the end-to-end message flow during a benchmark run, showing how data moves between components through the ZMQ message bus.
Expand Down
28 changes: 28 additions & 0 deletions docs/dev/patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,34 @@ service:
**Config types:**
- `CLIConfig`: unified CLI input DTO carrying both benchmark params (endpoints, loadgen) and service-runtime knobs (ZMQ ports, logging level)

## Baseline Collector Pattern

To take a clean reading at phase boundaries, mix in `BaselineCollectorMixin`:

```python
from aiperf.common.base_component_service import BaseComponentService
from aiperf.common.enums import BaselineKind
from aiperf.common.mixins.baseline_collector_mixin import BaselineCollectorMixin


class MyMonitor(BaselineCollectorMixin, BaseComponentService):
"""Captures a system snapshot before profiling and after credits drain."""

async def collect_baseline(
self, kind: BaselineKind, phase_id: str, phase_name: str
) -> None:
if phase_name != "profiling":
return
snapshot = await self._take_snapshot()
self._snapshots.append((kind, phase_id, snapshot))
```

The mixin auto-advertises `ServiceCapability.BASELINE_COLLECTOR` via its
`extra_capabilities` ClassVar, which SystemController's `BaselineCoordinator`
keys off to fan out gate requests. Per-collector exceptions are caught and
surfaced via the ack message's `success=False` field rather than blocking the
gate.

## Model Pattern

Use `AIPerfBaseModel` for data, `BaseConfig` for configuration:
Expand Down
11 changes: 10 additions & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ API server settings. Controls the host and port of the API server.
| `AIPERF_API_SERVER_CORS_ORIGINS` | `[]` | — | List of CORS origins to allow (empty = no CORS, ['*'] = all origins) |
| `AIPERF_API_SERVER_SHUTDOWN_TIMEOUT` | `5.0` | ≥ 1.0, ≤ 300.0 | Timeout in seconds for graceful API server shutdown before force-cancelling |

## BASELINE

Phase baseline handshake settings. Controls the gate that blocks TimingManager between phases until registered baseline collectors finish their point-in-time scrape.

| Environment Variable | Default | Constraints | Description |
|----------------------|---------|-------------|-------------|
| `AIPERF_BASELINE_GATE_TIMEOUT_S` | `5.0` | > 0.0 | Per-gate timeout (seconds). If registered baseline collectors do not all ack within this window, the gate releases with a warning and the phase proceeds without waiting for stragglers. |
| `AIPERF_BASELINE_GATE_ENABLED` | `True` | — | Master switch for the phase baseline handshake. When False, PhaseGateClient short-circuits to no-op and PhaseRunner does not wait between phases. Useful for replay/debug runs. |

## COMPRESSION

Compression settings for streaming file transfers. Controls chunk size and compression levels for zstd and gzip encodings used in dataset and results file transfers.
Expand Down Expand Up @@ -161,6 +170,7 @@ Record processing and export configuration. Controls batch sizes, processor scal
| `AIPERF_RECORD_PROCESSOR_SCALE_FACTOR` | `4` | ≥ 1, ≤ 100 | Scale factor for number of record processors to spawn based on worker count. Formula: 1 record processor for every X workers |
| `AIPERF_RECORD_PROGRESS_REPORT_INTERVAL` | `2.0` | ≥ 0.1, ≤ 600.0 | Interval in seconds between records progress report messages |
| `AIPERF_RECORD_PROCESS_RECORDS_TIMEOUT` | `300.0` | ≥ 1.0, ≤ 100000.0 | Timeout in seconds for processing record results |
| `AIPERF_RECORD_CREDITS_COMPLETE_FALLBACK_TIMEOUT` | `10.0` | ≥ 0.0, ≤ 300.0 | Maximum seconds RecordsManager waits for CreditsComplete after all profiling records are ready before finalizing defensively |

## SEARCHPLANNER

Expand All @@ -180,7 +190,6 @@ Server metrics collection configuration. Controls server metrics collection freq

| Environment Variable | Default | Constraints | Description |
|----------------------|---------|-------------|-------------|
| `AIPERF_SERVER_METRICS_COLLECTION_FLUSH_PERIOD` | `2.0` | ≥ 0.0, ≤ 30.0 | Time in seconds to continue collecting metrics after profiling completes, allowing server-side metrics to flush/finalize before shutting down (default: 2.0s) |
| `AIPERF_SERVER_METRICS_COLLECTION_INTERVAL` | `0.333` | ≥ 0.001, ≤ 300.0 | Server metrics collection interval in seconds (default: 333ms, ~3Hz) |
| `AIPERF_SERVER_METRICS_EXPORT_BATCH_SIZE` | `100` | ≥ 1, ≤ 1000000 | Batch size for server metrics jsonl writer export results processor |
| `AIPERF_SERVER_METRICS_REACHABILITY_TIMEOUT` | `10` | ≥ 1, ≤ 300 | Timeout in seconds for checking server metrics endpoint reachability during init |
Expand Down
2 changes: 2 additions & 0 deletions docs/index.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ navigation:
path: reference/tokenizer-auto-detection.md
- page: Conversation Context Mode
path: reference/conversation-context-mode.md
- page: Phase Baseline Handshake
path: reference/phase-baseline-handshake.md
- page: List-Metric Aggregation
path: reference/list-metric-aggregation.md
- page: Vendor Usage Field Reference
Expand Down
125 changes: 125 additions & 0 deletions docs/reference/phase-baseline-handshake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Phase Baseline Handshake

The phase baseline handshake captures point-in-time baseline readings at phase boundaries without coupling `TimingManager` to specific collectors. `PhaseRunner` pauses at each boundary through `PhaseGateClient`; `SystemController` fans the request out through `BaselineCoordinator`; registered baseline collectors scrape once and ACK; then the gate releases and the benchmark continues.

## Component map

```mermaid
flowchart LR
subgraph TimingManager
PR[PhaseRunner]
PGC[PhaseGateClient]
PR -->|before_phase / after_phase| PGC
end

subgraph Controller
SC[SystemController]
BC[BaselineCoordinator]
SC --> BC
end

subgraph Collectors[Baseline collector services]
GTM[GPUTelemetryManager]
SMM[ServerMetricsManager]
BCM[BaselineCollectorMixin]
GTM --> BCM
SMM --> BCM
end

PGC -->|PhaseStartGateCommand / PhaseEndGateCommand| SC
BC -->|PhaseBaselineRequestMessage| BCM
BCM -->|PhaseBaselineAckMessage| SC
SC -->|PhaseGateGrantedResponse| PGC
```

## Per-phase message sequence

```mermaid
sequenceDiagram
autonumber
participant Runner as PhaseRunner
participant Gate as PhaseGateClient
participant SC as SystemController
participant Coord as BaselineCoordinator
participant Collector as Baseline collectors

Runner->>Gate: before_phase(phase_id, phase_name)
Gate->>SC: PhaseStartGateCommand
SC->>Coord: gate_phase(kind=START)
Coord->>Collector: PhaseBaselineRequestMessage(kind=START)
Collector->>Collector: collect_baseline(START, phase_id, phase_name)
Collector-->>SC: PhaseBaselineAckMessage(success=True)
SC->>Coord: handle_ack(ack)
Coord-->>SC: all registered collectors acked
SC-->>Gate: PhaseGateGrantedResponse
Gate-->>Runner: start gate released

Runner->>Runner: issue credits and wait for returns

Runner->>Gate: after_phase(phase_id, phase_name)
Gate->>SC: PhaseEndGateCommand
SC->>Coord: gate_phase(kind=END)
Coord->>Collector: PhaseBaselineRequestMessage(kind=END)
Collector->>Collector: collect_baseline(END, phase_id, phase_name)
Collector-->>SC: PhaseBaselineAckMessage(success=True)
SC->>Coord: handle_ack(ack)
Coord-->>SC: all registered collectors acked
SC-->>Gate: PhaseGateGrantedResponse
Gate-->>Runner: end gate released
```

## Credit ordering at phase boundaries

```mermaid
sequenceDiagram
autonumber
participant Runner as PhaseRunner
participant Gate as PhaseGateClient
participant Controller as SystemController
participant Coord as BaselineCoordinator
participant Collectors as Baseline collectors
participant Issuer as CreditIssuer
participant Workers as Workers

Runner->>Gate: before_phase(phase_id, phase_name)
Gate->>Controller: PhaseStartGateCommand
Controller->>Coord: gate_phase(kind=START)
Coord->>Collectors: PhaseBaselineRequestMessage(kind=START)
Collectors-->>Controller: PhaseBaselineAckMessage(success=True)
Controller-->>Gate: PhaseGateGrantedResponse
Gate-->>Runner: START gate released
Runner->>Issuer: start strategy.execute_phase()
Issuer->>Workers: publish credits for this phase
Workers-->>Runner: return credit results
Runner->>Runner: wait for sends complete, then returns drain
Runner->>Gate: after_phase(phase_id, phase_name)
Gate->>Controller: PhaseEndGateCommand
Controller->>Coord: gate_phase(kind=END)
Coord->>Collectors: PhaseBaselineRequestMessage(kind=END)
Collectors-->>Controller: PhaseBaselineAckMessage(success=True)
Controller-->>Gate: PhaseGateGrantedResponse
Gate-->>Runner: END gate released
Runner->>Runner: phase transition may complete
```

## TimingManager phase flow

```mermaid
flowchart TD
A[PhaseRunner starts phase] --> B[Generate phase_id and phase_name]
B --> C[PhaseGateClient.before_phase]
C --> D[SystemController handles PHASE_START_GATE]
D --> E[BaselineCoordinator broadcasts START request]
E --> F[Collectors scrape START baseline]
F --> G[Collectors publish START ACKs]
G --> H[SystemController returns PhaseGateGrantedResponse]
H --> I[PhaseRunner issues credits]
I --> J[PhaseRunner waits for phase completion and returns]
J --> K[PhaseGateClient.after_phase]
K --> L[SystemController handles PHASE_END_GATE]
L --> M[BaselineCoordinator broadcasts END request]
M --> N[Collectors scrape END baseline]
N --> O[Collectors publish END ACKs]
O --> P[SystemController returns PhaseGateGrantedResponse]
P --> Q[PhaseRunner completes phase transition]
```
1 change: 0 additions & 1 deletion docs/server-metrics/server-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ WARNING Disabling server metrics collection for http://127.0.0.1:60000/metrics:
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `AIPERF_SERVER_METRICS_COLLECTION_INTERVAL` | 0.333s | Collection frequency (333ms, ~3Hz) |
| `AIPERF_SERVER_METRICS_COLLECTION_FLUSH_PERIOD` | 2.0s | Wait time for final metrics after benchmark |
| `AIPERF_SERVER_METRICS_REACHABILITY_TIMEOUT` | 10s | Timeout for endpoint reachability tests |
| `AIPERF_SERVER_METRICS_EXPORT_BATCH_SIZE` | 100 | Batch size for JSONL writer |
| `AIPERF_SERVER_METRICS_SHUTDOWN_DELAY` | 5.0s | Shutdown delay for command response transmission |
Expand Down
6 changes: 5 additions & 1 deletion src/aiperf/common/base_component_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import asyncio
import uuid
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, ClassVar

from aiperf.common.base_service import BaseService
from aiperf.common.enums import CommandType, LifecycleState
Expand Down Expand Up @@ -40,6 +40,9 @@ class BaseComponentService(BaseService):
publishing the current state of the service to the system controller.
"""

extra_capabilities: ClassVar[tuple[str, ...]] = ()
"""Capabilities advertised by this service in RegisterServiceCommand. Mixins extend this."""

def __init__(
self,
run: "BenchmarkRun",
Expand Down Expand Up @@ -77,6 +80,7 @@ async def _register_service_on_start(self) -> None:
# Target the system controller directly to avoid broadcasting to all services.
target_service_type=ServiceType.SYSTEM_CONTROLLER,
state=self.state,
capabilities=tuple(self.extra_capabilities),
)
max_attempts = Environment.SERVICE.REGISTRATION_MAX_ATTEMPTS
registration_interval = Environment.SERVICE.REGISTRATION_INTERVAL
Expand Down
10 changes: 10 additions & 0 deletions src/aiperf/common/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
BasePydanticEnumInfo,
CaseInsensitiveStrEnum,
)
from aiperf.common.enums.baseline_enums import (
BaselineKind,
ServiceCapability,
make_result_producer_capability,
parse_result_producer_capability,
)
from aiperf.common.enums.enums import (
AIPerfLogLevel,
AudioFormat,
Expand Down Expand Up @@ -86,6 +92,7 @@
"BaseMetricUnitInfo",
"BasePydanticBackedStrEnum",
"BasePydanticEnumInfo",
"BaselineKind",
"CaseInsensitiveStrEnum",
"CommAddress",
"CommandResponseStatus",
Expand Down Expand Up @@ -141,6 +148,7 @@
"SSEFieldType",
"ServerMetricsDiscoveryMode",
"ServerMetricsFormat",
"ServiceCapability",
"ServiceRegistrationStatus",
"SweepMode",
"SystemState",
Expand All @@ -151,4 +159,6 @@
"VideoJobStatus",
"VideoSynthType",
"WorkerStatus",
"make_result_producer_capability",
"parse_result_producer_capability",
]
45 changes: 45 additions & 0 deletions src/aiperf/common/enums/baseline_enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Enums for the phase baseline handshake.

BaselineKind tags whether a baseline reading is taken before a phase starts
issuing credits (START) or after credits have drained (END).

ServiceCapability is a generic capability tag advertised by services in their
RegisterServiceCommand; SystemController dispatches based on membership
(e.g. BASELINE_COLLECTOR services join the BaselineCoordinator's registered set).
"""

from aiperf.common.enums.base_enums import CaseInsensitiveStrEnum


class BaselineKind(CaseInsensitiveStrEnum):
"""Direction of a baseline reading relative to a phase."""

START = "start"
END = "end"


class ServiceCapability(CaseInsensitiveStrEnum):
"""Capability tags a service may advertise at registration time."""

BASELINE_COLLECTOR = "baseline_collector"
RESULT_PRODUCER = "result_producer"


_RESULT_PRODUCER_PREFIX = f"{ServiceCapability.RESULT_PRODUCER}:"


def make_result_producer_capability(domain: str) -> str:
"""Build a result-producer capability tag for a result domain."""

return f"{_RESULT_PRODUCER_PREFIX}{domain}"
Comment on lines +33 to +36
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject empty result-producer domains at construction.

make_result_producer_capability("") currently produces a tag that parse_result_producer_capability(...) cannot round-trip (it returns None). Guarding this early avoids silent invalid capabilities.

Suggested fix
 def make_result_producer_capability(domain: str) -> str:
     """Build a result-producer capability tag for a result domain."""
-
-    return f"{_RESULT_PRODUCER_PREFIX}{domain}"
+    if not domain:
+        raise ValueError("domain must be non-empty")
+    return f"{_RESULT_PRODUCER_PREFIX}{domain}"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aiperf/common/enums/baseline_enums.py` around lines 33 - 36,
make_result_producer_capability currently allows an empty domain which yields an
invalid tag that parse_result_producer_capability cannot round-trip; validate
the domain in make_result_producer_capability (ensure domain is a non-empty
string) and reject empty input by raising a ValueError (or similar explicit
exception) with a clear message; update the function that builds the tag
(make_result_producer_capability) to check domain before returning
f"{_RESULT_PRODUCER_PREFIX}{domain}" and reference
parse_result_producer_capability in the error message if helpful.



def parse_result_producer_capability(capability: str) -> str | None:
"""Return the result domain if capability is a result-producer tag."""

if not capability.startswith(_RESULT_PRODUCER_PREFIX):
return None
domain = capability.removeprefix(_RESULT_PRODUCER_PREFIX)
return domain or None
4 changes: 4 additions & 0 deletions src/aiperf/common/enums/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class CommandType(CaseInsensitiveStrEnum):
PROFILE_COMPLETE = "profile_complete"
PROFILE_CONFIGURE = "profile_configure"
PROFILE_START = "profile_start"
PHASE_END_GATE = "phase_end_gate"
PHASE_START_GATE = "phase_start_gate"
REGISTER_SERVICE = "register_service"
SHUTDOWN = "shutdown"
SHUTDOWN_WORKERS = "shutdown_workers"
Expand Down Expand Up @@ -348,6 +350,8 @@ class MessageType(CaseInsensitiveStrEnum):
INFERENCE_RESULTS = "inference_results"
METRIC_RECORDS = "metric_records"
PARSED_INFERENCE_RESULTS = "parsed_inference_results"
PHASE_BASELINE_ACK = "phase_baseline_ack"
PHASE_BASELINE_REQUEST = "phase_baseline_request"
PROCESSING_STATS = "processing_stats"
PROCESS_RECORDS_RESULT = "process_records_result"
PROCESS_TELEMETRY_RESULT = "process_telemetry_result"
Expand Down
Loading
Loading