From 9bbeafed17b3e2f65dabba6782ea8e4098d9837b Mon Sep 17 00:00:00 2001 From: "Md. Kishor Morol" Date: Sat, 6 Jun 2026 21:52:38 -0400 Subject: [PATCH] perf(sync): incremental journal/conference fetch via Railway archive MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Journal- and conference-sync were re-downloading the entire corpus every run (the journal step alone took ~2.6 hrs, dominated by OpenAlex NatComms ~44k works), because the fetch never consulted what was already stored. Make all heavy sync fetches incremental, backed by the complete uncapped archive that already lives in Railway Postgres (the committed *_db.json files are capped top-N working sets and can't be complete archives — a full conferences_db.json would exceed GitHub's 100MB file limit): - Journals: OpenAlex `from_created_date` filter pulls only works indexed since the last run. Watermark in site/data/journal_sync_state.json (minus 7-day lookback). See _load_journal_watermark / _fetch_journal_papers. - Conferences: settled past-year (venue, year) blocks are skipped via skip_keys threaded into every conference connector's fetch_all (openreview/pmlr/cvf/s2/acl); current calendar year always refetched. See _settled_conf_keys. - OpenAlex concept bulk-fetch (ML/NLP/CV/IR) is incremental too — it was re-paginating entire concepts (100k+ works) uncapped each run. Safety: incremental/skip engages only when Railway loads successfully (railway_store.load / _load_complete_archive). If Railway is down → full re-fetch, never silent data loss. A venue-name mismatch only causes a missed skip (full fetch), never a wrong skip. After one seeding backfill, sync runs drop from hours to minutes. Co-Authored-By: Claude Opus 4.8 --- src/connectors/acl_connector.py | 14 +- src/connectors/cvf_connector.py | 12 +- src/connectors/openalex_connector.py | 36 ++++- src/connectors/openreview_connector.py | 14 +- src/connectors/pmlr_connector.py | 12 +- src/connectors/semantic_scholar_connector.py | 14 +- src/pipeline.py | 148 +++++++++++++++++-- src/storage/railway_store.py | 50 +++++++ 8 files changed, 271 insertions(+), 29 deletions(-) diff --git a/src/connectors/acl_connector.py b/src/connectors/acl_connector.py index 9786bbf..9653541 100644 --- a/src/connectors/acl_connector.py +++ b/src/connectors/acl_connector.py @@ -85,13 +85,21 @@ def source_name(self) -> str: # ── Conference-sync mode: fetch everything ──────────────────────────────── - def fetch_all(self, min_year: int = 2020) -> list[Paper]: + def fetch_all( + self, min_year: int = 2020, skip_keys: set[str] | None = None + ) -> list[Paper]: """Download anthology+abstracts.bib.gz and return ALL papers for target venues. The export is ~37 MB compressed and includes titles, abstracts, authors, and BibTeX metadata for all 90,000+ ACL Anthology papers. Only papers from self._sync_venues and year >= min_year are returned. + + ``skip_keys`` holds ":" blocks already archived (settled + past years); matching records are skipped so the merge takes them from + the complete archive instead. (The export is one download regardless, so + this only trims parsing, not the fetch.) """ + skip_keys = skip_keys or set() log.info("[acl] downloading full anthology export from %s …", _EXPORT_URL) req = urllib.request.Request( _EXPORT_URL, @@ -121,6 +129,10 @@ def fetch_all(self, min_year: int = 2020) -> list[Paper]: if venue_key not in self._sync_venues: continue + venue_name, _ = _VENUE_META.get(venue_key, (venue_key.upper(), "")) + if f"{venue_name}:{year}" in skip_keys: + continue + p = self._export_record_to_paper(acl_id, record, venue_key) if p and p.title and p.id not in seen: seen.add(p.id) diff --git a/src/connectors/cvf_connector.py b/src/connectors/cvf_connector.py index fc5c30c..a2a7636 100644 --- a/src/connectors/cvf_connector.py +++ b/src/connectors/cvf_connector.py @@ -146,8 +146,13 @@ def __init__(self, conferences: list[str] | None = None) -> None: def source_name(self) -> str: return "cvf" - def fetch_all(self) -> list[Paper]: - """Fetch ALL papers from configured CVF conferences.""" + def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]: + """Fetch ALL papers from configured CVF conferences. + + ``skip_keys`` holds ":" blocks already archived (settled + past years); those conferences are skipped to avoid re-downloading them. + """ + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for conf_key in self._conferences: @@ -155,6 +160,9 @@ def fetch_all(self) -> list[Paper]: log.warning("[cvf] unknown conference key: %s", conf_key) continue path, venue, rank, year = _CONFERENCES[conf_key] + if f"{venue}:{year}" in skip_keys: + log.info("[cvf] %s (%s %d) archived — skipping", conf_key, venue, year) + continue try: papers = self._fetch_conference(path, venue, rank, year) log.info("[cvf] %s → %d papers", conf_key, len(papers)) diff --git a/src/connectors/openalex_connector.py b/src/connectors/openalex_connector.py index cfbcef4..2503877 100644 --- a/src/connectors/openalex_connector.py +++ b/src/connectors/openalex_connector.py @@ -224,8 +224,14 @@ def source_name(self) -> str: # ── Full bulk fetch ─────────────────────────────────────────────────────── - def fetch_all(self) -> list[Paper]: - """Fetch ALL papers for configured concept groups.""" + def fetch_all(self, from_created_date: str | None = None) -> list[Paper]: + """Fetch ALL papers for configured concept groups. + + When ``from_created_date`` (YYYY-MM-DD) is set, only works OpenAlex + indexed on or after that date are returned — the incremental path that + avoids re-paginating entire concepts (hundreds of thousands of works) + every run. + """ all_papers: list[Paper] = [] seen: set[str] = set() for group in self._groups: @@ -234,7 +240,7 @@ def fetch_all(self) -> list[Paper]: continue log.info("[openalex] fetching %s (concepts: %s) …", group, concept_ids) try: - papers = self._fetch_concept_group(concept_ids) + papers = self._fetch_concept_group(concept_ids, from_created_date) log.info("[openalex] %s → %d papers", group, len(papers)) for p in papers: if p.id not in seen: @@ -252,6 +258,7 @@ def fetch_journals( from_year: int | None = None, to_year: int | None = None, max_per_source: int | None = None, + from_created_date: str | None = None, ) -> list[Paper]: """Bulk-fetch every paper from the top CS journals, by venue. @@ -259,6 +266,11 @@ def fetch_journals( coverage of a given journal is systematic — unlike fetch_all(), which ranks by citations across a whole concept. Sets source_type='journal' and stamps the canonical venue/rank. Requires no API key. + + When ``from_created_date`` (YYYY-MM-DD) is given, only works *indexed by + OpenAlex on or after that date* are returned — an incremental sync that + skips papers already pulled in a previous run instead of re-downloading + the whole publication-year window every time. """ target = sources or _JOURNAL_SOURCES from_y = from_year if from_year is not None else self._from_year @@ -270,7 +282,9 @@ def fetch_journals( for source_id, (short, rank) in target.items(): try: - papers = self._fetch_journal_source(source_id, year_flt, max_per_source) + papers = self._fetch_journal_source( + source_id, year_flt, max_per_source, from_created_date + ) log.info("[openalex] journal %s (%s) → %d papers", short, source_id, len(papers)) for p in papers: if p.id in seen: @@ -288,12 +302,18 @@ def fetch_journals( return all_papers def _fetch_journal_source( - self, source_id: str, year_flt: str, max_per_source: int | None + self, + source_id: str, + year_flt: str, + max_per_source: int | None, + from_created_date: str | None = None, ) -> list[Paper]: base_filter = ( f"primary_location.source.id:{source_id}," f"publication_year:{year_flt},type:article" ) + if from_created_date: + base_filter += f",from_created_date:{from_created_date}" papers: list[Paper] = [] cursor = "*" while cursor: @@ -341,9 +361,13 @@ def fetch(self, query: str, max_results: int = 50) -> list[Paper]: # ── internals ───────────────────────────────────────────────────────────── - def _fetch_concept_group(self, concept_ids: list[str]) -> list[Paper]: + def _fetch_concept_group( + self, concept_ids: list[str], from_created_date: str | None = None + ) -> list[Paper]: concept_filter = "|".join(concept_ids) base_filter = f"concepts.id:{concept_filter},type:article,publication_year:>={self._from_year}" + if from_created_date: + base_filter += f",from_created_date:{from_created_date}" papers: list[Paper] = [] cursor = "*" diff --git a/src/connectors/openreview_connector.py b/src/connectors/openreview_connector.py index 967a600..7941cf6 100644 --- a/src/connectors/openreview_connector.py +++ b/src/connectors/openreview_connector.py @@ -92,11 +92,21 @@ def source_name(self) -> str: # ── Called by conference-sync (fetch everything) ────────────────────────── - def fetch_all(self) -> list[Paper]: - """Fetch ALL accepted papers from every configured venue.""" + def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]: + """Fetch ALL accepted papers from every configured venue. + + ``skip_keys`` holds ":" blocks already in the complete + archive (settled past years); those venue/years are skipped to avoid + re-downloading immutable proceedings every sync. + """ + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for venue_id in self._venues: + name, _rank, year = _VENUES.get(venue_id, ("Unknown", "", 0)) + if f"{name}:{year}" in skip_keys: + log.info("[openreview] %s (%s %s) archived — skipping", venue_id, name, year) + continue try: papers = self._fetch_venue_all(venue_id) log.info("[openreview] %s → %d papers", venue_id, len(papers)) diff --git a/src/connectors/pmlr_connector.py b/src/connectors/pmlr_connector.py index fbbed1c..c80f866 100644 --- a/src/connectors/pmlr_connector.py +++ b/src/connectors/pmlr_connector.py @@ -118,11 +118,19 @@ def __init__(self, volumes: dict[str, tuple[str, str, int]] | None = None) -> No def source_name(self) -> str: return "pmlr" - def fetch_all(self) -> list[Paper]: - """Fetch ALL papers from every configured PMLR volume.""" + def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]: + """Fetch ALL papers from every configured PMLR volume. + + ``skip_keys`` holds ":" blocks already archived (settled + past years); those volumes are skipped to avoid re-downloading them. + """ + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for vol, (venue, rank, year) in self._volumes.items(): + if f"{venue}:{year}" in skip_keys: + log.info("[pmlr] v%s (%s %d) archived — skipping", vol, venue, year) + continue try: papers = self._fetch_volume(vol, venue, rank, year) log.info("[pmlr] v%s (%s %d) → %d papers", vol, venue, year, len(papers)) diff --git a/src/connectors/semantic_scholar_connector.py b/src/connectors/semantic_scholar_connector.py index 8b2e8b9..9c375c0 100644 --- a/src/connectors/semantic_scholar_connector.py +++ b/src/connectors/semantic_scholar_connector.py @@ -124,19 +124,29 @@ def __init__(self, venues: list[str] | None = None) -> None: def source_name(self) -> str: return "semantic_scholar" - def fetch_all(self, venues: dict[str, list[int]] | None = None) -> list[Paper]: + def fetch_all( + self, + venues: dict[str, list[int]] | None = None, + skip_keys: set[str] | None = None, + ) -> list[Paper]: """Bulk-fetch ALL papers for ICLR/NeurIPS/COLM using the S2 bulk endpoint. Uses cursor-based pagination — no result cap beyond API limits. - Falls back gracefully per venue/year on failure. + Falls back gracefully per venue/year on failure. ``skip_keys`` holds + ":" blocks already archived (settled past years), which are + skipped to avoid re-fetching immutable proceedings. """ target = venues or _BULK_VENUES + skip_keys = skip_keys or set() all_papers: list[Paper] = [] seen: set[str] = set() for venue_key, years in target.items(): venue_name, rank = _VENUES.get(venue_key, (venue_key, "")) for year in years: + if f"{venue_name}:{year}" in skip_keys: + log.info("[s2] bulk %s %d archived — skipping", venue_key, year) + continue try: papers = self._bulk_fetch_venue_year(venue_key, venue_name, rank, year) log.info("[s2] bulk %s %d → %d papers", venue_key, year, len(papers)) diff --git a/src/pipeline.py b/src/pipeline.py index 019e232..96342a2 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -47,6 +47,7 @@ from src.normalization.schema import Paper from src.scoring.scorer import PaperScorer from src.sitegen.generator import SiteGenerator +from src.storage import railway_store from src.tagging.tagger import PaperTagger logging.basicConfig( @@ -123,10 +124,88 @@ def _enrich_affiliations_from_s2(papers: list[Paper], batch_size: int = 500) -> _SITE_DATA = Path(__file__).parent.parent / "site" / "data" +# Incremental journal-sync watermark: the date of the last successful journal +# fetch. Stored next to the data so it is committed alongside journals_db.json. +_JOURNAL_STATE = _SITE_DATA / "journal_sync_state.json" +# Re-scan this many days before the watermark so works OpenAlex indexes late +# (created_date trails publication) are not missed. Dedup absorbs any overlap. +_JOURNAL_LOOKBACK_DAYS = 7 + # Venues treated as arXiv / unclassified (not conference proceedings) _ARXIV_VENUES = {None, "", "arXiv", "Unknown"} +def _load_journal_watermark() -> str | None: + """Return the YYYY-MM-DD created-date floor for an incremental fetch. + + The stored date is shifted back by ``_JOURNAL_LOOKBACK_DAYS``. Returns None + when no prior sync is recorded, signalling a full backfill. + """ + if not _JOURNAL_STATE.exists(): + return None + try: + with open(_JOURNAL_STATE, encoding="utf-8") as fh: + last = json.load(fh).get("last_synced") + floor = date.fromisoformat(last) - timedelta(days=_JOURNAL_LOOKBACK_DAYS) + return floor.isoformat() + except Exception as exc: + log.warning("Could not read journal watermark (%s) — full fetch", exc) + return None + + +def _save_journal_watermark() -> None: + """Stamp today as the last successful journal sync.""" + try: + _JOURNAL_STATE.parent.mkdir(parents=True, exist_ok=True) + today = datetime.now(timezone.utc).date().isoformat() + with open(_JOURNAL_STATE, "w", encoding="utf-8") as fh: + json.dump({"last_synced": today}, fh, indent=2) + log.info(" [journals] watermark advanced → %s", today) + except Exception as exc: + log.warning("Could not write journal watermark: %s", exc) + + +def _load_complete_archive(source_type: str) -> list[Paper] | None: + """Load the complete, uncapped archive for a source_type from Railway. + + Returns a list of Paper (possibly empty) when Railway answered, or None when + it is unavailable — letting callers fall back to a full fetch rather than + skipping work against an incomplete (capped JSON) view. + """ + rows = railway_store.load(source_type=source_type) + if rows is None: + return None + papers: list[Paper] = [] + for d in rows: + try: + papers.append(Paper.from_dict(d)) + except Exception: + continue + log.info("Loaded %d %s papers from Railway archive", len(papers), source_type) + return papers + + +def _settled_conf_keys(papers: list[Paper]) -> set[str]: + """Build the set of ":" conference blocks safe to skip re-fetch. + + A block is settled when it is already in the complete archive AND its year is + in the past — the current calendar year is never skipped, so proceedings + still being published keep getting refreshed each run. + """ + cur_year = datetime.now(timezone.utc).year + keys: set[str] = set() + for p in papers: + if not p.venue or not p.year: + continue + try: + year = int(p.year) + except (TypeError, ValueError): + continue + if year < cur_year: + keys.add(f"{p.venue}:{year}") + return keys + + def _is_conference_paper(p: Paper) -> bool: return p.venue not in _ARXIV_VENUES @@ -211,20 +290,30 @@ def _load_journal_papers() -> list[Paper]: # ── Pipeline ────────────────────────────────────────────────────────────────── -def _fetch_journal_papers() -> list[Paper]: +def _fetch_journal_papers(since: str | None = None) -> list[Paper]: """Bulk-fetch top CS journals via OpenAlex (keyless, systematic by source id). + ``since`` (YYYY-MM-DD) restricts the fetch to works OpenAlex indexed on or + after that date — the incremental path. It is only set when the complete + journal archive is available to merge against, so the long tail is never + dropped; otherwise a full backfill runs. + OpenAlex is the primary source because the S2 journal search is unreliable here — it filters by venue *short name* and query text, which yields 0 results and chronic HTTP 429/400s. S2 is only a non-fatal supplement when a key is configured, for venues OpenAlex under-indexes (e.g. JMLR/TMLR). Dedup later in the pipeline removes any overlap. """ - log.info(" [openalex] bulk-fetching top CS journals by source id …") + if since: + log.info(" [openalex] incremental journal fetch (created since %s) …", since) + else: + log.info(" [openalex] full journal backfill …") journal_papers: list[Paper] = [] + fetch_ok = False try: - journal_papers = OpenAlexConnector().fetch_journals() + journal_papers = OpenAlexConnector().fetch_journals(from_created_date=since) log.info(" → %d journal papers (openalex)", len(journal_papers)) + fetch_ok = True except Exception as exc: log.warning(" [openalex] journal fetch failed: %s", exc) @@ -237,6 +326,11 @@ def _fetch_journal_papers() -> list[Paper]: except Exception as exc: log.warning(" [s2] journal supplement failed: %s", exc) + # Only advance the watermark when OpenAlex actually answered — a failed + # fetch must not skip those papers on the next run. + if fetch_ok: + _save_journal_watermark() + return journal_papers @@ -264,10 +358,31 @@ def run_pipeline( arxiv = ArxivConnector() all_papers: list[Paper] = [] + # ── Incremental sync setup (conference / journal sync runs only) ─────────── + # The complete, uncapped archive lives in Railway. When it is reachable we + # skip re-fetching settled work (immutable past proceedings / already-indexed + # journal papers) and merge the skipped rows back from the archive. If Railway + # is down we fall back to a full fetch so nothing below the JSON caps is lost. + journal_archive: list[Paper] | None = None + conf_archive: list[Paper] | None = None + journal_since: str | None = None + conf_skip_keys: set[str] | None = None + + if journals_only or conferences_only: + journal_archive = _load_complete_archive("journal") + if journal_archive is not None: + journal_since = _load_journal_watermark() + if conferences_only: + conf_archive = _load_complete_archive("conference") + if conf_archive is not None: + conf_skip_keys = _settled_conf_keys(conf_archive) + log.info(" [conf-sync] %d settled venue/year blocks will be skipped", + len(conf_skip_keys)) + # ── Journals-only mode: fetch journal papers and skip every other source ── if journals_only: log.info(" journals-only mode: fetching journal papers only") - all_papers.extend(_fetch_journal_papers()) + all_papers.extend(_fetch_journal_papers(since=journal_since)) # ── arXiv + ACL (skipped in conferences-only mode) ──────────────────────── if conferences_only: @@ -336,7 +451,7 @@ def run_pipeline( # OpenReview — ICLR, NeurIPS, COLM (authenticates via env credentials) log.info(" [openreview] fetching ALL papers (ICLR 2022-26, NeurIPS 2022-25, ICML 2024-25, COLM 2024-25) …") try: - fetched = OpenReviewConnector().fetch_all() + fetched = OpenReviewConnector().fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -344,7 +459,7 @@ def run_pipeline( log.info(" [pmlr] fetching ALL papers (ICML 2020-25, AISTATS 2021-25, UAI 2021-24) …") try: - fetched = PMLRConnector().fetch_all() + fetched = PMLRConnector().fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -352,7 +467,7 @@ def run_pipeline( log.info(" [cvf] fetching ALL papers (CVPR 2021-25, ICCV 2021+23, ECCV 2020+22+24) …") try: - fetched = CVFConnector().fetch_all() + fetched = CVFConnector().fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -360,7 +475,7 @@ def run_pipeline( log.info(" [acl] fetching ALL papers from anthology export (2020+) …") try: - fetched = ACLAnthologyConnector().fetch_all(min_year=2020) + fetched = ACLAnthologyConnector().fetch_all(min_year=2020, skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -370,13 +485,13 @@ def run_pipeline( log.info(" [s2] bulk-fetching AAAI, IJCAI, KDD, WWW, SIGIR, WSDM, CHI, SIGMOD, ICSE …") s2 = SemanticScholarConnector() try: - fetched = s2.fetch_all() + fetched = s2.fetch_all(skip_keys=conf_skip_keys) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: log.warning(" [s2] bulk fetch_all failed: %s", exc) - all_papers.extend(_fetch_journal_papers()) + all_papers.extend(_fetch_journal_papers(since=journal_since)) else: # ── Keyword-query mode (used in daily pipeline if skip_conferences=False) ── @@ -398,9 +513,12 @@ def run_pipeline( # ── OpenAlex (always, unless skip_conferences) ──────────────────────────── if not skip_conferences and not journals_only: if conferences_only: - log.info(" [openalex] bulk-fetching ML/NLP/CV/IR papers …") + if journal_since: + log.info(" [openalex] incremental bulk-fetch ML/NLP/CV/IR (created since %s) …", journal_since) + else: + log.info(" [openalex] full bulk-fetch ML/NLP/CV/IR papers …") try: - fetched = OpenAlexConnector(from_year=2022).fetch_all() + fetched = OpenAlexConnector(from_year=2022).fetch_all(from_created_date=journal_since) log.info(" → %d papers", len(fetched)) all_papers.extend(fetched) except Exception as exc: @@ -432,8 +550,10 @@ def run_pipeline( # papers (no expiry) and also bring in arXiv papers so the site output # stays complete. In journals-only mode the freshly fetched journals # merge with these; conference/arXiv rows are preserved, not dropped. - existing_conf = _load_conference_papers() - existing_journals = _load_journal_papers() + # Prefer the complete Railway archive (so skipped venue/years and the + # journal long-tail are restored in full); fall back to capped JSON. + existing_conf = conf_archive if conf_archive is not None else _load_conference_papers() + existing_journals = journal_archive if journal_archive is not None else _load_journal_papers() existing_arxiv = _load_arxiv_papers(max_age_days=max_age_days) all_papers = all_papers + existing_conf + existing_journals + existing_arxiv else: diff --git a/src/storage/railway_store.py b/src/storage/railway_store.py index 86dfba7..3dbc625 100644 --- a/src/storage/railway_store.py +++ b/src/storage/railway_store.py @@ -102,6 +102,56 @@ def _upsert_papers(cur, rows: list[dict]) -> None: log.info("[railway] upserted %d/%d papers", min(i + _BATCH, len(rows)), len(rows)) +def load(source_type: str | None = None) -> list[dict] | None: + """Read papers back from Railway — the complete, uncapped archive. + + Returns the rows as dicts (list columns decoded back to lists), optionally + filtered to a single ``source_type`` ('journal' / 'conference'). Returns + ``None`` when the DB is unreachable, so callers can distinguish "archive is + empty" from "no complete source available" (and fall back to a full fetch + rather than silently dropping the long tail). + """ + conn = _conn() + if conn is None: + return None + + cols = sorted(_PAPER_COLS) + sql = f"SELECT {', '.join(cols)} FROM papers" + params: list = [] + if source_type: + sql += " WHERE source_type = %s" + params.append(source_type) + + try: + with conn: + with conn.cursor() as cur: + cur.execute(sql, params) + fetched = cur.fetchall() + out: list[dict] = [] + for row in fetched: + d = dict(zip(cols, row)) + for c in _LIST_COLS: + v = d.get(c) + if isinstance(v, str): + try: + d[c] = json.loads(v) + except Exception: + d[c] = [] + elif v is None: + d[c] = [] + out.append(d) + log.info( + "[railway] loaded %d papers%s", len(out), + f" (source_type={source_type})" if source_type else "", + ) + return out + except Exception as exc: + log.warning("[railway] load failed: %s", exc) + return None + finally: + conn.close() + + def sync(papers: list[dict]) -> bool: conn = _conn() if conn is None: