diff --git a/src/connectors/acl_connector.py b/src/connectors/acl_connector.py index 9786bbf..9653541 100644 --- a/src/connectors/acl_connector.py +++ b/src/connectors/acl_connector.py @@ -85,13 +85,21 @@ def source_name(self) -> str: # ── Conference-sync mode: fetch everything ──────────────────────────────── - def fetch_all(self, min_year: int = 2020) -> list[Paper]: + def fetch_all( + self, min_year: int = 2020, skip_keys: set[str] | None = None + ) -> list[Paper]: """Download anthology+abstracts.bib.gz and return ALL papers for target venues. The export is ~37 MB compressed and includes titles, abstracts, authors, and BibTeX metadata for all 90,000+ ACL Anthology papers. Only papers from self._sync_venues and year >= min_year are returned. + + ``skip_keys`` holds ":" blocks already archived (settled + past years); matching records are skipped so the merge takes them from + the complete archive instead. (The export is one download regardless, so + this only trims parsing, not the fetch.) """ + skip_keys = skip_keys or set() log.info("[acl] downloading full anthology export from %s …", _EXPORT_URL) req = urllib.request.Request( _EXPORT_URL, @@ -121,6 +129,10 @@ def fetch_all(self, min_year: int = 2020) -> list[Paper]: if venue_key not in self._sync_venues: continue + venue_name, _ = _VENUE_META.get(venue_key, (venue_key.upper(), "")) + if f"{venue_name}:{year}" in skip_keys: + continue + p = self._export_record_to_paper(acl_id, record, venue_key) if p and p.title and p.id not in seen: seen.add(p.id) diff --git a/src/connectors/cvf_connector.py b/src/connectors/cvf_connector.py index fc5c30c..a2a7636 100644 --- a/src/connectors/cvf_connector.py +++ b/src/connectors/cvf_connector.py @@ -146,8 +146,13 @@ def __init__(self, conferences: list[str] | None = None) -> None: def source_name(self) -> str: return "cvf" - def fetch_all(self) -> list[Paper]: - """Fetch ALL papers from configured CVF conferences.""" + def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]: + """Fetch ALL papers from configured CVF conferences. + + ``skip_keys`` holds ":" blocks already archived (settled + past years); those conferences are skipped to avoid re-downloading them. + """ + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for conf_key in self._conferences: @@ -155,6 +160,9 @@ def fetch_all(self) -> list[Paper]: log.warning("[cvf] unknown conference key: %s", conf_key) continue path, venue, rank, year = _CONFERENCES[conf_key] + if f"{venue}:{year}" in skip_keys: + log.info("[cvf] %s (%s %d) archived — skipping", conf_key, venue, year) + continue try: papers = self._fetch_conference(path, venue, rank, year) log.info("[cvf] %s → %d papers", conf_key, len(papers)) diff --git a/src/connectors/openalex_connector.py b/src/connectors/openalex_connector.py index cfbcef4..2503877 100644 --- a/src/connectors/openalex_connector.py +++ b/src/connectors/openalex_connector.py @@ -224,8 +224,14 @@ def source_name(self) -> str: # ── Full bulk fetch ─────────────────────────────────────────────────────── - def fetch_all(self) -> list[Paper]: - """Fetch ALL papers for configured concept groups.""" + def fetch_all(self, from_created_date: str | None = None) -> list[Paper]: + """Fetch ALL papers for configured concept groups. + + When ``from_created_date`` (YYYY-MM-DD) is set, only works OpenAlex + indexed on or after that date are returned — the incremental path that + avoids re-paginating entire concepts (hundreds of thousands of works) + every run. + """ all_papers: list[Paper] = [] seen: set[str] = set() for group in self._groups: @@ -234,7 +240,7 @@ def fetch_all(self) -> list[Paper]: continue log.info("[openalex] fetching %s (concepts: %s) …", group, concept_ids) try: - papers = self._fetch_concept_group(concept_ids) + papers = self._fetch_concept_group(concept_ids, from_created_date) log.info("[openalex] %s → %d papers", group, len(papers)) for p in papers: if p.id not in seen: @@ -252,6 +258,7 @@ def fetch_journals( from_year: int | None = None, to_year: int | None = None, max_per_source: int | None = None, + from_created_date: str | None = None, ) -> list[Paper]: """Bulk-fetch every paper from the top CS journals, by venue. @@ -259,6 +266,11 @@ def fetch_journals( coverage of a given journal is systematic — unlike fetch_all(), which ranks by citations across a whole concept. Sets source_type='journal' and stamps the canonical venue/rank. Requires no API key. + + When ``from_created_date`` (YYYY-MM-DD) is given, only works *indexed by + OpenAlex on or after that date* are returned — an incremental sync that + skips papers already pulled in a previous run instead of re-downloading + the whole publication-year window every time. """ target = sources or _JOURNAL_SOURCES from_y = from_year if from_year is not None else self._from_year @@ -270,7 +282,9 @@ def fetch_journals( for source_id, (short, rank) in target.items(): try: - papers = self._fetch_journal_source(source_id, year_flt, max_per_source) + papers = self._fetch_journal_source( + source_id, year_flt, max_per_source, from_created_date + ) log.info("[openalex] journal %s (%s) → %d papers", short, source_id, len(papers)) for p in papers: if p.id in seen: @@ -288,12 +302,18 @@ def fetch_journals( return all_papers def _fetch_journal_source( - self, source_id: str, year_flt: str, max_per_source: int | None + self, + source_id: str, + year_flt: str, + max_per_source: int | None, + from_created_date: str | None = None, ) -> list[Paper]: base_filter = ( f"primary_location.source.id:{source_id}," f"publication_year:{year_flt},type:article" ) + if from_created_date: + base_filter += f",from_created_date:{from_created_date}" papers: list[Paper] = [] cursor = "*" while cursor: @@ -341,9 +361,13 @@ def fetch(self, query: str, max_results: int = 50) -> list[Paper]: # ── internals ───────────────────────────────────────────────────────────── - def _fetch_concept_group(self, concept_ids: list[str]) -> list[Paper]: + def _fetch_concept_group( + self, concept_ids: list[str], from_created_date: str | None = None + ) -> list[Paper]: concept_filter = "|".join(concept_ids) base_filter = f"concepts.id:{concept_filter},type:article,publication_year:>={self._from_year}" + if from_created_date: + base_filter += f",from_created_date:{from_created_date}" papers: list[Paper] = [] cursor = "*" diff --git a/src/connectors/openreview_connector.py b/src/connectors/openreview_connector.py index 967a600..7941cf6 100644 --- a/src/connectors/openreview_connector.py +++ b/src/connectors/openreview_connector.py @@ -92,11 +92,21 @@ def source_name(self) -> str: # ── Called by conference-sync (fetch everything) ────────────────────────── - def fetch_all(self) -> list[Paper]: - """Fetch ALL accepted papers from every configured venue.""" + def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]: + """Fetch ALL accepted papers from every configured venue. + + ``skip_keys`` holds ":" blocks already in the complete + archive (settled past years); those venue/years are skipped to avoid + re-downloading immutable proceedings every sync. + """ + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for venue_id in self._venues: + name, _rank, year = _VENUES.get(venue_id, ("Unknown", "", 0)) + if f"{name}:{year}" in skip_keys: + log.info("[openreview] %s (%s %s) archived — skipping", venue_id, name, year) + continue try: papers = self._fetch_venue_all(venue_id) log.info("[openreview] %s → %d papers", venue_id, len(papers)) diff --git a/src/connectors/pmlr_connector.py b/src/connectors/pmlr_connector.py index fbbed1c..c80f866 100644 --- a/src/connectors/pmlr_connector.py +++ b/src/connectors/pmlr_connector.py @@ -118,11 +118,19 @@ def __init__(self, volumes: dict[str, tuple[str, str, int]] | None = None) -> No def source_name(self) -> str: return "pmlr" - def fetch_all(self) -> list[Paper]: - """Fetch ALL papers from every configured PMLR volume.""" + def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]: + """Fetch ALL papers from every configured PMLR volume. + + ``skip_keys`` holds ":" blocks already archived (settled + past years); those volumes are skipped to avoid re-downloading them. + """ + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for vol, (venue, rank, year) in self._volumes.items(): + if f"{venue}:{year}" in skip_keys: + log.info("[pmlr] v%s (%s %d) archived — skipping", vol, venue, year) + continue try: papers = self._fetch_volume(vol, venue, rank, year) log.info("[pmlr] v%s (%s %d) → %d papers", vol, venue, year, len(papers)) diff --git a/src/connectors/semantic_scholar_connector.py b/src/connectors/semantic_scholar_connector.py index 8b2e8b9..9c375c0 100644 --- a/src/connectors/semantic_scholar_connector.py +++ b/src/connectors/semantic_scholar_connector.py @@ -124,19 +124,29 @@ def __init__(self, venues: list[str] | None = None) -> None: def source_name(self) -> str: return "semantic_scholar" - def fetch_all(self, venues: dict[str, list[int]] | None = None) -> list[Paper]: + def fetch_all( + self, + venues: dict[str, list[int]] | None = None, + skip_keys: set[str] | None = None, + ) -> list[Paper]: """Bulk-fetch ALL papers for ICLR/NeurIPS/COLM using the S2 bulk endpoint. Uses cursor-based pagination — no result cap beyond API limits. - Falls back gracefully per venue/year on failure. + Falls back gracefully per venue/year on failure. ``skip_keys`` holds + ":" blocks already archived (settled past years), which are + skipped to avoid re-fetching immutable proceedings. """ target = venues or _BULK_VENUES + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for venue_key, years in target.items(): venue_name, rank = _VENUES.get(venue_key, (venue_key, "")) for year in years: + if f"{venue_name}:{year}" in skip_keys: + log.info("[s2] bulk %s %d archived — skipping", venue_key, year) + continue try: papers = self._bulk_fetch_venue_year(venue_key, venue_name, rank, year) log.info("[s2] bulk %s %d → %d papers", venue_key, year, len(papers)) diff --git a/src/pipeline.py b/src/pipeline.py index 019e232..96342a2 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -47,6 +47,7 @@ from src.normalization.schema import Paper from src.scoring.scorer import PaperScorer from src.sitegen.generator import SiteGenerator +from src.storage import railway_store from src.tagging.tagger import PaperTagger logging.basicConfig( @@ -123,10 +124,88 @@ def _enrich_affiliations_from_s2(papers: list[Paper], batch_size: int = 500) -> _SITE_DATA = Path(__file__).parent.parent / "site" / "data" +# Incremental journal-sync watermark: the date of the last successful journal +# fetch. Stored next to the data so it is committed alongside journals_db.json. +_JOURNAL_STATE = _SITE_DATA / "journal_sync_state.json" +# Re-scan this many days before the watermark so works OpenAlex indexes late +# (created_date trails publication) are not missed. Dedup absorbs any overlap. +_JOURNAL_LOOKBACK_DAYS = 7 + # Venues treated as arXiv / unclassified (not conference proceedings) _ARXIV_VENUES = {None, "", "arXiv", "Unknown"} +def _load_journal_watermark() -> str | None: + """Return the YYYY-MM-DD created-date floor for an incremental fetch. + + The stored date is shifted back by ``_JOURNAL_LOOKBACK_DAYS``. Returns None + when no prior sync is recorded, signalling a full backfill. + """ + if not _JOURNAL_STATE.exists(): + return None + try: + with open(_JOURNAL_STATE, encoding="utf-8") as fh: + last = json.load(fh).get("last_synced") + floor = date.fromisoformat(last) - timedelta(days=_JOURNAL_LOOKBACK_DAYS) + return floor.isoformat() + except Exception as exc: + log.warning("Could not read journal watermark (%s) — full fetch", exc) + return None + + +def _save_journal_watermark() -> None: + """Stamp today as the last successful journal sync.""" + try: + _JOURNAL_STATE.parent.mkdir(parents=True, exist_ok=True) + today = datetime.now(timezone.utc).date().isoformat() + with open(_JOURNAL_STATE, "w", encoding="utf-8") as fh: + json.dump({"last_synced": today}, fh, indent=2) + log.info(" [journals] watermark advanced → %s", today) + except Exception as exc: + log.warning("Could not write journal watermark: %s", exc) + + +def _load_complete_archive(source_type: str) -> list[Paper] | None: + """Load the complete, uncapped archive for a source_type from Railway. + + Returns a list of Paper (possibly empty) when Railway answered, or None when + it is unavailable — letting callers fall back to a full fetch rather than + skipping work against an incomplete (capped JSON) view. + """ + rows = railway_store.load(source_type=source_type) + if rows is None: + return None + papers: list[Paper] = [] + for d in rows: + try: + papers.append(Paper.from_dict(d)) + except Exception: + continue + log.info("Loaded %d %s papers from Railway archive", len(papers), source_type) + return papers + + +def _settled_conf_keys(papers: list[Paper]) -> set[str]: + """Build the set of ":" conference blocks safe to skip re-fetch. + + A block is settled when it is already in the complete archive AND its year is + in the past — the current calendar year is never skipped, so proceedings + still being published keep getting refreshed each run. + """ + cur_year = datetime.now(timezone.utc).year + keys: set[str] = set() + for p in papers: + if not p.venue or not p.year: + continue + try: + year = int(p.year) + except (TypeError, ValueError): + continue + if year < cur_year: + keys.add(f"{p.venue}:{year}") + return keys + + def _is_conference_paper(p: Paper) -> bool: return p.venue not in _ARXIV_VENUES @@ -211,20 +290,30 @@ def _load_journal_papers() -> list[Paper]: # ── Pipeline ────────────────────────────────────────────────────────────────── -def _fetch_journal_papers() -> list[Paper]: +def _fetch_journal_papers(since: str | None = None) -> list[Paper]: """Bulk-fetch top CS journals via OpenAlex (keyless, systematic by source id). + ``since`` (YYYY-MM-DD) restricts the fetch to works OpenAlex indexed on or + after that date — the incremental path. It is only set when the complete + journal archive is available to merge against, so the long tail is never + dropped; otherwise a full backfill runs. + OpenAlex is the primary source because the S2 journal search is unreliable here — it filters by venue *short name* and query text, which yields 0 results and chronic HTTP 429/400s. S2 is only a non-fatal supplement when a key is configured, for venues OpenAlex under-indexes (e.g. JMLR/TMLR). Dedup later in the pipeline removes any overlap. """ - log.info(" [openalex] bulk-fetching top CS journals by source id …") + if since: + log.info(" [openalex] incremental journal fetch (created since %s) …", since) + else: + log.info(" [openalex] full journal backfill …") journal_papers: list[Paper] = [] + fetch_ok = False try: - journal_papers = OpenAlexConnector().fetch_journals() + journal_papers = OpenAlexConnector().fetch_journals(from_created_date=since) log.info(" → %d journal papers (openalex)", len(journal_papers)) + fetch_ok = True except Exception as exc: log.warning(" [openalex] journal fetch failed: %s", exc) @@ -237,6 +326,11 @@ def _fetch_journal_papers() -> list[Paper]: except Exception as exc: log.warning(" [s2] journal supplement failed: %s", exc) + # Only advance the watermark when OpenAlex actually answered — a failed + # fetch must not skip those papers on the next run. + if fetch_ok: + _save_journal_watermark() + return journal_papers @@ -264,10 +358,31 @@ def run_pipeline( arxiv = ArxivConnector() all_papers: list[Paper] = [] + # ── Incremental sync setup (conference / journal sync runs only) ─────────── + # The complete, uncapped archive lives in Railway. When it is reachable we + # skip re-fetching settled work (immutable past proceedings / already-indexed + # journal papers) and merge the skipped rows back from the archive. If Railway + # is down we fall back to a full fetch so nothing below the JSON caps is lost. + journal_archive: list[Paper] | None = None + conf_archive: list[Paper] | None = None + journal_since: str | None = None + conf_skip_keys: set[str] | None = None + + if journals_only or conferences_only: + journal_archive = _load_complete_archive("journal") + if journal_archive is not None: + journal_since = _load_journal_watermark() + if conferences_only: + conf_archive = _load_complete_archive("conference") + if conf_archive is not None: + conf_skip_keys = _settled_conf_keys(conf_archive) + log.info(" [conf-sync] %d settled venue/year blocks will be skipped", + len(conf_skip_keys)) + # ── Journals-only mode: fetch journal papers and skip every other source ── if journals_only: log.info(" journals-only mode: fetching journal papers only") - all_papers.extend(_fetch_journal_papers()) + all_papers.extend(_fetch_journal_papers(since=journal_since)) # ── arXiv + ACL (skipped in conferences-only mode) ──────────────────────── if conferences_only: @@ -336,7 +451,7 @@ def run_pipeline( # OpenReview — ICLR, NeurIPS, COLM (authenticates via env credentials) log.info(" [openreview] fetching ALL papers (ICLR 2022-26, NeurIPS 2022-25, ICML 2024-25, COLM 2024-25) …") try: - fetched = OpenReviewConnector().fetch_all() + fetched = OpenReviewConnector().fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -344,7 +459,7 @@ def run_pipeline( log.info(" [pmlr] fetching ALL papers (ICML 2020-25, AISTATS 2021-25, UAI 2021-24) …") try: - fetched = PMLRConnector().fetch_all() + fetched = PMLRConnector().fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -352,7 +467,7 @@ def run_pipeline( log.info(" [cvf] fetching ALL papers (CVPR 2021-25, ICCV 2021+23, ECCV 2020+22+24) …") try: - fetched = CVFConnector().fetch_all() + fetched = CVFConnector().fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -360,7 +475,7 @@ def run_pipeline( log.info(" [acl] fetching ALL papers from anthology export (2020+) …") try: - fetched = ACLAnthologyConnector().fetch_all(min_year=2020) + fetched = ACLAnthologyConnector().fetch_all(min_year=2020, skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -370,13 +485,13 @@ def run_pipeline( log.info(" [s2] bulk-fetching AAAI, IJCAI, KDD, WWW, SIGIR, WSDM, CHI, SIGMOD, ICSE …") s2 = SemanticScholarConnector() try: - fetched = s2.fetch_all() + fetched = s2.fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: log.warning(" [s2] bulk fetch_all failed: %s", exc) - all_papers.extend(_fetch_journal_papers()) + all_papers.extend(_fetch_journal_papers(since=journal_since)) else: # ── Keyword-query mode (used in daily pipeline if skip_conferences=False) ── @@ -398,9 +513,12 @@ def run_pipeline( # ── OpenAlex (always, unless skip_conferences) ──────────────────────────── if not skip_conferences and not journals_only: if conferences_only: - log.info(" [openalex] bulk-fetching ML/NLP/CV/IR papers …") + if journal_since: + log.info(" [openalex] incremental bulk-fetch ML/NLP/CV/IR (created since %s) …", journal_since) + else: + log.info(" [openalex] full bulk-fetch ML/NLP/CV/IR papers …") try: - fetched = OpenAlexConnector(from_year=2022).fetch_all() + fetched = OpenAlexConnector(from_year=2022).fetch_all(from_created_date=journal_since) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -432,8 +550,10 @@ def run_pipeline( # papers (no expiry) and also bring in arXiv papers so the site output # stays complete. In journals-only mode the freshly fetched journals # merge with these; conference/arXiv rows are preserved, not dropped. - existing_conf = _load_conference_papers() - existing_journals = _load_journal_papers() + # Prefer the complete Railway archive (so skipped venue/years and the + # journal long-tail are restored in full); fall back to capped JSON. + existing_conf = conf_archive if conf_archive is not None else _load_conference_papers() + existing_journals = journal_archive if journal_archive is not None else _load_journal_papers() existing_arxiv = _load_arxiv_papers(max_age_days=max_age_days) all_papers = all_papers + existing_conf + existing_journals + existing_arxiv else: diff --git a/src/storage/railway_store.py b/src/storage/railway_store.py index 86dfba7..3dbc625 100644 --- a/src/storage/railway_store.py +++ b/src/storage/railway_store.py @@ -102,6 +102,56 @@ def _upsert_papers(cur, rows: list[dict]) -> None: log.info("[railway] upserted %d/%d papers", min(i + _BATCH, len(rows)), len(rows)) +def load(source_type: str | None = None) -> list[dict] | None: + """Read papers back from Railway — the complete, uncapped archive. + + Returns the rows as dicts (list columns decoded back to lists), optionally + filtered to a single ``source_type`` ('journal' / 'conference'). Returns + ``None`` when the DB is unreachable, so callers can distinguish "archive is + empty" from "no complete source available" (and fall back to a full fetch + rather than silently dropping the long tail). + """ + conn = _conn() + if conn is None: + return None + + cols = sorted(_PAPER_COLS) + sql = f"SELECT {', '.join(cols)} FROM papers" + params: list = [] + if source_type: + sql += " WHERE source_type = %s" + params.append(source_type) + + try: + with conn: + with conn.cursor() as cur: + cur.execute(sql, params) + fetched = cur.fetchall() + out: list[dict] = [] + for row in fetched: + d = dict(zip(cols, row)) + for c in _LIST_COLS: + v = d.get(c) + if isinstance(v, str): + try: + d[c] = json.loads(v) + except Exception: + d[c] = [] + elif v is None: + d[c] = [] + out.append(d) + log.info( + "[railway] loaded %d papers%s", len(out), + f" (source_type={source_type})" if source_type else "", + ) + return out + except Exception as exc: + log.warning("[railway] load failed: %s", exc) + return None + finally: + conn.close() + + def sync(papers: list[dict]) -> bool: conn = _conn() if conn is None: