From f80848ac0759cff81b3dc6fcd3d80d2e85289118 Mon Sep 17 00:00:00 2001 From: Ivan Zaitsev Date: Wed, 17 Sep 2025 20:05:34 -0700 Subject: [PATCH 1/2] [autorevert] Add hud-like HTML grid rendering for the logged state and `autorevert-checker` subcommand --- aws/lambda/pytorch-auto-revert/README.md | 15 +- .../pytorch_auto_revert/__main__.py | 51 +- .../pytorch_auto_revert/hud_renderer.py | 560 +++++++++++------- .../pytorch_auto_revert/run_state_logger.py | 65 +- .../signal_extraction_datasource.py | 28 +- .../testers/autorevert_v2.py | 14 +- .../pytorch_auto_revert/testers/hud.py | 140 ++--- 7 files changed, 517 insertions(+), 356 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/README.md b/aws/lambda/pytorch-auto-revert/README.md index e1de8f77be..a68fb14014 100644 --- a/aws/lambda/pytorch-auto-revert/README.md +++ b/aws/lambda/pytorch-auto-revert/README.md @@ -89,4 +89,17 @@ The tool also supports: Run with `--help` for more information: ```bash python -m pytorch_auto_revert --help -``` \ No newline at end of file +``` + +## Rendering HUD HTML + +- Add `--hud-html` (optionally with a filepath) to `autorevert-checker` to dump the + run state as a HUD-style HTML grid alongside the regular ClickHouse logging: + ```bash + python -m pytorch_auto_revert autorevert-checker pull trunk --hud-html results.html + ``` + +- Render historical runs from ClickHouse by timestamp with the `hud` subcommand: + ```bash + python -m pytorch_auto_revert hud "2025-09-17 20:29:15" --repo-full-name pytorch/pytorch --hud-html hud.html + ``` diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py index a3d6bf0c3f..30a89acf6d 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py @@ -10,7 +10,7 @@ from .clickhouse_client_helper import CHCliFactory from .github_client_helper import GHClientFactory from .testers.autorevert_v2 import autorevert_v2 -from .testers.hud import run_hud +from .testers.hud import render_hud_html_from_clickhouse from .testers.restart_checker import workflow_restart_checker from .utils import RestartAction, RevertAction @@ -113,6 +113,15 @@ def get_opts() -> argparse.Namespace: "Revert mode: skip, log (no side effects), run-notify (side effect), or run-revert (side effect)." ), ) + workflow_parser.add_argument( + "--hud-html", + nargs="?", + const="hud.html", + default=None, + help=( + "If set, write the run state to HUD HTML at the given path (defaults to hud.html when flag provided)." + ), + ) # workflow-restart-checker subcommand workflow_restart_parser = subparsers.add_parser( @@ -135,35 +144,27 @@ def get_opts() -> argparse.Namespace: # hud subcommand: generate local HTML report for signals/detections hud_parser = subparsers.add_parser( - "hud", help="Generate local HUD-like HTML with extracted signals" + "hud", help="Render HUD HTML from a logged autorevert run state" ) hud_parser.add_argument( - "workflows", - nargs="+", - help="Workflow name(s) to analyze - e.g. trunk pull inductor", - ) - hud_parser.add_argument( - "--hours", type=int, default=24, help="Lookback window in hours (default: 24)" + "timestamp", + help="Run timestamp in UTC (e.g. '2025-09-17 20:29:15') matching misc.autorevert_state.ts", ) hud_parser.add_argument( "--repo-full-name", - default=os.environ.get("REPO_FULL_NAME", "pytorch/pytorch"), - help="Full repo name to filter by (owner/repo).", + dest="repo_full_name", + default=None, + help=( + "Optional repo filter (owner/repo). Required if multiple runs share the same timestamp." + ), ) hud_parser.add_argument( - "--out", + "--hud-html", + nargs="?", + const="hud.html", default="hud.html", help="Output HTML file path (default: hud.html)", ) - hud_parser.add_argument( - "--ignore-newer-than", - dest="ignore_newer_than", - default=None, - help=( - "Commit SHA (short or long) — drop all commits that are newer than " - "this SHA from signal detection and HUD rendering" - ), - ) return parser.parse_args() @@ -204,6 +205,7 @@ def main(*args, **kwargs) -> None: repo_full_name=os.environ.get("REPO_FULL_NAME", "pytorch/pytorch"), restart_action=(RestartAction.LOG if opts.dry_run else RestartAction.RUN), revert_action=RevertAction.LOG, + out_hud=None, ) elif opts.subcommand == "autorevert-checker": # New default behavior under the same subcommand @@ -213,17 +215,16 @@ def main(*args, **kwargs) -> None: repo_full_name=opts.repo_full_name, restart_action=(RestartAction.LOG if opts.dry_run else opts.restart_action), revert_action=(RevertAction.LOG if opts.dry_run else opts.revert_action), + out_hud=opts.hud_html, ) elif opts.subcommand == "workflow-restart-checker": workflow_restart_checker(opts.workflow, commit=opts.commit, days=opts.days) elif opts.subcommand == "hud": # Delegate to testers.hud module - run_hud( - opts.workflows, - hours=opts.hours, + render_hud_html_from_clickhouse( + opts.timestamp, repo_full_name=opts.repo_full_name, - out=opts.out, - ignore_newer_than=getattr(opts, "ignore_newer_than", None), + out_path=opts.hud_html, ) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/hud_renderer.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/hud_renderer.py index f931b9f1cc..e248ef750b 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/hud_renderer.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/hud_renderer.py @@ -1,159 +1,13 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import Dict, List, Optional, Set, Tuple - -from .signal import ( - AutorevertPattern, - Ineligible, - RestartCommits, - Signal, - SignalCommit, - SignalEvent, - SignalStatus, -) - - -@dataclass -class Column: - workflow_name: str - key: str # signal key - signal: Signal - - -@dataclass -class GridModel: - commits: List[str] # newest -> older - columns: List[Column] - # per-cell highlight classes: (workflow, key, sha) -> {classes} - cell_highlights: Dict[Tuple[str, str, str], Set[str]] - # (workflow, key) -> human note for detection result - column_notes: Dict[Tuple[str, str], str] - # (workflow, key) -> outcome tag: 'revert' | 'restart' | 'ineligible' - column_outcomes: Dict[Tuple[str, str], str] - - -def collect_commit_order(signals: List[Signal]) -> List[str]: - seen: Set[str] = set() - ordered: List[str] = [] - for sig in signals: - for c in sig.commits: - if c.head_sha not in seen: - seen.add(c.head_sha) - ordered.append(c.head_sha) - return ordered - - -def build_grid_model(signals: List[Signal]) -> GridModel: - commits = collect_commit_order(signals) - columns = [ - Column(workflow_name=s.workflow_name, key=s.key, signal=s) for s in signals - ] - - cell_highlights: Dict[Tuple[str, str, str], Set[str]] = {} - column_notes: Dict[Tuple[str, str], str] = {} - column_outcomes: Dict[Tuple[str, str], str] = {} - - # run detection and capture highlights/notes - for s in signals: - res = s.process_valid_autorevert_pattern() - note: Optional[str] = None - if isinstance(res, AutorevertPattern): - # highlight cells for this signal - for sha in res.newer_failing_commits: - cell_highlights.setdefault((s.workflow_name, s.key, sha), set()).add( - "hl-newer-fail" - ) - cell_highlights.setdefault( - (s.workflow_name, s.key, res.suspected_commit), set() - ).add("hl-suspected") - cell_highlights.setdefault( - (s.workflow_name, s.key, res.older_successful_commit), set() - ).add("hl-baseline") - note = ( - f"Pattern: newer fail {len(res.newer_failing_commits)};" - f" suspect {res.suspected_commit[:7]}" - f" vs baseline {res.older_successful_commit[:7]}" - ) - column_outcomes[(s.workflow_name, s.key)] = "revert" - elif isinstance(res, RestartCommits): - for sha in res.commit_shas: - cell_highlights.setdefault((s.workflow_name, s.key, sha), set()).add( - "hl-restart" - ) - if res.commit_shas: - note = f"Suggest restart: {', '.join(sorted(s[:7] for s in res.commit_shas))}" - column_outcomes[(s.workflow_name, s.key)] = "restart" - elif isinstance(res, Ineligible): - msg = f"Ineligible: {res.reason.value}" - if res.message: - msg += f" — {res.message}" - note = msg - column_outcomes[(s.workflow_name, s.key)] = "ineligible" - if note: - column_notes[(s.workflow_name, s.key)] = note - - return GridModel( - commits=commits, - columns=columns, - cell_highlights=cell_highlights, - column_notes=column_notes, - column_outcomes=column_outcomes, - ) - - -def _status_icon(status: SignalStatus) -> str: - if status == SignalStatus.FAILURE: - return "❌" # cross mark - if status == SignalStatus.SUCCESS: - return "✅" # check mark button - return "🟡" # large yellow circle (pending) - +import re +from datetime import datetime +from typing import Any, Dict, List, Mapping, Optional, Sequence, Union -def _event_title(e: SignalEvent) -> str: - return f"{e.name}\n{e.status.value}\nstart={e.started_at.isoformat()}" - - -def _parse_run_id(event_name: str) -> Optional[int]: - # event name format: "wf= kind= id= run= attempt=" - try: - for part in event_name.split(): - if part.startswith("run="): - return int(part.split("=", 1)[1]) - except Exception: - return None - return None +from .signal import SignalStatus -def _commit_min_started_at( - sha: str, sig_map: Dict[Tuple[str, str], Dict[str, SignalCommit]] -) -> Optional[str]: - """Return minimal started_at (YYYY-mm-dd HH:MM) across all events for this commit, if any.""" - tmin: Optional[str] = None - for m in sig_map.values(): - commit = m.get(sha) - if not commit or not commit.events: - continue - # events are sorted oldest first inside SignalCommit - ts = commit.events[0].started_at.strftime("%Y-%m-%d %H:%M") - if tmin is None or ts < tmin: - tmin = ts - return tmin - - -def render_html( - model: GridModel, title: str = "Signal HUD", repo_full_name: str = "pytorch/pytorch" -) -> str: - # Build fast lookup: (workflow,key)-> {sha: SignalCommit} - sig_map: Dict[Tuple[str, str], Dict[str, SignalCommit]] = {} - for col in model.columns: - m: Dict[str, SignalCommit] = {} - for c in col.signal.commits: - m[c.head_sha] = c - sig_map[(col.workflow_name, col.key)] = m - - # HTML + CSS - css = """ +HUD_CSS = """ body { font-family: -apple-system, BlinkMacSystemFont, Segoe UI, Roboto, Arial, sans-serif; margin: 16px; @@ -194,32 +48,300 @@ def render_html( td.cell.hl-baseline { background: #e6f7ff; } td.cell.hl-newer-fail { background: #fdecea; } td.cell.hl-restart { outline: 2px dashed #888; outline-offset: -2px; } - """ +""" + +HUD_JS = ( + "" +) + + +def _status_icon(status: Union[SignalStatus, str]) -> str: + value = status.value if isinstance(status, SignalStatus) else str(status).lower() + if value == SignalStatus.FAILURE.value: + return "❌" # cross mark + if value == SignalStatus.SUCCESS.value: + return "✅" # check mark button + return "🟡" # large yellow circle (pending) + + +def _parse_run_id(event_name: str) -> Optional[int]: + # event name format: "wf= kind= id= run= attempt=" + try: + for part in event_name.split(): + if part.startswith("run="): + return int(part.split("=", 1)[1]) + except Exception: + return None + return None + + +def _format_commit_label_from_state(sha: str, commit_times: Mapping[str, Any]) -> str: + raw = commit_times.get(sha) + if raw is None: + return sha + text = str(raw) + try: + dt = datetime.fromisoformat(text.replace("Z", "+00:00")) + text = dt.strftime("%Y-%m-%d %H:%M") + except Exception: + # keep original text if parsing fails + pass + return f"{sha} {text}".strip() + + +def _event_title_from_dict(event: Mapping[str, Any]) -> str: + parts: List[str] = [] + name = event.get("name") + if name: + parts.append(str(name)) + status = event.get("status") + if status: + parts.append(str(status)) + start = event.get("started_at") + if start: + parts.append(f"start={start}") + end = event.get("ended_at") + if end: + parts.append(f"end={end}") + return "\n".join(parts) + + +def _legacy_outcomes_from_columns( + columns: Sequence[Mapping[str, Any]], commits: Sequence[str] +) -> Dict[str, Dict[str, Any]]: + mapping: Dict[str, Dict[str, Any]] = {} + commit_index = {sha: idx for idx, sha in enumerate(commits)} + + def _resolve(prefix: Optional[str]) -> Optional[str]: + if not prefix: + return None + prefix = prefix.strip() + if not prefix: + return None + for sha in commits: + if sha.startswith(prefix): + return sha + return prefix + + def _sig_key(col: Mapping[str, Any]) -> str: + workflow = str(col.get("workflow", "")) + key = str(col.get("key", "")) + return f"{workflow}:{key}" if key else workflow + + for col in columns: + sig = _sig_key(col) + outcome = str(col.get("outcome", "ineligible")) + highlights: Mapping[str, Sequence[str]] = col.get("highlights", {}) or {} + note = str(col.get("note", "")) + cells: Mapping[str, Sequence[Mapping[str, Any]]] = col.get("cells", {}) or {} + + def _has_status(sha: str, status: str, _cells=cells) -> bool: + return any(ev.get("status") == status for ev in _cells.get(sha, []) or []) + + if outcome == "revert": + suspected = next( + ( + sha + for sha, classes in highlights.items() + if "hl-suspected" in classes + ), + None, + ) + baseline = next( + ( + sha + for sha, classes in highlights.items() + if "hl-baseline" in classes + ), + None, + ) + newer = [ + sha for sha, classes in highlights.items() if "hl-newer-fail" in classes + ] + if not suspected: + m = re.search(r"suspect\s+([0-9a-fA-F]{6,40})", note) + suspected = _resolve(m.group(1) if m else None) + if not baseline: + m = re.search(r"baseline\s+([0-9a-fA-F]{6,40})", note) + baseline = _resolve(m.group(1) if m else None) + + failed_commits = [sha for sha in commits if _has_status(sha, "failure")] + if not suspected and failed_commits: + # suspect is the oldest failing commit (last in list since commits newest->older) + suspected = failed_commits[-1] + if not newer and failed_commits: + newer = [sha for sha in failed_commits if sha != suspected] + if not baseline and suspected and suspected in commit_index: + for sha in commits[commit_index[suspected] + 1 :]: + if _has_status(sha, "success"): + baseline = sha + break + newer = [sha for sha in newer if sha] + newer.sort(key=lambda sha: commit_index.get(sha, float("inf"))) + mapping[sig] = { + "type": "AutorevertPattern", + "data": { + "suspected_commit": suspected, + "older_successful_commit": baseline, + "newer_failing_commits": newer, + }, + } + elif outcome == "restart": + restart_shas = sorted( + sha for sha, classes in highlights.items() if "hl-restart" in classes + ) + if not restart_shas: + restart_shas = [ + _resolve(match) + for match in re.findall(r"([0-9a-fA-F]{6,40})", note) + ] + restart_shas = [sha for sha in restart_shas if sha] + if not restart_shas: + # fall back to commits that had failures but not marked success + restart_shas = [sha for sha in commits if _has_status(sha, "failure")] + seen: List[str] = [] + for sha in restart_shas: + if sha and sha not in seen: + seen.append(sha) + restart_shas = sorted( + seen, key=lambda sha: commit_index.get(sha, float("inf")) + ) + mapping[sig] = { + "type": "RestartCommits", + "data": {"commit_shas": restart_shas}, + } + else: + ineligible = col.get("ineligible", {}) or {} + reason = ineligible.get("reason") + message = ineligible.get("message") + if not reason and note: + m = re.search(r"Ineligible:\s*([^\u2014]+)", note) + if m: + reason = m.group(1).strip() + if not message and "—" in note: + message = note.split("—", 1)[1].strip() + mapping[sig] = { + "type": "Ineligible", + "data": { + "reason": reason, + "message": message, + }, + } + return mapping + + +def _highlights_from_outcome(outcome: Mapping[str, Any]) -> Dict[str, List[str]]: + res: Dict[str, List[str]] = {} + if not outcome: + return res + outcome_type = outcome.get("type") + data = ( + outcome.get("data", {}) if isinstance(outcome.get("data"), Mapping) else outcome + ) + if outcome_type == "AutorevertPattern": + for sha in data.get("newer_failing_commits", []) or []: + if sha: + res.setdefault(sha, []).append("hl-newer-fail") + suspected = data.get("suspected_commit") + if suspected: + res.setdefault(suspected, []).append("hl-suspected") + baseline = data.get("older_successful_commit") + if baseline: + res.setdefault(baseline, []).append("hl-baseline") + elif outcome_type == "RestartCommits": + for sha in data.get("commit_shas", []) or []: + if sha: + res.setdefault(sha, []).append("hl-restart") + return res + + +def _note_from_outcome(outcome: Optional[Mapping[str, Any]]) -> str: + if not outcome: + return "" + outcome_type = outcome.get("type") + data = ( + outcome.get("data", {}) if isinstance(outcome.get("data"), Mapping) else outcome + ) + if outcome_type == "AutorevertPattern": + newer = data.get("newer_failing_commits", []) or [] + suspected = data.get("suspected_commit") or "?" + baseline = data.get("older_successful_commit") or "?" + return ( + f"Pattern: newer fail {len(newer)}; suspect {suspected[:7]}" + f" vs baseline {baseline[:7]}" + ) + if outcome_type == "RestartCommits": + commits = data.get("commit_shas", []) or [] + if commits: + short = ", ".join(sorted(sha[:7] for sha in commits if sha)) + else: + short = "" + return f"Suggest restart: {short}" + if outcome_type == "Ineligible": + reason = data.get("reason") or "" + message = data.get("message") or "" + base = f"Ineligible: {reason}" if reason else "Ineligible" + if message: + base += f" — {message}" + return base + return "" + + +def render_html_from_state( + state: Mapping[str, Any], title: Optional[str] = None +) -> str: + commits: Sequence[str] = state.get("commits", []) or [] + commit_times: Mapping[str, Any] = state.get("commit_times", {}) or {} + columns: Sequence[Mapping[str, Any]] = state.get("columns", []) or [] + meta: Mapping[str, Any] = state.get("meta", {}) or {} + + raw_outcomes = ( + state.get("outcomes") if isinstance(state.get("outcomes"), dict) else None + ) + if raw_outcomes: + outcome_map = {str(k): v for k, v in raw_outcomes.items()} + else: + outcome_map = _legacy_outcomes_from_columns(columns, commits) + + highlight_lookup: Dict[str, Dict[str, List[str]]] = { + key: _highlights_from_outcome(value) for key, value in outcome_map.items() + } + + repo_full_name = str(meta.get("repo") or "pytorch/pytorch") + workflows_meta = meta.get("workflows", []) or [] + if isinstance(workflows_meta, str): + workflows_label = workflows_meta + else: + workflows_label = ", ".join(str(w) for w in workflows_meta) + lookback = meta.get("lookback_hours") + if title is None: + hours_part = "" + if isinstance(lookback, (int, float)): + hours_part = f" ({int(lookback)}h)" + display_label = workflows_label or repo_full_name + title = f"Signal HUD: {display_label}{hours_part}" html_parts: List[str] = [] html_parts.append("") html_parts.append( '{}'.format(title) ) - html_parts.append(f"") - # lightweight JS for single-open expander behavior - html_parts.append( - "" - ) + html_parts.append(f"") + html_parts.append(HUD_JS) html_parts.append("") html_parts.append(f"

{title}

") html_parts.append( @@ -230,15 +352,15 @@ def render_html( "" ) - # Table header html_parts.append("") html_parts.append("") - # Row 1: titles html_parts.append("") html_parts.append('') - for col in model.columns: - label = f"{col.workflow_name}:{col.key}" - note = model.column_notes.get((col.workflow_name, col.key)) + for col in columns: + workflow = str(col.get("workflow", "")) + key = str(col.get("key", "")) + label = f"{workflow}:{key}" if key else workflow + note = str(col.get("note", "")) title_attr = (note + "\n" if note else "") + label safe_title = title_attr.replace('"', "'") html_parts.append( @@ -246,13 +368,17 @@ def render_html( f'title="{safe_title}">{label}' ) html_parts.append("") - # Row 2: outcomes + html_parts.append("") html_parts.append('') - for idx, col in enumerate(model.columns): - key = (col.workflow_name, col.key) - outcome = model.column_outcomes.get(key, "ineligible") - note = model.column_notes.get(key, "") + for idx, col in enumerate(columns): + outcome = str(col.get("outcome", "ineligible")) + workflow = str(col.get("workflow", "")) + key = str(col.get("key", "")) + sig_key = f"{workflow}:{key}" if key else workflow + note = _note_from_outcome(outcome_map.get(sig_key)) + if not note: + note = str(col.get("note", "")) rid = f"oc-{idx}" if outcome == "revert": badge = 'REV' @@ -260,69 +386,59 @@ def render_html( badge = 'RST' else: badge = 'N/A' + header_label = f"{workflow}:{key}" if key else workflow + safe_note = note.replace('"', "'") html_parts.append( - f'" + f'" ) html_parts.append("") html_parts.append("") - # Rows html_parts.append("") - # Build fast lookup for body rendering - # (workflow,key) -> {sha -> commit} - for sha in model.commits: + for sha in commits: html_parts.append("") - tmin = _commit_min_started_at(sha, sig_map) - label = f"{sha} {tmin or ''}".strip() + label = _format_commit_label_from_state(sha, commit_times) html_parts.append(f'') - for col in model.columns: - commit = sig_map[(col.workflow_name, col.key)].get(sha) - if not commit or not commit.events: - # still apply cell-level highlight (e.g., suspected baseline without explicit events) - cell_cls = " ".join( - sorted( - model.cell_highlights.get( - (col.workflow_name, col.key, sha), set() - ) - ) - ) - html_parts.append(f'') + for col in columns: + cells_map = col.get("cells", {}) or {} + events = cells_map.get(sha, []) or [] + workflow = str(col.get("workflow", "")) + key = str(col.get("key", "")) + sig_key = f"{workflow}:{key}" if key else workflow + highlights_map = highlight_lookup.get(sig_key, {}) + cell_classes = " ".join(sorted(highlights_map.get(sha, []))) + if not events: + html_parts.append(f'') continue + cell_parts: List[str] = [] - for e in commit.events: - icon = _status_icon(e.status) - title_attr = _event_title(e).replace('"', "'") - run_id = _parse_run_id(e.name) + for event in events: + status = event.get("status", "") + icon = _status_icon(status) + title_attr = _event_title_from_dict(event).replace('"', "'") + run_id = _parse_run_id(str(event.get("name", ""))) if run_id is not None: url = f"https://github.com/{repo_full_name}/actions/runs/{run_id}" cell_parts.append( - f'{icon}' + f'{icon}' ) else: cell_parts.append( f'{icon}' ) - cell_cls = " ".join( - sorted( - model.cell_highlights.get((col.workflow_name, col.key, sha), set()) - ) - ) html_parts.append( - f"" + f"" ) html_parts.append("") html_parts.append("") html_parts.append("
Commit (min started_at)
Outcome' - f"{badge}" - f'
×' - f"
{col.workflow_name}:{col.key}
" - f"
{note}
" - f"
" - f"
{badge}' + f'
×" + f"
{header_label}
" + f"
{safe_note}
" + "
" + "
{label}{''.join(cell_parts)}{''.join(cell_parts)}
") - - # Notes removed as they are available via per-signal outcome expanders - html_parts.append("") return "".join(html_parts) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/run_state_logger.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/run_state_logger.py index 29f127f5ab..f64e8cdffe 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/run_state_logger.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/run_state_logger.py @@ -1,7 +1,7 @@ from __future__ import annotations import json -from typing import Dict, Iterable, List, Tuple, Union +from typing import Any, Dict, Iterable, List, Tuple, Union from .clickhouse_client_helper import CHCliFactory from .signal import AutorevertPattern, Ineligible, RestartCommits, Signal @@ -26,8 +26,8 @@ def _build_state_json( repo: str, ctx: RunContext, pairs: Iterable[Tuple[Signal, SignalProcOutcome]], - ) -> str: - """Build a compact JSON string describing the run’s HUD-like grid and outcomes.""" + ) -> Dict[str, Any]: + """Build a dictionary describing the run’s HUD-like grid and outcomes.""" pairs_list = list(pairs) signals: List[Signal] = [s for s, _ in pairs_list] @@ -59,31 +59,42 @@ def _build_state_json( # Build columns with outcomes, notes, and per-commit events cols = [] + outcome_map: Dict[str, Dict[str, Any]] = {} for sig, outcome in pairs_list: if isinstance(outcome, AutorevertPattern): oc = "revert" - note = ( - f"Pattern: newer fail {len(outcome.newer_failing_commits)}; " - f"suspect {outcome.suspected_commit[:7]} vs baseline {outcome.older_successful_commit[:7]}" - ) ineligible = None + serialized = { + "type": "AutorevertPattern", + "data": { + "workflow_name": outcome.workflow_name, + "suspected_commit": outcome.suspected_commit, + "older_successful_commit": outcome.older_successful_commit, + "newer_failing_commits": list(outcome.newer_failing_commits), + }, + } elif isinstance(outcome, RestartCommits): oc = "restart" - if outcome.commit_shas: - short = ", ".join(sorted(s[:7] for s in outcome.commit_shas)) - note = f"Suggest restart: {short}" - else: - note = "Suggest restart: " ineligible = None + serialized = { + "type": "RestartCommits", + "data": { + "commit_shas": sorted(outcome.commit_shas), + }, + } else: oc = "ineligible" - note = f"Ineligible: {outcome.reason.value}" - if outcome.message: - note += f" — {outcome.message}" ineligible = { "reason": outcome.reason.value, "message": outcome.message, } + serialized = { + "type": "Ineligible", + "data": { + "reason": outcome.reason.value, + "message": outcome.message, + }, + } # Per-commit events for this signal cells: Dict[str, List[Dict]] = {} @@ -105,17 +116,21 @@ def _build_state_json( "workflow": sig.workflow_name, "key": sig.key, "outcome": oc, - "note": note, "cells": cells, } if ineligible is not None: col["ineligible"] = ineligible cols.append(col) - doc = { + sig_key = f"{sig.workflow_name}:{sig.key}" + outcome_map[sig_key] = serialized + + doc: Dict[str, Any] = { + "version": 2, "commits": commits, "commit_times": commit_times, "columns": cols, + "outcomes": outcome_map, "meta": { "repo": repo, "workflows": ctx.workflows, @@ -125,7 +140,7 @@ def _build_state_json( "revert_action": str(ctx.revert_action), }, } - return json.dumps(doc, separators=(",", ":")) + return doc def insert_state( self, @@ -133,11 +148,14 @@ def insert_state( ctx: RunContext, pairs: Iterable[Tuple[Signal, SignalProcOutcome]], params: str = "", - ) -> None: - """Insert one state row into misc.autorevert_state for this run context.""" - state_json = self._build_state_json( - repo=ctx.repo_full_name, ctx=ctx, pairs=list(pairs) - ) + ) -> str: + """Insert one state row into misc.autorevert_state for this run context. + + Returns the serialized JSON state that was stored, so callers can reuse it + for local rendering/debugging without rebuilding the structure. + """ + doc = self._build_state_json(repo=ctx.repo_full_name, ctx=ctx, pairs=pairs) + state_json = json.dumps(doc, separators=(",", ":")) cols = [ "ts", "repo", @@ -165,3 +183,4 @@ def insert_state( CHCliFactory().client.insert( table="autorevert_state", data=data, column_names=cols, database="misc" ) + return state_json diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py index d87aa1f2b0..d3fe6e7d6c 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_datasource.py @@ -3,7 +3,7 @@ import logging import time from datetime import datetime, timedelta -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Optional from .clickhouse_client_helper import CHCliFactory from .signal_extraction_types import ( @@ -211,3 +211,29 @@ def fetch_tests_for_job_ids( dt, ) return rows + + def fetch_autorevert_state_rows( + self, *, ts: str, repo_full_name: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Fetch run state rows from misc.autorevert_state for a given timestamp.""" + + query = ( + "SELECT repo, workflows, state FROM misc.autorevert_state " + "WHERE ts = parseDateTimeBestEffort({ts:String})" + ) + params: Dict[str, Any] = {"ts": ts} + if repo_full_name: + query += " AND repo = {repo:String}" + params["repo"] = repo_full_name + + res = CHCliFactory().client.query(query, parameters=params) + rows: List[Dict[str, Any]] = [] + for repo, workflows, state_json in res.result_rows: + rows.append( + { + "repo": repo, + "workflows": workflows, + "state": state_json, + } + ) + return rows diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py index 291d508333..26b721c31e 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timezone -from typing import Iterable, List, Tuple +from typing import Iterable, List, Optional, Tuple from ..run_state_logger import RunStateLogger from ..signal import Signal @@ -8,6 +8,7 @@ from ..signal_extraction import SignalExtractor from ..signal_extraction_types import RunContext from ..utils import RestartAction, RevertAction +from .hud import write_hud_html def autorevert_v2( @@ -17,7 +18,8 @@ def autorevert_v2( repo_full_name: str = "pytorch/pytorch", restart_action: RestartAction = RestartAction.RUN, revert_action: RevertAction = RevertAction.LOG, -) -> Tuple[List[Signal], List[Tuple[Signal, SignalProcOutcome]]]: + out_hud: Optional[str] = None, +) -> Tuple[List[Signal], List[Tuple[Signal, SignalProcOutcome]], str]: """Run the Signals-based autorevert flow end-to-end. - Extracts signals for the specified workflows and window @@ -25,7 +27,7 @@ def autorevert_v2( - Persists a single HUD-like state row for auditability Returns: - (signals, pairs) for diagnostics and potential external rendering + (signals, pairs, state_json) for diagnostics and potential external rendering """ workflows = list(workflows) # Use timezone-aware UTC to match ClickHouse DateTime semantics @@ -75,7 +77,9 @@ def autorevert_v2( logging.info("[v2] Executed action groups: %d", executed_count) # Persist full run state via separate logger - RunStateLogger().insert_state(ctx=run_ctx, pairs=pairs) + state_json = RunStateLogger().insert_state(ctx=run_ctx, pairs=pairs) + if out_hud: + write_hud_html(state_json, out_hud) logging.info("[v2] State logged") - return signals, pairs + return signals, pairs, state_json diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/hud.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/hud.py index e3e6f480ac..6fef676439 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/hud.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/hud.py @@ -1,94 +1,76 @@ +import json import logging -from typing import Iterable, Optional +from typing import Any, Mapping, Optional, Union -from ..hud_renderer import build_grid_model, render_html -from ..signal_extraction import SignalExtractor +from ..hud_renderer import render_html_from_state +from ..signal_extraction_datasource import SignalExtractionDatasource -def run_hud( - workflows: Iterable[str], - *, - hours: int = 24, - repo_full_name: str = "pytorch/pytorch", - out: str = "hud.html", - ignore_newer_than: Optional[str] = None, -) -> str: - """ - Extracts signals for the given workflows, optionally truncates commit history - to ignore commits newer than a specific SHA, builds a HUD model, and writes - an HTML report to `out`. +RunStatePayload = Union[str, Mapping[str, Any]] + - Returns the output filepath. - """ +def _ensure_state_dict(state: RunStatePayload) -> Mapping[str, Any]: + if isinstance(state, str): + return json.loads(state) + return state + +def write_hud_html(state: RunStatePayload, out_path: str) -> str: + """Render the given run-state JSON (string or mapping) to HUD HTML.""" + state_dict = _ensure_state_dict(state) + meta = state_dict.get("meta", {}) + workflows = meta.get("workflows") or [] + lookback = meta.get("lookback_hours") logging.info( - "[hud] Start: workflows=%s hours=%s repo=%s", - ",".join(workflows), - hours, - repo_full_name, + "[hud] Rendering HTML for repo=%s workflows=%s lookback=%s → %s", + meta.get("repo"), + ",".join(workflows) if isinstance(workflows, list) else workflows, + lookback, + out_path, ) + html = render_html_from_state(state_dict) + with open(out_path, "w", encoding="utf-8") as f: + f.write(html) + logging.info("HUD written to %s", out_path) + return out_path - extractor = SignalExtractor( - workflows=workflows, - lookback_hours=hours, - repo_full_name=repo_full_name, - ) - logging.info("[hud] Extracting signals ...") - signals = extractor.extract() - logging.info("[hud] Extracted %d signals", len(signals)) - # Optionally cut off newest commits above a given commit SHA prefix - if ignore_newer_than: - cut_prefix = str(ignore_newer_than).strip() - if cut_prefix: - total_trimmed = 0 - found_any = False - for s in signals: - # commits are ordered newest -> older; find first index that matches prefix - idx = next( - ( - i - for i, c in enumerate(s.commits) - if c.head_sha.startswith(cut_prefix) - ), - None, - ) - if idx is None: - continue - found_any = True - trimmed = idx # number of newer commits dropped - if trimmed > 0: - total_trimmed += trimmed - s.commits = s.commits[idx:] - if not found_any: - logging.warning( - "[hud] ignore-newer-than='%s' did not match any commit in extracted signals", - cut_prefix, - ) - else: - logging.info( - "[hud] Applied ignore-newer-than=%s; dropped %d newer commit entries across signals", - cut_prefix, - total_trimmed, - ) +def render_hud_html_from_clickhouse( + timestamp: str, + *, + repo_full_name: Optional[str] = None, + out_path: str, +) -> str: + """Fetch a logged autorevert state from ClickHouse by timestamp and render HUD HTML.""" - logging.info("[hud] Building grid model ...") - model = build_grid_model(signals) logging.info( - "[hud] Model: %d commits, %d columns", - len(model.commits), - len(model.columns), + "[hud] Fetching run state ts=%s repo=%s", + timestamp, + repo_full_name or "", ) - - logging.info("[hud] Rendering HTML ...") - html = render_html( - model, - title=f"Signal HUD: {', '.join(workflows)} ({hours}h)", - repo_full_name=repo_full_name, + rows = SignalExtractionDatasource().fetch_autorevert_state_rows( + ts=timestamp, repo_full_name=repo_full_name ) - with open(out, "w", encoding="utf-8") as f: - f.write(html) - logging.info("[hud] HUD written to %s", out) - logging.info("HUD written to %s", out) + if not rows: + raise RuntimeError( + "No autorevert_state row found for ts=" + + timestamp + + (" repo=" + repo_full_name if repo_full_name else "") + ) + if len(rows) > 1: + raise RuntimeError( + "Multiple autorevert_state rows found for ts=" + + timestamp + + "; pass --repo-full-name to disambiguate" + ) - return out + row = rows[0] + repo = row["repo"] + workflows = row["workflows"] + state_json = row["state"] + if isinstance(workflows, str): + workflows_display = workflows + else: + workflows_display = ",".join(workflows or []) + logging.info("[hud] Loaded state for repo=%s workflows=%s", repo, workflows_display) + return write_hud_html(state_json, out_path) From 0d9f8809fc2a8066cefb739e07cc66594ef91418 Mon Sep 17 00:00:00 2001 From: Ivan Zaitsev Date: Thu, 18 Sep 2025 08:53:40 -0700 Subject: [PATCH 2/2] avoid calling html generation on the main entry point --- .../pytorch-auto-revert/pytorch_auto_revert/__main__.py | 8 ++++---- .../pytorch_auto_revert/testers/autorevert_v2.py | 6 +----- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py index 30a89acf6d..7af8150c75 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/__main__.py @@ -10,7 +10,7 @@ from .clickhouse_client_helper import CHCliFactory from .github_client_helper import GHClientFactory from .testers.autorevert_v2 import autorevert_v2 -from .testers.hud import render_hud_html_from_clickhouse +from .testers.hud import render_hud_html_from_clickhouse, write_hud_html from .testers.restart_checker import workflow_restart_checker from .utils import RestartAction, RevertAction @@ -205,18 +205,18 @@ def main(*args, **kwargs) -> None: repo_full_name=os.environ.get("REPO_FULL_NAME", "pytorch/pytorch"), restart_action=(RestartAction.LOG if opts.dry_run else RestartAction.RUN), revert_action=RevertAction.LOG, - out_hud=None, ) elif opts.subcommand == "autorevert-checker": # New default behavior under the same subcommand - autorevert_v2( + _signals, _pairs, state_json = autorevert_v2( opts.workflows, hours=opts.hours, repo_full_name=opts.repo_full_name, restart_action=(RestartAction.LOG if opts.dry_run else opts.restart_action), revert_action=(RevertAction.LOG if opts.dry_run else opts.revert_action), - out_hud=opts.hud_html, ) + if opts.hud_html: + write_hud_html(state_json, opts.hud_html) elif opts.subcommand == "workflow-restart-checker": workflow_restart_checker(opts.workflow, commit=opts.commit, days=opts.days) elif opts.subcommand == "hud": diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py index 26b721c31e..f8b42dbfb0 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/testers/autorevert_v2.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timezone -from typing import Iterable, List, Optional, Tuple +from typing import Iterable, List, Tuple from ..run_state_logger import RunStateLogger from ..signal import Signal @@ -8,7 +8,6 @@ from ..signal_extraction import SignalExtractor from ..signal_extraction_types import RunContext from ..utils import RestartAction, RevertAction -from .hud import write_hud_html def autorevert_v2( @@ -18,7 +17,6 @@ def autorevert_v2( repo_full_name: str = "pytorch/pytorch", restart_action: RestartAction = RestartAction.RUN, revert_action: RevertAction = RevertAction.LOG, - out_hud: Optional[str] = None, ) -> Tuple[List[Signal], List[Tuple[Signal, SignalProcOutcome]], str]: """Run the Signals-based autorevert flow end-to-end. @@ -78,8 +76,6 @@ def autorevert_v2( # Persist full run state via separate logger state_json = RunStateLogger().insert_state(ctx=run_ctx, pairs=pairs) - if out_hud: - write_hud_html(state_json, out_hud) logging.info("[v2] State logged") return signals, pairs, state_json