feat(metrics): column-store + ragged-series + accumulator protocol foundation#969
feat(metrics): column-store + ragged-series + accumulator protocol foundation#969ajcasagrande wants to merge 2 commits into
Conversation
This is PR-6 of a multi-PR breakdown of the metrics-accumulator
foundation work from ajc/inferencex-agentx-mvp. The goal is to land
the storage / protocol layer that the upcoming MetricsAccumulator
(PR-7) plugs into, without dragging the accumulator itself in yet.
Net-new files (copied verbatim from source branch):
- src/aiperf/common/accumulator_protocols.py (191 lines)
MetricSeriesProtocol, AccumulatorProtocol, AnalyzerProtocol,
StreamExporterProtocol, AccumulatorResult, ExportContext,
SummaryContext. Only runtime import is MetricValueTypeVarT from
aiperf.common.enums.metric_enums; everything else is gated behind
TYPE_CHECKING so plugin-load doesn't trip a circular import.
- src/aiperf/metrics/ragged_series.py (107 lines)
CSR-style storage for list-valued per-record metrics
(inter_chunk_latency today). Exposes add_for_record(idx, values),
grouped_cumsum(), get_values_for_mask(); declares
SUPPORTS_PER_RECORD_REPLAY = True so the future sweep helpers can
gate ICL-aware curves on backend capability.
- src/aiperf/metrics/column_store.py (503 lines)
Session-indexed NaN-sparse columnar storage. Holds numeric,
string, list, and 4 flavors of metadata columns (numeric, string,
bool with uint8 sentinel, categorical with int32-intern). Per-tag
setter closures cache the type-dispatch first sighting and skip
the isinstance ladder on subsequent records. Backend class is
picked from Environment.METRICS.LIST_BACKEND ("ragged" default,
"tdigest" for bounded memory).
- src/aiperf/metrics/_column_store_handlers.py (74 lines)
Closure factories for ColumnStore.ingest dispatch.
Rewires (minimal, protocol-conformance only):
- src/aiperf/metrics/list_metric_aggregation.py
Subsumes the existing TDigestListMetricAggregator (#865) under
MetricSeriesProtocol: adds __len__, add_for_record(idx, values)
record-keyed alias (idx ignored — t-digest is a global sketch),
SUPPORTS_PER_RECORD_REPLAY = False flag, __all__, and updates
docstring references from MetricAggregator -> MetricSeriesProtocol.
- src/aiperf/metrics/metric_dicts.py
Replaces inline Protocol definition with an import of
MetricSeriesProtocol from accumulator_protocols and a back-compat
alias (MetricAggregator = MetricSeriesProtocol). Adds __all__ for
explicit re-export. Existing callsites (derived_sum_metric.py,
test_list_metric_aggregation.py) keep working through the alias.
- src/aiperf/metrics/__init__.py
Re-exports MetricSeriesProtocol.
- src/aiperf/common/environment.py
Adds METRICS.LIST_BACKEND: Literal["ragged", "tdigest"] field
(default "ragged"), consumed by ColumnStore._resolve_list_backend_class().
docs/environment-variables.md auto-regenerated.
Test files ported alongside the new modules:
- tests/unit/common/test_accumulator_protocols.py (252 lines)
- tests/unit/metrics/test_column_store.py (440 lines)
- tests/unit/metrics/test_ragged_series.py (125 lines)
All three exercise only the protocol / column-store / ragged-series
surface — no MetricsAccumulator dependencies.
Deliberately deferred to downstream PRs:
- metric_result_from_array() and observation_duration() / window_*
additions on metric_dicts.py — the source branch adds these to
feed the accumulator's vectorized result builder, but they have
no on-main callers. Skipped to keep this PR scoped to the
protocol + storage surface.
- metrics/__init__.py re-export of metric_result_from_array (same
reason).
- accumulator.py / accumulator_models.py / accumulator_sweeps.py /
derived_latency.py / base_aggregate_metric.py aggregation_kind /
base_metric.py console_group / base_usage_record_metric.py — all
pulled in by PR-7.
- AggregationKind and EFFECTIVE/ACTIVE MetricConsoleGroup additions
on metric_enums.py — PR-7 (only used by accumulator).
Preserves #881 schema 1.1: not touching metrics_json_exporter.py or
export_models.py in this PR.
Preserves #865 t-digest behavior: TDigestListMetricAggregator now
slots in as a ColumnStore list backend through add_for_record and
still passes its existing test suite unchanged.
Verification:
- uv run pytest tests/unit/ -n auto -> 12741 passed, 79 skipped
- pre-commit run on all staged files -> all hooks pass
- tools/check_ergonomics.py baseline regenerated to admit
column_store.py (503 lines, just over the 500 cap). The 503-line
figure is a clean storage surface; splitting it would fragment
cohesive column-type concerns.
No surprises: the warned-about circular import via
aiperf.common.enums never tripped — accumulator_protocols only
runtime-imports MetricValueTypeVarT from metric_enums (which is a
leaf module relative to plugin load), and ColumnStore's
Environment.METRICS.LIST_BACKEND lookup is already deferred inside a
factory function for monkey-patch friendliness.
Signed-off-by: Anthony Casagrande <acasagrande@nvidia.com>
Try out this PRQuick install: pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@ea01d70ddd2aafc48c3aaf37b677076d2b60fc70Recommended with virtual environment (using uv): uv venv --python 3.12 && source .venv/bin/activate
uv pip install --upgrade --force-reinstall git+https://github.com/ai-dynamo/aiperf.git@ea01d70ddd2aafc48c3aaf37b677076d2b60fc70Last updated for commit: |
WalkthroughThis PR introduces a columnar storage system for per-record metrics in aiperf. It defines protocol contracts for the metrics pipeline, implements pluggable list-value backends (ragged arrays and t-digest sketches), creates ColumnStore for structured metric ingestion with metadata tracking, and integrates the new MetricSeriesProtocol across the codebase. ChangesMetrics Storage Pipeline
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (1)
tests/unit/common/test_accumulator_protocols.py (1)
128-130: ⚡ Quick winAlign test names with the repository convention.
Several names (for example
test_to_json,test_to_csv,test_time_range) do not follow the requiredtest_<function>_<scenario>_<expected>format.As per coding guidelines:
tests/**/*.py: Test naming convention:test_<function>_<scenario>_<expected>.Also applies to: 155-252
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/common/test_accumulator_protocols.py` around lines 128 - 130, Rename test functions to follow the repository convention test_<function>_<scenario>_<expected>; specifically update test_protocol_isinstance_check and other tests in this file (examples: test_to_json, test_to_csv, test_time_range and tests in the 155-252 range) to descriptive names that include the function under test, the scenario, and the expected outcome (e.g., test_protocol_isinstance_with_matching_protocol_returns_true); ensure any parametrization or references (fixtures, marks) use the new names.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/aiperf/metrics/_column_store_handlers.py`:
- Around line 44-47: The numeric handler function (handler) currently assigns
value into col and updates sums[tag] and counts[tag] without validating numeric
finiteness; guard against NaN/Inf by checking the value with the
aiperf.common.finite utilities (e.g., is_finite or similar) before mutating
col/sums/counts, and if the value is not finite treat it as None/skip updating
sums and counts (but still set col[idx] appropriately if required by your data
model); update the logic around col[idx] = value, sums[tag] = sums[tag] + value,
and counts[tag] = counts[tag] + 1 to perform the finite check and only modify
sums/counts when the check passes.
In `@src/aiperf/metrics/column_store.py`:
- Around line 282-307: The ingest method currently allows negative idx values
which trigger Python negative indexing and can corrupt data; add a defensive
check at the top of ingest (before _grow) to raise a ValueError if idx < 0, e.g.
"if idx < 0: raise ValueError(...)", and apply the same guard to the other write
API in this file (the analogous method around lines 339-363) so both methods
validate idx, avoiding silent negative-index writes; update/add tests to assert
that negative idx raises.
- Around line 86-93: The constructor ( __init__ ) currently allows
initial_capacity == 0 which makes _grow() stuck because 0 * 2 == 0; validate
initial_capacity > 0 at start of __init__ (raise ValueError or coerce to 1) and
ensure _capacity is set to at least 1 so subsequent _grow() doubles progress;
apply the same validation/fix to the other initialization block in this module
that also assigns _capacity (the other __init__/constructor around the similar
capacity setup).
In `@src/aiperf/metrics/ragged_series.py`:
- Around line 30-32: The constructor currently allows initial_capacity or
offsets_capacity to be 0 which causes _grow_offsets() (and the similar grow
routine around lines 99-104) to loop forever because new_cap stays 0; fix by
validating inputs in __init__ (e.g., raise ValueError if initial_capacity <= 0
or offsets_capacity <= 0) and harden the grow logic in _grow_offsets() and the
other grow method to ensure growth always increases capacity (compute new_cap =
max(1, old_cap * 2) or if old_cap == 0 set new_cap = 1) so a zero capacity
cannot produce an infinite loop.
- Around line 58-66: The extend method currently allows negative idx which
causes negative-index writes into _offsets; add an explicit check at the top of
extend (in the extend(self, idx: int, values: list[float]) method) to raise a
ValueError if idx < 0 before calling _grow_offsets or writing to self._offsets,
so negative indices are rejected and no silent corruption of _offsets/_values
can occur.
In `@tests/unit/common/test_accumulator_protocols.py`:
- Around line 63-64: Replace string-typed keys with AccumulatorType instances in
the protocol contract tests: update required_accumulators, summary_dependencies,
and any usages asserting SummaryContext.accumulators to use values
typed/constructed as AccumulatorType rather than plain str so tests validate the
actual contract; search for occurrences of required_accumulators and
summary_dependencies in the test file (including lines ~201-203) and change
their elements to AccumulatorType values consistent with the production
enum/type, and adjust any assertions that compare keys to expect AccumulatorType
instances.
---
Nitpick comments:
In `@tests/unit/common/test_accumulator_protocols.py`:
- Around line 128-130: Rename test functions to follow the repository convention
test_<function>_<scenario>_<expected>; specifically update
test_protocol_isinstance_check and other tests in this file (examples:
test_to_json, test_to_csv, test_time_range and tests in the 155-252 range) to
descriptive names that include the function under test, the scenario, and the
expected outcome (e.g.,
test_protocol_isinstance_with_matching_protocol_returns_true); ensure any
parametrization or references (fixtures, marks) use the new names.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 92a02970-ded5-4d52-a593-cc196d241536
📒 Files selected for processing (13)
docs/environment-variables.mdsrc/aiperf/common/accumulator_protocols.pysrc/aiperf/common/environment.pysrc/aiperf/metrics/__init__.pysrc/aiperf/metrics/_column_store_handlers.pysrc/aiperf/metrics/column_store.pysrc/aiperf/metrics/list_metric_aggregation.pysrc/aiperf/metrics/metric_dicts.pysrc/aiperf/metrics/ragged_series.pytests/unit/common/test_accumulator_protocols.pytests/unit/metrics/test_column_store.pytests/unit/metrics/test_ragged_series.pytools/ergonomics_baseline.json
| def handler(idx: int, value: Any) -> None: | ||
| col[idx] = value | ||
| sums[tag] = sums[tag] + value | ||
| counts[tag] = counts[tag] + 1 |
There was a problem hiding this comment.
Guard numeric ingest against non-finite values.
This path updates running numeric aggregates directly; accepting NaN/Inf will contaminate sums and derived stats.
Proposed fix
+from aiperf.common.finite import is_finite_value
...
def handler(idx: int, value: Any) -> None:
+ if not is_finite_value(value):
+ return
col[idx] = value
sums[tag] = sums[tag] + value
counts[tag] = counts[tag] + 1As per coding guidelines: “Numeric metric values crossing a serialization boundary or feeding a numerical algorithm must be finite or explicitly None - use aiperf.common.finite utilities”.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/aiperf/metrics/_column_store_handlers.py` around lines 44 - 47, The
numeric handler function (handler) currently assigns value into col and updates
sums[tag] and counts[tag] without validating numeric finiteness; guard against
NaN/Inf by checking the value with the aiperf.common.finite utilities (e.g.,
is_finite or similar) before mutating col/sums/counts, and if the value is not
finite treat it as None/skip updating sums and counts (but still set col[idx]
appropriately if required by your data model); update the logic around col[idx]
= value, sums[tag] = sums[tag] + value, and counts[tag] = counts[tag] + 1 to
perform the finite check and only modify sums/counts when the check passes.
| def __init__( | ||
| self, | ||
| initial_capacity: int = 1024, | ||
| *, | ||
| list_backend_cls: type[ListMetricBackendT] | None = None, | ||
| ) -> None: | ||
| self._capacity = initial_capacity | ||
| self._count = 0 |
There was a problem hiding this comment.
Require initial_capacity > 0 to avoid non-terminating growth.
With initial_capacity == 0, _grow() never progresses (0 * 2 == 0), causing an infinite loop.
Proposed fix
def __init__(
self,
initial_capacity: int = 1024,
*,
list_backend_cls: type[ListMetricBackendT] | None = None,
) -> None:
+ if initial_capacity <= 0:
+ raise ValueError("initial_capacity must be > 0")
self._capacity = initial_capacityAlso applies to: 393-402
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/aiperf/metrics/column_store.py` around lines 86 - 93, The constructor (
__init__ ) currently allows initial_capacity == 0 which makes _grow() stuck
because 0 * 2 == 0; validate initial_capacity > 0 at start of __init__ (raise
ValueError or coerce to 1) and ensure _capacity is set to at least 1 so
subsequent _grow() doubles progress; apply the same validation/fix to the other
initialization block in this module that also assigns _capacity (the other
__init__/constructor around the similar capacity setup).
| def ingest( | ||
| self, | ||
| idx: int, | ||
| *, | ||
| record_metrics: dict[str, Any], | ||
| start_ns: float, | ||
| end_ns: float, | ||
| generation_start_ns: float | None, | ||
| ) -> None: | ||
| """Write a record's data to slot `idx` (= session_num). | ||
|
|
||
| Grows capacity if idx >= _capacity. Dispatches metric values via cached | ||
| per-tag setter closures — the isinstance ladder and ``_ensure_*_column`` | ||
| lookups run only on the first record per tag. Profiling at 50k records | ||
| shows this hoists ~30% of ingest wall time vs the per-record dispatch. | ||
| """ | ||
| if idx >= self._capacity: | ||
| self._grow(idx) | ||
|
|
||
| if idx >= self._count: | ||
| self._count = idx + 1 | ||
|
|
||
| self.start_ns[idx] = start_ns | ||
| self.end_ns[idx] = end_ns | ||
| if generation_start_ns is not None: | ||
| self.generation_start_ns[idx] = generation_start_ns |
There was a problem hiding this comment.
Reject negative record indices in write APIs.
idx < 0 currently writes via Python negative indexing, which can silently overwrite the last slots and corrupt stored records/metadata.
Proposed fix
def ingest(
self,
idx: int,
@@
) -> None:
+ if idx < 0:
+ raise ValueError("idx must be >= 0")
if idx >= self._capacity:
self._grow(idx)
@@
def ingest_metadata(
self,
idx: int,
@@
) -> None:
+ if idx < 0:
+ raise ValueError("idx must be >= 0")
if idx >= self._capacity:
self._grow(idx)Also applies to: 339-363
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/aiperf/metrics/column_store.py` around lines 282 - 307, The ingest method
currently allows negative idx values which trigger Python negative indexing and
can corrupt data; add a defensive check at the top of ingest (before _grow) to
raise a ValueError if idx < 0, e.g. "if idx < 0: raise ValueError(...)", and
apply the same guard to the other write API in this file (the analogous method
around lines 339-363) so both methods validate idx, avoiding silent
negative-index writes; update/add tests to assert that negative idx raises.
| def __init__( | ||
| self, initial_capacity: int = 1024, offsets_capacity: int = 256 | ||
| ) -> None: |
There was a problem hiding this comment.
Validate capacities to prevent infinite growth loops.
initial_capacity/offsets_capacity can be 0, which makes _grow_offsets() loop forever (new_cap *= 2 stays 0).
Proposed fix
def __init__(
self, initial_capacity: int = 1024, offsets_capacity: int = 256
) -> None:
+ if initial_capacity <= 0:
+ raise ValueError("initial_capacity must be > 0")
+ if offsets_capacity <= 0:
+ raise ValueError("offsets_capacity must be > 0")
self._values = GrowableArray(
initial_capacity=initial_capacity, dtype=np.float64
)Also applies to: 99-104
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/aiperf/metrics/ragged_series.py` around lines 30 - 32, The constructor
currently allows initial_capacity or offsets_capacity to be 0 which causes
_grow_offsets() (and the similar grow routine around lines 99-104) to loop
forever because new_cap stays 0; fix by validating inputs in __init__ (e.g.,
raise ValueError if initial_capacity <= 0 or offsets_capacity <= 0) and harden
the grow logic in _grow_offsets() and the other grow method to ensure growth
always increases capacity (compute new_cap = max(1, old_cap * 2) or if old_cap
== 0 set new_cap = 1) so a zero capacity cannot produce an infinite loop.
| def extend(self, idx: int, values: list[float]) -> None: | ||
| """Append values for session_num ``idx``.""" | ||
| n = len(values) | ||
| if n == 0: | ||
| return | ||
| if idx >= self._offsets_capacity: | ||
| self._grow_offsets(idx) | ||
| self._offsets[idx] = len(self._values) | ||
| val_arr = np.asarray(values, dtype=np.float64) |
There was a problem hiding this comment.
Reject negative record indices.
idx < 0 currently writes through negative indexing (_offsets[-1]), which can silently corrupt data layout.
Proposed fix
def extend(self, idx: int, values: list[float]) -> None:
"""Append values for session_num ``idx``."""
+ if idx < 0:
+ raise ValueError("idx must be >= 0")
n = len(values)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/aiperf/metrics/ragged_series.py` around lines 58 - 66, The extend method
currently allows negative idx which causes negative-index writes into _offsets;
add an explicit check at the top of extend (in the extend(self, idx: int,
values: list[float]) method) to raise a ValueError if idx < 0 before calling
_grow_offsets or writing to self._offsets, so negative indices are rejected and
no silent corruption of _offsets/_values can occur.
| required_accumulators: ClassVar[set[str]] = set() | ||
| summary_dependencies: ClassVar[list[str]] = [] |
There was a problem hiding this comment.
Use AccumulatorType-typed keys in protocol contract tests.
These tests currently exercise SummaryContext.accumulators and analyzer dependency fields with str values, which weakens coverage of the declared AccumulatorType contract and can miss contract regressions.
Also applies to: 201-203
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/common/test_accumulator_protocols.py` around lines 63 - 64,
Replace string-typed keys with AccumulatorType instances in the protocol
contract tests: update required_accumulators, summary_dependencies, and any
usages asserting SummaryContext.accumulators to use values typed/constructed as
AccumulatorType rather than plain str so tests validate the actual contract;
search for occurrences of required_accumulators and summary_dependencies in the
test file (including lines ~201-203) and change their elements to
AccumulatorType values consistent with the production enum/type, and adjust any
assertions that compare keys to expect AccumulatorType instances.
| def sum(self) -> MetricValueTypeVarT: | ||
| """Return the accumulated sum of all observed values.""" | ||
|
|
||
| def __len__(self) -> int: |
There was a problem hiding this comment.
Adding __len__ to the protocol while MetricAggregator is kept as a back-compat alias breaks existing custom aggregators that only implement the old sum/to_result runtime contract. Fix: keep MetricAggregator on the old protocol shape or avoid the stricter protocol for legacy isinstance checks.
| return np.zeros(0, dtype=np.bool_) | ||
| rec_start = self.start_ns[: self._count] | ||
| rec_end = self.end_ns[: self._count] | ||
| return (rec_start <= end_ns) & (rec_end >= start_ns) |
There was a problem hiding this comment.
query_time_range violates the new protocol's [start_ns, end_ns) contract by using inclusive overlap semantics, which can double-count records across adjacent windows. Fix: align the mask with half-open membership semantics or update the protocol and all callers to explicitly require overlap semantics.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Summary
Foundational layer for a column-store-backed metrics accumulator pipeline. This PR ships only the protocol surface and the two list backends — no accumulator orchestration, no derived-latency math, no exporter rewires. Those land in follow-up PRs.
What it does
MetricSeriesProtocol(formerlyMetricAggregator) as the runtime-checkable contract that every per-metric series backend must satisfy. The protocol moves fromaiperf.metrics.metric_dictstoaiperf.common.accumulator_protocols; the old name remains as a back-compat alias.ColumnStore— a per-record-indexed store that holds one backend instance per metric and dispatches uniformly to either backend viaadd_for_record(idx, values).RaggedSeries(new) — list-of-lists storage retaining every per-record observation; supports per-record replay.TDigestListMetricAggregator(already onmainfrom feat(metrics): t-digest aggregator for InterChunkLatencyMetric #865) — slotted in as a column-store backend after gainingadd_for_recordandSUPPORTS_PER_RECORD_REPLAY = Falseto satisfy the protocol.AIPERF_METRICS_LIST_BACKEND={ragged,tdigest}(Environment.METRICS.LIST_BACKEND, defaultragged).What's deliberately deferred (later PRs)
MetricsAccumulatororchestration classaccumulator_models.py,accumulator_sweeps.py,derived_latency.py,base_usage_record_metric.pybase_aggregate_metric.pyaggregation_kind,base_metric.pyconsole_group + classmethod conversionsmetric_result_from_array(),observation_duration(),window_start_ns,window_end_nsinmetric_dicts.py— these have no main-side callers and only get used by deferred modules; included in PR-7Preserved invariants (verified)
exporters/metrics_json_exporter.pyandcommon/models/export_models.pyare untouched (zero diff). `tests/unit/exporters/test_metrics_json_exporter.py` passes unchanged.Files
Total: 13 files, +1747 / -20.
Notes
Test plan
What downstream PRs need from this one
Once this lands, the follow-up `MetricsAccumulator` PR can:
Summary by CodeRabbit
New Features
Documentation