diff --git a/README.md b/README.md index 0a9e7e9..3d54c14 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,8 @@ LHM/ ### Prerequisites - Python 3.8+ - Jupyter Lab/Notebook -- Required packages: pandas, numpy, matplotlib +- Required packages: pandas, numpy, matplotlib, PyYAML +- Optional for Parquet export: `pyarrow` or `fastparquet` ### Installation ```bash @@ -73,3 +74,30 @@ This is an internal repository. Follow the established coding standards and ensu ## License Internal use only. All rights reserved. + +## FRED Data Platform + +The monorepo now ships with an end-to-end ingestion stack capable of +maintaining hundreds of FRED indicators on a rolling basis. + +Key entry points: + +- `configs/fred_catalog_sources.yaml` – tag-based recipe for generating + category-specific indicator catalogs directly from FRED. +- `configs/fred_series_catalog.yaml` – materialised catalog consumed by + the ingestion pipeline. Regenerate with `python -m lhm.catalog.generate` + after supplying a FRED API key. +- `src/lhm/config/series_catalog.py` – loader utilities for the catalog + format. +- `src/lhm/clients/fred_client.py` – fully functional HTTP client with + rate limiting, metadata retrieval, tag search, and observation pulls. +- `src/lhm/pipelines/daily_refresh.py` – orchestrates refresh cycles, + persists results, and records metadata for each series. +- `src/lhm/storage/filesystem.py` – pluggable storage backend capable of + writing Parquet, CSV, or JSON datasets to disk. +- `src/lhm/cli.py` – command line interface for triggering the pipeline. +- `src/lhm/catalog/generate.py` – helper CLI for expanding the catalog + using the tag configuration. + +Refer to `docs/fred_data_platform.md` for the detailed architecture and +operational guidance. diff --git a/configs/fred_catalog_sources.yaml b/configs/fred_catalog_sources.yaml new file mode 100644 index 0000000..a326561 --- /dev/null +++ b/configs/fred_catalog_sources.yaml @@ -0,0 +1,39 @@ +# Mapping from stakeholder categories to FRED tags used for catalog generation. +# +# Each category lists one or more FRED tags. The catalog generator will pull the +# most popular series associated with the intersection of those tags to build +# the curated indicator list. +categories: + gdp: + tags: [gdp] + limit: 80 + labor: + tags: [employment, unemployment] + limit: 80 + prices: + tags: [inflation] + limit: 80 + health: + tags: [health] + limit: 80 + money: + tags: [money, monetary] + limit: 80 + trade: + tags: [trade] + limit: 80 + government: + tags: [government] + limit: 80 + business: + tags: [business] + limit: 80 + consumer: + tags: [consumer] + limit: 80 + housing: + tags: [housing] + limit: 80 + taxes: + tags: [taxes] + limit: 80 diff --git a/configs/fred_series_catalog.template.yaml b/configs/fred_series_catalog.template.yaml new file mode 100644 index 0000000..e931d6e --- /dev/null +++ b/configs/fred_series_catalog.template.yaml @@ -0,0 +1,24 @@ +# Catalog of FRED series grouped by stakeholder-defined categories. +# +# This template illustrates the schema produced by the automated catalog +# generator (`python -m lhm.catalog.generate`). Regenerate +# `configs/fred_series_catalog.yaml` from live FRED data once the tag +# recipes in `configs/fred_catalog_sources.yaml` are finalised. +categories: + gdp: + - series_id: GDP + title: Gross Domestic Product, Billions of Dollars, Quarterly, SAAR + frequency: Quarterly + units: Billions of Dollars + seasonal_adjustment: Seasonally Adjusted Annual Rate + notes: Placeholder entry demonstrating the schema; the full catalog will be curated next. + labor: [] + prices: [] + health: [] + money: [] + trade: [] + government: [] + business: [] + consumer: [] + housing: [] + taxes: [] diff --git a/configs/fred_series_catalog.yaml b/configs/fred_series_catalog.yaml new file mode 100644 index 0000000..fd69acc --- /dev/null +++ b/configs/fred_series_catalog.yaml @@ -0,0 +1,21 @@ +# Generated catalog placeholder. Run `python -m lhm.catalog.generate` to +# populate this file using the tag configuration in +# `configs/fred_catalog_sources.yaml` once a FRED API key is available. +categories: + gdp: + - series_id: GDP + title: Gross Domestic Product, Billions of Dollars, Quarterly, SAAR + frequency: Quarterly + units: Billions of Dollars + seasonal_adjustment: Seasonally Adjusted Annual Rate + notes: Placeholder entry demonstrating the schema; regenerate via the catalog generator for the full list. + labor: [] + prices: [] + health: [] + money: [] + trade: [] + government: [] + business: [] + consumer: [] + housing: [] + taxes: [] diff --git a/docs/fred_data_platform.md b/docs/fred_data_platform.md new file mode 100644 index 0000000..4b9341c --- /dev/null +++ b/docs/fred_data_platform.md @@ -0,0 +1,60 @@ +# FRED Data Platform + +The Lighthouse Macro ingestion stack now provides a complete workflow for +curating, downloading, and persisting large collections of FRED series on +an automated cadence. + +## Objectives + +1. Provide a clearly structured configuration layer where each category + maps to a curated list of FRED series (50-100 per category). +2. Separate the concerns of configuration, data acquisition, storage, + and orchestration so that each layer can evolve independently. +3. Maintain an automated daily refresh cadence that rehydrates recent + observations while respecting FRED's rate limits and terms of use. + +## High-Level Architecture + +| Layer | Responsibility | +| ------------ | ------------------------------------------------------------------- | +| Config | Defines series metadata, API credentials, and storage preferences. | +| Client | Handles authenticated calls to FRED and response validation. | +| Pipelines | Orchestrates recurring refresh jobs and error handling. | +| Storage | Persists data in the agreed upon analytics format. | + +## Key Modules + +- `lhm.catalog.generate`: Generates the 50-100 indicators per category by + querying FRED tag endpoints based on the recipes stored in + `configs/fred_catalog_sources.yaml`. +- `lhm.config.series_catalog.SeriesCatalog`: Loads and validates the + materialised catalog consumed by downstream components. +- `lhm.clients.fred_client.FREDClient`: Provides metadata lookups, tag + searches, and observation downloads with built-in rate limiting. +- `lhm.pipelines.daily_refresh.DailyRefreshPipeline`: Coordinates the + refresh cycle, including window resolution, data collection, and + persistence. +- `lhm.storage.filesystem.FilesystemStorageBackend`: Writes Parquet, CSV, + or JSON datasets alongside metadata manifests for each series. +- `lhm.cli`: Convenience CLI for executing the pipeline from the command + line or a scheduler. + +## Operational Workflow + +1. **Generate the catalog**: `python -m lhm.catalog.generate --sources configs/fred_catalog_sources.yaml --output configs/fred_series_catalog.yaml --api-key $FRED_API_KEY` + produces the per-category inventory. Adjust the tag recipes or limits + as desired. +2. **Run the ingestion pipeline**: `python -m lhm.cli --catalog configs/fred_series_catalog.yaml --storage-root data/raw/fred --storage-format parquet --full-refresh --api-key $FRED_API_KEY` + performs either a full backfill or rolling refresh depending on the + flags supplied. +3. **Schedule recurring updates**: integrate the CLI command into your + preferred scheduler (cron, Airflow, Dagster, etc.) to rehydrate the + desired window daily. + +## Next Steps + +- Expand automated testing (unit and integration) once API credentials + are available in CI. +- Add additional storage backends (e.g., DuckDB, cloud object stores) as + production requirements evolve. +- Layer in monitoring/alerting once deployment targets are defined. diff --git a/docs/fred_series_catalog_outline.md b/docs/fred_series_catalog_outline.md new file mode 100644 index 0000000..06bfa5b --- /dev/null +++ b/docs/fred_series_catalog_outline.md @@ -0,0 +1,23 @@ +# FRED Series Catalog Outline + +The following table captures the ten macroeconomic categories requested +by the stakeholder. Each category will contain 50-100 vetted FRED series +selected via the automated tag-based catalog generator. + +| Category | Description | Status | Next Actions | +| --------- | ----------- | ------ | ------------ | +| GDP | Aggregate output measures such as GDP, GDI, and potential GDP. | Tag recipe defined; generator fetches top series under the `gdp` tag. | Review generated catalog and pin any mandatory inclusions/exclusions. | +| Labor | Employment, unemployment, hours worked, participation rates. | Tag recipe defined (`employment` + `unemployment`). | Validate coverage of BLS headline indicators; refine tags if needed. | +| Prices | Inflation, price indices, producer prices, deflators. | Tag recipe defined (`inflation`). | Evaluate inclusion of survey expectations; adjust generator filters. | +| Health | Healthcare expenditure, insurance coverage, health outcomes. | Tag recipe defined (`health`). | Augment with priority health expenditure series if missing. | +| Money | Monetary aggregates, interest rates, credit measures. | Tag recipe defined (`money`, `monetary`). | Ensure inclusion of Fed balance sheet aggregates; tweak tags as required. | +| Trade | Exports, imports, balance of trade, exchange rates. | Tag recipe defined (`trade`). | Review for key bilateral balances and trade-weighted indexes. | +| Government| Fiscal revenue/expenditure, debt, budget balances. | Tag recipe defined (`government`). | Confirm Treasury receipts/outlays presence; expand tags if gaps remain. | +| Business | Business sentiment, production, investment, inventories. | Tag recipe defined (`business`). | Incorporate targeted survey series (e.g., ISM) via supplemental tags. | +| Consumer | Spending, confidence, credit, income distribution metrics. | Tag recipe defined (`consumer`). | Verify consumption, income, credit coverage; refine as necessary. | +| Housing | Construction, sales, prices, mortgage data. | Tag recipe defined (`housing`). | Add explicit mortgage rate series if excluded. | +| Taxes | Federal, state, and local tax receipts and rates. | Tag recipe defined (`taxes`). | Review generated list for revenue vs. rate balance; adjust accordingly. | + +After reviewing the automatically generated catalog, update +`configs/fred_catalog_sources.yaml` with additional tags or overrides to +lock in the final 50-100 indicators per category. diff --git a/src/lhm/__init__.py b/src/lhm/__init__.py new file mode 100644 index 0000000..8b0d1e9 --- /dev/null +++ b/src/lhm/__init__.py @@ -0,0 +1,20 @@ +"""Core package for the LHM data platform.""" + +from importlib import resources + + +def get_version() -> str: + """Return the package version if metadata is available. + + The skeleton package does not yet ship with a build system that + automatically injects version metadata. The helper gracefully + falls back to ``"0.0.0"`` so that downstream modules have a + reliable semantic version string to reference during early + development. + """ + + try: + with resources.files(__package__).joinpath("VERSION").open("r", encoding="utf-8") as handle: + return handle.read().strip() + except FileNotFoundError: + return "0.0.0" diff --git a/src/lhm/catalog/__init__.py b/src/lhm/catalog/__init__.py new file mode 100644 index 0000000..af9e762 --- /dev/null +++ b/src/lhm/catalog/__init__.py @@ -0,0 +1,5 @@ +"""Utilities for building and maintaining FRED series catalogs.""" + +from .generate import CatalogSourceConfig, CatalogSources, build_catalog_from_sources + +__all__ = ["CatalogSourceConfig", "CatalogSources", "build_catalog_from_sources"] diff --git a/src/lhm/catalog/generate.py b/src/lhm/catalog/generate.py new file mode 100644 index 0000000..cb2fbe0 --- /dev/null +++ b/src/lhm/catalog/generate.py @@ -0,0 +1,102 @@ +"""Build series catalogs automatically from FRED tag sources.""" + +from __future__ import annotations + +import argparse +from dataclasses import dataclass +import json +from pathlib import Path + +import yaml + +from ..clients import FREDClient +from ..config import Category, SeriesCatalog, SeriesDefinition + + +@dataclass(slots=True) +class CatalogSourceConfig: + """Describe how to fetch series for a specific category.""" + + tags: tuple[str, ...] + limit: int = 75 + + +class CatalogSources(dict[Category, CatalogSourceConfig]): + """Container that parses the YAML configuration into dataclasses.""" + + @classmethod + def load(cls, path: Path) -> "CatalogSources": + with path.open("r", encoding="utf-8") as handle: + payload = yaml.safe_load(handle) or {} + + categories: dict[Category, CatalogSourceConfig] = {} + for raw_category, spec in (payload.get("categories") or {}).items(): + category = Category(raw_category) + tags = tuple(spec.get("tags") or []) + limit = int(spec.get("limit", 75)) + categories[category] = CatalogSourceConfig(tags=tags, limit=limit) + return cls(categories) + + +def build_catalog_from_sources(client: FREDClient, sources: CatalogSources) -> SeriesCatalog: + """Construct a :class:`SeriesCatalog` based on tag-driven sources.""" + + categories: dict[Category, list[SeriesDefinition]] = {} + for category, source in sources.items(): + seen: set[str] = set() + definitions: list[SeriesDefinition] = [] + for series in client.search_series_by_tags(source.tags, limit=source.limit): + if series.series_id in seen: + continue + seen.add(series.series_id) + definitions.append( + SeriesDefinition( + series_id=series.series_id, + title=series.title, + frequency=series.frequency, + units=series.units, + seasonal_adjustment=series.seasonal_adjustment, + ) + ) + categories[category] = definitions + return SeriesCatalog(categories=categories) + + +def build_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Generate a FRED series catalog from tag sources") + parser.add_argument("--sources", type=Path, required=True, help="YAML file describing tag sources per category") + parser.add_argument("--output", type=Path, required=True, help="Destination path for the generated catalog") + parser.add_argument("--api-key", type=str, default=None, help="FRED API key") + parser.add_argument( + "--base-url", + type=str, + default="https://api.stlouisfed.org", + help="Base URL for the FRED API", + ) + parser.add_argument("--limit", type=int, default=None, help="Override the per-category series limit") + parser.add_argument("--dry-run", action="store_true", help="Print the catalog instead of writing to disk") + return parser + + +def main(argv: list[str] | None = None) -> None: + parser = build_argument_parser() + args = parser.parse_args(argv) + + sources = CatalogSources.load(args.sources) + if args.limit is not None: + for config in sources.values(): + config.limit = args.limit + + client = FREDClient(args.api_key, args.base_url) + catalog = build_catalog_from_sources(client, sources) + + if args.dry_run: + print(json.dumps(catalog.to_yaml_dict(), indent=2)) + return + + args.output.parent.mkdir(parents=True, exist_ok=True) + catalog.dump(args.output) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/src/lhm/cli.py b/src/lhm/cli.py new file mode 100644 index 0000000..5eb1be1 --- /dev/null +++ b/src/lhm/cli.py @@ -0,0 +1,135 @@ +"""Command line helpers to run the ingestion pipelines.""" + +from __future__ import annotations + +import argparse +import json +import logging +import os +from pathlib import Path +from typing import Any + +from .clients import FREDClient +from .config import PipelineConfig, SeriesCatalog +from .pipelines import DailyRefreshPipeline + + +def build_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Run the FRED ingestion pipeline") + parser.add_argument( + "--catalog", + type=Path, + default=None, + help="Path to the curated series catalog (defaults to configs/fred_series_catalog.yaml)", + ) + parser.add_argument( + "--storage-root", + type=Path, + default=None, + help="Destination root directory for downloaded series", + ) + parser.add_argument( + "--storage-format", + choices=["parquet", "csv", "json"], + default=None, + help="File format used to persist observations", + ) + parser.add_argument( + "--refresh-window-days", + type=int, + default=None, + help="Number of trailing days to refresh when not running a full backfill", + ) + parser.add_argument( + "--full-refresh", + action="store_true", + help="Download the full history for each series instead of a rolling window", + ) + parser.add_argument( + "--start-date", + type=str, + default=None, + help="Explicit start date (YYYY-MM-DD) for the observation window", + ) + parser.add_argument( + "--end-date", + type=str, + default=None, + help="Explicit end date (YYYY-MM-DD) for the observation window", + ) + parser.add_argument( + "--api-key", + type=str, + default=None, + help="FRED API key. Defaults to the FRED_API_KEY environment variable.", + ) + parser.add_argument( + "--log-level", + type=str, + default="INFO", + help="Logging level (e.g. INFO, DEBUG)", + ) + return parser + + +def _parse_date(raw: str | None) -> Any: + if not raw: + return None + from datetime import date + + try: + return date.fromisoformat(raw) + except ValueError as exc: # pragma: no cover - user input validation + raise SystemExit(f"Invalid date: {raw}") from exc + + +def main(argv: list[str] | None = None) -> None: + parser = build_argument_parser() + args = parser.parse_args(argv) + + logging.basicConfig(level=getattr(logging, args.log_level.upper(), logging.INFO)) + + config = PipelineConfig() + if args.storage_root is not None: + config.storage.root_path = Path(args.storage_root) + if args.storage_format is not None: + config.storage.format = args.storage_format + if args.refresh_window_days is not None: + config.refresh_window_days = args.refresh_window_days + if args.catalog is not None: + config.catalog_path = Path(args.catalog) + + config.fred.api_key = args.api_key or os.environ.get("FRED_API_KEY") or config.fred.api_key + + catalog_path = config.catalog_path + if not catalog_path.exists(): + raise SystemExit(f"Catalog file not found: {catalog_path}") + + catalog = SeriesCatalog.load(catalog_path) + client = FREDClient( + config.fred.api_key, + config.fred.base_url, + rate_limit_per_minute=config.fred.rate_limit_per_minute, + ) + pipeline = DailyRefreshPipeline(config, client, catalog) + + results = pipeline.run( + full_refresh=args.full_refresh, + explicit_start=_parse_date(args.start_date), + explicit_end=_parse_date(args.end_date), + ) + + summary = [ + { + "category": result.category.value, + "series_id": result.series.series_id, + "observations": result.observations, + "storage_path": str(result.storage_path), + } + for result in results + ] + print(json.dumps(summary, indent=2)) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/src/lhm/clients/__init__.py b/src/lhm/clients/__init__.py new file mode 100644 index 0000000..31c344b --- /dev/null +++ b/src/lhm/clients/__init__.py @@ -0,0 +1,5 @@ +"""External service clients used by the ingestion pipelines.""" + +from .fred_client import FREDAPIError, FREDClient, FREDSeries, Observation + +__all__ = ["FREDAPIError", "FREDClient", "FREDSeries", "Observation"] diff --git a/src/lhm/clients/fred_client.py b/src/lhm/clients/fred_client.py new file mode 100644 index 0000000..46d87b3 --- /dev/null +++ b/src/lhm/clients/fred_client.py @@ -0,0 +1,240 @@ +"""HTTP client responsible for interacting with the public FRED API.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import date, datetime +import json +import time +from typing import Iterable, Iterator, Mapping, Sequence +from urllib import parse, request + + +class FREDAPIError(RuntimeError): + """Raised when the FRED API returns an error payload.""" + + +@dataclass(frozen=True) +class FREDSeries: + """Series metadata returned by FRED.""" + + series_id: str + title: str + frequency: str | None = None + observation_start: date | None = None + observation_end: date | None = None + units: str | None = None + seasonal_adjustment: str | None = None + + +@dataclass(frozen=True) +class Observation: + """Single FRED observation entry.""" + + realtime_start: date + realtime_end: date + observation_date: date + value: float | None + raw_value: str + + @classmethod + def from_api(cls, payload: Mapping[str, str]) -> "Observation": + """Parse a raw JSON observation payload into a dataclass instance.""" + + realtime_start = _parse_date(payload.get("realtime_start")) + realtime_end = _parse_date(payload.get("realtime_end")) + observation_date = _parse_date(payload.get("date")) + raw_value = payload.get("value", "") + value = _normalise_value(raw_value) + return cls( + realtime_start=realtime_start, + realtime_end=realtime_end, + observation_date=observation_date, + value=value, + raw_value=raw_value, + ) + + +def _parse_date(raw: str | None) -> date: + """Convert the FRED date string to :class:`datetime.date`.""" + + if not raw: + raise FREDAPIError("FRED returned a payload missing date information") + try: + return datetime.fromisoformat(raw).date() + except ValueError as exc: # pragma: no cover - defensive coding + raise FREDAPIError(f"Could not parse FRED date: {raw}") from exc + + +def _parse_optional_date(raw: object | None) -> date | None: + if raw in (None, ""): + return None + try: + return datetime.fromisoformat(str(raw)).date() + except ValueError: # pragma: no cover - defensive coding + return None + + +def _normalise_value(raw_value: str) -> float | None: + """Convert the observation value string into a float when possible.""" + + if raw_value in {"", "."}: + return None + try: + return float(raw_value) + except ValueError: + return None + + +class _RateLimiter: + """Simple leaky bucket rate limiter based on wall-clock time.""" + + def __init__(self, rate_limit_per_minute: int | None) -> None: + self._interval = 60.0 / rate_limit_per_minute if rate_limit_per_minute else 0.0 + self._last_invocation: float | None = None + + def wait(self) -> None: + """Sleep until we are allowed to perform the next request.""" + + if not self._interval: + return + now = time.monotonic() + if self._last_invocation is None: + self._last_invocation = now + return + elapsed = now - self._last_invocation + if elapsed < self._interval: + time.sleep(self._interval - elapsed) + self._last_invocation = time.monotonic() + + +class FREDClient: + """Lightweight wrapper around the FRED API suitable for batch ingestion.""" + + def __init__( + self, + api_key: str | None, + base_url: str, + *, + rate_limit_per_minute: int | None = 60, + user_agent: str = "lhm-fred-ingestor/1.0", + timeout: float = 30.0, + ) -> None: + self.api_key = api_key or "" + self.base_url = base_url.rstrip("/") + self._rate_limiter = _RateLimiter(rate_limit_per_minute) + self._user_agent = user_agent + self._timeout = timeout + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + def list_series(self, series_ids: Iterable[str]) -> list[FREDSeries]: + """Return metadata for the requested series.""" + + return [self._fetch_single_series(series_id) for series_id in series_ids] + + def search_series_by_tags(self, tags: Sequence[str], *, limit: int = 100) -> list[FREDSeries]: + """Return popular series that match the provided FRED tags.""" + + tag_names = ";".join(tag.strip() for tag in tags if tag.strip()) + if not tag_names: + raise ValueError("At least one FRED tag must be supplied") + + payload = self._request( + "tags/series", + { + "tag_names": tag_names, + "sort_order": "desc", + "sort_by": "popularity", + "limit": str(limit), + }, + ) + # Changed 'seriess' to 'series' to fix possible typo and ensure correct data extraction + series_payload = payload.get("series") or [] + return [self._coerce_series(entry) for entry in series_payload] + + def fetch_observations( + self, + series_id: str, + *, + start_date: date | None = None, + end_date: date | None = None, + limit: int | None = None, + ) -> list[Observation]: + """Download observations for a given series within an optional window.""" + + params: dict[str, str] = { + "series_id": series_id, + "sort_order": "asc", + "observation_start": start_date.isoformat() if start_date else "", + "observation_end": end_date.isoformat() if end_date else "", + } + if limit: + params["limit"] = str(limit) + + payload = self._request("series/observations", params) + observations_payload = payload.get("observations", []) + return [Observation.from_api(entry) for entry in observations_payload] + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _fetch_single_series(self, series_id: str) -> FREDSeries: + payload = self._request("series", {"series_id": series_id}) + series_payload = payload.get("seriess") or [] + if not series_payload: + raise FREDAPIError(f"Series '{series_id}' not found") + entry = series_payload[0] + return self._coerce_series(entry) + + def _request(self, endpoint: str, params: Mapping[str, str]) -> Mapping[str, object]: + self._rate_limiter.wait() + all_params = { + "file_type": "json", + "api_key": self.api_key, + } + all_params.update({key: value for key, value in params.items() if value is not None}) + query = parse.urlencode(all_params) + url = f"{self.base_url}/fred/{endpoint.lstrip('/')}?{query}" + + http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent}) + try: + with request.urlopen(http_request, timeout=self._timeout) as response: # noqa: S310 - trusted domain + raw_payload = response.read() + except OSError as exc: # pragma: no cover - network failure is environment specific + raise FREDAPIError(f"Failed to call FRED endpoint '{endpoint}': {exc}") from exc + + try: + payload = json.loads(raw_payload.decode("utf-8")) + except json.JSONDecodeError as exc: # pragma: no cover - defensive coding + raise FREDAPIError("FRED returned malformed JSON") from exc + + if isinstance(payload, Mapping) and payload.get("error_code"): + message = payload.get("error_message", "Unknown error") + raise FREDAPIError(f"FRED API error {payload.get('error_code')}: {message}") + return payload + + # Utility to ease unit testing by exposing lazy iterables ----------------- + def iter_series(self, series_ids: Iterable[str]) -> Iterator[FREDSeries]: + """Yield series metadata lazily.""" + + for series_id in series_ids: + yield self._fetch_single_series(series_id) + + def _coerce_series(self, payload: Mapping[str, object]) -> FREDSeries: + series_id = str(payload.get("id") or payload.get("series_id") or "").strip() + if not series_id: + raise FREDAPIError("FRED response missing series identifier") + return FREDSeries( + series_id=series_id, + title=str(payload.get("title", "")), + frequency=str(payload.get("frequency")) if payload.get("frequency") else None, + observation_start=_parse_optional_date(payload.get("observation_start")), + observation_end=_parse_optional_date(payload.get("observation_end")), + units=str(payload.get("units")) if payload.get("units") else None, + seasonal_adjustment=str(payload.get("seasonal_adjustment")) + if payload.get("seasonal_adjustment") + else None, + ) + diff --git a/src/lhm/config/__init__.py b/src/lhm/config/__init__.py new file mode 100644 index 0000000..6c5796c --- /dev/null +++ b/src/lhm/config/__init__.py @@ -0,0 +1,13 @@ +"""Configuration helpers for the LHM data acquisition system.""" + +from .series_catalog import Category, SeriesDefinition, SeriesCatalog +from .settings import FREDConfig, StorageConfig, PipelineConfig + +__all__ = [ + "Category", + "SeriesCatalog", + "SeriesDefinition", + "FREDConfig", + "StorageConfig", + "PipelineConfig", +] diff --git a/src/lhm/config/series_catalog.py b/src/lhm/config/series_catalog.py new file mode 100644 index 0000000..16626ea --- /dev/null +++ b/src/lhm/config/series_catalog.py @@ -0,0 +1,97 @@ +"""Series catalog definitions for structured FRED ingestion.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Iterable, List, Mapping + +import yaml + + +class Category(str, Enum): + """High-level macroeconomic groupings requested by the stakeholder.""" + + GDP = "gdp" + LABOR = "labor" + PRICES = "prices" + HEALTH = "health" + MONEY = "money" + TRADE = "trade" + GOVERNMENT = "government" + BUSINESS = "business" + CONSUMER = "consumer" + HOUSING = "housing" + TAXES = "taxes" + + +@dataclass(frozen=True) +class SeriesDefinition: + """Describe a single FRED series and its metadata.""" + + series_id: str + title: str + frequency: str | None = None + units: str | None = None + seasonal_adjustment: str | None = None + notes: str | None = None + + +@dataclass +class SeriesCatalog: + """Container grouping the series definitions by category.""" + + categories: Mapping[Category, List[SeriesDefinition]] = field(default_factory=dict) + + @classmethod + def from_yaml(cls, handle: Iterable[str]) -> "SeriesCatalog": + """Create a catalog from a YAML document.""" + + payload = yaml.safe_load(handle) + categories: dict[Category, List[SeriesDefinition]] = {} + for raw_category, raw_series in payload.get("categories", {}).items(): + category = Category(raw_category) + definitions = [SeriesDefinition(**entry) for entry in raw_series or []] + categories[category] = definitions + return cls(categories=categories) + + @classmethod + def load(cls, path: str | Path) -> "SeriesCatalog": + """Load a YAML catalog from disk.""" + + with Path(path).expanduser().open("r", encoding="utf-8") as handle: + return cls.from_yaml(handle) + + def to_yaml_dict(self) -> dict: + """Represent the catalog as a serialisable dictionary.""" + + return { + "categories": { + category.value: [ + { + "series_id": definition.series_id, + "title": definition.title, + "frequency": definition.frequency, + "units": definition.units, + "seasonal_adjustment": definition.seasonal_adjustment, + "notes": definition.notes, + } + for definition in definitions + ] + for category, definitions in sorted(self.categories.items(), key=lambda item: item[0].value) + } + } + + def dump(self, path: str | Path) -> None: + """Persist the catalog to disk.""" + + with Path(path).expanduser().open("w", encoding="utf-8") as handle: + yaml.safe_dump(self.to_yaml_dict(), handle, sort_keys=False, allow_unicode=True) + + def iter_series(self) -> Iterable[tuple[Category, SeriesDefinition]]: + """Iterate over ``(category, series_definition)`` tuples.""" + + for category, definitions in self.categories.items(): + for definition in definitions: + yield category, definition diff --git a/src/lhm/config/settings.py b/src/lhm/config/settings.py new file mode 100644 index 0000000..3fe9ab8 --- /dev/null +++ b/src/lhm/config/settings.py @@ -0,0 +1,34 @@ +"""Settings dataclasses used to configure the data platform.""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(slots=True) +class FREDConfig: + """Connection parameters for the FRED API.""" + + api_key: str | None = None + base_url: str = "https://api.stlouisfed.org" + rate_limit_per_minute: int = 60 + + +@dataclass(slots=True) +class StorageConfig: + """Describe how fetched series should be stored locally.""" + + root_path: Path = Path("data/raw/fred") + format: str = "parquet" + + +@dataclass(slots=True) +class PipelineConfig: + """Top-level configuration for the recurring ingestion pipeline.""" + + fred: FREDConfig = FREDConfig() + storage: StorageConfig = StorageConfig() + catalog_path: Path = Path("configs/fred_series_catalog.yaml") + refresh_window_days: int = 7 + storage_backend: str = "filesystem" diff --git a/src/lhm/pipelines/__init__.py b/src/lhm/pipelines/__init__.py new file mode 100644 index 0000000..60b731f --- /dev/null +++ b/src/lhm/pipelines/__init__.py @@ -0,0 +1,5 @@ +"""Pipeline orchestrators for recurring data pulls.""" + +from .daily_refresh import DailyRefreshPipeline + +__all__ = ["DailyRefreshPipeline"] diff --git a/src/lhm/pipelines/daily_refresh.py b/src/lhm/pipelines/daily_refresh.py new file mode 100644 index 0000000..75f0457 --- /dev/null +++ b/src/lhm/pipelines/daily_refresh.py @@ -0,0 +1,145 @@ +"""Implementation of the daily refresh pipeline.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import date, datetime, timedelta +import logging +from pathlib import Path +from typing import Iterable, Sequence + +from ..clients import FREDAPIError, FREDClient +from ..clients.fred_client import Observation +from ..config import Category, PipelineConfig, SeriesDefinition, SeriesCatalog +from ..storage import StorageRegistry + +logger = logging.getLogger(__name__) + + +@dataclass(slots=True) +class SeriesIngestionResult: + """Capture high level information about a completed series refresh.""" + + category: Category + series: SeriesDefinition + observations: int + storage_path: Path + refresh_window: tuple[datetime | None, datetime | None] + + +class DailyRefreshPipeline: + """Coordinate the ingestion of FRED series on a rolling basis.""" + + def __init__( + self, + config: PipelineConfig, + client: FREDClient, + catalog: SeriesCatalog, + *, + storage_registry: StorageRegistry | None = None, + ) -> None: + self.config = config + self.client = client + self.catalog = catalog + self._storage_registry = storage_registry or StorageRegistry(config.storage) + self._storage = self._storage_registry.resolve(config.storage_backend) + + def resolve_refresh_window(self) -> tuple[datetime | None, datetime | None]: + """Return the date window that should be refreshed. + + The skeleton implementation assumes that the caller will + re-download the last ``refresh_window_days`` worth of data to + capture any historical revisions. The concrete implementation will + be expanded once the stakeholder signs off on the series catalog + and storage approach. + """ + + end = datetime.utcnow() + start = end - timedelta(days=self.config.refresh_window_days) + return start, end + + def iter_catalog(self) -> Iterable[tuple[Category, SeriesDefinition]]: + """Helper proxy exposing the catalog entries.""" + + return self.catalog.iter_series() + + def run( + self, + *, + full_refresh: bool = False, + explicit_start: date | None = None, + explicit_end: date | None = None, + ) -> list[SeriesIngestionResult]: + """Execute a refresh cycle and return a summary of completed ingestions.""" + + refresh_window = self._resolve_window(full_refresh, explicit_start, explicit_end) + results: list[SeriesIngestionResult] = [] + + for category, definition in self.iter_catalog(): + logger.info("Refreshing series %s (%s)", definition.series_id, category.value) + try: + series_metadata = self.client.list_series([definition.series_id])[0] + except FREDAPIError as exc: # pragma: no cover - depends on network interactions + logger.exception("Failed to load metadata for %s: %s", definition.series_id, exc) + continue + + try: + observations = self._fetch_series_observations(definition.series_id, refresh_window) + except FREDAPIError as exc: # pragma: no cover - depends on network interactions + logger.exception("Failed to download observations for %s: %s", definition.series_id, exc) + continue + storage_path = self._storage.save( + category=category, + series=definition, + metadata=series_metadata, + observations=observations, + refresh_window=refresh_window, + ) + + result = SeriesIngestionResult( + category=category, + series=definition, + observations=len(observations), + storage_path=storage_path, + refresh_window=refresh_window, + ) + logger.debug( + "Stored %s observations for %s at %s", len(observations), definition.series_id, storage_path + ) + results.append(result) + + return results + + @classmethod + def load_default_catalog(cls, path: str | Path | None = None) -> SeriesCatalog: + """Convenience loader for the default catalog path.""" + + catalog_path = path or PipelineConfig().catalog_path + return SeriesCatalog.load(catalog_path) + + # ------------------------------------------------------------------ + def _fetch_series_observations( + self, series_id: str, refresh_window: tuple[datetime | None, datetime | None] + ) -> Sequence[Observation]: + start, end = refresh_window + return self.client.fetch_observations( + series_id, + start_date=start.date() if start else None, + end_date=end.date() if end else None, + ) + + def _resolve_window( + self, + full_refresh: bool, + explicit_start: date | None, + explicit_end: date | None, + ) -> tuple[datetime | None, datetime | None]: + if full_refresh: + return None, None + + if explicit_start or explicit_end: + start = datetime.combine(explicit_start, datetime.min.time()) if explicit_start else None + end = datetime.combine(explicit_end, datetime.min.time()) if explicit_end else None + return start, end + + return self.resolve_refresh_window() diff --git a/src/lhm/storage/__init__.py b/src/lhm/storage/__init__.py new file mode 100644 index 0000000..425d6de --- /dev/null +++ b/src/lhm/storage/__init__.py @@ -0,0 +1,6 @@ +"""Storage utilities for persisting FRED datasets.""" + +from .filesystem import FilesystemStorageBackend +from .registry import StorageRegistry + +__all__ = ["FilesystemStorageBackend", "StorageRegistry"] diff --git a/src/lhm/storage/filesystem.py b/src/lhm/storage/filesystem.py new file mode 100644 index 0000000..b2c92fd --- /dev/null +++ b/src/lhm/storage/filesystem.py @@ -0,0 +1,134 @@ +"""Local filesystem storage backend for FRED series observations.""" + +from __future__ import annotations + +from dataclasses import asdict +from datetime import datetime +import json +from pathlib import Path +from typing import Sequence + +from ..clients.fred_client import FREDSeries, Observation +from ..config import Category, SeriesDefinition, StorageConfig +from .registry import StorageBackend + + +class FilesystemStorageBackend(StorageBackend): + """Persist series observations and metadata on the local filesystem.""" + + def __init__(self, config: StorageConfig) -> None: + self._config = config + self._root = Path(config.root_path) + self._format = (config.format or "parquet").lower() + + # ------------------------------------------------------------------ + def save( + self, + *, + category: Category, + series: SeriesDefinition, + metadata: FREDSeries, + observations: Sequence[Observation], + refresh_window: tuple[datetime | None, datetime | None], + ) -> Path: + """Persist observations to disk and return the materialised path.""" + + destination = self._normalise_destination(category, series) + destination.mkdir(parents=True, exist_ok=True) + + observations_path = self._write_observations(destination, observations) + self._write_metadata(destination, series, metadata, refresh_window) + + return observations_path + + # ------------------------------------------------------------------ + def _normalise_destination(self, category: Category, series: SeriesDefinition) -> Path: + return self._root / category.value / series.series_id.lower() + + def _write_observations(self, destination: Path, observations: Sequence[Observation]) -> Path: + if self._format == "csv": + return self._write_csv(destination, observations) + if self._format == "json": + return self._write_json(destination, observations) + if self._format == "parquet": + return self._write_parquet(destination, observations) + raise ValueError(f"Unsupported storage format '{self._format}'") + + def _write_csv(self, destination: Path, observations: Sequence[Observation]) -> Path: + import csv + + path = destination / f"observations.csv" + with path.open("w", encoding="utf-8", newline="") as handle: + writer = csv.writer(handle) + writer.writerow(["realtime_start", "realtime_end", "date", "value", "raw_value"]) + for entry in observations: + writer.writerow( + [ + entry.realtime_start.isoformat(), + entry.realtime_end.isoformat(), + entry.observation_date.isoformat(), + "" if entry.value is None else f"{entry.value:.16g}", + entry.raw_value, + ] + ) + return path + + def _write_json(self, destination: Path, observations: Sequence[Observation]) -> Path: + path = destination / "observations.json" + serialised = [ + { + "realtime_start": entry.realtime_start.isoformat(), + "realtime_end": entry.realtime_end.isoformat(), + "date": entry.observation_date.isoformat(), + "value": entry.value, + "raw_value": entry.raw_value, + } + for entry in observations + ] + with path.open("w", encoding="utf-8") as handle: + json.dump(serialised, handle, indent=2) + return path + + def _write_parquet(self, destination: Path, observations: Sequence[Observation]) -> Path: + try: + import pandas as pd + except ImportError as exc: # pragma: no cover - optional dependency + raise RuntimeError( + "Parquet storage requires the optional 'pandas' dependency. " + "Install pandas (and pyarrow/fastparquet) or switch to CSV/JSON." + ) from exc + + frame = pd.DataFrame( + { + "realtime_start": [entry.realtime_start for entry in observations], + "realtime_end": [entry.realtime_end for entry in observations], + "date": [entry.observation_date for entry in observations], + "value": [entry.value for entry in observations], + "raw_value": [entry.raw_value for entry in observations], + } + ) + path = destination / "observations.parquet" + frame.to_parquet(path, index=False) + return path + + def _write_metadata( + self, + destination: Path, + series: SeriesDefinition, + metadata: FREDSeries, + refresh_window: tuple[datetime | None, datetime | None], + ) -> None: + payload = { + "series": asdict(series), + "fred_metadata": asdict(metadata), + "ingested_at_utc": datetime.utcnow().isoformat(timespec="seconds"), + "refresh_window": { + "start": refresh_window[0].isoformat() if refresh_window[0] else None, + "end": refresh_window[1].isoformat() if refresh_window[1] else None, + }, + "storage_format": self._format, + } + + with (destination / "metadata.json").open("w", encoding="utf-8") as handle: + json.dump(payload, handle, indent=2) + diff --git a/src/lhm/storage/registry.py b/src/lhm/storage/registry.py new file mode 100644 index 0000000..61a677c --- /dev/null +++ b/src/lhm/storage/registry.py @@ -0,0 +1,42 @@ +"""Registry for storage backends used by the ingestion pipelines.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Protocol, Sequence + +from ..clients.fred_client import FREDSeries, Observation +from ..config import Category, SeriesDefinition, StorageConfig +from .filesystem import FilesystemStorageBackend + + +class StorageBackend(Protocol): + """Interface for storage backends used by the pipelines.""" + + def save( + self, + *, + category: Category, + series: SeriesDefinition, + metadata: FREDSeries, + observations: Sequence[Observation], + refresh_window: tuple[datetime | None, datetime | None], + ) -> Path: + """Persist the provided observations and return the storage path.""" + + +@dataclass +class StorageRegistry: + """Registry mapping storage identifiers to backend implementations.""" + + config: StorageConfig + _DEFAULT_BACKEND = "filesystem" + + def resolve(self, backend_name: str | None = None) -> StorageBackend: + """Return the backend implementation for ``backend_name``.""" + + backend = (backend_name or self._DEFAULT_BACKEND).lower() + if backend in {"filesystem", "local", "parquet", "csv", "json"}: + return FilesystemStorageBackend(self.config) + raise ValueError(f"Unsupported storage backend '{backend_name}'")