From a6e7aab5b04c01074fb71468cdcea22e89bb3291 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 15:08:54 -0700 Subject: [PATCH 1/2] regression_db ghstack-source-id: f8ffcac2b5f313e2ce8605d3689f0e557efc8eaf Pull-Request: https://github.com/pytorch/test-infra/pull/7089 --- .../.gitignore | 3 + .../common/benchmark_time_series_api_model.py | 62 +++ .../common/config.py | 91 ++++ .../common/config_model.py | 190 ++++++++ .../lambda_function.py | 443 ++++++++++++++++++ .../requirements.txt | 5 + .../schema.sql | 28 ++ 7 files changed, 822 insertions(+) create mode 100644 aws/lambda/benchmark_regression_summary_report/.gitignore create mode 100644 aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/config.py create mode 100644 aws/lambda/benchmark_regression_summary_report/common/config_model.py create mode 100644 aws/lambda/benchmark_regression_summary_report/lambda_function.py create mode 100644 aws/lambda/benchmark_regression_summary_report/requirements.txt create mode 100644 clickhouse_db_schema/benchmark_regression_summary_report/schema.sql diff --git a/aws/lambda/benchmark_regression_summary_report/.gitignore b/aws/lambda/benchmark_regression_summary_report/.gitignore new file mode 100644 index 0000000000..bd92f6376a --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/.gitignore @@ -0,0 +1,3 @@ +*.zip +deployment/ +venv/ diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py new file mode 100644 index 0000000000..552b8cefbd --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass, field +from typing import Optional, List, Dict, Any +import requests + +# The data class to provide api response model from get_time_series api + + +@dataclass +class TimeRange: + start: str + end: str + + +@dataclass +class BenchmarkTimeSeriesItem: + group_info: Dict[str, Any] + num_of_dp: int + data: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass +class BenchmarkTimeSeriesApiData: + time_series: List[BenchmarkTimeSeriesItem] + time_range: TimeRange + + +@dataclass +class BenchmarkTimeSeriesApiResponse: + data: BenchmarkTimeSeriesApiData + + @classmethod + def from_request( + cls, url: str, query: dict, timeout: int = 180 + ) -> "BenchmarkTimeSeriesApiResponse": + """ + Send a POST request and parse into BenchmarkTimeSeriesApiResponse. + + Args: + url: API endpoint + query: JSON payload must + timeout: max seconds to wait for connect + response (default: 30) + Returns: + ApiResponse + Raises: + requests.exceptions.RequestException if network/timeout/HTTP error + RuntimeError if the API returns an "error" field or malformed data + """ + resp = requests.post(url, json=query, timeout=timeout) + resp.raise_for_status() + payload = resp.json() + + if "error" in payload: + raise RuntimeError(f"API error: {payload['error']}") + try: + tr = TimeRange(**payload["data"]["time_range"]) + ts = [ + BenchmarkTimeSeriesItem(**item) + for item in payload["data"]["time_series"] + ] + except Exception as e: + raise RuntimeError(f"Malformed API payload: {e}") + return cls(data=BenchmarkTimeSeriesApiData(time_series=ts, time_range=tr)) diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py new file mode 100644 index 0000000000..a68dc7355a --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config.py @@ -0,0 +1,91 @@ +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + BenchmarkRegressionConfigBook, + DayRangeWindow, + Frequency, + RegressionPolicy, + Policy, + RangeConfig, +) + +# Compiler benchmark regression config +# todo(elainewy): eventually each team should configure their own benchmark regression config, currenlty place here for lambda + + +COMPILER_BENCHMARK_CONFIG = BenchmarkConfig( + name="Compiler Benchmark Regression", + id="compiler_regression", + source=BenchmarkApiSource( + api_query_url="https://hud.pytorch.org/api/benchmark/get_time_series", + type="benchmark_time_series_api", + # currently we only detect the regression for h100 with dtype bfloat16, and mode inference + # we can extend this to other devices, dtypes and mode in the future + api_endpoint_params_template=""" + { + "name": "compiler_precompute", + "query_params": { + "commits": [], + "compilers": [], + "arch": "h100", + "device": "cuda", + "dtype": "bfloat16", + "granularity": "hour", + "mode": "inference", + "startTime": "{{ startTime }}", + "stopTime": "{{ stopTime }}", + "suites": ["torchbench", "huggingface", "timm_models"], + "workflowId": 0, + "branches": ["main"] + } + } + """, + ), + # set baseline from past 7 days using avg, and compare with the last 1 day + policy=Policy( + frequency=Frequency(value=1, unit="days"), + range=RangeConfig( + baseline=DayRangeWindow(value=7), + comparison=DayRangeWindow(value=2), + ), + metrics={ + "passrate": RegressionPolicy( + name="passrate", + condition="greater_equal", + threshold=0.9, + baseline_aggregation="max", + ), + "geomean": RegressionPolicy( + name="geomean", + condition="greater_equal", + threshold=0.95, + baseline_aggregation="max", + ), + "compression_ratio": RegressionPolicy( + name="compression_ratio", + condition="greater_equal", + threshold=0.9, + baseline_aggregation="max", + ), + }, + notification_config={ + "type": "github", + "repo": "pytorch/test-infra", + "issue": "7081", + }, + ), +) + +BENCHMARK_REGRESSION_CONFIG = BenchmarkRegressionConfigBook( + configs={ + "compiler_regression": COMPILER_BENCHMARK_CONFIG, + } +) + + +def get_benchmark_regression_config(config_id: str) -> BenchmarkConfig: + """Get benchmark regression config by config id""" + try: + return BENCHMARK_REGRESSION_CONFIG[config_id] + except KeyError: + raise ValueError(f"Invalid config id: {config_id}") diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py new file mode 100644 index 0000000000..f452e84da2 --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -0,0 +1,190 @@ +from __future__ import annotations +from dataclasses import dataclass, field +from typing import Any, Dict, Literal, Optional +from datetime import timedelta +from jinja2 import Environment, Template, meta +import json + + +# -------- Frequency -------- +@dataclass(frozen=True) +class Frequency: + """ + The frequency of how often the report should be generated. + The minimum frequency we support is 1 day. + Attributes: + value: Number of units (e.g., 7 for 7 days). + unit: Unit of time, either "days" or "weeks". + + Methods: + to_timedelta: Convert frequency into a datetime.timedelta. + get_text: return the frequency in text format + """ + + value: int + unit: Literal["days", "weeks"] + + def to_timedelta(self) -> timedelta: + """Convert frequency N days or M weeks into a datetime.timedelta.""" + if self.unit == "days": + return timedelta(days=self.value) + elif self.unit == "weeks": + return timedelta(weeks=self.value) + else: + raise ValueError(f"Unsupported unit: {self.unit}") + + def get_text(self): + return f"{self.value} {self.unit}" + + +# -------- Source -------- +_JINJA_ENV = Environment(autoescape=False) + + +@dataclass +class BenchmarkApiSource: + """ + Defines the source of the benchmark data we want to query + api_query_url: the url of the api to query + api_endpoint_params_template: the jinjia2 template of the api endpoint's query params + default_ctx: the default context to use when rendering the api_endpoint_params_template + """ + + api_query_url: str + api_endpoint_params_template: str + type: Literal["benchmark_time_series_api", "other"] = "benchmark_time_series_api" + default_ctx: Dict[str, Any] = field(default_factory=dict) + + def required_template_vars(self) -> set[str]: + ast = _JINJA_ENV.parse(self.api_endpoint_params_template) + return set(meta.find_undeclared_variables(ast)) + + def render(self, ctx: Dict[str, Any], strict: bool = True) -> dict: + """Render with caller-supplied context (no special casing for start/end).""" + merged = {**self.default_ctx, **ctx} + + if strict: + required = self.required_template_vars() + missing = required - merged.keys() + if missing: + raise ValueError(f"Missing required vars: {missing}") + rendered = Template(self.api_endpoint_params_template).render(**merged) + return json.loads(rendered) + + +# -------- Policy: range windows -------- +@dataclass +class DayRangeWindow: + value: int + # raw indicates fetch from the source data + source: Literal["raw"] = "raw" + + +@dataclass +class RangeConfig: + """ + Defines the range of baseline and comparison windows for a given policy. + - baseline: the baseline window that build the baseline value + - comparison: the comparison window that we fetch data from to compare against the baseline value + """ + + baseline: DayRangeWindow + comparison: DayRangeWindow + + def total_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value + self.comparison.value) + + def comparison_timedelta(self) -> timedelta: + return timedelta(days=self.comparison.value) + + def baseline_timedelta(self) -> timedelta: + return timedelta(days=self.baseline.value) + + +# -------- Policy: metrics -------- +@dataclass +class RegressionPolicy: + """ + Defines the policy for a given metric. + - new value muset be {x} baseline value: + - "greater_than": higher is better; new value must be strictly greater to baseline + - "less_than": lower is better; new value must be strictly lower to baseline + - "equal_to": new value should be ~= baseline * threshold within rel_tol + - "greater_equal": higher is better; new value must be greater or equal to baseline + - "less_equal": lower is better; new value must be less or equal to baseline + """ + + name: str + condition: Literal[ + "greater_than", "less_than", "equal_to", "greater_equal", "less_equal" + ] + threshold: float + baseline_aggregation: Literal[ + "avg", "max", "min", "p50", "p90", "p95", "latest", "earliest" + ] = "max" + rel_tol: float = 1e-3 # used only for "equal_to" + + def is_violation(self, value: float, baseline: float) -> bool: + target = baseline * self.threshold + + if self.condition == "greater_than": + # value must be strictly greater than target + return value <= target + + if self.condition == "greater_equal": + # value must be greater or equal to target + return value < target + + if self.condition == "less_than": + # value must be strictly less than target + return value >= target + + if self.condition == "less_equal": + # value must be less or equal to target + return value > target + + if self.condition == "equal_to": + # |value - target| should be within rel_tol * max(1, |target|) + denom = max(1.0, abs(target)) + return abs(value - target) > self.rel_tol * denom + + raise ValueError(f"Unknown condition: {self.condition}") + + +@dataclass +class Policy: + frequency: Frequency + range: RangeConfig + metrics: Dict[str, RegressionPolicy] + + # TODO(elainewy): add notification config + notification_config: Optional[Dict[str, Any]] = None + + +# -------- Top-level benchmark regression config -------- +@dataclass +class BenchmarkConfig: + """ + Represents a single benchmark regression configuration. + - BenchmarkConfig defines the benchmark regression config for a given benchmark. + - source: defines the source of the benchmark data we want to query + - policy: defines the policy for the benchmark regressions, including frequency to generate the report, range of the baseline and new values, and regression thresholds for metrics + - name: the name of the benchmark + - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set + """ + + name: str + id: str + source: BenchmarkApiSource + policy: Policy + + +@dataclass +class BenchmarkRegressionConfigBook: + configs: Dict[str, BenchmarkConfig] = field(default_factory=dict) + + def __getitem__(self, key: str) -> BenchmarkConfig: + config = self.configs.get(key, None) + if not config: + raise KeyError(f"Config {key} not found") + return config diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py new file mode 100644 index 0000000000..e13dbf749d --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python +import argparse +from concurrent.futures import ThreadPoolExecutor, as_completed +import json +import logging +import os +import threading +import requests +import datetime as dt +from typing import Any, Optional +import clickhouse_connect +from common.benchmark_time_series_api_model import ( + BenchmarkTimeSeriesApiResponse, +) +from common.config_model import ( + BenchmarkApiSource, + BenchmarkConfig, + Frequency, +) +from common.config import get_benchmark_regression_config +from dateutil.parser import isoparse + +logging.basicConfig( + level=logging.INFO, +) +logger = logging.getLogger() +logger.setLevel("INFO") + +ENVS = { + "GITHUB_ACCESS_TOKEN": os.getenv("GITHUB_ACCESS_TOKEN", ""), + "CLICKHOUSE_ENDPOINT": os.getenv("CLICKHOUSE_ENDPOINT", ""), + "CLICKHOUSE_PASSWORD": os.getenv("CLICKHOUSE_PASSWORD", ""), + "CLICKHOUSE_USERNAME": os.getenv("CLICKHOUSE_USERNAME", ""), +} + +# TODO(elainewy): change this to benchmark.benchmark_regression_report once the table is created +BENCHMARK_REGRESSION_REPORT_TABLE = "fortesting.benchmark_regression_report" +BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS = ["compiler_regression"] + + +def truncate_to_hour(ts: dt.datetime) -> dt.datetime: + return ts.replace(minute=0, second=0, microsecond=0) + + +def get_clickhouse_client( + host: str, user: str, password: str +) -> clickhouse_connect.driver.client.Client: + # for local testing only, disable SSL verification + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True, verify=False + ) + + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True + ) + + +def get_clickhouse_client_environment() -> clickhouse_connect.driver.client.Client: + for name, env_val in ENVS.items(): + if not env_val: + raise ValueError(f"Missing environment variable {name}") + return get_clickhouse_client( + host=ENVS["CLICKHOUSE_ENDPOINT"], + user=ENVS["CLICKHOUSE_USERNAME"], + password=ENVS["CLICKHOUSE_PASSWORD"], + ) + + +BENCHMARK_REGRESSION_SUMMARY_REPORT_TABLE = ( + "fortesting.benchmark_regression_summary_report" +) + + +class BenchmarkSummaryProcessor: + def __init__( + self, + is_dry_run: bool = False, + ) -> None: + self.is_dry_run = is_dry_run + + def process( + self, + config_id: str, + end_time: dt.datetime, + cc: Optional[clickhouse_connect.driver.client.Client] = None, + args: Optional[argparse.Namespace] = None, + ): + def log_info(msg: str): + logger.info("[%s] %s", config_id, msg) + + def log_error(msg: str): + logger.error("[%s] %s", config_id, msg) + + # ensure each thread has its own clickhouse client. clickhouse client + # is not thread-safe. + if cc is None: + tlocal = threading.local() + if not hasattr(tlocal, "cc") or tlocal.cc is None: + if args: + tlocal.cc = get_clickhouse_client( + args.clickhouse_endpoint, + args.clickhouse_username, + args.clickhouse_password, + ) + else: + tlocal.cc = get_clickhouse_client_environment() + cc = tlocal.cc + try: + config = get_benchmark_regression_config(config_id) + log_info(f"found config for config_id {config_id}") + except ValueError as e: + log_error(f"Skip process, Invalid config: {e}") + return + except Exception as e: + log_error(f"Unexpected error from get_benchmark_regression_config: {e}") + return + + # check if the current time is > policy's time_delta + previous record_ts from summary_table + report_freq = config.policy.frequency + should_generate = self._should_generate_report( + cc, end_time, config_id, report_freq + ) + if not should_generate: + log_info( + "Skip generate report for time:{end_time} with frequency {report_freq.get_text()}, no data found", + ) + return + else: + log_info( + "Plan to generate report for time:{end_time} with frequency {report_freq.get_text()}..." + ) + latest, ls, le = self.get_latest(config, end_time) + if not latest: + log_info( + f"no latest data found for time range [{ls},{le}] with frequency {report_freq.get_text()}..." + ) + return + + baseline, bs, be = self.get_basline(config, end_time) + if not baseline: + log_info( + f"no baseline data found for time range [{bs},{be}] with frequency {report_freq.get_text()}..." + ) + return + + def get_latest(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + latest_s = end_time - data_range.comparison_timedelta() + latest_e = end_time + latest_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=latest_s, + end_time=latest_e, + source=config.source, + ) + logger.info( + "[%s] found %s # of data, with time range %s", + config.id, + len(latest_data.time_series), + latest_data.time_range, + ) + if not latest_data.time_range or not latest_data.time_range.end: + return None, latest_s, latest_e + if not self.should_use_data(config.id, latest_data.time_range.end, end_time): + return None, latest_s, latest_e + return latest_data, latest_s, latest_e + + def get_basline(self, config: BenchmarkConfig, end_time: dt.datetime): + data_range = config.policy.range + baseline_s = end_time - data_range.total_timedelta() + baseline_e = end_time - data_range.comparison_timedelta() + # fetch baseline from api + raw_data = self._fetch_from_benchmark_ts_api( + config_id=config.id, + start_time=baseline_s, + end_time=baseline_e, + source=config.source, + ) + + logger.info( + "found %s # of data, with time range %s", + len(raw_data.time_series), + raw_data.time_range, + ) + if not self.should_use_data(config.id, raw_data.time_range.end, baseline_e): + logger.info( + "[%s][get_basline] Skip generate report, no data found during [%s,%s]", + config.id, + baseline_s.isoformat(), + baseline_e.isoformat(), + ) + return None, baseline_s, baseline_e + return raw_data, baseline_s, baseline_e + + def should_use_data( + self, + config_id: str, + latest_ts_str: str, + end_time: dt.datetime, + min_delta: dt.timedelta = dt.timedelta(days=2), + ) -> bool: + if not latest_ts_str: + return False + latest_dt = isoparse(latest_ts_str) + cutoff = end_time - min_delta + + if latest_dt >= cutoff: + return True + logger.info( + "[%s] expect latest data to be after %s, but got %s", + config_id, + cutoff, + latest_dt, + ) + return False + + def _fetch_from_benchmark_ts_api( + self, + config_id: str, + end_time: dt.datetime, + start_time: dt.datetime, + source: BenchmarkApiSource, + ): + str_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S") + str_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") + query = source.render( + ctx={ + "startTime": str_start_time, + "stopTime": str_end_time, + } + ) + url = source.api_query_url + + logger.info("[%s]trying to call %s, with query\n %s", config_id, url, query) + try: + resp: BenchmarkTimeSeriesApiResponse = ( + BenchmarkTimeSeriesApiResponse.from_request(url, query) + ) + return resp.data + except requests.exceptions.HTTPError as e: + logger.error("Server error message: %s", e.response.json().get("error")) + raise + except Exception as e: + raise RuntimeError(f"[{config_id}]Fetch failed: {e}") + + def _should_generate_report( + self, + cc: clickhouse_connect.driver.client.Client, + end_time: dt.datetime, + config_id: str, + f: Frequency, + ) -> bool: + def _get_latest_record_ts( + cc: clickhouse_connect.driver.Client, + config_id: str, + ) -> Optional[dt.datetime]: + table = BENCHMARK_REGRESSION_REPORT_TABLE + res = cc.query( + f""" + SELECT max(last_record_ts) + FROM {table} + WHERE report_id = {{config_id:String}} + """, + parameters={"config_id": config_id}, + ) + if not res.result_rows or res.result_rows[0][0] is None: + return None + latest: dt.datetime = res.result_rows[0][ + 0 + ] # typically tz-aware UTC from clickhouse_connect + # If not tz-aware, force UTC: + if latest.tzinfo is None: + latest = latest.replace(tzinfo=dt.timezone.utc) + return latest + + freq_delta = f.to_timedelta() + latest_record_ts = _get_latest_record_ts(cc, config_id) + + # No report exists yet, generate + if not latest_record_ts: + return True + + end_utc = ( + end_time if end_time.tzinfo else end_time.replace(tzinfo=dt.timezone.utc) + ) + end_utc = end_utc.astimezone(dt.timezone.utc) + cutoff = end_time - freq_delta + return latest_record_ts < cutoff + + +class WorkerPoolHandler: + """ + WorkerPoolHandler runs workers in parallel to generate benchmark regression report + and writes the results to the target destination. + + """ + + def __init__( + self, + benchmark_summary_processor: BenchmarkSummaryProcessor, + max_workers: int = 6, + ): + self.benchmark_summary_processor = benchmark_summary_processor + self.max_workers = max_workers + + def start( + self, + config_ids: list[str], + args: Optional[argparse.Namespace] = None, + ) -> None: + logger.info( + "[WorkerPoolHandler] start to process benchmark " + "summary data with required config: %s", + config_ids, + ) + end_time = dt.datetime.now(dt.timezone.utc).replace( + minute=0, second=0, microsecond=0 + ) + logger.info("current time with hour granularity(utc) %s", end_time) + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + futures = [] + for config_id in config_ids: + future = executor.submit( + self.benchmark_summary_processor.process, + config_id, + end_time, + cc=None, + args=args, + ) + futures.append(future) + results = [] + errors = [] + + # handle results from parallel processing + for future in as_completed(futures): + try: + result = future.result() + # This will raise an exception if one occurred + results.append(result) + except Exception as e: + logger.warning(f"Error processing future: {e}") + errors.append({"error": str(e)}) + + +def main( + args: Optional[argparse.Namespace] = None, + github_access_token: str = "", + is_dry_run: bool = False, +): + """ + Main method to run in both local environment and lambda handler. + 1. generate intervals[start_time,end_time] using latest timestamp from source table and target table + 2. call WorkerPoolHandler to geneterate and write histogram data for each interval in parallel + """ + if not github_access_token: + raise ValueError("Missing environment variable GITHUB_ACCESS_TOKEN") + + # get time intervals. + logger.info("[Main] start work ....") + + # get jobs in queue from clickhouse for list of time intervals, in parallel + handler = WorkerPoolHandler( + BenchmarkSummaryProcessor(is_dry_run=is_dry_run), + ) + handler.start(BENCHMARK_REGRESSION_TRACKING_CONFIG_IDS, args) + logger.info(" [Main] Done. work completed.") + + +def lambda_handler(event: Any, context: Any) -> None: + """ + Main method to run in aws lambda environment + """ + main( + None, + github_access_token=ENVS["GITHUB_ACCESS_TOKEN"], + ) + return + + +def parse_args() -> argparse.Namespace: + """ + Parse command line args, this is mainly used for local test environment. + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--clickhouse-endpoint", + default=ENVS["CLICKHOUSE_ENDPOINT"], + type=str, + help="the clickhouse endpoint, the clickhouse_endpoint " + + "name is https://{clickhouse_endpoint}:{port} for full url ", + ) + parser.add_argument( + "--clickhouse-username", + type=str, + default=ENVS["CLICKHOUSE_USERNAME"], + help="the clickhouse username", + ) + parser.add_argument( + "--clickhouse-password", + type=str, + default=ENVS["CLICKHOUSE_PASSWORD"], + help="the clickhouse password for the user name", + ) + parser.add_argument( + "--github-access-token", + type=str, + default=ENVS["GITHUB_ACCESS_TOKEN"], + help="the github access token to access github api", + ) + parser.add_argument( + "--not-dry-run", + action="store_true", + help="when set, writing results to destination from local " + + "environment. By default, we run in dry-run mode for local " + + "environment", + ) + args, _ = parser.parse_known_args() + return args + + +def local_run() -> None: + """ + method to run in local test environment + """ + + args = parse_args() + + logger.info("args: %s", args) + + # update environment variables for input parameters + + # always run in dry-run mode in local environment, unless it's disabled. + is_dry_run = not args.not_dry_run + + main( + args, + args.github_access_token, + is_dry_run=is_dry_run, + ) + + +if __name__ == "__main__": + local_run() diff --git a/aws/lambda/benchmark_regression_summary_report/requirements.txt b/aws/lambda/benchmark_regression_summary_report/requirements.txt new file mode 100644 index 0000000000..87c33c2e7f --- /dev/null +++ b/aws/lambda/benchmark_regression_summary_report/requirements.txt @@ -0,0 +1,5 @@ +clickhouse_connect==0.8.5 +boto3==1.35.33 +PyGithub==1.59.0 +python-dateutil==2.8.2 +PyYAML==6.0.1 diff --git a/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql new file mode 100644 index 0000000000..df85548710 --- /dev/null +++ b/clickhouse_db_schema/benchmark_regression_summary_report/schema.sql @@ -0,0 +1,28 @@ +CREATE TABLE benchmark.benchmark_regression_report +( + `id` UUID DEFAULT generateUUIDv4(), + `report_id` String, -- unique id for the report config + `created_at` DateTime64(0, 'UTC') DEFAULT now(), + `last_record_ts` DateTime64(0, 'UTC'), + `last_record_commit` String, + `type` String, -- e.g. 'daily','weekly' + `status` String, -- e.g. 'no_regression',"regression",'failure' + `regression_count` UInt32 DEFAULT 0, + `insufficient_data_count` UInt32 DEFAULT 0, + `total_count` UInt32 DEFAULT 0, + `report` String DEFAULT '{}' +) +ENGINE = SharedMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}') +PARTITION BY toYYYYMM(report_date) +ORDER BY +( + report_id, + type, + status, + last_record_ts, + last_record_commit, + created_at, + id +) +TTL created_at + toIntervalYear(10) +SETTINGS index_granularity = 8192; From 5baf0fde8dc35f175de311d83b91bfec0217bf60 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 3 Sep 2025 15:47:55 -0700 Subject: [PATCH 2/2] fix bug2 --- .../common/benchmark_time_series_api_model.py | 4 ++- .../common/config.py | 7 ++++-- .../common/config_model.py | 14 +++++++---- .../lambda_function.py | 25 +++++++++---------- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py index 552b8cefbd..fe7705a6ea 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/benchmark_time_series_api_model.py @@ -1,7 +1,9 @@ from dataclasses import dataclass, field -from typing import Optional, List, Dict, Any +from typing import Any, Dict, List + import requests + # The data class to provide api response model from get_time_series api diff --git a/aws/lambda/benchmark_regression_summary_report/common/config.py b/aws/lambda/benchmark_regression_summary_report/common/config.py index a68dc7355a..ef0586758f 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/config.py +++ b/aws/lambda/benchmark_regression_summary_report/common/config.py @@ -4,13 +4,16 @@ BenchmarkRegressionConfigBook, DayRangeWindow, Frequency, - RegressionPolicy, Policy, RangeConfig, + RegressionPolicy, ) + # Compiler benchmark regression config -# todo(elainewy): eventually each team should configure their own benchmark regression config, currenlty place here for lambda +# todo(elainewy): eventually each team should configure +# their own benchmark regression config, currenlty place +# here for lambda COMPILER_BENCHMARK_CONFIG = BenchmarkConfig( diff --git a/aws/lambda/benchmark_regression_summary_report/common/config_model.py b/aws/lambda/benchmark_regression_summary_report/common/config_model.py index f452e84da2..7779f17f2d 100644 --- a/aws/lambda/benchmark_regression_summary_report/common/config_model.py +++ b/aws/lambda/benchmark_regression_summary_report/common/config_model.py @@ -1,9 +1,11 @@ from __future__ import annotations + +import json from dataclasses import dataclass, field -from typing import Any, Dict, Literal, Optional from datetime import timedelta -from jinja2 import Environment, Template, meta -import json +from typing import Any, Dict, Literal, Optional + +from jinja2 import Environment, meta, Template # -------- Frequency -------- @@ -168,7 +170,9 @@ class BenchmarkConfig: Represents a single benchmark regression configuration. - BenchmarkConfig defines the benchmark regression config for a given benchmark. - source: defines the source of the benchmark data we want to query - - policy: defines the policy for the benchmark regressions, including frequency to generate the report, range of the baseline and new values, and regression thresholds for metrics + - policy: defines the policy for the benchmark regressions, including frequency to + generate the report, range of the baseline and new values, and regression thresholds + for metrics - name: the name of the benchmark - id: the id of the benchmark, this must be unique for each benchmark, and cannot be changed once set """ @@ -184,7 +188,7 @@ class BenchmarkRegressionConfigBook: configs: Dict[str, BenchmarkConfig] = field(default_factory=dict) def __getitem__(self, key: str) -> BenchmarkConfig: - config = self.configs.get(key, None) + config = self.configs.get(key) if not config: raise KeyError(f"Config {key} not found") return config diff --git a/aws/lambda/benchmark_regression_summary_report/lambda_function.py b/aws/lambda/benchmark_regression_summary_report/lambda_function.py index e13dbf749d..9ae8789328 100644 --- a/aws/lambda/benchmark_regression_summary_report/lambda_function.py +++ b/aws/lambda/benchmark_regression_summary_report/lambda_function.py @@ -1,25 +1,20 @@ #!/usr/bin/env python import argparse -from concurrent.futures import ThreadPoolExecutor, as_completed -import json +import datetime as dt import logging import os import threading -import requests -import datetime as dt +from concurrent.futures import as_completed, ThreadPoolExecutor from typing import Any, Optional + import clickhouse_connect -from common.benchmark_time_series_api_model import ( - BenchmarkTimeSeriesApiResponse, -) -from common.config_model import ( - BenchmarkApiSource, - BenchmarkConfig, - Frequency, -) +import requests +from common.benchmark_time_series_api_model import BenchmarkTimeSeriesApiResponse from common.config import get_benchmark_regression_config +from common.config_model import BenchmarkApiSource, BenchmarkConfig, Frequency from dateutil.parser import isoparse + logging.basicConfig( level=logging.INFO, ) @@ -197,8 +192,12 @@ def should_use_data( config_id: str, latest_ts_str: str, end_time: dt.datetime, - min_delta: dt.timedelta = dt.timedelta(days=2), + min_delta: Optional[dt.timedelta] = None, ) -> bool: + # set default + if not min_delta: + min_delta = dt.timedelta(days=2) + if not latest_ts_str: return False latest_dt = isoparse(latest_ts_str)