Skip to content

Commit ce47131

Browse files
sudan45tnagorra
authored andcommitted
Extraction of ibtracs
- Update transformation of ibtracs - Update env variable in helm chart - Update source task and collection - Update geocoder in basehandler
1 parent fbdd8f2 commit ce47131

File tree

10 files changed

+178
-2
lines changed

10 files changed

+178
-2
lines changed

apps/etl/etl_tasks/noaa_IBTrACS.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
3+
from celery import chain, shared_task
4+
from django.conf import settings
5+
6+
from apps.etl.extraction.sources.noaa_IBTrACS.extract import IBTrACSExtraction
7+
from apps.etl.transform.sources.noaa_ibtracs import IbtracsTransformHandler
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
@shared_task
13+
def extract_and_transform_ibtracs_data(url):
14+
chain(
15+
IBTrACSExtraction.task.s(url),
16+
IbtracsTransformHandler.task.s(),
17+
).apply_async()
18+
19+
20+
@shared_task
21+
def ext_and_transform_ibtracs_historical_data():
22+
url = f"{settings.IBTRACS_DATA_URL}/ibtracs.ALL.list.v04r01.csv"
23+
extract_and_transform_ibtracs_data(url)
24+
25+
26+
@shared_task
27+
def ext_and_transform_ibtracs_latest_data():
28+
url = f"{settings.IBTRACS_DATA_URL}/ibtracs.ACTIVE.list.v04r01.csv"
29+
extract_and_transform_ibtracs_data(url)

apps/etl/extraction/sources/noaa_IBTrACS/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import logging
2+
from typing import Any, Callable
3+
4+
import requests
5+
6+
from apps.etl.extraction.sources.base.handler import BaseExtraction
7+
from apps.etl.extraction.sources.base.utils import manage_duplicate_file_content
8+
from apps.etl.models import ExtractionData
9+
from main.celery import app
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class IBTrACSExtraction(BaseExtraction):
15+
"""
16+
Handles data extraction of IBTrACS
17+
"""
18+
19+
@classmethod
20+
def store_extraction_data( # type: ignore[reportIncompatibleMethodOverride]
21+
cls,
22+
validate_source_func: Callable[[Any], None] | None,
23+
source: int,
24+
response: requests.Response,
25+
instance_id: int | None = None,
26+
):
27+
"""
28+
Save extracted data into database. Checks for duplicate content using hashing.
29+
"""
30+
file_name = f"{source}.zip"
31+
resp_data = response
32+
33+
# save the additional response data after the data is fetched from api.
34+
extraction_instance = ExtractionData.objects.get(id=instance_id)
35+
extraction_instance.resp_data_type = "application/csv"
36+
extraction_instance.save(update_fields=["resp_data_type"])
37+
38+
# Validate the non empty response data.
39+
if resp_data:
40+
# manage duplicate file content.
41+
manage_duplicate_file_content(
42+
source=extraction_instance.source,
43+
hash_content=None,
44+
instance=extraction_instance,
45+
response_data=resp_data.content,
46+
file_name=file_name,
47+
)
48+
return resp_data.content
49+
50+
@classmethod
51+
def handle_extraction(cls, url: str, params: dict | None, source: int): # type: ignore[reportIncompatibleMethodOverride]
52+
"""
53+
Process data extraction
54+
Returns:
55+
csv file
56+
"""
57+
logger.info("Starting data extraction")
58+
instance = cls._create_extraction_instance(url=url, source=source)
59+
try:
60+
cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
61+
response = requests.get(url=url, params=params)
62+
response.raise_for_status()
63+
instance.resp_code = response.status_code
64+
instance.save(update_fields=["resp_code"])
65+
66+
if response.status_code == 200:
67+
response_data = cls.store_extraction_data(
68+
instance_id=instance.id,
69+
source=ExtractionData.Source.IBTRACS,
70+
response=response,
71+
validate_source_func=None,
72+
)
73+
if response_data:
74+
cls._update_instance_status(instance, ExtractionData.Status.SUCCESS)
75+
logger.info("Data extracted successfully")
76+
else:
77+
cls._update_instance_status(
78+
instance,
79+
ExtractionData.Status.SUCCESS,
80+
ExtractionData.ValidationStatus.NO_DATA,
81+
update_validation=True,
82+
)
83+
logger.warning("NO hazard data found in response")
84+
# FIXME: Handle else case
85+
return instance.id
86+
87+
except requests.exceptions.RequestException:
88+
cls._update_instance_status(instance, ExtractionData.Status.FAILED)
89+
logger.error(
90+
"extraction failed",
91+
exc_info=True,
92+
extra={
93+
"source": instance.source,
94+
},
95+
)
96+
raise
97+
98+
@staticmethod
99+
@app.task
100+
def task(url: str): # type: ignore[reportIncompatibleMethodOverride]
101+
return IBTrACSExtraction().handle_extraction(url=url, params=None, source=ExtractionData.Source.IBTRACS)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from django.core.management.base import BaseCommand
2+
3+
from apps.etl.etl_tasks.noaa_IBTrACS import ext_and_transform_ibtracs_historical_data
4+
5+
6+
class Command(BaseCommand):
7+
help = "Import data from IBTrACS"
8+
9+
def handle(self, *args, **options):
10+
ext_and_transform_ibtracs_historical_data()

apps/etl/transform/sources/handler.py

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
"emdat-events": PyStacLoadData.ItemType.EVENT,
3434
"emdat-hazards": PyStacLoadData.ItemType.HAZARD,
3535
"emdat-impacts": PyStacLoadData.ItemType.IMPACT,
36+
"ibtracs-events": PyStacLoadData.ItemType.EVENT,
37+
"ibtracs-hazards": PyStacLoadData.ItemType.HAZARD,
3638
}
3739

3840

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
3+
from django.conf import settings
4+
from pystac_monty.geocoding import GAULGeocoder
5+
from pystac_monty.sources.ibtracs import IBTrACSDataSource, IBTrACSTransformer
6+
7+
from apps.etl.models import ExtractionData
8+
from apps.etl.transform.sources.handler import BaseTransformerHandler
9+
from main.celery import app
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class IbtracsTransformHandler(BaseTransformerHandler):
15+
transformer = IBTrACSTransformer
16+
transformer_schema = IBTrACSDataSource
17+
18+
@classmethod
19+
def get_schema_data(cls, extraction_obj: ExtractionData):
20+
with extraction_obj.resp_data.open() as file_data:
21+
data = file_data.read()
22+
23+
return cls.transformer_schema(source_url=extraction_obj.url, data=data.decode("utf-8"))
24+
25+
@staticmethod
26+
@app.task
27+
def task(extraction_id):
28+
geocoder = GAULGeocoder(gpkg_path=None, service_base_url=settings.GEOCODER_URL)
29+
return IbtracsTransformHandler().handle_transformation(extraction_id, geocoder)

docker-compose.yml

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ x-server: &base_server_setup
3636
ARC_DOMAIN: ${ARC_DOMAIN?error}
3737
ARC_USERNAME: ${ARC_USERNAME?error}
3838
ARC_PASSWORD: ${ARC_PASSWORD?error}
39+
IBTRACS_DATA_URL: ${IBTRACS_DATA_URL?error}
3940
# ETL Load
4041
EOAPI_DOMAIN: ${EOAPI_DOMAIN?error}
4142
DJANGO_APP_ENVIRONMENT: ${DJANGO_APP_ENVIRONMENT:-development}

helm/linter_values.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ env:
2626
ARC_DOMAIN: https://arc.dummy.com
2727
USGS_DATA_URL: https://usgs.dummy.com
2828
EMDAT_URL: https://emdat.dummy.com
29-
29+
IBTRACS_DATA_URL: https://ibtracs.dummy.com
3030
envAdditional:
3131
ENABLE_MAGIC: "true"
3232
MAGIC_TYPE: fun

helm/values.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ env:
176176
ARC_DOMAIN:
177177
IFRC_DATA_URL:
178178
EMDAT_URL:
179+
IBTRACS_DATA_URL:
179180
# NOTE: Used to pass additional configs to api/worker containers
180181
# NOTE: Not used by azure vault
181182
envAdditional:

main/settings.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
GDACS_START_DATE=(str, "2025-01-01"),
8181
EMDAT_START_YEAR=(str, "2024"),
8282
GFD_START_DATE=(str, "2025-01-01"),
83+
IBTRACS_DATA_URL=str,
8384
# ETL Load configs
8485
EOAPI_DOMAIN=str, # http://montandon-eoapi.ifrc.org
8586
GFD_CREDENTIAL=str,
@@ -141,6 +142,8 @@
141142
ARC_USERNAME = env("ARC_USERNAME")
142143
ARC_PASSWORD = env("ARC_PASSWORD")
143144

145+
IBTRACS_DATA_URL = env("IBTRACS_DATA_URL")
146+
144147
TIME_ZONE = env("DJANGO_TIME_ZONE")
145148

146149
SECRET_KEY = env("DJANGO_SECRET_KEY")
@@ -349,7 +352,7 @@
349352
SENTRY_DSN = env("SENTRY_DSN")
350353
SENTRY_ENABLED = False
351354
SENTRY_MONITOR_CELERY_BEAT_TASKS = env("SENTRY_MONITOR_CELERY_BEAT_TASKS")
352-
if SENTRY_DSN:
355+
if SENTRY_DSN is not None:
353356
SENTRY_ENABLED = True
354357
SENTRY_CONFIG = {
355358
"dsn": SENTRY_DSN,

0 commit comments

Comments
 (0)