diff --git a/README.md b/README.md index 1eabd4a3..34b907cc 100644 --- a/README.md +++ b/README.md @@ -1130,7 +1130,8 @@ from metamon.backend.team_prediction.usage_stats import get_usage_stats from datetime import date usage_stats = get_usage_stats("gen1ou", start_date=date(2017, 12, 1), - end_date=date(2018, 3, 30) + end_date=date(2018, 3, 30), + rank=1500, # falls back to nearest lower (glicko) rank where data is available ) alakazam_info: dict = usage_stats["Alakazam"] # non alphanum chars and case are flexible ``` diff --git a/metamon/backend/team_prediction/predictor.py b/metamon/backend/team_prediction/predictor.py index fa0fc61b..482b9c34 100644 --- a/metamon/backend/team_prediction/predictor.py +++ b/metamon/backend/team_prediction/predictor.py @@ -17,14 +17,20 @@ ) from metamon.backend.team_prediction.usage_stats import ( PreloadedSmogonUsageStats, + DEFAULT_USAGE_RANK, ) from metamon.backend.replay_parser.str_parsing import pokemon_name from metamon.backend.team_prediction.team import TeamSet, PokemonSet, Roster class TeamPredictor(ABC): - def __init__(self, replay_stats_dir: Optional[str] = None): + def __init__( + self, + replay_stats_dir: Optional[str] = None, + usage_stats_rank: int = DEFAULT_USAGE_RANK, + ): self.replay_stats_dir = replay_stats_dir + self.usage_stats_rank = usage_stats_rank def bin_usage_stats_dates( self, date: datetime.date @@ -50,6 +56,7 @@ def get_legacy_team_builder(self, format: str, date: datetime.date) -> TeamBuild format=format, start_date=start_date, end_date=end_date, + rank=self.usage_stats_rank, ) def get_usage_stats( @@ -209,9 +216,13 @@ def __init__( top_k_scored_teams: int = 10, top_k_scored_movesets: int = 3, replay_stats_dir: Optional[str] = None, + usage_stats_rank: int = DEFAULT_USAGE_RANK, ): assert not isinstance(top_k_consistent_teams, str) - super().__init__(replay_stats_dir) + super().__init__( + replay_stats_dir, + usage_stats_rank=usage_stats_rank, + ) self.stat_format = None self.top_k_consistent_teams = top_k_consistent_teams self.top_k_consistent_movesets = top_k_consistent_movesets diff --git a/metamon/backend/team_prediction/usage_stats/__init__.py b/metamon/backend/team_prediction/usage_stats/__init__.py index 3eeaf626..d82b2b7e 100644 --- a/metamon/backend/team_prediction/usage_stats/__init__.py +++ b/metamon/backend/team_prediction/usage_stats/__init__.py @@ -1,2 +1,7 @@ -from .stat_reader import get_usage_stats, PreloadedSmogonUsageStats +from .stat_reader import ( + get_usage_stats, + PreloadedSmogonUsageStats, + DEFAULT_USAGE_RANK, + list_available_usage_ranks, +) from .legacy_team_builder import TeamBuilder, PokemonStatsLookupError diff --git a/metamon/backend/team_prediction/usage_stats/create_usage_jsons.py b/metamon/backend/team_prediction/usage_stats/create_usage_jsons.py index de1bf1de..2d22c162 100644 --- a/metamon/backend/team_prediction/usage_stats/create_usage_jsons.py +++ b/metamon/backend/team_prediction/usage_stats/create_usage_jsons.py @@ -1,6 +1,7 @@ import os import json import argparse +from collections import defaultdict from tqdm import tqdm from metamon.backend.team_prediction.usage_stats.format_rules import ( @@ -20,54 +21,69 @@ def main(args): for gen in range(1, 10): for year in range(2014, 2026): for month in range(1, 13): + date = f"{year}-{month:02d}" stat_dir = os.path.join(args.smogon_stat_dir) - valid_movesets = [] + valid_movesets_by_rank = defaultdict(list) for format in VALID_TIERS: format_name = f"gen{gen}{format.name.lower()}" - stat = SmogonStat( + + ranks = SmogonStat.available_ranks( format_name, raw_stats_dir=stat_dir, - date=f"{year}-{month:02d}", + date=date, ) - if stat.movesets: - # if we find data for this, save it + + for rank in ranks: + stat = SmogonStat( + format_name, + raw_stats_dir=stat_dir, + date=date, + rank=rank, + verbose=False, + ) + if not stat.movesets: + continue + path = os.path.join( args.save_dir, "movesets_data", f"gen{gen}", f"{format.name.lower()}", - f"{year}-{month:02d}.json", + rank, + f"{date}.json", ) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: json.dump(stat.movesets, f) - valid_movesets.append(stat.movesets) - check_cheatsheet = {} - for mon in stat.movesets.keys(): - checks = stat.movesets[mon]["checks"] - check_cheatsheet[mon] = checks + check_cheatsheet = { + mon: stat.movesets[mon]["checks"] + for mon in stat.movesets.keys() + } path = os.path.join( args.save_dir, "checks_data", f"gen{gen}", f"{format.name.lower()}", - f"{year}-{month:02d}.json", + rank, + f"{date}.json", ) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: json.dump(check_cheatsheet, f) + + valid_movesets_by_rank[rank].append(stat.movesets) pbar.update(1) - if valid_movesets: - # merge all the tiers. used to lookup rare Pokémon choices, i.e. fooling around - # with low-tier Pokémon in OverUsed - inclusive_movesets = merge_movesets(valid_movesets) + + for rank, tier_movesets in valid_movesets_by_rank.items(): + inclusive_movesets = merge_movesets(tier_movesets) path = os.path.join( args.save_dir, "movesets_data", f"gen{gen}", "all_tiers", - f"{year}-{month:02d}.json", + rank, + f"{date}.json", ) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: diff --git a/metamon/backend/team_prediction/usage_stats/legacy_team_builder.py b/metamon/backend/team_prediction/usage_stats/legacy_team_builder.py index 00310671..0ec1a618 100644 --- a/metamon/backend/team_prediction/usage_stats/legacy_team_builder.py +++ b/metamon/backend/team_prediction/usage_stats/legacy_team_builder.py @@ -7,7 +7,10 @@ import numpy as np import metamon -from metamon.backend.team_prediction.usage_stats import get_usage_stats +from metamon.backend.team_prediction.usage_stats import ( + get_usage_stats, + DEFAULT_USAGE_RANK, +) from metamon.backend.team_prediction.usage_stats.constants import ( HIDDEN_POWER_IVS, HIDDEN_POWER_DVS, @@ -43,12 +46,18 @@ def __init__( format: str, start_date: datetime.date, end_date: datetime.date, + rank: int = DEFAULT_USAGE_RANK, verbose: bool = False, remove_banned: bool = False, ): self.format = format self.gen = metamon.backend.format_to_gen(format) - self.stat = get_usage_stats(format, start_date, end_date) + self.stat = get_usage_stats( + format, + start_date, + end_date, + rank=rank, + ) if remove_banned: self.stat.remove_banned_pm() self.verbose = verbose diff --git a/metamon/backend/team_prediction/usage_stats/stat_reader.py b/metamon/backend/team_prediction/usage_stats/stat_reader.py index e6fc86a9..e1fd6c5f 100644 --- a/metamon/backend/team_prediction/usage_stats/stat_reader.py +++ b/metamon/backend/team_prediction/usage_stats/stat_reader.py @@ -1,11 +1,10 @@ import os -import copy import re import json import datetime import functools import warnings -from typing import Optional +from typing import Optional, List from termcolor import colored import metamon @@ -29,6 +28,59 @@ EARLIEST_USAGE_STATS_DATE = datetime.date(2014, 1, 1) LATEST_USAGE_STATS_DATE = datetime.date(2025, 12, 1) +DEFAULT_USAGE_RANK = 1500 + + +def rank_from_moveset_filename(fmt: str, filename: str) -> Optional[int]: + """ + Extract the baseline/rank from a Smogon moveset filename. + Examples: gen1ou-0.txt, gen1ou-1500.txt, gen1ou-1630.txt, gen1ou-1760.txt + Returns rank as int or None if not applicable. + """ + if not filename.startswith(fmt): + return None + if filename.endswith(".txt.gz") or filename.endswith(".gz"): + return None + if not filename.endswith(".txt"): + return None + + stem = filename[:-4] + if stem == fmt: + # Smogon convention: no explicit baseline means 1500. + return 1500 + + m = re.match(rf"^{re.escape(fmt)}-(\d+(?:\.\d+)?)$", stem) + if not m: + return None + return int(float(m.group(1))) + + +def list_available_ranks_in_moveset_dir(moveset_dir: str, fmt: str) -> List[int]: + if not os.path.isdir(moveset_dir): + return [] + ranks = set() + for fn in os.listdir(moveset_dir): + r = rank_from_moveset_filename(fmt, fn) + if r is not None: + ranks.add(r) + return sorted(ranks) + + +def list_available_usage_ranks(format: str) -> List[int]: + """ + List available baseline/rank subdirectories in the processed usage-stats dataset + for a given format (e.g., gen4ou). + """ + gen, tier = int(format[3]), format[4:] + usage_stats_path = metamon.data.download.download_usage_stats(gen) + base = os.path.join(usage_stats_path, "movesets_data", f"gen{gen}", f"{tier}") + if not os.path.isdir(base): + return [] + ranks = [] + for d in os.listdir(base): + if os.path.isdir(os.path.join(base, d)) and re.fullmatch(r"\d+", d): + ranks.append(int(d)) + return sorted(ranks) def parse_pokemon_moveset(file_path): @@ -255,7 +307,7 @@ def __init__( format: str, raw_stats_dir: str, date=None, - rank=None, + rank: Optional[int] = None, verbose: bool = True, ) -> None: if date and type(date) == str: @@ -273,11 +325,21 @@ def __init__( self._movesets = {} self._inclusive = {} self._usage = None + self._available_ranks: List[int] = [] self._load() self._name_conversion = { pokemon_name(pokemon): pokemon for pokemon in self._movesets.keys() } + @staticmethod + def available_ranks(format: str, raw_stats_dir: str, date: str) -> List[int]: + moveset_dir = os.path.join(raw_stats_dir, date, "moveset") + return list_available_ranks_in_moveset_dir(moveset_dir, format) + + @property + def available_ranks_loaded(self) -> List[int]: + return list(self._available_ranks) + def _load(self): moveset_paths = [] for data_path in self.data_paths: @@ -286,21 +348,49 @@ def _load(self): moveset_paths.append(moveset_path) if len(moveset_paths) == 0: - print(f"No moveset data found for {self.format} in {self.data_paths}") + if self.verbose: + print(f"No moveset data found for {self.format} in {self.data_paths}") self._movesets = {} + self._available_ranks = [] return _movesets = [] + ranks_seen = set() for moveset_path in moveset_paths: - format_data = [ - x for x in os.listdir(moveset_path) if x.startswith(self.format + "-") - ] - if self.rank is not None: - format_data = [x for x in format_data if self.rank in x] - _movesets += [ - parse_pokemon_moveset(os.path.join(moveset_path, x)) - for x in format_data - ] + files_by_rank = {} + for fn in os.listdir(moveset_path): + r = rank_from_moveset_filename(self.format, fn) + if r is None: + continue + files_by_rank.setdefault(r, []).append(fn) + + ranks_seen.update(files_by_rank.keys()) + + if not files_by_rank: + continue + + if self.rank is None: + available = sorted(files_by_rank.keys()) + raise ValueError( + f"SmogonStat requires a baseline/rank for {self.format}. " + f"Available ranks in {moveset_path}: {available}" + ) + filenames = files_by_rank.get(self.rank, []) + + for fn in filenames: + fp = os.path.join(moveset_path, fn) + try: + _movesets.append(parse_pokemon_moveset(fp)) + except Exception as e: + if self.verbose: + warnings.warn(colored(f"Failed parsing {fp}: {e}", "red")) + + self._available_ranks = sorted(ranks_seen) + + if not _movesets: + self._movesets = {} + return + self._movesets = { pokemon_name(k): v for k, v in merge_movesets(_movesets).items() } @@ -360,7 +450,12 @@ def usage(self): def load_between_dates( - dir_path: str, start_year: int, start_month: int, end_year: int, end_month: int + dir_path: str, + start_year: int, + start_month: int, + end_year: int, + end_month: int, + warn_if_empty: bool = True, ) -> dict: start_date = datetime.date(start_year, start_month, 1) end_date = datetime.date(end_year, end_month, 1) @@ -377,6 +472,8 @@ def load_between_dates( selected_data = [] for json_file in os.listdir(dir_path): + if not json_file.endswith(".json"): + continue year, month = json_file.replace(".json", "").split("-") date = datetime.date(year=int(year), month=int(month), day=1) if not start_date <= date <= end_date: @@ -384,8 +481,7 @@ def load_between_dates( with open(os.path.join(dir_path, json_file), "r") as file: data = json.load(file) selected_data.append(data) - if not selected_data: - breakpoint() + if not selected_data and warn_if_empty: warnings.warn( colored( f"No Showdown usage stats found in {dir_path} between {start_date} and {end_date}", @@ -401,10 +497,13 @@ def __init__( format, start_date: datetime.date, end_date: datetime.date, + rank: int = DEFAULT_USAGE_RANK, + load_nearest_lower_rank: bool = True, + search_lower_ranks_on_miss: bool = True, verbose: bool = True, ): self.format = format.strip().lower() - self.rank = None + self.rank = int(rank) self.start_date = start_date self.end_date = end_date self.verbose = verbose @@ -412,12 +511,62 @@ def __init__( gen, tier = int(self.format[3]), self.format[4:] self.gen = gen usage_stats_path = metamon.data.download.download_usage_stats(gen) - movesets_path = os.path.join( + movesets_base = os.path.join( usage_stats_path, "movesets_data", f"gen{gen}", f"{tier}" ) - inclusive_path = os.path.join( + inclusive_base = os.path.join( usage_stats_path, "movesets_data", f"gen{gen}", "all_tiers" ) + movesets_path = os.path.join(movesets_base, str(self.rank)) + inclusive_path = os.path.join(inclusive_base, str(self.rank)) + + def _avail_ranks(base: str) -> list[int]: + if not os.path.isdir(base): + return [] + ranks = [] + for d in os.listdir(base): + if os.path.isdir(os.path.join(base, d)) and re.fullmatch(r"\d+", d): + ranks.append(int(d)) + return sorted(ranks) + + def _nearest_lower_rank(target: int, candidates: list[int]) -> Optional[int]: + lower = [r for r in candidates if r < target] + return max(lower) if lower else None + + if not os.path.isdir(movesets_path): + avail = _avail_ranks(movesets_base) + fallback_rank = ( + _nearest_lower_rank(self.rank, avail) + if load_nearest_lower_rank + else None + ) + if fallback_rank is not None: + if self.verbose: + warnings.warn( + colored( + f"Requested rank={self.rank} not found for {self.format}. " + f"Falling back to nearest rank={fallback_rank}.", + "yellow", + ) + ) + self.rank = fallback_rank + movesets_path = os.path.join(movesets_base, str(self.rank)) + inclusive_path = os.path.join(inclusive_base, str(self.rank)) + else: + raise FileNotFoundError( + f"Movesets data not found for {self.format} at rank={self.rank}. " + f"Available ranks: {avail}. " + f"Run `python -m metamon download usage-stats` to get the latest data." + ) + + if not os.path.isdir(inclusive_path): + avail = _avail_ranks(inclusive_base) + raise FileNotFoundError( + f"All-tiers movesets not found for gen{gen} at rank={self.rank}. " + f"Available ranks: {avail}. " + f"Run `python -m metamon download usage-stats` to get the latest data." + ) + # data is split by year and month if not os.path.exists(movesets_path) or not os.path.exists(inclusive_path): raise FileNotFoundError( @@ -430,6 +579,11 @@ def __init__( end_year=end_date.year, end_month=end_date.month, ) + if not self._movesets: + raise FileNotFoundError( + f"No usage stats found for {self.format} at rank={self.rank} " + f"between {start_date} and {end_date} in {movesets_path}." + ) self._inclusive = load_between_dates( inclusive_path, start_year=EARLIEST_USAGE_STATS_DATE.year, @@ -437,6 +591,38 @@ def __init__( end_year=LATEST_USAGE_STATS_DATE.year, end_month=LATEST_USAGE_STATS_DATE.month, ) + self._lower_rank_fallbacks: list[tuple[int, dict, dict]] = [] + if search_lower_ranks_on_miss: + avail = _avail_ranks(movesets_base) + lower_ranks = [r for r in avail if r < self.rank] + lower_ranks.sort(reverse=True) + for r in lower_ranks: + lower_movesets_path = os.path.join(movesets_base, str(r)) + lower_inclusive_path = os.path.join(inclusive_base, str(r)) + if not os.path.isdir(lower_movesets_path) or not os.path.isdir( + lower_inclusive_path + ): + continue + lower_movesets = load_between_dates( + lower_movesets_path, + start_year=start_date.year, + start_month=start_date.month, + end_year=end_date.year, + end_month=end_date.month, + warn_if_empty=False, + ) + lower_inclusive = load_between_dates( + lower_inclusive_path, + start_year=EARLIEST_USAGE_STATS_DATE.year, + start_month=EARLIEST_USAGE_STATS_DATE.month, + end_year=LATEST_USAGE_STATS_DATE.year, + end_month=LATEST_USAGE_STATS_DATE.month, + warn_if_empty=False, + ) + if lower_movesets or lower_inclusive: + self._lower_rank_fallbacks.append( + (r, lower_movesets, lower_inclusive) + ) def _load(self): pass @@ -446,18 +632,38 @@ def _inclusive_search(self, key): key_id = pokemon_name(key) recent = self._movesets.get(key_id, {}) alltime = self._inclusive.get(key_id, {}) - if not (recent or alltime): + if not (recent or alltime or self._lower_rank_fallbacks): return None - if recent and alltime: - # use the alltime stats to selectively get keys that exist - # in recent but are unhelpful for team prediction. - no_info = {"Nothing": 1.0} - for key, value in recent.items(): + no_info = {"Nothing": 1.0} + + def _apply_field_fallback(primary: dict, fallback: dict) -> dict: + if not fallback: + return primary + if not primary: + return fallback + for field, value in fallback.items(): if value == no_info: - if alltime.get(key, {}) != no_info: - recent[key] = alltime[key] - return recent if recent else alltime + continue + if field not in primary or primary.get(field) == no_info: + primary[field] = value + return primary + + # Start with tier stats for the requested rank; do not use all_tiers yet. + primary = recent if recent else {} + + # First, walk downward through lower-rank tier stats. + for _, lower_recent, _ in self._lower_rank_fallbacks: + primary = _apply_field_fallback(primary, lower_recent.get(key_id, {})) + + # If still missing, fall back to all_tiers for the requested rank. + primary = _apply_field_fallback(primary, alltime) + + # Finally, use lower-rank all_tiers as a last resort. + for _, _, lower_alltime in self._lower_rank_fallbacks: + primary = _apply_field_fallback(primary, lower_alltime.get(key_id, {})) + + return primary if primary else None def __getitem__(self, key): entry = Dex.from_gen(self.gen).get_pokedex_entry(key) @@ -475,6 +681,9 @@ def get_usage_stats( format, start_date: Optional[datetime.date] = None, end_date: Optional[datetime.date] = None, + rank: int = DEFAULT_USAGE_RANK, + load_nearest_lower_rank: bool = True, + search_lower_ranks_on_miss: bool = True, ) -> PreloadedSmogonUsageStats: if start_date is None or start_date < EARLIEST_USAGE_STATS_DATE: start_date = EARLIEST_USAGE_STATS_DATE @@ -486,20 +695,47 @@ def get_usage_stats( else: # force to start of months to prevent cache miss (we only have monthly stats anyway) end_date = datetime.date(end_date.year, end_date.month, 1) - return _cached_smogon_stats(format, start_date, end_date) + return _cached_smogon_stats( + format, + start_date, + end_date, + int(rank), + load_nearest_lower_rank, + search_lower_ranks_on_miss, + ) @functools.lru_cache(maxsize=64) -def _cached_smogon_stats(format, start_date: datetime.date, end_date: datetime.date): - print(f"Loading usage stats for {format} between {start_date} and {end_date}") - return PreloadedSmogonUsageStats( - format=format, start_date=start_date, end_date=end_date, verbose=False +def _cached_smogon_stats( + format, + start_date: datetime.date, + end_date: datetime.date, + rank: int, + load_nearest_lower_rank: bool, + search_lower_ranks_on_miss: bool, +): + print( + f"Loading usage stats for {format} between {start_date} and {end_date} (rank={rank})" + ) + stats = PreloadedSmogonUsageStats( + format=format, + start_date=start_date, + end_date=end_date, + rank=rank, + load_nearest_lower_rank=load_nearest_lower_rank, + search_lower_ranks_on_miss=search_lower_ranks_on_miss, + verbose=False, ) + if stats.rank != rank: + print(f" -> Fell back to rank={stats.rank}") + return stats if __name__ == "__main__": stats = get_usage_stats( - "gen9ou", datetime.date(2023, 1, 1), datetime.date(2025, 6, 1) + "gen9ou", + datetime.date(2023, 1, 1), + datetime.date(2025, 6, 1), ) print(len(stats.usage)) for mon in sorted( diff --git a/metamon/backend/team_prediction/usage_stats/stat_scraper.py b/metamon/backend/team_prediction/usage_stats/stat_scraper.py index 64a5f89a..b09f1467 100644 --- a/metamon/backend/team_prediction/usage_stats/stat_scraper.py +++ b/metamon/backend/team_prediction/usage_stats/stat_scraper.py @@ -1,126 +1,217 @@ -import os -import argparse -import asyncio -import aiohttp -import aiofiles -from bs4 import BeautifulSoup -from urllib.parse import urljoin - -base_url = "https://www.smogon.com/stats/" - -parser = argparse.ArgumentParser( - description="Gathers tier usage statistics from Smogon by month across a range of years" -) -parser.add_argument( - "--start_date", - type=int, - default=2015, - help="Start date for scraping (YYYY) (inclusive)", -) -parser.add_argument( - "--end_date", - type=int, - default=2024, - help="End year for scraping (YYYY) (exclusive)", -) -parser.add_argument( - "--save_dir", - type=str, - default="./stats", - help="Local directory to save the scraped files", -) -args = parser.parse_args() - - -def ensure_dir(file_path): - if not os.path.exists(file_path): - os.makedirs(file_path) - - -async def save_text_file(session, url, local_path): - # Check if the file already exists - if os.path.isfile(local_path): - print(f"File already exists: {local_path}") - return - async with session.get(url) as response: - if response.status == 200: - text = await response.text() - async with aiofiles.open(local_path, "w", encoding="utf-8") as file: - await file.write(text) - - -async def scrape_base(session, url, local_dir, start_date, end_date): - async with session.get(url) as response: - text = await response.text() - soup = BeautifulSoup(text, "html.parser") - - tasks = [] - for link in soup.find_all("a"): - href = link.get("href") - if href and not href.startswith("?") and href != "../": - href_date = int(href[:4]) - href_full = urljoin(url, href) - local_path = os.path.join(local_dir, href) - - if ( - href.endswith("/") - and href_date >= start_date - and href_date < end_date - ): # It's a directory - ensure_dir(local_path) - task = asyncio.create_task(scrape(session, href_full, local_path)) - tasks.append(task) - - await asyncio.gather(*tasks) - - -async def scrape(session, url, local_dir): - try: - async with session.get(url) as response: - text = await response.text() - soup = BeautifulSoup(text, "html.parser") - - tasks = [] - for link in soup.find_all("a"): - href = link.get("href") - if "chaos" in href or "monotype" in href or "metagame" in href: - continue - if href and not href.startswith("?"): - href_full = urljoin(url, href) - local_path = os.path.join(local_dir, href) - - if href.endswith("/") and href != "../": # It's a directory - ensure_dir(local_path) - task = asyncio.create_task( - scrape(session, href_full, local_path) - ) - tasks.append(task) - elif href.endswith(".txt") or href.endswith( - ".json" - ): # It's a txt file - print(f"Downloading {href_full} to {local_path}") - task = asyncio.create_task( - save_text_file(session, href_full, local_path) - ) - tasks.append(task) - - await asyncio.gather(*tasks) - except Exception as e: - print(f"Error on url {url}: {e}") - - -ensure_dir(args.save_dir) - - -async def main(): - async with aiohttp.ClientSession() as session: - await scrape_base( - session, - base_url, - args.save_dir, - start_date=args.start_date, - end_date=args.end_date, - ) - - -asyncio.run(main()) +import os +import re +import argparse +import asyncio +import aiohttp +import aiofiles +from bs4 import BeautifulSoup +from urllib.parse import urljoin + +base_url = "https://www.smogon.com/stats/" + +parser = argparse.ArgumentParser( + description="Gathers tier usage statistics from Smogon by month across a range of years" +) +parser.add_argument( + "--start_date", + type=int, + default=2015, + help="Start date for scraping (YYYY) (inclusive)", +) +parser.add_argument( + "--end_date", + type=int, + default=2024, + help="End year for scraping (YYYY) (exclusive)", +) +parser.add_argument( + "--save_dir", + type=str, + default="./stats", + help="Local directory to save the scraped files", +) +parser.add_argument( + "--baselines", + type=str, + default="", + help=( + "Comma-separated baselines to keep (e.g. '0,1500,1695,1825'). " + "Empty = keep all." + ), +) +parser.add_argument( + "--min_baseline", + type=float, + default=None, + help="If set, only download files with baseline >= this value.", +) +parser.add_argument( + "--include_chaos", + action="store_true", + help="Download chaos/ JSON files (includes info.cutoff metadata).", +) +parser.add_argument( + "--max_concurrency", + type=int, + default=8, + help="Maximum number of concurrent HTTP requests.", +) +parser.add_argument( + "--max_retries", + type=int, + default=5, + help="Maximum number of retries for a failed request.", +) +parser.add_argument( + "--backoff_base", + type=float, + default=0.5, + help="Base seconds for exponential backoff (retry delay = base * 2^attempt).", +) +args = parser.parse_args() + + +BASELINE_RE = re.compile(r"-(\d+(?:\.\d+)?)\.(txt|json)$") + +SKIP_DIRS = {"monotype", "metagame", "leads"} +if not args.include_chaos: + SKIP_DIRS.add("chaos") + +allowed_baselines = None +if args.baselines.strip(): + allowed_baselines = { + float(x.strip()) for x in args.baselines.split(",") if x.strip() + } + + +def extract_baseline(href: str): + m = BASELINE_RE.search(href) + if m: + return float(m.group(1)) + if href.endswith(".txt") or href.endswith(".json"): + # Smogon convention: no explicit baseline means 1500. + return 1500.0 + return None + + +def ensure_dir(file_path): + if not os.path.exists(file_path): + os.makedirs(file_path) + + +async def save_text_file(session, url, local_path): + # Check if the file already exists + if os.path.isfile(local_path): + print(f"File already exists: {local_path}") + return + async with session.get(url) as response: + if response.status == 200: + text = await response.text() + async with aiofiles.open(local_path, "w", encoding="utf-8") as file: + await file.write(text) + + +async def _fetch_with_retries(session, url): + for attempt in range(args.max_retries + 1): + try: + async with session.get(url) as response: + if response.status != 200: + raise RuntimeError(f"HTTP {response.status}") + return await response.text() + except Exception: + if attempt >= args.max_retries: + raise + await asyncio.sleep(args.backoff_base * (2**attempt)) + + +async def scrape_base(session, url, local_dir, start_date, end_date, sem): + async with sem: + text = await _fetch_with_retries(session, url) + soup = BeautifulSoup(text, "html.parser") + tasks = [] + for link in soup.find_all("a"): + href = link.get("href") + if href and not href.startswith("?") and href != "../": + href_date = int(href[:4]) + href_full = urljoin(url, href) + local_path = os.path.join(local_dir, href) + + if ( + href.endswith("/") and href_date >= start_date and href_date < end_date + ): # It's a directory + ensure_dir(local_path) + task = asyncio.create_task(scrape(session, href_full, local_path, sem)) + tasks.append(task) + + await asyncio.gather(*tasks) + + +async def scrape(session, url, local_dir, sem): + try: + async with sem: + text = await _fetch_with_retries(session, url) + soup = BeautifulSoup(text, "html.parser") + + tasks = [] + for link in soup.find_all("a"): + href = link.get("href") + if not href or href.startswith("?"): + continue + if href.endswith("/"): + if href == "../": + continue + dirname = href.rstrip("/") + if dirname in SKIP_DIRS: + continue + href_full = urljoin(url, href) + local_path = os.path.join(local_dir, href) + ensure_dir(local_path) + task = asyncio.create_task(scrape(session, href_full, local_path, sem)) + tasks.append(task) + continue + + if href.endswith(".txt") or href.endswith(".json"): + baseline = extract_baseline(href) + if ( + allowed_baselines is not None + and baseline is not None + and baseline not in allowed_baselines + ): + continue + if ( + args.min_baseline is not None + and baseline is not None + and baseline < args.min_baseline + ): + continue + href_full = urljoin(url, href) + local_path = os.path.join(local_dir, href) + print(f"Downloading {href_full} to {local_path}") + task = asyncio.create_task( + save_text_file(session, href_full, local_path) + ) + tasks.append(task) + + await asyncio.gather(*tasks) + except Exception as e: + print(f"Error on url {url}: {e}") + + +ensure_dir(args.save_dir) + + +async def main(): + connector = aiohttp.TCPConnector(limit=args.max_concurrency) + sem = asyncio.Semaphore(args.max_concurrency) + async with aiohttp.ClientSession(connector=connector) as session: + await scrape_base( + session, + base_url, + args.save_dir, + start_date=args.start_date, + end_date=args.end_date, + sem=sem, + ) + + +asyncio.run(main()) diff --git a/metamon/backend/team_prediction/validate.py b/metamon/backend/team_prediction/validate.py index 3475d51d..c992d8b4 100644 --- a/metamon/backend/team_prediction/validate.py +++ b/metamon/backend/team_prediction/validate.py @@ -1,9 +1,12 @@ -import subprocess -import random -from typing import List, Tuple -import gc import argparse +import atexit +import json import os +from pathlib import Path +import random +import shutil +import subprocess +from typing import List, Optional import tqdm from poke_env.teambuilder import ConstantTeambuilder @@ -19,13 +22,153 @@ ) from metamon.tokenizer import get_tokenizer +_REPO_ROOT = Path(__file__).resolve().parents[3] +_PERSISTENT_VALIDATOR = None +_PERSISTENT_VALIDATOR_DISABLED = False + + +def _candidate_node_cwds(repo_root: Path) -> List[Path]: + return [repo_root, repo_root / "server" / "pokemon-showdown"] + + +def _resolve_node_cwd(repo_root: Path) -> Path: + for cwd in _candidate_node_cwds(repo_root): + if (cwd / "node_modules" / "pokemon-showdown").exists(): + return cwd + if (cwd / "node_modules" / ".bin" / "pokemon-showdown").exists(): + return cwd + return repo_root + + +def _find_showdown_bin(repo_root: Path) -> Optional[str]: + local_bins = [ + repo_root / "node_modules" / ".bin" / "pokemon-showdown", + repo_root + / "server" + / "pokemon-showdown" + / "node_modules" + / ".bin" + / "pokemon-showdown", + ] + for bin_path in local_bins: + if bin_path.exists(): + return str(bin_path) + return shutil.which("pokemon-showdown") + + +def _resolve_showdown_validate_cmd( + format_id: str, cmd: Optional[List[str]] +) -> List[str]: + if cmd is not None: + return cmd + [format_id] + showdown_bin = _find_showdown_bin(_REPO_ROOT) + if showdown_bin: + return [showdown_bin, "validate-team", format_id] + return ["npx", "pokemon-showdown", "validate-team", format_id] + + +class PersistentShowdownValidator: + def __init__(self, repo_root: Path): + self._script_path = repo_root / "tools" / "persistent_showdown_validator.js" + if not self._script_path.exists(): + raise FileNotFoundError(f"Missing validator script at {self._script_path}") + self._cwd = _resolve_node_cwd(repo_root) + self._proc = self._start_process() + if not self._ping(): + self.close() + raise RuntimeError("Persistent validator failed to start") + + def _start_process(self) -> subprocess.Popen: + return subprocess.Popen( + ["node", str(self._script_path)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + cwd=str(self._cwd), + bufsize=1, + ) + + def _send(self, payload: dict) -> Optional[dict]: + if self._proc.poll() is not None: + return None + if self._proc.stdin is None or self._proc.stdout is None: + return None + try: + self._proc.stdin.write(json.dumps(payload) + "\n") + self._proc.stdin.flush() + except BrokenPipeError: + return None + line = self._proc.stdout.readline() + if not line: + return None + try: + return json.loads(line) + except json.JSONDecodeError: + return None + + def _ping(self) -> bool: + response = self._send({"format": "gen1ou", "team": ""}) + return response is not None + + def validate(self, team_str: str, format_id: str) -> tuple[bool, List[str]]: + response = self._send({"format": format_id, "team": team_str}) + if response is None: + raise RuntimeError("Validator process is not responding") + ok = bool(response.get("ok")) + errors = response.get("errors") or [] + return ok, [str(err) for err in errors] + + def close(self) -> None: + if self._proc is None: + return + if self._proc.poll() is None: + try: + if self._proc.stdin is not None: + self._proc.stdin.close() + self._proc.terminate() + self._proc.wait(timeout=1) + except subprocess.TimeoutExpired: + self._proc.kill() + self._proc = None + + +def _get_persistent_validator() -> Optional[PersistentShowdownValidator]: + global _PERSISTENT_VALIDATOR, _PERSISTENT_VALIDATOR_DISABLED + if _PERSISTENT_VALIDATOR_DISABLED: + return None + if _PERSISTENT_VALIDATOR is None: + try: + _PERSISTENT_VALIDATOR = PersistentShowdownValidator(_REPO_ROOT) + atexit.register(_PERSISTENT_VALIDATOR.close) + except Exception as exc: # pragma: no cover - best-effort optimization + print(f"Persistent validator unavailable, falling back to CLI: {exc}") + _PERSISTENT_VALIDATOR_DISABLED = True + return None + return _PERSISTENT_VALIDATOR + def validate_showdown_team( team_str: str, format_id: str = "gen1ou", - cmd: List[str] = ["npx", "pokemon-showdown", "validate-team"], -) -> Tuple[bool, List[str]]: - full_cmd = cmd + [format_id] + cmd: Optional[List[str]] = None, +) -> bool: + validator = _get_persistent_validator() + if validator is not None: + global _PERSISTENT_VALIDATOR_DISABLED + try: + ok, errors = validator.validate(team_str, format_id) + except Exception as exc: # pragma: no cover - best-effort optimization + validator.close() + _PERSISTENT_VALIDATOR_DISABLED = True + print(f"Persistent validator failed, falling back to CLI: {exc}") + else: + if ok: + return True + print(errors) + return False + + full_cmd = _resolve_showdown_validate_cmd(format_id, cmd) proc = subprocess.run(full_cmd, input=team_str, text=True, capture_output=True) diff --git a/metamon/backend/team_prediction/vocabulary.py b/metamon/backend/team_prediction/vocabulary.py index c101aa5e..60a6f744 100644 --- a/metamon/backend/team_prediction/vocabulary.py +++ b/metamon/backend/team_prediction/vocabulary.py @@ -42,7 +42,9 @@ def create_vocabularies(scan_dataset: bool = False): for tier in ["ou", "uu", "ubers", "nu"]: format = f"gen{gen}{tier}" stat = get_usage_stats( - format, start_date=date(2015, 1, 1), end_date=date(2025, 1, 1) + format, + start_date=date(2015, 1, 1), + end_date=date(2025, 1, 1), ) for pokemon_name, data in stat._inclusive.items(): diff --git a/metamon/data/download.py b/metamon/data/download.py index 87bf1fab..1948aa12 100644 --- a/metamon/data/download.py +++ b/metamon/data/download.py @@ -27,7 +27,7 @@ LATEST_RAW_REPLAY_REVISION = "v5" LATEST_PARSED_REPLAY_REVISION = "v5" LATEST_TEAMS_REVISION = "v4" -LATEST_USAGE_STATS_REVISION = "v3" +LATEST_USAGE_STATS_REVISION = "v4" def _update_version_reference(key: str, name: str, version: str): diff --git a/tools/analyze_moveset_trends.py b/tools/analyze_moveset_trends.py index 8fa7c633..e491aff9 100644 --- a/tools/analyze_moveset_trends.py +++ b/tools/analyze_moveset_trends.py @@ -15,7 +15,10 @@ import matplotlib.pyplot as plt import matplotlib.cm as cm -from metamon.backend.team_prediction.usage_stats import get_usage_stats +from metamon.backend.team_prediction.usage_stats import ( + get_usage_stats, + DEFAULT_USAGE_RANK, +) def get_monthly_dates(start_year: int, end_year: int) -> list[datetime.date]: @@ -33,8 +36,14 @@ def main(args): top_k = args.top_k start_year = args.start_year end_year = args.end_year + rank = args.rank + output_file = ( + args.output or f"{pokemon_name}_{format_name}_rank{rank}_move_trends.png" + ) - print(f"Analyzing {pokemon_name} in {format_name} ({start_year}-{end_year})") + print( + f"Analyzing {pokemon_name} in {format_name} ({start_year}-{end_year}, rank={rank})" + ) # Step 1: Load all-time stats to find top K moves print("Loading all-time stats to determine top moves...") @@ -42,6 +51,7 @@ def main(args): format_name, start_date=datetime.date(start_year, 1, 1), end_date=datetime.date(end_year, 12, 1), + rank=rank, ) try: @@ -73,6 +83,7 @@ def main(args): format_name, start_date=date, end_date=date, + rank=rank, ) pokemon_monthly = monthly_stats[pokemon_name] moves = pokemon_monthly.get("moves", {}) @@ -115,7 +126,7 @@ def main(args): alpha=0.9, ) - title = f"{pokemon_name.upper()} Move Trends in {format_name.upper()} ({start_year}-{end_year})" + title = f"{pokemon_name.upper()} Move Trends in {format_name.upper()} ({start_year}-{end_year}, rank={rank})" ax.set_title(title, fontsize=18, fontweight="bold", family="monospace", pad=15) ax.set_xlabel("Date", fontsize=14, fontweight="bold") @@ -139,7 +150,6 @@ def main(args): plt.tight_layout() # Save plot - output_file = f"{pokemon_name}_{format_name}_move_trends.png" plt.savefig(output_file, dpi=150, bbox_inches="tight", facecolor="white") print(f"Saved plot to {output_file}") @@ -178,5 +188,18 @@ def main(args): default=2025, help="End year for analysis (default: 2025)", ) + parser.add_argument( + "--rank", + type=int, + default=DEFAULT_USAGE_RANK, + help=f"Usage stats rank/baseline (default: {DEFAULT_USAGE_RANK})", + ) + parser.add_argument( + "--output", + "-o", + type=str, + default=None, + help="Output filename for the plot (default: {pokemon}_{format}_rank{rank}_move_trends.png)", + ) args = parser.parse_args() main(args) diff --git a/tools/patch_pokeagent_gen9ou_trajs.py b/tools/patch_pokeagent_gen9ou_trajs.py index 7f005e48..4174b960 100644 --- a/tools/patch_pokeagent_gen9ou_trajs.py +++ b/tools/patch_pokeagent_gen9ou_trajs.py @@ -23,7 +23,9 @@ USAGE_STATS = get_usage_stats( - "gen9ou", start_date=date(2022, 1, 1), end_date=date(2025, 5, 31) + "gen9ou", + start_date=date(2022, 1, 1), + end_date=date(2025, 5, 31), ) diff --git a/tools/persistent_showdown_validator.js b/tools/persistent_showdown_validator.js new file mode 100644 index 00000000..e83c73cb --- /dev/null +++ b/tools/persistent_showdown_validator.js @@ -0,0 +1,110 @@ +#!/usr/bin/env node + +/** + * Persistent Pokemon Showdown team validator. + * + * Protocol: + * - Read JSON lines on stdin: {"format": "gen1ou", "team": ""} + * - Write JSON lines on stdout: {"ok": true/false, "errors": ["..."]} + */ + +const readline = require("readline"); + +let TeamValidator; +let Teams; + +try { + ({ TeamValidator, Teams } = require("pokemon-showdown")); +} catch (errPrimary) { + try { + ({ TeamValidator } = require("pokemon-showdown/dist/sim/team-validator")); + } catch (errSecondary) { + const message = + "Unable to load pokemon-showdown. Install it locally with npm install."; + console.error(message); + console.error(String(errPrimary)); + console.error(String(errSecondary)); + process.exit(1); + } +} + +if (!Teams) { + try { + const teamsModule = require("pokemon-showdown/dist/sim/teams"); + Teams = teamsModule.Teams || teamsModule; + } catch (err) { + console.error("Unable to load pokemon-showdown Teams module."); + console.error(String(err)); + process.exit(1); + } +} + +const validatorsByFormat = new Map(); + +function getValidator(format) { + if (!validatorsByFormat.has(format)) { + validatorsByFormat.set(format, TeamValidator.get(format)); + } + return validatorsByFormat.get(format); +} + +function normalizeErrors(result) { + if (!result) return []; + if (Array.isArray(result)) return result.map((e) => String(e)); + return [String(result)]; +} + +function respond(payload) { + process.stdout.write(`${JSON.stringify(payload)}\n`); +} + +const rl = readline.createInterface({ + input: process.stdin, + crlfDelay: Infinity, +}); + +rl.on("line", (line) => { + const trimmed = line.trim(); + if (!trimmed) return; + + let req; + try { + req = JSON.parse(trimmed); + } catch (err) { + respond({ ok: false, errors: ["Invalid JSON input."] }); + return; + } + + const format = req.format; + const team = req.team; + + if (!format || typeof team !== "string") { + respond({ ok: false, errors: ["Input must include format and team string."] }); + return; + } + + let validator; + try { + validator = getValidator(format); + } catch (err) { + respond({ ok: false, errors: [String(err)] }); + return; + } + + try { + const parsedTeam = Teams.import(team); + if (!parsedTeam) { + respond({ ok: false, errors: ["Invalid team data"] }); + return; + } + const result = validator.validateTeam(parsedTeam); + const errors = normalizeErrors(result); + respond({ ok: errors.length === 0, errors }); + } catch (err) { + respond({ ok: false, errors: [String(err)] }); + } +}); + +rl.on("close", () => { + process.exit(0); +});