Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extraction and Transformation of Ibtracs #210

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions apps/etl/etl_tasks/noaa_IBTrACS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging

from celery import chain, shared_task
from django.conf import settings

from apps.etl.extraction.sources.noaa_IBTrACS.extract import IBTrACSExtraction
from apps.etl.transform.sources.noaa_ibtracs import IbtracsTransformHandler

logger = logging.getLogger(__name__)


@shared_task
def extract_and_transform_ibtracs_data(url):
chain(
IBTrACSExtraction.task.s(url),
IbtracsTransformHandler.task.s(),
).apply_async()


@shared_task
def ext_and_transform_ibtracs_historical_data():
url = f"{settings.IBTRACS_DATA_URL}/ibtracs.ALL.list.v04r01.csv"
extract_and_transform_ibtracs_data(url)


@shared_task
def ext_and_transform_ibtracs_latest_data():
url = f"{settings.IBTRACS_DATA_URL}/ibtracs.ACTIVE.list.v04r01.csv"
extract_and_transform_ibtracs_data(url)
Empty file.
101 changes: 101 additions & 0 deletions apps/etl/extraction/sources/noaa_IBTrACS/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import logging
from typing import Any, Callable

import requests

from apps.etl.extraction.sources.base.handler import BaseExtraction
from apps.etl.extraction.sources.base.utils import manage_duplicate_file_content
from apps.etl.models import ExtractionData
from main.celery import app

logger = logging.getLogger(__name__)


class IBTrACSExtraction(BaseExtraction):
"""
Handles data extraction of IBTrACS
"""

@classmethod
def store_extraction_data( # type: ignore[reportIncompatibleMethodOverride]
cls,
validate_source_func: Callable[[Any], None] | None,
source: int,
response: requests.Response,
instance_id: int | None = None,
):
"""
Save extracted data into database. Checks for duplicate content using hashing.
"""
file_name = f"{source}.zip"
resp_data = response

# save the additional response data after the data is fetched from api.
extraction_instance = ExtractionData.objects.get(id=instance_id)
extraction_instance.resp_data_type = "application/csv"
extraction_instance.save(update_fields=["resp_data_type"])

# Validate the non empty response data.
if resp_data:
# manage duplicate file content.
manage_duplicate_file_content(
source=extraction_instance.source,
hash_content=None,
instance=extraction_instance,
response_data=resp_data.content,
file_name=file_name,
)
return resp_data.content

@classmethod
def handle_extraction(cls, url: str, params: dict | None, source: int): # type: ignore[reportIncompatibleMethodOverride]
"""
Process data extraction
Returns:
csv file
"""
logger.info("Starting data extraction")
instance = cls._create_extraction_instance(url=url, source=source)
try:
cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
response = requests.get(url=url, params=params)
response.raise_for_status()
instance.resp_code = response.status_code
instance.save(update_fields=["resp_code"])

if response.status_code == 200:
response_data = cls.store_extraction_data(
instance_id=instance.id,
source=ExtractionData.Source.IBTRACS,
response=response,
validate_source_func=None,
)
if response_data:
cls._update_instance_status(instance, ExtractionData.Status.SUCCESS)
logger.info("Data extracted successfully")
else:
cls._update_instance_status(
instance,
ExtractionData.Status.SUCCESS,
ExtractionData.ValidationStatus.NO_DATA,
update_validation=True,
)
logger.warning("NO hazard data found in response")
# FIXME: Handle else case
return instance.id

except requests.exceptions.RequestException:
cls._update_instance_status(instance, ExtractionData.Status.FAILED)
logger.error(
"extraction failed",
exc_info=True,
extra={
"source": instance.source,
},
)
raise

@staticmethod
@app.task
def task(url: str): # type: ignore[reportIncompatibleMethodOverride]
return IBTrACSExtraction().handle_extraction(url=url, params=None, source=ExtractionData.Source.IBTRACS)
10 changes: 10 additions & 0 deletions apps/etl/management/commands/extract_ibtracs_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.core.management.base import BaseCommand

from apps.etl.etl_tasks.noaa_IBTrACS import ext_and_transform_ibtracs_historical_data


class Command(BaseCommand):
help = "Import data from IBTrACS"

def handle(self, *args, **options):
ext_and_transform_ibtracs_historical_data()
2 changes: 2 additions & 0 deletions apps/etl/transform/sources/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
"emdat-events": PyStacLoadData.ItemType.EVENT,
"emdat-hazards": PyStacLoadData.ItemType.HAZARD,
"emdat-impacts": PyStacLoadData.ItemType.IMPACT,
"ibtracs-events": PyStacLoadData.ItemType.EVENT,
"ibtracs-hazards": PyStacLoadData.ItemType.HAZARD,
}


Expand Down
29 changes: 29 additions & 0 deletions apps/etl/transform/sources/noaa_ibtracs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging

from django.conf import settings
from pystac_monty.geocoding import GAULGeocoder
from pystac_monty.sources.ibtracs import IBTrACSDataSource, IBTrACSTransformer

from apps.etl.models import ExtractionData
from apps.etl.transform.sources.handler import BaseTransformerHandler
from main.celery import app

logger = logging.getLogger(__name__)


class IbtracsTransformHandler(BaseTransformerHandler):
transformer = IBTrACSTransformer
transformer_schema = IBTrACSDataSource

@classmethod
def get_schema_data(cls, extraction_obj: ExtractionData):
with extraction_obj.resp_data.open() as file_data:
data = file_data.read()

return cls.transformer_schema(source_url=extraction_obj.url, data=data.decode("utf-8"))

@staticmethod
@app.task
def task(extraction_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def task(extraction_id):
def task(extraction_id: int):

geocoder = GAULGeocoder(gpkg_path=None, service_base_url=settings.GEOCODER_URL)
return IbtracsTransformHandler().handle_transformation(extraction_id, geocoder)
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ x-server: &base_server_setup
ARC_DOMAIN: ${ARC_DOMAIN?error}
ARC_USERNAME: ${ARC_USERNAME?error}
ARC_PASSWORD: ${ARC_PASSWORD?error}
IBTRACS_DATA_URL: ${IBTRACS_DATA_URL?error}
# ETL Load
EOAPI_DOMAIN: ${EOAPI_DOMAIN?error}
DJANGO_APP_ENVIRONMENT: ${DJANGO_APP_ENVIRONMENT:-development}
Expand Down
2 changes: 1 addition & 1 deletion helm/linter_values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ env:
ARC_DOMAIN: https://arc.dummy.com
USGS_DATA_URL: https://usgs.dummy.com
EMDAT_URL: https://emdat.dummy.com

IBTRACS_DATA_URL: https://ibtracs.dummy.com
envAdditional:
ENABLE_MAGIC: "true"
MAGIC_TYPE: fun
Expand Down
1 change: 1 addition & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ env:
ARC_DOMAIN:
IFRC_DATA_URL:
EMDAT_URL:
IBTRACS_DATA_URL:
# NOTE: Used to pass additional configs to api/worker containers
# NOTE: Not used by azure vault
envAdditional:
Expand Down
5 changes: 4 additions & 1 deletion main/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
GDACS_START_DATE=(str, "2025-01-01"),
EMDAT_START_YEAR=(str, "2024"),
GFD_START_DATE=(str, "2025-01-01"),
IBTRACS_DATA_URL=str,
# ETL Load configs
EOAPI_DOMAIN=str, # http://montandon-eoapi.ifrc.org
GFD_CREDENTIAL=str,
Expand Down Expand Up @@ -141,6 +142,8 @@
ARC_USERNAME = env("ARC_USERNAME")
ARC_PASSWORD = env("ARC_PASSWORD")

IBTRACS_DATA_URL = env("IBTRACS_DATA_URL")

TIME_ZONE = env("DJANGO_TIME_ZONE")

SECRET_KEY = env("DJANGO_SECRET_KEY")
Expand Down Expand Up @@ -349,7 +352,7 @@
SENTRY_DSN = env("SENTRY_DSN")
SENTRY_ENABLED = False
SENTRY_MONITOR_CELERY_BEAT_TASKS = env("SENTRY_MONITOR_CELERY_BEAT_TASKS")
if SENTRY_DSN:
if SENTRY_DSN is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the different here?

SENTRY_ENABLED = True
SENTRY_CONFIG = {
"dsn": SENTRY_DSN,
Expand Down
Loading