Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions swvo/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
solar_wind as solar_wind,
sme as sme,
)
from swvo.io.base import BaseIO as BaseIO
118 changes: 118 additions & 0 deletions swvo/io/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# SPDX-FileCopyrightText: 2025 GFZ Helmholtz Centre for Geosciences
#
# SPDX-License-Identifier: Apache-2.0

"""
Base class for all IO modules.
"""

import logging
import os
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional

import pandas as pd

logger = logging.getLogger(__name__)


class BaseIO(ABC):
"""Abstract base class for all IO classes.

This base class defines the common interface for external data I/O operations,
including initialization, reading, and downloading/processing data.

Subclasses can implement flexible signatures for `read()` and `download_and_process()`
methods to accommodate different data sources and requirements.

Parameters
----------
data_dir : Path | None
Data directory for storing downloaded/processed data.
If not provided, it will be read from the environment variable
defined by the subclass's `ENV_VAR_NAME`.

Raises
------
ValueError
Raises `ValueError` if necessary environment variable is not set
and `data_dir` is not provided.
"""

ENV_VAR_NAME: str = "" # Must be set by subclasses
LABEL: str = "" # Must be set by subclasses

def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
"""Initialize the BaseIO class.

Parameters
----------
data_dir : Path | None
Data directory for storing data. If not provided, it will be read
from the environment variable defined by ENV_VAR_NAME.
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.
Raises
------
ValueError
If data_dir is None and ENV_VAR_NAME is not set in environment,
or if prefer_env_var is True and ENV_VAR_NAME is not set.
Comment thread
sahiljhawar marked this conversation as resolved.
Outdated
"""
if prefer_env_var and self.ENV_VAR_NAME in os.environ:
data_dir = Path(os.environ[self.ENV_VAR_NAME])
elif data_dir is None:
if not self.ENV_VAR_NAME or self.ENV_VAR_NAME not in os.environ:
raise ValueError(f"Necessary environment variable {self.ENV_VAR_NAME} not set!")
data_dir = Path(os.environ[self.ENV_VAR_NAME])

self.data_dir: Path = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)

logger.info(f"{self.__class__.__name__} data directory: {self.data_dir}")

@abstractmethod
def read(self, *args, **kwargs) -> pd.DataFrame | list[pd.DataFrame]:
"""Read data.

Subclasses should implement this method with their specific signature.
Common parameters include:
- start_time: datetime
Start time of the data to read. Must be timezone-aware.
- end_time: datetime
End time of the data to read. Must be timezone-aware.
- download: bool, optional
Download data on the go if not available locally.
- Additional parameters specific to each data source.

Returns
-------
pd.DataFrame or list[pd.DataFrame]
Data for the specified parameters.
"""
pass

@abstractmethod
def download_and_process(self, *args, **kwargs) -> None:
"""Download and process data.

Subclasses should implement this method with their specific signature.
Common parameters include:
- start_time: datetime
Start time of the data to download. Must be timezone-aware.
- end_time: datetime
End time of the data to download. Must be timezone-aware.
- target_date: datetime
Target date for data (for single-day sources).
- request_time: datetime
Request time for data (for streaming sources).
- reprocess_files: bool, optional
If True, re-download and re-process existing files.
- Additional parameters specific to each data source.

Returns
-------
None
"""
pass
12 changes: 0 additions & 12 deletions swvo/io/dst/omni.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd
Expand All @@ -31,17 +30,6 @@ class DSTOMNI(OMNILowRes):
Inherits the `download_and_process`, other private methods and attributes from OMNILowRes.
"""

def __init__(self, data_dir: Optional[Path] = None) -> None:
"""
Initialize a DSTOMNI object.

Parameters
----------
data_dir : Path | None
Data directory for the Dst OMNI data. If not provided, it will be read from the environment variable
"""
super().__init__(data_dir=data_dir)

# data is downloaded along with OMNI data, check file name in parent class
def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame:
"""
Expand Down
18 changes: 3 additions & 15 deletions swvo/io/dst/wdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@
"""

import logging
import os
import re
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
from typing import List, Optional, Tuple
from typing import List, Tuple

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)

logging.captureWarnings(True)


class DSTWDC:
class DSTWDC(BaseIO):
"""This is a class for the WDC Dst data.

Parameters
Expand All @@ -50,18 +50,6 @@ class DSTWDC:
URL = "https://wdc.kugi.kyoto-u.ac.jp/dst_realtime/YYYYMM/"
LABEL = "wdc"

def __init__(self, data_dir: Optional[Path] = None) -> None:
if data_dir is None:
if self.ENV_VAR_NAME not in os.environ:
raise ValueError(f"Necessary environment variable {self.ENV_VAR_NAME} not set!")

data_dir = os.environ.get(self.ENV_VAR_NAME) # ty: ignore[invalid-assignment]

self.data_dir: Path = Path(data_dir) # ty:ignore[invalid-argument-type]
self.data_dir.mkdir(parents=True, exist_ok=True)

logger.info(f"WDC Dst data directory: {self.data_dir}")

def download_and_process(self, start_time: datetime, end_time: datetime, reprocess_files: bool = False) -> None:
"""Download and process WDC Dst data files.

Expand Down
13 changes: 0 additions & 13 deletions swvo/io/f10_7/omni.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd
Expand All @@ -31,17 +29,6 @@ class F107OMNI(OMNILowRes):
Inherits the :func:`download_and_process`, other private methods and attributes from :class:`OMNILowRes`.
"""

def __init__(self, data_dir: Optional[Path] = None) -> None:
"""
Initialize a F107OMNI object.

Parameters
----------
data_dir : Path | None
Data directory for the OMNI Kp data. If not provided, it will be read from the environment variable
"""
super().__init__(data_dir=data_dir)

# data is downloaded along with OMNI data, check file name in parent class
def read(self, start_time: datetime, end_time: datetime, download: bool = False) -> pd.DataFrame:
"""
Expand Down
22 changes: 2 additions & 20 deletions swvo/io/f10_7/swpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,26 @@
from __future__ import annotations

import logging
import os
import shutil
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

import numpy as np
import pandas as pd
import requests

from swvo.io.base import BaseIO
from swvo.io.utils import enforce_utc_timezone

logger = logging.getLogger(__name__)

logging.captureWarnings(True)


class F107SWPC:
class F107SWPC(BaseIO):
"""This is a class for the SWPC F107 data.

Parameters
----------
data_dir : Path | None
Data directory for the OMNI Low Resolution data. If not provided, it will be read from the environment variable

Methods
-------
download_and_process
Expand All @@ -52,18 +46,6 @@ class F107SWPC:

LABEL = "swpc"

def __init__(self, data_dir: Optional[Path] = None) -> None:
if data_dir is None:
if self.ENV_VAR_NAME not in os.environ:
msg = f"Necessary environment variable {self.ENV_VAR_NAME} not set!"
raise ValueError(msg)
data_dir = os.environ.get(self.ENV_VAR_NAME) # ty: ignore[invalid-assignment]

self.data_dir: Path = Path(data_dir) # ty:ignore[invalid-argument-type]
self.data_dir.mkdir(parents=True, exist_ok=True)

logger.info(f"SWPC F10.7 data directory: {self.data_dir}")

def _is_within_download_range(self, target_date: datetime) -> bool:
"""Check if a date is within the last 30 days.

Expand Down
45 changes: 29 additions & 16 deletions swvo/io/hp/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class HpEnsemble:
Hp index Possible options are: hp30, hp60.
data_dir : Path | None
Data directory for the Hp data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument.
If False (default), the passed data_dir is used if provided, otherwise the environment variable is used.

Methods
-------
Expand All @@ -43,28 +46,38 @@ class HpEnsemble:
Returns `FileNotFoundError` if the data directory does not exist.
"""

ENV_VAR_NAME = "PLACEHOLDER; SEE DERIVED CLASSES BELOW"
LABEL = "ensemble"
ENV_VAR_NAME: str = "" # Must be set by subclasses
LABEL: str = "ensemble"

def __init__(self, index: str, data_dir: Optional[Path] = None) -> None:
def __init__(self, index: str, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
"""Initialize HpEnsemble.

Parameters
----------
index : str
Hp index. Possible options are: hp30, hp60.
data_dir : Path | None
Data directory for the Hp data. If not provided, it will be read from the environment variable
prefer_env_var : bool, optional
If True, the environment variable takes precedence over the passed data_dir argument, by default False
"""
self.index = index
if self.index not in ("hp30", "hp60"):
msg = "Encountered invalid index: {self.index}. Possible options are: hp30, hp60!"
Comment thread
sahiljhawar marked this conversation as resolved.
Outdated
raise ValueError(msg)
if prefer_env_var and self.ENV_VAR_NAME in os.environ:
data_dir = Path(os.environ[self.ENV_VAR_NAME])
elif data_dir is None:
if not self.ENV_VAR_NAME or self.ENV_VAR_NAME not in os.environ:
raise ValueError(f"Necessary environment variable {self.ENV_VAR_NAME} not set!")
data_dir = Path(os.environ[self.ENV_VAR_NAME])

if data_dir is None:
if self.ENV_VAR_NAME not in os.environ:
msg = f"Necessary environment variable {self.ENV_VAR_NAME} not set!"
raise ValueError(msg)

data_dir = os.environ.get(self.ENV_VAR_NAME) # ty: ignore[invalid-assignment]

self.data_dir: Path = Path(data_dir) # ty:ignore[invalid-argument-type]
self.data_dir = Path(data_dir)

logger.info(f"{self.index.upper()} Ensemble data directory: {self.data_dir}")

if not self.data_dir.exists():
msg = f"Data directory {self.data_dir} does not exist! Impossible to retrive data!"
Comment thread
sahiljhawar marked this conversation as resolved.
Outdated
logger.error(msg)
raise FileNotFoundError(msg)

self.index_number: int = int(index[2:])
Expand Down Expand Up @@ -315,8 +328,8 @@ class Hp30Ensemble(HpEnsemble):

ENV_VAR_NAME = "HP30_ENSEMBLE_FORECAST_DIR"

def __init__(self, data_dir: Optional[Path] = None) -> None:
super().__init__("hp30", data_dir)
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
super().__init__("hp30", data_dir, prefer_env_var)

def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: float) -> list[pd.DataFrame]:
"""Read Ensemble Hp30 forecast data for a given time range and forecast horizon.
Expand Down Expand Up @@ -356,8 +369,8 @@ class Hp60Ensemble(HpEnsemble):

ENV_VAR_NAME = "HP60_ENSEMBLE_FORECAST_DIR"

def __init__(self, data_dir: Optional[Path] = None) -> None:
super().__init__("hp60", data_dir)
def __init__(self, data_dir: Optional[Path] = None, prefer_env_var: bool = False) -> None:
super().__init__("hp60", data_dir, prefer_env_var)

def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: int) -> list[pd.DataFrame]:
"""Read Ensemble Hp60 forecast data for a given time range and forecast horizon.
Expand Down
Loading
Loading