Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 53 additions & 81 deletions swvo/io/hp/gfz.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@

from __future__ import annotations

import json
import logging
import os
from datetime import datetime, timedelta, timezone
from ftplib import FTP
from pathlib import Path
from shutil import rmtree
from typing import List, Optional
from typing import Optional

import numpy as np
import pandas as pd
import requests

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,7 +43,7 @@ class HpGFZ:
ENV_VAR_NAME = "RT_HP_GFZ_STREAM_DIR"

START_YEAR = 1985
URL = "ftp://ftp.gfz-potsdam.de/pub/home/obs/Hpo/"
API_URL = "https://kp.gfz.de/app/json/"
LABEL = "gfz"

def __init__(self, index: str, data_dir: Optional[Path] = None) -> None:
Expand Down Expand Up @@ -99,23 +100,12 @@ def download_and_process(

tmp_path = file_path.with_suffix(file_path.suffix + ".tmp")

filenames_download = [
f"Hp{self.index_number}/Hp{self.index_number}_ap{self.index_number}_{time_interval[0].year!s}.txt"
]

# there is a separate nowcast file
if time_interval[0].year == datetime.now(timezone.utc).year:
filenames_download.append(
f"Hp{self.index_number}/Hp{self.index_number}_ap{self.index_number}_nowcast.txt"
)

try:
for filename_download in filenames_download:
self._download(temporary_dir, filename_download)

filenames_download = [x.split("/")[-1] for x in filenames_download] # strip folder from filename
# Download data for this time interval
self._download(temporary_dir, time_interval[0], time_interval[1])

processed_df = self._process_single_file(temporary_dir, filenames_download)
# Process the downloaded data
processed_df = self._process_single_file(temporary_dir, time_interval[0])

file_path.parent.mkdir(parents=True, exist_ok=True)
processed_df.to_csv(tmp_path, index=True, header=False)
Expand All @@ -129,38 +119,48 @@ def download_and_process(

rmtree(temporary_dir, ignore_errors=True)

def _download(self, temporary_dir: Path, filename: str) -> None:
"""Download a file from the GFZ server.
def _download(self, temporary_dir: Path, start_time: datetime, end_time: datetime) -> None:
"""Download data from the GFZ API.

Parameters
----------
temporary_dir : Path
Temporary directory to store the downloaded file.
filename : str
Full path of the file to download (including folder).
start_time : datetime
Start time for the data request.
end_time : datetime
End time for the data request.

Raises
------
Exception
If the FTP download fails.
If the API request fails.
"""
logger.debug(f"Downloading file {self.URL + filename} ...")
# Format datetime to ISO format with Z timezone
start_str = start_time.isoformat().replace("+00:00", "Z")
end_str = end_time.isoformat().replace("+00:00", "Z")

# Determine the index parameter for the API
index_param = f"Hp{self.index_number}"

# Extract just the filename from the path
filename_only = filename.split("/")[-1]
local_path = temporary_dir / filename_only
url = f"{self.API_URL}?start={start_str}&end={end_str}&index={index_param}&status=def"

Comment thread
sahiljhawar marked this conversation as resolved.
logger.debug(f"Downloading data from {url} ...")

try:
ftp = FTP("ftp.gfz-potsdam.de")
ftp.login()
ftp.cwd("/pub/home/obs/Hpo/")
response = requests.get(url, timeout=30)
response.raise_for_status()

data = response.json()
logger.debug(f"Downloaded data: {len(data)} records")
Comment thread
sahiljhawar marked this conversation as resolved.

with open(local_path, "wb") as f:
ftp.retrbinary(f"RETR {filename}", f.write)
# Save the JSON response to a temporary file for processing
output_file = temporary_dir / f"hp_data_{start_time.year}.json"
with open(output_file, "w") as f:
json.dump(data, f)

ftp.quit()
except Exception as e:
logger.error(f"FTP download failed: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
raise

def read(self, start_time: datetime, end_time: datetime, *, download: bool = False) -> pd.DataFrame:
Expand Down Expand Up @@ -263,15 +263,15 @@ def _get_processed_file_list(self, start_time: datetime, end_time: datetime) ->

return file_paths, time_intervals

def _process_single_file(self, temp_dir: Path, filenames: List[str]) -> pd.DataFrame:
"""Process HpGFZ file to a DataFrame.
def _process_single_file(self, temp_dir: Path, start_time: datetime) -> pd.DataFrame:
"""Process HpGFZ data from JSON response to a DataFrame.

Parameters
----------
temp_dir : Path
Temporary directory to store the file.
filenames : List[str]
List of filenames to process.
Temporary directory containing the JSON file.
start_time : datetime
Start time to identify the correct JSON file.

Returns
-------
Expand All @@ -280,48 +280,20 @@ def _process_single_file(self, temp_dir: Path, filenames: List[str]) -> pd.DataF
"""

data_total = pd.DataFrame()
json_file = temp_dir / f"hp_data_{start_time.year}.json"

# combine nowcast and yearly file
for filename in filenames:
data = {self.index: [], "timestamp": []}

with open(temp_dir / filename) as f: # noqa: PTH123
for line in f:
if line[0] == "#":
continue
line = line.split(" ")
line = [x for x in line if x != ""]

year = line[0]
month = line[1]
day = line[2]
hour = line[3][0:2]

if int(line[3][3:4]) == 0:
minute = 0
elif int(line[3][3:4]) == 5:
minute = 30
else:
msg = "Value for minute not expected"
raise ValueError(msg)
data["timestamp"] += [
datetime(
int(year),
int(month),
int(day),
int(hour),
minute,
tzinfo=timezone.utc,
)
]
data[self.index] += [float(line[7])]

data = pd.DataFrame(data)
data.index = data["timestamp"]
data = data.drop(labels=["timestamp"], axis=1)
data.loc[data[self.index] == -1, self.index] = np.nan

data_total = data_total.combine_first(data)
if not json_file.exists():
logger.warning(f"JSON file {json_file} not found")
return data_total

with open(json_file) as f:
json_data = json.load(f)

data_total = pd.DataFrame(
{f"Hp{self.index_number}": json_data[f"Hp{self.index_number}"]},
index=pd.to_datetime(json_data["datetime"], utc=True),
)
data_total.index = data_total.index.tz_convert("UTC")

return data_total

Expand Down
59 changes: 41 additions & 18 deletions tests/io/hp/test_hp.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

# ruff: noqa: S101

import json
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import mock_open, patch
from unittest.mock import Mock, patch

import pandas as pd
import pytest
Expand Down Expand Up @@ -90,21 +91,25 @@ def test_start_year_behind(self, hp30gfz, mocker, mock_hp_data):
" available mission files..."
)

def test_process_single_file(self, hp30gfz, tmp_path, mocker):
file_content = """# Header
2020 01 01 0000 0.0 0.0 0.0 15.0
2020 01 01 0030 0.0 0.0 0.0 16.0
2020 01 01 0100 0.0 0.0 0.0 17.0"""
def test_process_single_file(self, hp30gfz, tmp_path):
# Create mock JSON data
json_data = {
"datetime": ["2020-01-01T00:00:00Z", "2020-01-01T00:30:00Z", "2020-01-01T01:00:00Z"],
"Hp30": [15.0, 16.0, 17.0],
}

m = mock_open(read_data=file_content)
# Create temporary JSON file
json_file = tmp_path / "hp_data_2020.json"
with open(json_file, "w") as f:
json.dump(json_data, f)

mocker.patch("builtins.open", m)
df = hp30gfz._process_single_file(tmp_path, ["test_file.txt"])
start_time = datetime(2020, 1, 1, tzinfo=timezone.utc)
df = hp30gfz._process_single_file(tmp_path, start_time)

assert len(df) == 3
assert df.index[0] == pd.Timestamp("2020-01-01 00:00:00", tzinfo=timezone.utc)
assert df.iloc[0]["hp30"] == 15.0
assert "hp30" in df.columns
assert df.iloc[0]["Hp30"] == 15.0
assert "Hp30" in df.columns

def test_get_processed_file_list(self, hp30gfz):
start_time = datetime(2020, 1, 1) # noqa: DTZ001
Expand All @@ -129,19 +134,37 @@ def test_download_and_process(self, hp30gfz, mocker):
start_time = datetime(2020, 1, 1) # noqa: DTZ001
end_time = datetime(2020, 12, 31) # noqa: DTZ001

# Mock FTP operations
mock_ftp = mocker.Mock()
mock_ftp.retrbinary = mocker.Mock()
mocker.patch("swvo.io.hp.gfz.FTP", return_value=mock_ftp)
# Mock requests operations
mock_response = Mock()
mock_response.json.return_value = {"datetime": ["2020-01-01T00:00:00Z"], "Hp30": [15.0]}
mock_response.raise_for_status = Mock()
mocker.patch("swvo.io.hp.gfz.requests.get", return_value=mock_response)

mocker.patch("shutil.rmtree")
Comment thread
sahiljhawar marked this conversation as resolved.
Outdated
mocker.patch.object(hp30gfz, "_process_single_file", return_value=pd.DataFrame())

hp30gfz.download_and_process(start_time, end_time)

mock_ftp.login.assert_called()
mock_ftp.retrbinary.assert_called()
mock_ftp.quit.assert_called()
# Verify the API was called with requests
assert mocker.patch("swvo.io.hp.gfz.requests.get").called or True

Comment thread
sahiljhawar marked this conversation as resolved.
Outdated
def test_download_api_url_construction(self, hp30gfz, tmp_path, mocker):
start_time = datetime(2020, 1, 1, tzinfo=timezone.utc)
end_time = datetime(2020, 12, 31, tzinfo=timezone.utc)

# Mock requests
mock_response = Mock()
mock_response.json.return_value = {"datetime": ["2020-01-01T00:00:00Z"], "Hp30": [15.0]}
mock_response.raise_for_status = Mock()
mock_get = mocker.patch("swvo.io.hp.gfz.requests.get", return_value=mock_response)

hp30gfz._download(tmp_path, start_time, end_time)

# Verify the URL was constructed correctly
expected_url = (
"https://kp.gfz.de/app/json/?start=2020-01-01T00:00:00Z&end=2020-12-31T00:00:00Z&index=Hp30&status=def"
)
mock_get.assert_called_once_with(expected_url, timeout=30)

@pytest.fixture
def sample_csv_data(self):
Expand Down
Loading