Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Class bases ETL for Glide, Emdat. #208

Merged
merged 4 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 96 additions & 9 deletions apps/etl/etl_tasks/emdat.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,104 @@
from datetime import datetime

from celery import chain, shared_task
from django.conf import settings

from apps.etl.extraction.sources.emdat.extract import EMDATExtraction, EMDATQueryVars
from apps.etl.transform.sources.emdat import EMDATTransformHandler

from apps.etl.extraction.sources.emdat.extract import (
extract_emdat_historical_data,
extract_emdat_latest_data,
)
from apps.etl.transform.sources.emdat import transform_emdat_data
QUERY = """
query monty(
$limit: Int
$offset: Int
$include_hist: Boolean
$from: Int
$to: Int
) {
api_version
public_emdat(
cursor: { offset: $offset, limit: $limit }
filters: { include_hist: $include_hist, from: $from, to: $to }
) {
total_available
info {
timestamp
filters
cursor
version
}
data {
disno
classif_key
group
subgroup
type
subtype
external_ids
name
iso
country
subregion
region
location
origin
associated_types
ofda_response
appeal
declaration
aid_contribution
magnitude
magnitude_scale
latitude
longitude
river_basin
start_year
start_month
start_day
end_year
end_month
end_day
total_deaths
no_injured
no_affected
no_homeless
total_affected
reconstr_dam
reconstr_dam_adj
insur_dam
insur_dam_adj
total_dam
total_dam_adj
cpi
admin_units
entry_date
last_update
}
}
}
"""


@shared_task
def ext_and_transform_emdat_historical_data(**kwargs):
chain(extract_emdat_historical_data.s(), transform_emdat_data.s()).apply_async()
def ext_and_transform_emdat_latest_data(**kwargs):
# FIXME: Why are we getting data from settings.EMDAT_START_YEAR to get the latest data?
# Also, the filtering only filters using year so we might have lot of duplicate data
variables: EMDATQueryVars = {
"limit": -1,
"from": int(settings.EMDAT_START_YEAR),
"to": datetime.now().year,
"include_hist": None,
}

chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()


@shared_task
def ext_and_transform_emdat_latest_data(**kwargs):
chain(extract_emdat_latest_data.s(), transform_emdat_data.s()).apply_async()
def ext_and_transform_emdat_historical_data(**kwargs):
variables: EMDATQueryVars = {
"limit": -1,
"from": None,
"to": None,
"include_hist": True,
}

chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
135 changes: 88 additions & 47 deletions apps/etl/etl_tasks/glide.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,102 @@
from datetime import datetime

from celery import chain, shared_task
from django.conf import settings

from apps.etl.extraction.sources.glide.extract import GlideExtraction, GlideQueryVars
from apps.etl.models import ExtractionData, HazardType
from apps.etl.transform.sources.glide import GlideTransformHandler

from apps.etl.extraction.sources.glide.extract import (
extract_glide_historical_data,
extract_glide_latest_data,
)
from apps.etl.models import HazardType
from apps.etl.transform.sources.glide import transform_glide_event_data
GLIDE_HAZARDS = [
HazardType.EARTHQUAKE,
HazardType.FLOOD,
HazardType.CYCLONE,
HazardType.EPIDEMIC,
HazardType.STORM,
HazardType.DROUGHT,
HazardType.TSUNAMI,
HazardType.WILDFIRE,
HazardType.VOLCANO,
HazardType.COLDWAVE,
HazardType.EXTRATROPICAL_CYCLONE,
HazardType.EXTREME_TEMPERATURE,
HazardType.FIRE,
HazardType.FLASH_FLOOD,
HazardType.HEAT_WAVE,
HazardType.INSECT_INFESTATION,
HazardType.LANDSLIDE,
HazardType.MUD_SLIDE,
HazardType.SEVERE_LOCAL_STROM,
HazardType.SLIDE,
HazardType.SNOW_AVALANCHE,
HazardType.TECH_DISASTER,
HazardType.TORNADO,
HazardType.VIOLENT_WIND,
HazardType.WAVE_SURGE,
]


@shared_task
def ext_and_transform_glide_historical_data(hazard_type: str, hazard_type_str: str, **kwargs):
event_workflow = chain(
extract_glide_historical_data.s(
def _ext_and_transform_glide_latest_data(hazard_type: HazardType):
ext_object = (
ExtractionData.objects.filter(
source=ExtractionData.Source.GLIDE,
hazard_type=hazard_type,
hazard_type_str=hazard_type_str,
),
transform_glide_event_data.s(),
status=ExtractionData.Status.SUCCESS,
resp_data__isnull=False,
)
.order_by("-created_at")
.first()
)
event_workflow.apply_async()

if ext_object:
from_date = ext_object.created_at.date()
else:
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()

to_date = datetime.today().date()

# FIXME: Check if the date filters are inclusive
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
variables: GlideQueryVars = {
"fromyear": from_date.year,
"frommonth": from_date.month,
"fromday": from_date.day,
"toyear": to_date.year,
"tomonth": to_date.month,
"today": to_date.day,
"events": hazard_type.value,
}

chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()


@shared_task
def ext_and_transform_data(hazard_type, hazard_type_str):
event_workflow = chain(
extract_glide_latest_data.s(
hazard_type=hazard_type,
hazard_type_str=hazard_type_str,
),
transform_glide_event_data.s(),
)
event_workflow.apply_async()
def _ext_and_transform_glide_historical_data(hazard_type: HazardType):
to_date = datetime.today().date()

# FIXME: Check if the date filters are inclusive
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
variables: GlideQueryVars = {
"fromyear": None,
"frommonth": None,
"fromday": None,
"toyear": to_date.year,
"tomonth": to_date.month,
"today": to_date.day,
"events": hazard_type.value,
}

chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()


@shared_task
def ext_and_transform_glide_latest_data():
ext_and_transform_data.delay("EQ", HazardType.EARTHQUAKE)
ext_and_transform_data.delay("TC", HazardType.CYCLONE)
ext_and_transform_data.delay("FL", HazardType.FLOOD)
ext_and_transform_data.delay("DR", HazardType.DROUGHT)
ext_and_transform_data.delay("WF", HazardType.WILDFIRE)
ext_and_transform_data.delay("VO", HazardType.VOLCANO)
ext_and_transform_data.delay("TS", HazardType.TSUNAMI)
ext_and_transform_data.delay("CW", HazardType.COLDWAVE)
ext_and_transform_data.delay("EP", HazardType.EPIDEMIC)
ext_and_transform_data.delay("EC", HazardType.EXTRATROPICAL_CYCLONE)
ext_and_transform_data.delay("ET", HazardType.EXTREME_TEMPERATURE)
ext_and_transform_data.delay("FR", HazardType.FIRE)
ext_and_transform_data.delay("FF", HazardType.FLASH_FLOOD)
ext_and_transform_data.delay("HT", HazardType.HEAT_WAVE)
ext_and_transform_data.delay("IN", HazardType.INSECT_INFESTATION)
ext_and_transform_data.delay("LS", HazardType.LANDSLIDE)
ext_and_transform_data.delay("MS", HazardType.MUD_SLIDE)
ext_and_transform_data.delay("ST", HazardType.SEVERE_LOCAL_STROM)
ext_and_transform_data.delay("SL", HazardType.SLIDE)
ext_and_transform_data.delay("AV", HazardType.SNOW_AVALANCHE)
ext_and_transform_data.delay("SS", HazardType.STORM)
ext_and_transform_data.delay("AC", HazardType.TECH_DISASTER)
ext_and_transform_data.delay("TO", HazardType.TORNADO)
ext_and_transform_data.delay("VW", HazardType.VIOLENT_WIND)
ext_and_transform_data.delay("WV", HazardType.WAVE_SURGE)
for hazard_type in GLIDE_HAZARDS:
_ext_and_transform_glide_latest_data(hazard_type)


@shared_task
def ext_and_transform_glide_historical_data():
for hazard_type in GLIDE_HAZARDS:
_ext_and_transform_glide_historical_data(hazard_type)
1 change: 0 additions & 1 deletion apps/etl/etl_tasks/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

@shared_task
def ext_and_transform_usgs_latest_data():
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson"
url = f"{settings.USGS_DATA_URL}/all_day.geojson"
ext_and_transform_data.delay(url)

Expand Down
7 changes: 4 additions & 3 deletions apps/etl/extraction/sources/base/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def store_extraction_data(
validate_source_func: Callable[[Any], None],
source: int,
response: dict,
instance_id: int = None,
instance_id: int | None = None,
):
"""
Save extracted data into data base. Checks for duplicate conent using hashing.
Expand Down Expand Up @@ -72,14 +72,15 @@ def _create_extraction_instance(cls, url: str, source: int) -> ExtractionData:
status=ExtractionData.Status.PENDING,
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
trace_id=str(uuid.uuid4()),
# FIXME Pass hazard type
hazard_type=None,
attempt_no=0,
resp_code=0,
)

@classmethod
def _update_instance_status(
cls, instance: ExtractionData, status: int, validation_status: int = None, update_validation: bool = False
cls, instance: ExtractionData, status: int, validation_status: int | None = None, update_validation: bool = False
) -> None:
"""
Update the status of the extraction instance.
Expand Down Expand Up @@ -116,7 +117,7 @@ def _save_response_data(cls, instance: ExtractionData, response: requests.Respon
return json.loads(response.content)

@classmethod
def handle_extraction(cls, url: str, params: dict, headers: dict, source: int) -> dict:
def handle_extraction(cls, url: str, params: dict | None, headers: dict, source: int) -> dict:
"""
Process data extraction.
Returns:
Expand Down
3 changes: 2 additions & 1 deletion apps/etl/extraction/sources/desinventar/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def store_extraction_data(
instance_id: int = None,
):
"""
Save extracted data into database. Checks for duplicate content using hashing.
Save extracted data into database.
"""
file_name = f"{source}.zip"
resp_data = response
Expand All @@ -41,6 +41,7 @@ def store_extraction_data(
# manage duplicate file content.
manage_duplicate_file_content(
source=extraction_instance.source,
# FIXME: We need to calculate has for zip file
hash_content=None,
instance=extraction_instance,
response_data=resp_data.content,
Expand Down
Loading
Loading