-
Notifications
You must be signed in to change notification settings - Fork 0
Implement automated FRED ingestion pipeline #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement automated FRED ingestion pipeline #10
Conversation
Reviewer's GuideThis PR delivers a full end-to-end FRED data ingestion stack by introducing a rate-limited HTTP client, tag-based catalog generation, a daily refresh orchestrator exposed via a CLI, a pluggable filesystem storage backend, a structured configuration layer with catalog definitions and YAML templates, and comprehensive documentation updates. Sequence diagram for running the FRED ingestion pipeline via CLIsequenceDiagram
actor User
participant CLI
participant DailyRefreshPipeline
participant FREDClient
participant FilesystemStorageBackend
User->>CLI: Run CLI with config and options
CLI->>DailyRefreshPipeline: Instantiate pipeline with config, client, catalog
DailyRefreshPipeline->>FREDClient: Fetch series metadata
DailyRefreshPipeline->>FREDClient: Fetch observations
DailyRefreshPipeline->>FilesystemStorageBackend: Save observations and metadata
FilesystemStorageBackend-->>DailyRefreshPipeline: Return storage path
DailyRefreshPipeline-->>CLI: Return ingestion summary
CLI-->>User: Output summary
Class diagram for FREDClient and related typesclassDiagram
class FREDClient {
- api_key: str
- base_url: str
- _rate_limiter: _RateLimiter
- _user_agent: str
- _timeout: float
+ list_series(series_ids: Iterable[str]) list[FREDSeries]
+ search_series_by_tags(tags: Sequence[str], limit: int) list[FREDSeries]
+ fetch_observations(series_id: str, start_date: date, end_date: date, limit: int) list[Observation]
+ iter_series(series_ids: Iterable[str]) Iterator[FREDSeries]
}
class _RateLimiter {
- _interval: float
- _last_invocation: float
+ wait()
}
class FREDAPIError {
}
class FREDSeries {
+ series_id: str
+ title: str
+ frequency: str
+ observation_start: date
+ observation_end: date
+ units: str
+ seasonal_adjustment: str
}
class Observation {
+ realtime_start: date
+ realtime_end: date
+ observation_date: date
+ value: float
+ raw_value: str
+ from_api(payload: Mapping[str, str]) Observation
}
FREDClient --> _RateLimiter
FREDClient --> FREDSeries
FREDClient --> Observation
Observation ..> FREDAPIError
FREDClient ..> FREDAPIError
Class diagram for DailyRefreshPipeline and SeriesIngestionResultclassDiagram
class DailyRefreshPipeline {
- config: PipelineConfig
- client: FREDClient
- catalog: SeriesCatalog
- _storage_registry: StorageRegistry
- _storage: StorageBackend
+ resolve_refresh_window() tuple[datetime, datetime]
+ iter_catalog() Iterable[tuple[Category, SeriesDefinition]]
+ run(full_refresh: bool, explicit_start: date, explicit_end: date) list[SeriesIngestionResult]
+ load_default_catalog(path: str|Path) SeriesCatalog
}
class SeriesIngestionResult {
+ category: Category
+ series: SeriesDefinition
+ observations: int
+ storage_path: Path
+ refresh_window: tuple[datetime, datetime]
}
DailyRefreshPipeline --> SeriesIngestionResult
DailyRefreshPipeline --> FREDClient
DailyRefreshPipeline --> SeriesCatalog
DailyRefreshPipeline --> StorageRegistry
DailyRefreshPipeline --> PipelineConfig
Class diagram for FilesystemStorageBackend and StorageRegistryclassDiagram
class FilesystemStorageBackend {
- _config: StorageConfig
- _root: Path
- _format: str
+ save(category: Category, series: SeriesDefinition, metadata: FREDSeries, observations: Sequence[Observation], refresh_window: tuple[datetime, datetime]) Path
}
class StorageRegistry {
+ config: StorageConfig
+ resolve(backend_name: str) StorageBackend
}
FilesystemStorageBackend --> StorageConfig
StorageRegistry --> FilesystemStorageBackend
StorageRegistry --> StorageConfig
Class diagram for configuration and catalog typesclassDiagram
class FREDConfig {
+ api_key: str
+ base_url: str
+ rate_limit_per_minute: int
}
class StorageConfig {
+ root_path: Path
+ format: str
}
class PipelineConfig {
+ fred: FREDConfig
+ storage: StorageConfig
+ catalog_path: Path
+ refresh_window_days: int
+ storage_backend: str
}
class Category {
}
class SeriesDefinition {
+ series_id: str
+ title: str
+ frequency: str
+ units: str
+ seasonal_adjustment: str
+ notes: str
}
class SeriesCatalog {
+ categories: Mapping[Category, List[SeriesDefinition]]
+ from_yaml(handle: Iterable[str]) SeriesCatalog
+ load(path: str|Path) SeriesCatalog
+ to_yaml_dict() dict
+ dump(path: str|Path)
+ iter_series() Iterable[tuple[Category, SeriesDefinition]]
}
PipelineConfig --> FREDConfig
PipelineConfig --> StorageConfig
SeriesCatalog --> SeriesDefinition
SeriesCatalog --> Category
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
good 2 go |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Consider leveraging argparse's 'type' parameter to parse --start-date and --end-date into date objects directly, reducing the need for manual date validation in _parse_date.
- The filesystem storage backend has repeated observation serialization logic across CSV, JSON, and Parquet; consider extracting a common serializer to reduce duplication and centralize format-specific concerns.
- Enhance FREDClient._request to explicitly handle HTTP error responses (e.g., catching HTTPError for status codes >=400) to distinguish API errors from network failures and support more granular retry logic.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider leveraging argparse's 'type' parameter to parse --start-date and --end-date into date objects directly, reducing the need for manual date validation in _parse_date.
- The filesystem storage backend has repeated observation serialization logic across CSV, JSON, and Parquet; consider extracting a common serializer to reduce duplication and centralize format-specific concerns.
- Enhance FREDClient._request to explicitly handle HTTP error responses (e.g., catching HTTPError for status codes >=400) to distinguish API errors from network failures and support more granular retry logic.
## Individual Comments
### Comment 1
<location> `src/lhm/clients/fred_client.py:153` </location>
<code_context>
+ "limit": str(limit),
+ },
+ )
+ series_payload = payload.get("seriess") or []
+ return [self._coerce_series(entry) for entry in series_payload]
+
</code_context>
<issue_to_address>
**issue (typo):** Possible typo: 'seriess' key may be incorrect.
If 'seriess' is intentional, please clarify or document the reason; otherwise, this may cause missing data.
```suggestion
# Changed 'seriess' to 'series' to fix possible typo and ensure correct data extraction
series_payload = payload.get("series") or []
```
</issue_to_address>
### Comment 2
<location> `src/lhm/clients/fred_client.py:200` </location>
<code_context>
+ query = parse.urlencode(all_params)
+ url = f"{self.base_url}/fred/{endpoint.lstrip('/')}?{query}"
+
+ http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent})
+ try:
+ with request.urlopen(http_request, timeout=self._timeout) as response: # noqa: S310 - trusted domain
</code_context>
<issue_to_address>
**suggestion:** No retry logic for transient network failures.
Adding retries with exponential backoff would help prevent ingestion failures caused by temporary network issues.
Suggested implementation:
```python
import time
from urllib.error import URLError
def _request(self, endpoint: str, params: Mapping[str, str]) -> Mapping[str, object]:
self._rate_limiter.wait()
all_params = {
"file_type": "json",
"api_key": self.api_key,
}
all_params.update({key: value for key, value in params.items() if value is not None})
query = parse.urlencode(all_params)
url = f"{self.base_url}/fred/{endpoint.lstrip('/')}?{query}"
http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent})
raw_payload = self._request_with_retries(http_request, endpoint)
try:
payload = json.loads(raw_payload.decode("utf-8"))
except json.JSONDecodeError as exc: # pragma: no cover - defensive coding
raise FREDAPIError("FRED returned malformed JSON") from exc
if isinstance(payload, Mapping) and payload.get("error_code"):
```
```python
http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent})
# replaced with retry logic in _request_with_retries
```
```python
try:
payload = json.loads(raw_payload.decode("utf-8"))
except json.JSONDecodeError as exc: # pragma: no cover - defensive coding
raise FREDAPIError("FRED returned malformed JSON") from exc
if isinstance(payload, Mapping) and payload.get("error_code"):
def _request_with_retries(self, http_request, endpoint, max_retries=3, base_delay=0.5):
last_exc = None
for attempt in range(max_retries):
try:
with request.urlopen(http_request, timeout=self._timeout) as response: # noqa: S310 - trusted domain
return response.read()
except (OSError, URLError) as exc: # pragma: no cover - network failure is environment specific
last_exc = exc
if attempt < max_retries - 1:
time.sleep(base_delay * (2 ** attempt))
else:
raise FREDAPIError(f"Failed to call FRED endpoint '{endpoint}' after {max_retries} attempts: {exc}") from exc
```
</issue_to_address>
### Comment 3
<location> `src/lhm/cli.py:104-106` </location>
<code_context>
+ config.fred.api_key = args.api_key or os.environ.get("FRED_API_KEY") or config.fred.api_key
+
+ catalog_path = config.catalog_path
+ if not catalog_path.exists():
+ raise SystemExit(f"Catalog file not found: {catalog_path}")
+
</code_context>
<issue_to_address>
**suggestion:** No fallback or guidance if catalog file is missing.
Instead of exiting, suggest informing the user how to create the catalog file or providing a default template if possible.
```suggestion
catalog_path = config.catalog_path
if not catalog_path.exists():
print(f"Catalog file not found: {catalog_path}")
print("You can create a catalog file by running:")
print(f" touch {catalog_path}")
print("Or, a default template will be created for you now.")
default_catalog = {
"datasets": [],
"description": "Default catalog template. Add your datasets here."
}
try:
with open(catalog_path, "w") as f:
json.dump(default_catalog, f, indent=2)
print(f"Default catalog file created at: {catalog_path}")
except Exception as e:
print(f"Failed to create default catalog file: {e}")
raise SystemExit(1)
```
</issue_to_address>
### Comment 4
<location> `src/lhm/clients/fred_client.py:195-196` </location>
<code_context>
def _request(self, endpoint: str, params: Mapping[str, str]) -> Mapping[str, object]:
self._rate_limiter.wait()
all_params = {
"file_type": "json",
"api_key": self.api_key,
}
all_params.update({key: value for key, value in params.items() if value is not None})
query = parse.urlencode(all_params)
url = f"{self.base_url}/fred/{endpoint.lstrip('/')}?{query}"
http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent})
try:
with request.urlopen(http_request, timeout=self._timeout) as response: # noqa: S310 - trusted domain
raw_payload = response.read()
except OSError as exc: # pragma: no cover - network failure is environment specific
raise FREDAPIError(f"Failed to call FRED endpoint '{endpoint}': {exc}") from exc
try:
payload = json.loads(raw_payload.decode("utf-8"))
except json.JSONDecodeError as exc: # pragma: no cover - defensive coding
raise FREDAPIError("FRED returned malformed JSON") from exc
if isinstance(payload, Mapping) and payload.get("error_code"):
message = payload.get("error_message", "Unknown error")
raise FREDAPIError(f"FRED API error {payload.get('error_code')}: {message}")
return payload
</code_context>
<issue_to_address>
**suggestion (code-quality):** Merge dictionary updates via the union operator [×2] ([`dict-assign-update-to-union`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/dict-assign-update-to-union/))
```suggestion
} | {key: value for key, value in params.items() if value is not None}
```
</issue_to_address>
### Comment 5
<location> `src/lhm/clients/fred_client.py:225-227` </location>
<code_context>
def _coerce_series(self, payload: Mapping[str, object]) -> FREDSeries:
series_id = str(payload.get("id") or payload.get("series_id") or "").strip()
if not series_id:
raise FREDAPIError("FRED response missing series identifier")
return FREDSeries(
series_id=series_id,
title=str(payload.get("title", "")),
frequency=str(payload.get("frequency")) if payload.get("frequency") else None,
observation_start=_parse_optional_date(payload.get("observation_start")),
observation_end=_parse_optional_date(payload.get("observation_end")),
units=str(payload.get("units")) if payload.get("units") else None,
seasonal_adjustment=str(payload.get("seasonal_adjustment"))
if payload.get("seasonal_adjustment")
else None,
)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Lift code into else after jump in control flow ([`reintroduce-else`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/reintroduce-else/))
- Swap if/else branches ([`swap-if-else-branches`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/swap-if-else-branches/))
</issue_to_address>
### Comment 6
<location> `src/lhm/storage/filesystem.py:60` </location>
<code_context>
def _write_csv(self, destination: Path, observations: Sequence[Observation]) -> Path:
import csv
path = destination / f"observations.csv"
with path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle)
writer.writerow(["realtime_start", "realtime_end", "date", "value", "raw_value"])
for entry in observations:
writer.writerow(
[
entry.realtime_start.isoformat(),
entry.realtime_end.isoformat(),
entry.observation_date.isoformat(),
"" if entry.value is None else f"{entry.value:.16g}",
entry.raw_value,
]
)
return path
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace f-string with no interpolated values with string ([`remove-redundant-fstring`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-fstring/))
```suggestion
path = destination / "observations.csv"
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| query = parse.urlencode(all_params) | ||
| url = f"{self.base_url}/fred/{endpoint.lstrip('/')}?{query}" | ||
|
|
||
| http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: No retry logic for transient network failures.
Adding retries with exponential backoff would help prevent ingestion failures caused by temporary network issues.
Suggested implementation:
import time
from urllib.error import URLError
def _request(self, endpoint: str, params: Mapping[str, str]) -> Mapping[str, object]:
self._rate_limiter.wait()
all_params = {
"file_type": "json",
"api_key": self.api_key,
}
all_params.update({key: value for key, value in params.items() if value is not None})
query = parse.urlencode(all_params)
url = f"{self.base_url}/fred/{endpoint.lstrip('/')}?{query}"
http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent})
raw_payload = self._request_with_retries(http_request, endpoint)
try:
payload = json.loads(raw_payload.decode("utf-8"))
except json.JSONDecodeError as exc: # pragma: no cover - defensive coding
raise FREDAPIError("FRED returned malformed JSON") from exc
if isinstance(payload, Mapping) and payload.get("error_code"): http_request = request.Request(url, method="GET", headers={"User-Agent": self._user_agent})
# replaced with retry logic in _request_with_retries try:
payload = json.loads(raw_payload.decode("utf-8"))
except json.JSONDecodeError as exc: # pragma: no cover - defensive coding
raise FREDAPIError("FRED returned malformed JSON") from exc
if isinstance(payload, Mapping) and payload.get("error_code"):
def _request_with_retries(self, http_request, endpoint, max_retries=3, base_delay=0.5):
last_exc = None
for attempt in range(max_retries):
try:
with request.urlopen(http_request, timeout=self._timeout) as response: # noqa: S310 - trusted domain
return response.read()
except (OSError, URLError) as exc: # pragma: no cover - network failure is environment specific
last_exc = exc
if attempt < max_retries - 1:
time.sleep(base_delay * (2 ** attempt))
else:
raise FREDAPIError(f"Failed to call FRED endpoint '{endpoint}' after {max_retries} attempts: {exc}") from exc| catalog_path = config.catalog_path | ||
| if not catalog_path.exists(): | ||
| raise SystemExit(f"Catalog file not found: {catalog_path}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: No fallback or guidance if catalog file is missing.
Instead of exiting, suggest informing the user how to create the catalog file or providing a default template if possible.
| catalog_path = config.catalog_path | |
| if not catalog_path.exists(): | |
| raise SystemExit(f"Catalog file not found: {catalog_path}") | |
| catalog_path = config.catalog_path | |
| if not catalog_path.exists(): | |
| print(f"Catalog file not found: {catalog_path}") | |
| print("You can create a catalog file by running:") | |
| print(f" touch {catalog_path}") | |
| print("Or, a default template will be created for you now.") | |
| default_catalog = { | |
| "datasets": [], | |
| "description": "Default catalog template. Add your datasets here." | |
| } | |
| try: | |
| with open(catalog_path, "w") as f: | |
| json.dump(default_catalog, f, indent=2) | |
| print(f"Default catalog file created at: {catalog_path}") | |
| except Exception as e: | |
| print(f"Failed to create default catalog file: {e}") | |
| raise SystemExit(1) |
| } | ||
| all_params.update({key: value for key, value in params.items() if value is not None}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge dictionary updates via the union operator [×2] (dict-assign-update-to-union)
| } | |
| all_params.update({key: value for key, value in params.items() if value is not None}) | |
| } | {key: value for key, value in params.items() if value is not None} |
| series_id = str(payload.get("id") or payload.get("series_id") or "").strip() | ||
| if not series_id: | ||
| raise FREDAPIError("FRED response missing series identifier") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (code-quality): We've found these issues:
- Use named expression to simplify assignment and conditional (
use-named-expression) - Lift code into else after jump in control flow (
reintroduce-else) - Swap if/else branches (
swap-if-else-branches)
| def _write_csv(self, destination: Path, observations: Sequence[Observation]) -> Path: | ||
| import csv | ||
|
|
||
| path = destination / f"observations.csv" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Replace f-string with no interpolated values with string (remove-redundant-fstring)
| path = destination / f"observations.csv" | |
| path = destination / "observations.csv" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| payload = { | ||
| "series": asdict(series), | ||
| "fred_metadata": asdict(metadata), | ||
| "ingested_at_utc": datetime.utcnow().isoformat(timespec="seconds"), | ||
| "refresh_window": { | ||
| "start": refresh_window[0].isoformat() if refresh_window[0] else None, | ||
| "end": refresh_window[1].isoformat() if refresh_window[1] else None, | ||
| }, | ||
| "storage_format": self._format, | ||
| } | ||
|
|
||
| with (destination / "metadata.json").open("w", encoding="utf-8") as handle: | ||
| json.dump(payload, handle, indent=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialising FRED metadata raises TypeError
When saving series metadata, asdict(metadata) includes datetime.date objects for observation_start/observation_end, but the payload is passed directly to json.dump. Dates are not JSON serialisable, so any series with those fields populated will cause json.dump to raise TypeError, aborting the pipeline after writing observations but before writing metadata.json. Consider converting the date values to strings (e.g. .isoformat()) before dumping.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
typo . fix Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
|
What is needed from me? |
Summary
Testing
Codex Task
Summary by Sourcery
Implement a production-ready, end-to-end FRED ingestion pipeline by adding a robust HTTP client, tag-driven catalog generation, configurable orchestration, pluggable storage backend, CLI tooling, and comprehensive documentation.
New Features:
Enhancements:
lhm.catalog.generate) and pipeline execution (lhm.cli)Documentation: