Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/connectors/acl_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,21 @@ def source_name(self) -> str:

# ── Conference-sync mode: fetch everything ────────────────────────────────

def fetch_all(self, min_year: int = 2020) -> list[Paper]:
def fetch_all(
self, min_year: int = 2020, skip_keys: set[str] | None = None
) -> list[Paper]:
"""Download anthology+abstracts.bib.gz and return ALL papers for target venues.

The export is ~37 MB compressed and includes titles, abstracts, authors,
and BibTeX metadata for all 90,000+ ACL Anthology papers.
Only papers from self._sync_venues and year >= min_year are returned.

``skip_keys`` holds "<venue>:<year>" blocks already archived (settled
past years); matching records are skipped so the merge takes them from
the complete archive instead. (The export is one download regardless, so
this only trims parsing, not the fetch.)
"""
skip_keys = skip_keys or set()
log.info("[acl] downloading full anthology export from %s …", _EXPORT_URL)
req = urllib.request.Request(
_EXPORT_URL,
Expand Down Expand Up @@ -121,6 +129,10 @@ def fetch_all(self, min_year: int = 2020) -> list[Paper]:
if venue_key not in self._sync_venues:
continue

venue_name, _ = _VENUE_META.get(venue_key, (venue_key.upper(), ""))
if f"{venue_name}:{year}" in skip_keys:
continue

p = self._export_record_to_paper(acl_id, record, venue_key)
if p and p.title and p.id not in seen:
seen.add(p.id)
Expand Down
12 changes: 10 additions & 2 deletions src/connectors/cvf_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,23 @@ def __init__(self, conferences: list[str] | None = None) -> None:
def source_name(self) -> str:
return "cvf"

def fetch_all(self) -> list[Paper]:
"""Fetch ALL papers from configured CVF conferences."""
def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]:
"""Fetch ALL papers from configured CVF conferences.

``skip_keys`` holds "<venue>:<year>" blocks already archived (settled
past years); those conferences are skipped to avoid re-downloading them.
"""
skip_keys = skip_keys or set()
all_papers: list[Paper] = []
seen: set[str] = set()
for conf_key in self._conferences:
if conf_key not in _CONFERENCES:
log.warning("[cvf] unknown conference key: %s", conf_key)
continue
path, venue, rank, year = _CONFERENCES[conf_key]
if f"{venue}:{year}" in skip_keys:
log.info("[cvf] %s (%s %d) archived — skipping", conf_key, venue, year)
continue
try:
papers = self._fetch_conference(path, venue, rank, year)
log.info("[cvf] %s → %d papers", conf_key, len(papers))
Expand Down
36 changes: 30 additions & 6 deletions src/connectors/openalex_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,14 @@ def source_name(self) -> str:

# ── Full bulk fetch ───────────────────────────────────────────────────────

def fetch_all(self) -> list[Paper]:
"""Fetch ALL papers for configured concept groups."""
def fetch_all(self, from_created_date: str | None = None) -> list[Paper]:
"""Fetch ALL papers for configured concept groups.

When ``from_created_date`` (YYYY-MM-DD) is set, only works OpenAlex
indexed on or after that date are returned — the incremental path that
avoids re-paginating entire concepts (hundreds of thousands of works)
every run.
"""
all_papers: list[Paper] = []
seen: set[str] = set()
for group in self._groups:
Expand All @@ -234,7 +240,7 @@ def fetch_all(self) -> list[Paper]:
continue
log.info("[openalex] fetching %s (concepts: %s) …", group, concept_ids)
try:
papers = self._fetch_concept_group(concept_ids)
papers = self._fetch_concept_group(concept_ids, from_created_date)
log.info("[openalex] %s → %d papers", group, len(papers))
for p in papers:
if p.id not in seen:
Expand All @@ -252,13 +258,19 @@ def fetch_journals(
from_year: int | None = None,
to_year: int | None = None,
max_per_source: int | None = None,
from_created_date: str | None = None,
) -> list[Paper]:
"""Bulk-fetch every paper from the top CS journals, by venue.

Paginates OpenAlex works filtered to each journal's source id, so
coverage of a given journal is systematic — unlike fetch_all(), which
ranks by citations across a whole concept. Sets source_type='journal'
and stamps the canonical venue/rank. Requires no API key.

When ``from_created_date`` (YYYY-MM-DD) is given, only works *indexed by
OpenAlex on or after that date* are returned — an incremental sync that
skips papers already pulled in a previous run instead of re-downloading
the whole publication-year window every time.
"""
target = sources or _JOURNAL_SOURCES
from_y = from_year if from_year is not None else self._from_year
Expand All @@ -270,7 +282,9 @@ def fetch_journals(

for source_id, (short, rank) in target.items():
try:
papers = self._fetch_journal_source(source_id, year_flt, max_per_source)
papers = self._fetch_journal_source(
source_id, year_flt, max_per_source, from_created_date
)
log.info("[openalex] journal %s (%s) → %d papers", short, source_id, len(papers))
for p in papers:
if p.id in seen:
Expand All @@ -288,12 +302,18 @@ def fetch_journals(
return all_papers

def _fetch_journal_source(
self, source_id: str, year_flt: str, max_per_source: int | None
self,
source_id: str,
year_flt: str,
max_per_source: int | None,
from_created_date: str | None = None,
) -> list[Paper]:
base_filter = (
f"primary_location.source.id:{source_id},"
f"publication_year:{year_flt},type:article"
)
if from_created_date:
base_filter += f",from_created_date:{from_created_date}"
papers: list[Paper] = []
cursor = "*"
while cursor:
Expand Down Expand Up @@ -341,9 +361,13 @@ def fetch(self, query: str, max_results: int = 50) -> list[Paper]:

# ── internals ─────────────────────────────────────────────────────────────

def _fetch_concept_group(self, concept_ids: list[str]) -> list[Paper]:
def _fetch_concept_group(
self, concept_ids: list[str], from_created_date: str | None = None
) -> list[Paper]:
concept_filter = "|".join(concept_ids)
base_filter = f"concepts.id:{concept_filter},type:article,publication_year:>={self._from_year}"
if from_created_date:
base_filter += f",from_created_date:{from_created_date}"

papers: list[Paper] = []
cursor = "*"
Expand Down
14 changes: 12 additions & 2 deletions src/connectors/openreview_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,21 @@ def source_name(self) -> str:

# ── Called by conference-sync (fetch everything) ──────────────────────────

def fetch_all(self) -> list[Paper]:
"""Fetch ALL accepted papers from every configured venue."""
def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]:
"""Fetch ALL accepted papers from every configured venue.

``skip_keys`` holds "<venue>:<year>" blocks already in the complete
archive (settled past years); those venue/years are skipped to avoid
re-downloading immutable proceedings every sync.
"""
skip_keys = skip_keys or set()
all_papers: list[Paper] = []
seen: set[str] = set()
for venue_id in self._venues:
name, _rank, year = _VENUES.get(venue_id, ("Unknown", "", 0))
if f"{name}:{year}" in skip_keys:
log.info("[openreview] %s (%s %s) archived — skipping", venue_id, name, year)
continue
try:
papers = self._fetch_venue_all(venue_id)
log.info("[openreview] %s → %d papers", venue_id, len(papers))
Expand Down
12 changes: 10 additions & 2 deletions src/connectors/pmlr_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,19 @@ def __init__(self, volumes: dict[str, tuple[str, str, int]] | None = None) -> No
def source_name(self) -> str:
return "pmlr"

def fetch_all(self) -> list[Paper]:
"""Fetch ALL papers from every configured PMLR volume."""
def fetch_all(self, skip_keys: set[str] | None = None) -> list[Paper]:
"""Fetch ALL papers from every configured PMLR volume.

``skip_keys`` holds "<venue>:<year>" blocks already archived (settled
past years); those volumes are skipped to avoid re-downloading them.
"""
skip_keys = skip_keys or set()
all_papers: list[Paper] = []
seen: set[str] = set()
for vol, (venue, rank, year) in self._volumes.items():
if f"{venue}:{year}" in skip_keys:
log.info("[pmlr] v%s (%s %d) archived — skipping", vol, venue, year)
continue
try:
papers = self._fetch_volume(vol, venue, rank, year)
log.info("[pmlr] v%s (%s %d) → %d papers", vol, venue, year, len(papers))
Expand Down
14 changes: 12 additions & 2 deletions src/connectors/semantic_scholar_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,29 @@ def __init__(self, venues: list[str] | None = None) -> None:
def source_name(self) -> str:
return "semantic_scholar"

def fetch_all(self, venues: dict[str, list[int]] | None = None) -> list[Paper]:
def fetch_all(
self,
venues: dict[str, list[int]] | None = None,
skip_keys: set[str] | None = None,
) -> list[Paper]:
"""Bulk-fetch ALL papers for ICLR/NeurIPS/COLM using the S2 bulk endpoint.

Uses cursor-based pagination — no result cap beyond API limits.
Falls back gracefully per venue/year on failure.
Falls back gracefully per venue/year on failure. ``skip_keys`` holds
"<venue>:<year>" blocks already archived (settled past years), which are
skipped to avoid re-fetching immutable proceedings.
"""
target = venues or _BULK_VENUES
skip_keys = skip_keys or set()
all_papers: list[Paper] = []
seen: set[str] = set()

for venue_key, years in target.items():
venue_name, rank = _VENUES.get(venue_key, (venue_key, ""))
for year in years:
if f"{venue_name}:{year}" in skip_keys:
log.info("[s2] bulk %s %d archived — skipping", venue_key, year)
continue
try:
papers = self._bulk_fetch_venue_year(venue_key, venue_name, rank, year)
log.info("[s2] bulk %s %d → %d papers", venue_key, year, len(papers))
Expand Down
Loading
Loading