Skip to content

feat: add phase baseline handshake#956

Open
ajcasagrande wants to merge 3 commits into
mainfrom
ajc/phase-baseline-handshake
Open

feat: add phase baseline handshake#956
ajcasagrande wants to merge 3 commits into
mainfrom
ajc/phase-baseline-handshake

Conversation

@ajcasagrande
Copy link
Copy Markdown
Contributor

@ajcasagrande ajcasagrande commented May 18, 2026

Summary by CodeRabbit

  • New Features

    • Phase baseline handshake added to establish clean pre/post-phase telemetry reference points and gate phases (start/end) with a configurable timeout; failed collector responses are logged but won’t block phases
    • Records manager now defers finalization with a credits-complete fallback timeout
  • Configuration

    • New env vars: AIPERF_BASELINE_GATE_TIMEOUT_S, AIPERF_BASELINE_GATE_ENABLED, AIPERF_RECORD_CREDITS_COMPLETE_FALLBACK_TIMEOUT
    • Server metrics config: AIPERF_SERVER_METRICS_COLLECTION_INTERVAL replaces prior flush-period setting
  • Documentation

    • New architecture and integration docs for phase baseline handshake and baseline collector pattern

Review Change Stack

ajcasagrande and others added 2 commits May 16, 2026 17:07
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
Signed-off-by: Anthony Casagrande <[email protected]>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 18, 2026

Try out this PR

Quick install:

pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@4134e4659e783f0eed8414001af8b262e7b86dd4

Recommended with virtual environment (using uv):

uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@4134e4659e783f0eed8414001af8b262e7b86dd4

Last updated for commit: 4134e46Browse code

@github-actions github-actions Bot added the feat label May 18, 2026
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 18, 2026

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: ef8a1865-cec7-40ef-89cd-76fa9a1dd3cd

📥 Commits

Reviewing files that changed from the base of the PR and between 6bc1a6d and 4134e46.

📒 Files selected for processing (4)
  • docs/environment-variables.md
  • src/aiperf/common/environment.py
  • tests/component_integration/conftest.py
  • tests/integration/conftest.py
💤 Files with no reviewable changes (2)
  • tests/integration/conftest.py
  • tests/component_integration/conftest.py
✅ Files skipped from review due to trivial changes (1)
  • docs/environment-variables.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/aiperf/common/environment.py

Walkthrough

This PR implements a phase baseline handshake mechanism that synchronizes baseline collection at phase boundaries. Services advertise baseline-collection or result-producer capabilities; the system broadcasts requests to collectors via BaselineCoordinator, waits for acknowledgements with timeout, and gates phase transitions. Result producers signal completion through ResultJoinCoordinator to trigger coordinated shutdown. Timing managers wire the client-side PhaseGateClient into phase runners to enforce START/END gating.

Changes

Phase Baseline Handshake & Coordination

Layer / File(s) Summary
Baseline type system and enums
src/aiperf/common/enums/baseline_enums.py, src/aiperf/common/enums/enums.py, src/aiperf/common/enums/__init__.py
BaselineKind enum (START/END), ServiceCapability enum (BASELINE_COLLECTOR/RESULT_PRODUCER), and helpers make_result_producer_capability() and parse_result_producer_capability() to encode/decode per-domain result producer tags.
Environment configuration for baseline gating and credits
src/aiperf/common/environment.py
New BASELINE settings subsystem with GATE_TIMEOUT_S (default 5.0s) and GATE_ENABLED (default true); extends RECORD with CREDITS_COMPLETE_FALLBACK_TIMEOUT; removes SERVER_METRICS.COLLECTION_FLUSH_PERIOD.
Message contracts and module exports
src/aiperf/common/messages/base_messages.py, src/aiperf/common/messages/baseline_messages.py, src/aiperf/common/messages/command_messages.py, src/aiperf/common/messages/__init__.py
Adds request_ns validation (ge=0); defines PhaseBaselineRequestMessage and PhaseBaselineAckMessage for baseline handshake; defines PhaseStartGateCommand, PhaseEndGateCommand, PhaseGateGrantedResponse for phase gating; updates exports.
Base service capability infrastructure
src/aiperf/common/base_component_service.py, src/aiperf/common/mixins/baseline_collector_mixin.py, src/aiperf/common/mixins/__init__.py
BaseComponentService.extra_capabilities advertises service capabilities in registration; BaselineCollectorMixin registers ServiceCapability.BASELINE_COLLECTOR and defines abstract collect_baseline() hook; mixin handler publishes ack messages with success/error status.
Baseline coordinator (server-side)
src/aiperf/controller/baseline_coordinator.py
BaselineCoordinator tracks registered collectors, broadcasts PhaseBaselineRequestMessage, waits for per-service acks with timeout, logs unacked collectors on timeout, and allows phase progression regardless.
Result join coordinator for shutdown readiness
src/aiperf/controller/result_join_coordinator.py
ResultJoinCoordinator tracks per-domain result producer completion across services; computes pending_domains and exposes ready and pending_domains_changed() for shutdown triggering.
Phase gate client (client-side)
src/aiperf/timing/phase/phase_gate.py
PhaseGateClient sends PhaseStartGateCommand before phase and PhaseEndGateCommand after phase with configurable timeout; returns immediately if disabled.
SystemController baseline and result coordination
src/aiperf/controller/system_controller.py
Initializes BaselineCoordinator and ResultJoinCoordinator; registers services by capability; delegates gate commands to baseline coordinator; marks domains complete as results arrive; uses ResultJoinCoordinator.ready and pending_domains_changed() to trigger unified shutdown.
GPU telemetry baseline collection
src/aiperf/gpu_telemetry/manager.py, src/aiperf/gpu_telemetry/pynvml_collector.py
Mixes BaselineCollectorMixin into GPUTelemetryManager; advertises BASELINE_COLLECTOR and telemetry result producer capabilities; implements collect_baseline() to gather metrics only during profiling phase; removes baseline capture during configure; stops collectors on PROFILE_COMPLETE without final scrape.
Server metrics baseline collection
src/aiperf/server_metrics/manager.py
Mixes BaselineCollectorMixin into ServerMetricsManager; advertises BASELINE_COLLECTOR and server_metrics result producer capabilities; implements concurrent collect_baseline() during profiling; removes eager baseline during configure; stops collectors on PROFILE_COMPLETE without scrape.
Records manager as result producer with fallback
src/aiperf/records/records_manager.py
Advertises profile result producer capability; implements _maybe_handle_all_records_received() with timeout-based fallback when CREDITS_COMPLETE is delayed; removes server-metrics flush wait.
Phase runner phase gating
src/aiperf/timing/phase/runner.py
Accepts optional phase_gate: PhaseGateClient; generates per-run phase_id (UUID) and uses phase name; calls before_phase() before lifecycle START and after_phase() after lifecycle END; handles cancellation path with END gate execution.
Timing manager and orchestrator gating integration
src/aiperf/timing/manager.py, src/aiperf/timing/phase_orchestrator.py
Creates PhaseGateClient from Environment.BASELINE settings in TimingManager; passes gate to PhaseOrchestrator; orchestrator forwards to each PhaseRunner.
Reference documentation and environment docs
docs/architecture.md, docs/dev/patterns.md, docs/environment-variables.md, docs/index.yml, docs/reference/phase-baseline-handshake.md, docs/server-metrics/server-metrics.md
Adds architecture and pattern documentation for phase baseline handshake; documents BASELINE and CREDITS_COMPLETE_FALLBACK_TIMEOUT environment variables; removes COLLECTION_FLUSH_PERIOD from docs; adds navigation entry; documents COLLECTION_INTERVAL for server metrics.
Test fixture cleanup: remove server metrics flush period workarounds
tests/component_integration/conftest.py, tests/integration/conftest.py
Removes Environment import and no_server_metrics_flush_period fixture; removes COLLECTION_FLUSH_PERIOD env var override from integration test setup.
Unit tests for enums, messages, and capabilities
tests/unit/common/enums/test_baseline_enums.py, tests/unit/common/messages/test_baseline_messages.py, tests/unit/common/messages/test_register_service_capabilities.py
Tests enum string values, case-insensitive parsing, capability formatting/parsing, message round-trip JSON serialization, gate command metadata, and RegisterServiceCommand.capabilities defaults and round-trip.
Unit tests for BaselineCollectorMixin
tests/unit/common/mixins/test_baseline_collector_mixin.py
Tests mixin advertises BASELINE_COLLECTOR capability; handler invokes collect_baseline() and publishes ack with success/error; error formatting and exception handling.
Unit tests for baseline/result coordinators and SystemController integration
tests/unit/common/test_environment_baseline.py, tests/unit/controller/test_baseline_coordinator.py, tests/unit/controller/test_result_join_coordinator.py, tests/unit/controller/test_system_controller_baseline.py
Tests BASELINE env defaults/overrides; coordinator registration, gating, ack handling, timeout behavior; domain/service lifecycle and pending changes; SystemController baseline registration and shutdown coordination with telemetry/server-metrics disable scenarios.
Unit tests for GPU telemetry and server metrics baseline integration
tests/unit/gpu_telemetry/test_baseline_integration.py, tests/unit/gpu_telemetry/test_telemetry_manager.py, tests/unit/server_metrics/test_baseline_integration.py, tests/unit/server_metrics/test_server_metrics_manager.py
Tests mixin inheritance, capability advertisement, collect_baseline() implementation; verifies no baseline capture during configure; verifies profile completion stops without final scrape; tests scrape failure propagation.
Unit tests for PhaseGateClient
tests/unit/timing/phase/test_phase_gate_client.py
Tests before_phase() sends PhaseStartGateCommand; after_phase() sends PhaseEndGateCommand; disabled client short-circuits without sending.
Unit tests for phase runner gating and ordering
tests/unit/timing/phase/test_runner_gates.py, tests/unit/timing/phase/test_runner_gate_ordering.py
Tests PhaseRunner.__init__ accepts phase_gate kwarg with None default; before_phase fires before publish_phase_start; after_phase fires after publish_phase_complete; disabled gate does not send but publish still occurs.
Unit tests for records manager result producer and fallback
tests/unit/records/test_records_manager.py
Tests profile result producer capability advertisement; credits-complete fallback timing; defensive finalization without server-metrics flush wait.

🎯 4 (Complex) | ⏱️ ~60 minutes

🐰 A baseline caught for every phase,
Handlers gather in their place,
Gate and wait with gentle grace,
Credits flow and shutdown's embrace. 🎯✨

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (9)
tests/unit/common/mixins/test_baseline_collector_mixin.py (1)

65-75: ⚡ Quick win

Strengthen failure-path contract assertions.

This test checks the error payload, but it should also assert the ack envelope (service_id, phase_id, kind) and single publish to lock the handshake contract on failures too.

Suggested test hardening
 `@pytest.mark.asyncio`
 async def test_handler_acks_failure_when_collect_raises() -> None:
     bus = _StubBus()
     svc = _StubCollector(bus, fail=True)
     await svc._on_phase_baseline_request(
         PhaseBaselineRequestMessage(
             phase_id="p1", phase_name="profiling", kind=BaselineKind.END
         )
     )
+    assert len(bus.published) == 1
     ack = bus.published[0]
     assert ack.success is False
+    assert ack.service_id == "svc-stub"
+    assert ack.phase_id == "p1"
+    assert ack.kind == BaselineKind.END
     assert "simulated DCGM failure" in ack.error
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/common/mixins/test_baseline_collector_mixin.py` around lines 65 -
75, Update the test_handler_acks_failure_when_collect_raises to also assert the
ack envelope and ensure exactly one publish: after calling
_StubCollector._on_phase_baseline_request and obtaining ack = bus.published[0],
assert ack.service_id (or the expected service identifier from _StubCollector),
ack.phase_id == "p1", and ack.kind == BaselineKind.END, and additionally assert
len(bus.published) == 1 to guarantee a single publish; keep the existing
assertions that ack.success is False and that "simulated DCGM failure" appears
in ack.error.
tests/unit/common/enums/test_baseline_enums.py (1)

11-40: ⚡ Quick win

Parametrize repeated enum/parser cases.

These are clean tests already, but the repeated single-assert patterns are a good fit for @pytest.mark.parametrize to reduce duplication and improve coverage extensibility.

As per coding guidelines: "Use @pytest.mark.parametrize for data-driven tests".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/common/enums/test_baseline_enums.py` around lines 11 - 40, Several
tests duplicate single-assert patterns; convert them to data-driven
parameterized tests using pytest.mark.parametrize to reduce duplication. Replace
the repeated tests for BaselineKind values and case-insensitive parsing with a
single parameterized test that iterates over (input, expected) tuples and
references BaselineKind and the existing test functions (e.g.,
test_baseline_kind_values_lowercase, test_baseline_kind_case_insensitive). Do
the same for ServiceCapability and the capability helpers by parameterizing
expected string values for ServiceCapability.BASELINE_COLLECTOR and
ServiceCapability.RESULT_PRODUCER, and parameterize
make_result_producer_capability and parse_result_producer_capability cases
(including negative cases that should return None) so each assertion becomes an
iteration in a single param test rather than separate test functions.
src/aiperf/controller/baseline_coordinator.py (1)

82-93: 💤 Low value

Exceptions in gathered futures are silently ignored.

With return_exceptions=True, if a future raises an exception (e.g., from set_exception or internal asyncio errors), it appears in results but the loop only checks for PhaseBaselineAckMessage. These exceptions would be silently dropped.

♻️ Suggested improvement to log unexpected exceptions
         for ack in results:
             if isinstance(ack, PhaseBaselineAckMessage) and not ack.success:
                 _logger.warning(
                     f"Baseline {kind} for phase '{phase_name}' "
                     f"(id={phase_id[:8]}) collector {ack.service_id!r} "
                     f"reported failure: {ack.error}"
                 )
+            elif isinstance(ack, BaseException):
+                _logger.warning(
+                    f"Baseline {kind} for phase '{phase_name}' "
+                    f"(id={phase_id[:8]}) unexpected exception: {ack!r}"
+                )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aiperf/controller/baseline_coordinator.py` around lines 82 - 93, The loop
over results from asyncio.gather in baseline_coordinator.py currently only
handles PhaseBaselineAckMessage and silently drops exceptions because
return_exceptions=True; update the loop that processes results (the variable
results from asyncio.wait_for / asyncio.gather) to detect
non-PhaseBaselineAckMessage items that are Exception instances and log them
(include context such as phase_name, phase_id[:8], kind and the
exception/traceback) so raised exceptions from futures are not ignored; keep the
existing handling for PhaseBaselineAckMessage.success failures and add a branch
to log unexpected exceptions from the gathered futures.
tests/unit/records/test_records_manager.py (1)

347-351: ⚡ Quick win

Add type hints to the new helper function.

Line 347 introduces close_background_task without parameter/return annotations; please type it for consistency with project rules.

Proposed patch
+from collections.abc import Coroutine
+from typing import Any
 ...
-    def close_background_task(coro):
+    def close_background_task(coro: Coroutine[Any, Any, Any]) -> MagicMock:
         coro.close()
         return MagicMock()

As per coding guidelines, "Add type hints on ALL functions (params and return)".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/records/test_records_manager.py` around lines 347 - 351, The
helper close_background_task lacks type annotations; update it to have param and
return types (e.g., annotate coro as Coroutine[Any, Any, Any] and the function
return as MagicMock) and add the needed imports (typing: Any, Coroutine and
unittest.mock: MagicMock) so the signature becomes typed and consistent with
project rules, and keep manager.execute_async =
MagicMock(side_effect=close_background_task) unchanged.
tests/unit/common/test_environment_baseline.py (2)

6-8: ⚡ Quick win

Avoid asserting defaults via the global Environment singleton.

Using Environment at Line 7 can make this test order-dependent if another test initializes env-backed settings earlier with overrides. Prefer constructing a fresh _Environment() here too, so this test always validates defaults deterministically.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/common/test_environment_baseline.py` around lines 6 - 8, The test
test_baseline_defaults currently reads defaults from the global Environment
singleton which can be mutated by other tests; instead instantiate a fresh
_Environment() and assert against its BASELINE values (e.g., create env =
_Environment() and use env.BASELINE.GATE_TIMEOUT_S and
env.BASELINE.GATE_ENABLED) so the test validates deterministic defaults
independent of global state.

11-13: ⚡ Quick win

Add the fixture type hint and keep imports at module scope.

At Line 11, type monkeypatch (e.g., pytest.MonkeyPatch). Also move the _Environment import from Line 12 to the file top for consistency with test style.

As per coding guidelines, "Add type hints on ALL functions (params and return)" and "Put imports at file top, fixtures for setup, one focus per test."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/common/test_environment_baseline.py` around lines 11 - 13, Add a
proper fixture type hint and move imports to module scope: change the test
function signature test_baseline_env_override to accept a typed fixture (e.g.,
monkeypatch: pytest.MonkeyPatch) and keep the return annotation (-> None), and
move the from aiperf.common.environment import _Environment import to the top of
the test module so imports are module-scoped; update any other local imports in
this test file similarly to follow the project style.
tests/unit/controller/test_system_controller_baseline.py (1)

41-42: ⚡ Quick win

Type the _no_op_publish parameter.

At Line 41, add a type for msg to satisfy the repo-wide function typing rule.

As per coding guidelines, "Add type hints on ALL functions (params and return)".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/controller/test_system_controller_baseline.py` around lines 41 -
42, The helper function _no_op_publish is missing a parameter type for msg;
update its signature to include an explicit type (e.g., msg: Any) so it conforms
to the repo rule requiring type hints on all functions, and add the
corresponding import from typing (Any) at the top of the test module if not
already present; keep the existing async and return type (-> None) intact.
tests/unit/gpu_telemetry/test_telemetry_manager.py (1)

1250-1250: ⚡ Quick win

Add return type hints to the new async test functions.

Line 1250 and Line 1263 are missing explicit return annotations (-> None), which this repo requires for all functions.

Suggested patch
-    async def test_collect_baseline_reports_scrape_failures(self):
+    async def test_collect_baseline_reports_scrape_failures(self) -> None:
@@
-    async def test_profile_complete_stops_without_final_scrape(self):
+    async def test_profile_complete_stops_without_final_scrape(self) -> None:

As per coding guidelines: "Add type hints on ALL functions (params and return)" for **/*.py.

Also applies to: 1263-1263

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/gpu_telemetry/test_telemetry_manager.py` at line 1250, Two async
test functions are missing explicit return type hints; add "-> None" to the
async def signature for test_collect_baseline_reports_scrape_failures and the
other async test declared at the nearby location (the function starting at line
1263) so both signatures read "async def <name>(...) -> None:". Update the
function declarations (not body) for these tests to include the return
annotation to satisfy the repository's typing requirement.
tests/unit/server_metrics/test_server_metrics_manager.py (1)

466-466: ⚡ Quick win

Add explicit -> None return annotations on updated async tests.

Line 466, Line 535, and Line 559 define new/renamed test functions without return type hints.

Suggested patch
-    async def test_configure_does_not_capture_baseline(
+    async def test_configure_does_not_capture_baseline(
         self,
         cli_config: CLIConfig,
         cfg_with_endpoint: CLIConfig,
-    ):
+    ) -> None:
@@
-    async def test_profile_complete_stops_without_final_scrape(
+    async def test_profile_complete_stops_without_final_scrape(
         self,
         cli_config: CLIConfig,
         cfg_with_endpoint: CLIConfig,
-    ):
+    ) -> None:
@@
-    async def test_collect_baseline_reports_scrape_failures(
+    async def test_collect_baseline_reports_scrape_failures(
         self,
         cli_config: CLIConfig,
         cfg_with_endpoint: CLIConfig,
-    ):
+    ) -> None:

As per coding guidelines: "Add type hints on ALL functions (params and return)" for **/*.py.

Also applies to: 535-535, 559-559

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/server_metrics/test_server_metrics_manager.py` at line 466, The
async test functions (e.g., async def test_configure_does_not_capture_baseline)
are missing explicit return type annotations; update each updated/renamed async
test definition to include an explicit "-> None" return type (for the functions
at the mentioned locations, and the other two new/renamed tests referenced) so
their signatures read "async def <function_name>(...) -> None:" to satisfy the
project's type-hinting guideline.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/aiperf/common/enums/baseline_enums.py`:
- Around line 33-36: make_result_producer_capability currently allows an empty
domain which yields an invalid tag that parse_result_producer_capability cannot
round-trip; validate the domain in make_result_producer_capability (ensure
domain is a non-empty string) and reject empty input by raising a ValueError (or
similar explicit exception) with a clear message; update the function that
builds the tag (make_result_producer_capability) to check domain before
returning f"{_RESULT_PRODUCER_PREFIX}{domain}" and reference
parse_result_producer_capability in the error message if helpful.

In `@src/aiperf/common/messages/baseline_messages.py`:
- Around line 40-46: Add a Pydantic validation to enforce consistency between
the success and error fields on the baseline ACK model: ensure that when success
is True then error is None/empty, and when success is False then error is a
non-empty string. Locate the model containing the Field definitions for success
and error (the Baseline ACK dataclass/model in this file) and add a validator or
root_validator that raises a ValueError with a clear message when these
conditions are violated so the wire contract remains unambiguous.

In `@src/aiperf/common/mixins/baseline_collector_mixin.py`:
- Around line 27-37: The class BaselineCollectorMixin declares an
`@abstractmethod` (collect_baseline) but doesn't inherit from ABC, so Python won't
enforce implementation in subclasses; update BaselineCollectorMixin to inherit
from abc.ABC (or make it subclass of ABC) so the abstract hook on
collect_baseline is enforced and instantiating subclasses without an override
will raise TypeError, leaving the method name collect_baseline and the mixin
class name unchanged.

In `@tests/unit/common/enums/test_baseline_enums.py`:
- Around line 11-40: Add explicit return type hints "-> None" to every test
function in this file: test_baseline_kind_values_lowercase,
test_baseline_kind_case_insensitive,
test_service_capability_baseline_collector_value,
test_service_capability_result_producer_value,
test_make_result_producer_capability_includes_domain,
test_parse_result_producer_capability_returns_domain, and
test_parse_result_producer_capability_ignores_other_capabilities; update each
def signature to include "-> None" to comply with the repo-wide "Type hints on
ALL functions" rule.

In `@tests/unit/common/messages/test_register_service_capabilities.py`:
- Around line 32-34: The test test_capabilities_accepts_tuple_of_strings
currently asserts string-equality and can pass if values were coerced to enums;
change the assertion on cmd.capabilities (from _make(...)) to verify the
normalized type by asserting it is a tuple of Capability enum members
corresponding to the input strings (i.e., the Capability enum instances for
"baseline_collector"), rather than comparing to the raw string tuple.

---

Nitpick comments:
In `@src/aiperf/controller/baseline_coordinator.py`:
- Around line 82-93: The loop over results from asyncio.gather in
baseline_coordinator.py currently only handles PhaseBaselineAckMessage and
silently drops exceptions because return_exceptions=True; update the loop that
processes results (the variable results from asyncio.wait_for / asyncio.gather)
to detect non-PhaseBaselineAckMessage items that are Exception instances and log
them (include context such as phase_name, phase_id[:8], kind and the
exception/traceback) so raised exceptions from futures are not ignored; keep the
existing handling for PhaseBaselineAckMessage.success failures and add a branch
to log unexpected exceptions from the gathered futures.

In `@tests/unit/common/enums/test_baseline_enums.py`:
- Around line 11-40: Several tests duplicate single-assert patterns; convert
them to data-driven parameterized tests using pytest.mark.parametrize to reduce
duplication. Replace the repeated tests for BaselineKind values and
case-insensitive parsing with a single parameterized test that iterates over
(input, expected) tuples and references BaselineKind and the existing test
functions (e.g., test_baseline_kind_values_lowercase,
test_baseline_kind_case_insensitive). Do the same for ServiceCapability and the
capability helpers by parameterizing expected string values for
ServiceCapability.BASELINE_COLLECTOR and ServiceCapability.RESULT_PRODUCER, and
parameterize make_result_producer_capability and
parse_result_producer_capability cases (including negative cases that should
return None) so each assertion becomes an iteration in a single param test
rather than separate test functions.

In `@tests/unit/common/mixins/test_baseline_collector_mixin.py`:
- Around line 65-75: Update the test_handler_acks_failure_when_collect_raises to
also assert the ack envelope and ensure exactly one publish: after calling
_StubCollector._on_phase_baseline_request and obtaining ack = bus.published[0],
assert ack.service_id (or the expected service identifier from _StubCollector),
ack.phase_id == "p1", and ack.kind == BaselineKind.END, and additionally assert
len(bus.published) == 1 to guarantee a single publish; keep the existing
assertions that ack.success is False and that "simulated DCGM failure" appears
in ack.error.

In `@tests/unit/common/test_environment_baseline.py`:
- Around line 6-8: The test test_baseline_defaults currently reads defaults from
the global Environment singleton which can be mutated by other tests; instead
instantiate a fresh _Environment() and assert against its BASELINE values (e.g.,
create env = _Environment() and use env.BASELINE.GATE_TIMEOUT_S and
env.BASELINE.GATE_ENABLED) so the test validates deterministic defaults
independent of global state.
- Around line 11-13: Add a proper fixture type hint and move imports to module
scope: change the test function signature test_baseline_env_override to accept a
typed fixture (e.g., monkeypatch: pytest.MonkeyPatch) and keep the return
annotation (-> None), and move the from aiperf.common.environment import
_Environment import to the top of the test module so imports are module-scoped;
update any other local imports in this test file similarly to follow the project
style.

In `@tests/unit/controller/test_system_controller_baseline.py`:
- Around line 41-42: The helper function _no_op_publish is missing a parameter
type for msg; update its signature to include an explicit type (e.g., msg: Any)
so it conforms to the repo rule requiring type hints on all functions, and add
the corresponding import from typing (Any) at the top of the test module if not
already present; keep the existing async and return type (-> None) intact.

In `@tests/unit/gpu_telemetry/test_telemetry_manager.py`:
- Line 1250: Two async test functions are missing explicit return type hints;
add "-> None" to the async def signature for
test_collect_baseline_reports_scrape_failures and the other async test declared
at the nearby location (the function starting at line 1263) so both signatures
read "async def <name>(...) -> None:". Update the function declarations (not
body) for these tests to include the return annotation to satisfy the
repository's typing requirement.

In `@tests/unit/records/test_records_manager.py`:
- Around line 347-351: The helper close_background_task lacks type annotations;
update it to have param and return types (e.g., annotate coro as Coroutine[Any,
Any, Any] and the function return as MagicMock) and add the needed imports
(typing: Any, Coroutine and unittest.mock: MagicMock) so the signature becomes
typed and consistent with project rules, and keep manager.execute_async =
MagicMock(side_effect=close_background_task) unchanged.

In `@tests/unit/server_metrics/test_server_metrics_manager.py`:
- Line 466: The async test functions (e.g., async def
test_configure_does_not_capture_baseline) are missing explicit return type
annotations; update each updated/renamed async test definition to include an
explicit "-> None" return type (for the functions at the mentioned locations,
and the other two new/renamed tests referenced) so their signatures read "async
def <function_name>(...) -> None:" to satisfy the project's type-hinting
guideline.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: e5388858-cca6-4bbd-93a1-43cc76a9b315

📥 Commits

Reviewing files that changed from the base of the PR and between 318824a and 6bc1a6d.

📒 Files selected for processing (46)
  • docs/architecture.md
  • docs/dev/patterns.md
  • docs/environment-variables.md
  • docs/index.yml
  • docs/reference/phase-baseline-handshake.md
  • docs/server-metrics/server-metrics.md
  • src/aiperf/common/base_component_service.py
  • src/aiperf/common/enums/__init__.py
  • src/aiperf/common/enums/baseline_enums.py
  • src/aiperf/common/enums/enums.py
  • src/aiperf/common/environment.py
  • src/aiperf/common/messages/__init__.py
  • src/aiperf/common/messages/base_messages.py
  • src/aiperf/common/messages/baseline_messages.py
  • src/aiperf/common/messages/command_messages.py
  • src/aiperf/common/mixins/__init__.py
  • src/aiperf/common/mixins/baseline_collector_mixin.py
  • src/aiperf/controller/baseline_coordinator.py
  • src/aiperf/controller/result_join_coordinator.py
  • src/aiperf/controller/system_controller.py
  • src/aiperf/gpu_telemetry/manager.py
  • src/aiperf/gpu_telemetry/pynvml_collector.py
  • src/aiperf/records/records_manager.py
  • src/aiperf/server_metrics/manager.py
  • src/aiperf/timing/manager.py
  • src/aiperf/timing/phase/phase_gate.py
  • src/aiperf/timing/phase/runner.py
  • src/aiperf/timing/phase_orchestrator.py
  • tests/component_integration/conftest.py
  • tests/integration/conftest.py
  • tests/unit/common/enums/test_baseline_enums.py
  • tests/unit/common/messages/test_baseline_messages.py
  • tests/unit/common/messages/test_register_service_capabilities.py
  • tests/unit/common/mixins/test_baseline_collector_mixin.py
  • tests/unit/common/test_environment_baseline.py
  • tests/unit/controller/test_baseline_coordinator.py
  • tests/unit/controller/test_result_join_coordinator.py
  • tests/unit/controller/test_system_controller_baseline.py
  • tests/unit/gpu_telemetry/test_baseline_integration.py
  • tests/unit/gpu_telemetry/test_telemetry_manager.py
  • tests/unit/records/test_records_manager.py
  • tests/unit/server_metrics/test_baseline_integration.py
  • tests/unit/server_metrics/test_server_metrics_manager.py
  • tests/unit/timing/phase/test_phase_gate_client.py
  • tests/unit/timing/phase/test_runner_gate_ordering.py
  • tests/unit/timing/phase/test_runner_gates.py
💤 Files with no reviewable changes (3)
  • docs/server-metrics/server-metrics.md
  • tests/integration/conftest.py
  • tests/component_integration/conftest.py

Comment on lines +33 to +36
def make_result_producer_capability(domain: str) -> str:
"""Build a result-producer capability tag for a result domain."""

return f"{_RESULT_PRODUCER_PREFIX}{domain}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject empty result-producer domains at construction.

make_result_producer_capability("") currently produces a tag that parse_result_producer_capability(...) cannot round-trip (it returns None). Guarding this early avoids silent invalid capabilities.

Suggested fix
 def make_result_producer_capability(domain: str) -> str:
     """Build a result-producer capability tag for a result domain."""
-
-    return f"{_RESULT_PRODUCER_PREFIX}{domain}"
+    if not domain:
+        raise ValueError("domain must be non-empty")
+    return f"{_RESULT_PRODUCER_PREFIX}{domain}"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aiperf/common/enums/baseline_enums.py` around lines 33 - 36,
make_result_producer_capability currently allows an empty domain which yields an
invalid tag that parse_result_producer_capability cannot round-trip; validate
the domain in make_result_producer_capability (ensure domain is a non-empty
string) and reject empty input by raising a ValueError (or similar explicit
exception) with a clear message; update the function that builds the tag
(make_result_producer_capability) to check domain before returning
f"{_RESULT_PRODUCER_PREFIX}{domain}" and reference
parse_result_producer_capability in the error message if helpful.

Comment on lines +40 to +46
success: bool = Field(
...,
description="False if collect_baseline() raised; coordinator still counts as ack.",
)
error: str | None = Field(
default=None, description="Error string when success=False."
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Enforce success/error consistency on baseline acks.

The model currently allows success=True with a non-null error, and success=False without an error. Enforcing this keeps the wire contract unambiguous.

Suggested fix
-from pydantic import Field
+from pydantic import Field, model_validator
@@
 class PhaseBaselineAckMessage(BaseServiceMessage):
@@
     error: str | None = Field(
         default=None, description="Error string when success=False."
     )
+
+    `@model_validator`(mode="after")
+    def validate_ack_fields(self) -> "PhaseBaselineAckMessage":
+        if self.success and self.error is not None:
+            raise ValueError("error must be None when success=True")
+        if not self.success and not self.error:
+            raise ValueError("error is required when success=False")
+        return self
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aiperf/common/messages/baseline_messages.py` around lines 40 - 46, Add a
Pydantic validation to enforce consistency between the success and error fields
on the baseline ACK model: ensure that when success is True then error is
None/empty, and when success is False then error is a non-empty string. Locate
the model containing the Field definitions for success and error (the Baseline
ACK dataclass/model in this file) and add a validator or root_validator that
raises a ValueError with a clear message when these conditions are violated so
the wire contract remains unambiguous.

Comment on lines +27 to +37
class BaselineCollectorMixin:
"""Mix into a BaseComponentService to participate in the phase baseline handshake."""

extra_capabilities: ClassVar[tuple[str, ...]] = (
ServiceCapability.BASELINE_COLLECTOR,
)

@abstractmethod
async def collect_baseline(
self, kind: BaselineKind, phase_id: str, phase_name: str
) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "1) Inspect BaselineCollectorMixin declaration:"
rg -n --type=py 'class\s+BaselineCollectorMixin' src/aiperf/common/mixins/baseline_collector_mixin.py
rg -n --type=py '`@abstractmethod`|def collect_baseline' src/aiperf/common/mixins/baseline_collector_mixin.py

echo
echo "2) List classes inheriting BaselineCollectorMixin and whether they override collect_baseline:"
python - <<'PY'
import ast
from pathlib import Path

def base_name(node):
    if isinstance(node, ast.Name):
        return node.id
    if isinstance(node, ast.Attribute):
        parts = []
        cur = node
        while isinstance(cur, ast.Attribute):
            parts.append(cur.attr)
            cur = cur.value
        if isinstance(cur, ast.Name):
            parts.append(cur.id)
        return ".".join(reversed(parts))
    return None

for path in Path("src").rglob("*.py"):
    text = path.read_text(encoding="utf-8")
    try:
        tree = ast.parse(text)
    except SyntaxError:
        continue
    for n in [x for x in ast.walk(tree) if isinstance(x, ast.ClassDef)]:
        bases = {base_name(b) for b in n.bases}
        if "BaselineCollectorMixin" in bases or "aiperf.common.mixins.baseline_collector_mixin.BaselineCollectorMixin" in bases:
            has_override = any(
                isinstance(m, (ast.FunctionDef, ast.AsyncFunctionDef)) and m.name == "collect_baseline"
                for m in n.body
            )
            print(f"{path}:{n.lineno} class {n.name} collect_baseline_override={has_override}")
PY

Repository: ai-dynamo/aiperf

Length of output: 487


🏁 Script executed:

head -40 src/aiperf/common/mixins/baseline_collector_mixin.py

Repository: ai-dynamo/aiperf

Length of output: 1507


🏁 Script executed:

rg -n 'class.*BaselineCollectorMixin' --type py -A 10 src/

Repository: ai-dynamo/aiperf

Length of output: 3095


Enforce the abstract hook by inheriting ABC.

Line 34 uses @abstractmethod, but Line 27 does not inherit ABC. This means subclasses can be instantiated without implementing collect_baseline, and baseline requests may silently succeed. While both known subclasses (GPUTelemetryManager and ServerMetricsManager) correctly override the method, Python does not enforce the contract without ABC inheritance.

Proposed fix
-from abc import abstractmethod
+from abc import ABC, abstractmethod
@@
-class BaselineCollectorMixin:
+class BaselineCollectorMixin(ABC):
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aiperf/common/mixins/baseline_collector_mixin.py` around lines 27 - 37,
The class BaselineCollectorMixin declares an `@abstractmethod` (collect_baseline)
but doesn't inherit from ABC, so Python won't enforce implementation in
subclasses; update BaselineCollectorMixin to inherit from abc.ABC (or make it
subclass of ABC) so the abstract hook on collect_baseline is enforced and
instantiating subclasses without an override will raise TypeError, leaving the
method name collect_baseline and the mixin class name unchanged.

Comment on lines +11 to +40
def test_baseline_kind_values_lowercase():
assert BaselineKind.START == "start"
assert BaselineKind.END == "end"


def test_baseline_kind_case_insensitive():
assert BaselineKind("START") == BaselineKind.START
assert BaselineKind("End") == BaselineKind.END


def test_service_capability_baseline_collector_value():
assert ServiceCapability.BASELINE_COLLECTOR == "baseline_collector"


def test_service_capability_result_producer_value():
assert ServiceCapability.RESULT_PRODUCER == "result_producer"


def test_make_result_producer_capability_includes_domain():
assert make_result_producer_capability("profile") == "result_producer:profile"


def test_parse_result_producer_capability_returns_domain():
assert parse_result_producer_capability("result_producer:telemetry") == "telemetry"


def test_parse_result_producer_capability_ignores_other_capabilities():
assert parse_result_producer_capability("baseline_collector") is None
assert parse_result_producer_capability("result_producer") is None
assert parse_result_producer_capability("result_producer:") is None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add return type hints to test functions.

These tests should declare -> None for consistency with the repo-wide typing rule.

Proposed diff
-def test_baseline_kind_values_lowercase():
+def test_baseline_kind_values_lowercase() -> None:
@@
-def test_baseline_kind_case_insensitive():
+def test_baseline_kind_case_insensitive() -> None:
@@
-def test_service_capability_baseline_collector_value():
+def test_service_capability_baseline_collector_value() -> None:
@@
-def test_service_capability_result_producer_value():
+def test_service_capability_result_producer_value() -> None:
@@
-def test_make_result_producer_capability_includes_domain():
+def test_make_result_producer_capability_includes_domain() -> None:
@@
-def test_parse_result_producer_capability_returns_domain():
+def test_parse_result_producer_capability_returns_domain() -> None:
@@
-def test_parse_result_producer_capability_ignores_other_capabilities():
+def test_parse_result_producer_capability_ignores_other_capabilities() -> None:

As per coding guidelines: "Type hints on ALL functions (params and return)".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/common/enums/test_baseline_enums.py` around lines 11 - 40, Add
explicit return type hints "-> None" to every test function in this file:
test_baseline_kind_values_lowercase, test_baseline_kind_case_insensitive,
test_service_capability_baseline_collector_value,
test_service_capability_result_producer_value,
test_make_result_producer_capability_includes_domain,
test_parse_result_producer_capability_returns_domain, and
test_parse_result_producer_capability_ignores_other_capabilities; update each
def signature to include "-> None" to comply with the repo-wide "Type hints on
ALL functions" rule.

Comment on lines +32 to +34
def test_capabilities_accepts_tuple_of_strings() -> None:
cmd = _make(capabilities=("baseline_collector",))
assert cmd.capabilities == ("baseline_collector",)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert normalized capability type, not just string-equivalent value.

This currently passes even if capabilities are coerced to enum instances because string enums compare equal to strings. Consider asserting the normalized enum tuple to lock the contract.

Proposed diff
 def test_capabilities_accepts_tuple_of_strings() -> None:
     cmd = _make(capabilities=("baseline_collector",))
-    assert cmd.capabilities == ("baseline_collector",)
+    assert cmd.capabilities == (ServiceCapability.BASELINE_COLLECTOR,)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/common/messages/test_register_service_capabilities.py` around
lines 32 - 34, The test test_capabilities_accepts_tuple_of_strings currently
asserts string-equality and can pass if values were coerced to enums; change the
assertion on cmd.capabilities (from _make(...)) to verify the normalized type by
asserting it is a tuple of Capability enum members corresponding to the input
strings (i.e., the Capability enum instances for "baseline_collector"), rather
than comparing to the raw string tuple.

self.info(
f"GPU Telemetry: Capturing {kind} baseline for phase '{phase_name}'..."
)
await self._collect_once(label=str(kind))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The START gate can call collect_baseline() before this service's PROFILE_START handler has initialized collectors, because TimingManager starts phases in a background task as soon as it handles PROFILE_START, so this scrape can run on uninitialized GPU collectors. Fix: initialize reachable GPU collectors during configure before advertising telemetry enabled, or delay TimingManager phase execution until all services complete PROFILE_START.

END gate to `_on_return_wait_complete`. All other phases wait synchronously,
cancel the progress task, and fire the END gate inline.
"""
if self._config.seamless and not is_final_phase:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The seamless non-final path returns after sending completes and defers the END gate to a background task, so the next phase can issue credits before the previous phase's END baseline and contaminate the next START baseline with prior-phase in-flight work. Fix: when phase gating is enabled, wait for returns and the END gate before starting the next phase, or disable baseline gates for seamless overlapping phases.

self._should_wait_for_telemetry = message.enabled

if not message.enabled:
self._result_join_coordinator.unregister("telemetry", message.service_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabled telemetry and server-metrics services are removed from ResultJoinCoordinator but remain registered in BaselineCoordinator, so after their delayed shutdown later phase gates wait for the full timeout for services that will never ack. Fix: unregister disabled, stopped, or erroring baseline collectors from BaselineCoordinator and resolve any in-flight wait for that service.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 19, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants