Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ Metrics collection and storage configuration. Controls metrics storage allocatio
| `AIPERF_METRICS_OSL_MISMATCH_PCT_THRESHOLD` | `5.0` | ≥ 0.0, ≤ 100.0 | Percentage difference threshold for flagging discrepancies between requested and actual output sequence length (default: 5%) |
| `AIPERF_METRICS_OSL_MISMATCH_MAX_TOKEN_THRESHOLD` | `50` | ≥ 1 | Maximum absolute token threshold for OSL mismatch. The effective threshold is min(requested_osl * pct_threshold, this value). Makes threshold tighter for large OSL values (default: 50 tokens) |
| `AIPERF_METRICS_TDIGEST_COMPRESSION` | `500` | ≥ 20, ≤ 10000 | t-digest sketch compression for list-valued record metric aggregation. Higher = more centroids, tighter percentile accuracy, larger sketch. Default 500 measured to keep worst-case relative percentile error under 0.05% on 50M-sample workloads (40x under the 0.5% claimed accuracy band) at ~4 KB sketch size. |
| `AIPERF_METRICS_LIST_BACKEND` | `'ragged'` | — | Storage backend for list-valued RECORD metrics (today: only inter_chunk_latency). 'ragged' (default) keeps every value, enabling exact percentiles and ICL-aware throughput / tokens-in-flight sweep curves. 'tdigest' uses a bounded-memory crick.TDigest sketch (~4 KB regardless of sample count) — percentiles are approximate (≤0.05% relative error at default compression), and ICL-aware sweep curves silently fall back to their non-ICL equivalents that use only request-level (start_ns, generation_start_ns, end_ns) timing. Choose tdigest when records-manager pod memory at 1M+ request scale is the binding constraint. |

## MLFLOW

Expand Down
191 changes: 191 additions & 0 deletions src/aiperf/common/accumulator_protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, Protocol, runtime_checkable

import numpy as np
from numpy.typing import NDArray

from aiperf.common.enums.metric_enums import MetricValueTypeVarT

if TYPE_CHECKING:
from aiperf.common.models.error_models import ErrorDetailsCount
from aiperf.common.models.record_models import MetricResult
from aiperf.common.types import MetricTagT
from aiperf.exporters.exporter_config import FileExportInfo
from aiperf.plugin.enums import AccumulatorType


@runtime_checkable
class AccumulatorResult(Protocol):
"""Protocol for typed results from accumulator summarize()."""

def to_json(self) -> Any:
"""Serialize to JSON-compatible structure."""
...

def to_csv(self) -> list[dict[str, Any]]:
"""Serialize to list of CSV-compatible row dicts."""
...


@runtime_checkable
class MetricSeriesProtocol(Protocol[MetricValueTypeVarT]):
"""Shared interface for run-level record metric series consumers.

Implemented by any in-memory accumulator that exposes a running sum, a
record count, and a finalized ``MetricResult`` summary. Used by the
per-tag dispatch path in MetricsAccumulator and by ColumnStore-backed
series wrappers so that derived metrics can read values without caring
about the underlying storage shape (numpy column, ragged CSR, growable
array, etc.).
"""

@property
def sum(self) -> MetricValueTypeVarT:
"""Return the accumulated sum of all observed values."""

def __len__(self) -> int:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding __len__ to the protocol while MetricAggregator is kept as a back-compat alias breaks existing custom aggregators that only implement the old sum/to_result runtime contract. Fix: keep MetricAggregator on the old protocol shape or avoid the stricter protocol for legacy isinstance checks.

"""Return the number of observed values."""

def to_result(self, tag: MetricTagT, header: str, unit: str) -> MetricResult:
"""Summarize the accumulated values as a MetricResult."""


@dataclass(frozen=True, slots=True)
class ExportContext:
"""Context passed to domain-specific export_results() methods.

Bundles the profiling time window and error summary so that export_results
signatures stay stable as new fields are added.
"""

start_ns: int | None = None
"""Inclusive start of the export time window (ns since epoch), or None for unbounded."""

end_ns: int | None = None
"""Exclusive end of the export time window (ns since epoch), or None for unbounded."""

error_summary: list[ErrorDetailsCount] | None = None
"""De-duplicated profile-run error counts to surface in the export, if any."""

cancelled: bool = False
"""True when the profile run was cancelled — exporters may emit partial artifacts."""


@dataclass(slots=True)
class SummaryContext:
"""Typed cross-accumulator communication context for dependency-ordered summarization.

NOT a Pydantic model — this is never serialized over the wire. It is created
by RecordsManager._process_results() and passed through the topological-sort
pipeline so each accumulator can read outputs from its declared dependencies.
"""

accumulators: dict[AccumulatorType, Any] = field(default_factory=dict)
"""Live accumulator instances keyed by AccumulatorType — analyzers use this to query peer state."""

accumulator_outputs: dict[str, Any] = field(default_factory=dict)
"""Already-computed summary payloads keyed by accumulator name — populated as topo-order completes."""

start_ns: int = 0
"""Inclusive start of the summarization window (ns since epoch); 0 means full range."""

end_ns: int = 0
"""Exclusive end of the summarization window (ns since epoch); 0 means full range."""

cancelled: bool = False
"""True when the profile run was cancelled — analyzers may short-circuit."""

def get_accumulator(self, accumulator_type: AccumulatorType) -> Any | None:
"""Look up an accumulator by its type. Returns None if not present."""
return self.accumulators.get(accumulator_type)

def get_output(self, accumulator_type: str) -> Any | None:
"""Look up a previously-computed accumulator output. Returns None if not yet available."""
return self.accumulator_outputs.get(accumulator_type)


@runtime_checkable
class AccumulatorProtocol(Protocol):
"""Protocol for accumulators that ingest records, support time-range queries, and produce summaries.

Accumulators are the primary data stores in the records pipeline. Each accumulator
owns exactly one record type and is fully self-contained — no cross-accumulator
dependencies. Derived computations belong on AnalyzerProtocol instead.
"""

async def process_record(self, record: Any) -> None:
"""Ingest a single record into this accumulator's internal storage."""
...

def query_time_range(self, start_ns: int, end_ns: int) -> NDArray[np.bool_]:
"""Return a boolean mask where True marks records in [start_ns, end_ns).

The mask length equals the accumulator's record count. Callers can use
``mask.sum()`` for the count or ``np.where(mask)[0]`` for indices.
"""
...

async def summarize(self, ctx: SummaryContext | None = None) -> AccumulatorResult:
"""Compute and return aggregated metric results.

Args:
ctx: Optional SummaryContext for reading dependency outputs.
None when called for realtime metrics (no cross-processor deps).
"""
...

async def export_results(self, ctx: ExportContext) -> Any:
"""Export final results for this accumulator.

Called once after profiling completes. Each accumulator returns its own
typed result (AccumulatorMetricsSummary, TelemetryExportData, ServerMetricsResults)
which is consumed by typed fields on the unified results message.

Args:
ctx: ExportContext with profiling time window, error summary, and cancelled flag.
"""
...


@runtime_checkable
class AnalyzerProtocol(Protocol):
"""Protocol for processors that don't ingest records directly but derive results
from other accumulators at summarization time.

Analyzers declare which accumulators they need via required_accumulators
and which outputs they depend on via summary_dependencies. They receive
accumulator references at construction and a SummaryContext at summarize time.
"""

required_accumulators: ClassVar[set[AccumulatorType]]
summary_dependencies: ClassVar[list[AccumulatorType]]

async def summarize(self, ctx: SummaryContext) -> Any:
"""Compute derived results using data from declared accumulator dependencies."""
...


@runtime_checkable
class StreamExporterProtocol(Protocol):
"""Protocol for processors that stream each record to an external sink (e.g. JSONL files).

Stream exporters have no summarization dependencies and are flushed after
all accumulators complete.
"""

async def process_record(self, record: Any) -> None:
"""Write a single record to the export sink."""
...

async def finalize(self) -> None:
"""Flush any buffered data. Called once after all records are processed."""
...

def get_export_info(self) -> FileExportInfo:
"""Return metadata about the file this exporter writes to."""
...
4 changes: 4 additions & 0 deletions src/aiperf/common/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ class _MetricsSettings(BaseSettings):
default=500,
description="t-digest sketch compression for list-valued record metric aggregation. Higher = more centroids, tighter percentile accuracy, larger sketch. Default 500 measured to keep worst-case relative percentile error under 0.05% on 50M-sample workloads (40x under the 0.5% claimed accuracy band) at ~4 KB sketch size.",
)
LIST_BACKEND: Literal["ragged", "tdigest"] = Field(
default="ragged",
description="Storage backend for list-valued RECORD metrics (today: only inter_chunk_latency). 'ragged' (default) keeps every value, enabling exact percentiles and ICL-aware throughput / tokens-in-flight sweep curves. 'tdigest' uses a bounded-memory crick.TDigest sketch (~4 KB regardless of sample count) — percentiles are approximate (≤0.05% relative error at default compression), and ICL-aware sweep curves silently fall back to their non-ICL equivalents that use only request-level (start_ns, generation_start_ns, end_ns) timing. Choose tdigest when records-manager pod memory at 1M+ request scale is the binding constraint.",
)


class _OTelSettings(BaseSettings):
Expand Down
2 changes: 2 additions & 0 deletions src/aiperf/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
MetricDictValueTypeVarT,
MetricRecordDict,
MetricResultsDict,
MetricSeriesProtocol,
)
from aiperf.metrics.metric_registry import MetricRegistry

Expand All @@ -30,5 +31,6 @@
"MetricRecordDict",
"MetricRegistry",
"MetricResultsDict",
"MetricSeriesProtocol",
"RecordMetricT",
]
74 changes: 74 additions & 0 deletions src/aiperf/metrics/_column_store_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Per-tag setter closure factories for ``ColumnStore.ingest``.

These closures are resolved on first sighting of each metric tag (via Python
type dispatch) and cached in ``ColumnStore._tag_handlers``. Subsequent records
skip the isinstance ladder and the ``_ensure_*_column`` lookups entirely.

Profiling at 50k records (24 numeric tags + ICL) showed this hoist drops
``ColumnStore.ingest`` wall by ~30% and total ingest function calls by 40%.
The handlers are invalidated by ``_grow()`` because numeric arrays get
reallocated; closures captured the old array references and would write to
garbage. List backends and string lists are unaffected (in-place growth) but
clearing all handlers on grow is simpler and grow runs ~log2(N) times.
"""

from __future__ import annotations

from collections.abc import Callable
from typing import Any

import numpy as np
from numpy.typing import NDArray

from aiperf.metrics.list_metric_aggregation import TDigestListMetricAggregator
from aiperf.metrics.ragged_series import RaggedSeries


def make_numeric_handler(
col: NDArray[np.float64],
tag: str,
sums: dict[str, float],
counts: dict[str, int],
) -> Callable[[int, Any], None]:
"""Closure that writes a numeric metric value at ``idx`` and updates the
O(1) running sum/count side-channel.

The ``float()`` cast is intentionally absent: numpy's ``__setitem__``
coerces Python ``int`` to ``float64`` automatically, and ``+=`` on the
sum dict promotes the int operand the same way. Saves a Python-level
function call per numeric metric per record (~5-8% on the scalar path).
"""

def handler(idx: int, value: Any) -> None:
col[idx] = value
sums[tag] = sums[tag] + value
counts[tag] = counts[tag] + 1
Comment on lines +44 to +47
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard numeric ingest against non-finite values.

This path updates running numeric aggregates directly; accepting NaN/Inf will contaminate sums and derived stats.

Proposed fix
+from aiperf.common.finite import is_finite_value
...
     def handler(idx: int, value: Any) -> None:
+        if not is_finite_value(value):
+            return
         col[idx] = value
         sums[tag] = sums[tag] + value
         counts[tag] = counts[tag] + 1

As per coding guidelines: “Numeric metric values crossing a serialization boundary or feeding a numerical algorithm must be finite or explicitly None - use aiperf.common.finite utilities”.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/aiperf/metrics/_column_store_handlers.py` around lines 44 - 47, The
numeric handler function (handler) currently assigns value into col and updates
sums[tag] and counts[tag] without validating numeric finiteness; guard against
NaN/Inf by checking the value with the aiperf.common.finite utilities (e.g.,
is_finite or similar) before mutating col/sums/counts, and if the value is not
finite treat it as None/skip updating sums and counts (but still set col[idx]
appropriately if required by your data model); update the logic around col[idx]
= value, sums[tag] = sums[tag] + value, and counts[tag] = counts[tag] + 1 to
perform the finite check and only modify sums/counts when the check passes.


return handler


def make_string_handler(
col: list[str | None],
) -> Callable[[int, Any], None]:
"""Closure that writes a string metric value at ``idx``. The list reference
survives capacity growth (``list.extend`` is in-place)."""

def handler(idx: int, value: Any) -> None:
col[idx] = value

return handler


def make_list_handler(
backend: RaggedSeries | TDigestListMetricAggregator,
) -> Callable[[int, Any], None]:
"""Closure that hands a list-valued metric to the configured list backend.
The backend reference is stable across ``ColumnStore._grow`` (list backends
own their own growth)."""

def handler(idx: int, value: Any) -> None:
backend.add_for_record(idx, value)

return handler
Loading
Loading