Skip to content

Commit b8fafe3

Browse files
authored
Merge pull request #208 from IFRCGo/feature/apply-class-based-etl
Class bases ETL for Glide, Emdat.
2 parents ca070f1 + ae8ed22 commit b8fafe3

File tree

12 files changed

+313
-468
lines changed

12 files changed

+313
-468
lines changed

apps/etl/etl_tasks/emdat.py

+96-9
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,104 @@
1+
from datetime import datetime
2+
13
from celery import chain, shared_task
4+
from django.conf import settings
5+
6+
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction, EMDATQueryVars
7+
from apps.etl.transform.sources.emdat import EMDATTransformHandler
28

3-
from apps.etl.extraction.sources.emdat.extract import (
4-
extract_emdat_historical_data,
5-
extract_emdat_latest_data,
6-
)
7-
from apps.etl.transform.sources.emdat import transform_emdat_data
9+
QUERY = """
10+
query monty(
11+
$limit: Int
12+
$offset: Int
13+
$include_hist: Boolean
14+
$from: Int
15+
$to: Int
16+
) {
17+
api_version
18+
public_emdat(
19+
cursor: { offset: $offset, limit: $limit }
20+
filters: { include_hist: $include_hist, from: $from, to: $to }
21+
) {
22+
total_available
23+
info {
24+
timestamp
25+
filters
26+
cursor
27+
version
28+
}
29+
data {
30+
disno
31+
classif_key
32+
group
33+
subgroup
34+
type
35+
subtype
36+
external_ids
37+
name
38+
iso
39+
country
40+
subregion
41+
region
42+
location
43+
origin
44+
associated_types
45+
ofda_response
46+
appeal
47+
declaration
48+
aid_contribution
49+
magnitude
50+
magnitude_scale
51+
latitude
52+
longitude
53+
river_basin
54+
start_year
55+
start_month
56+
start_day
57+
end_year
58+
end_month
59+
end_day
60+
total_deaths
61+
no_injured
62+
no_affected
63+
no_homeless
64+
total_affected
65+
reconstr_dam
66+
reconstr_dam_adj
67+
insur_dam
68+
insur_dam_adj
69+
total_dam
70+
total_dam_adj
71+
cpi
72+
admin_units
73+
entry_date
74+
last_update
75+
}
76+
}
77+
}
78+
"""
879

980

1081
@shared_task
11-
def ext_and_transform_emdat_historical_data(**kwargs):
12-
chain(extract_emdat_historical_data.s(), transform_emdat_data.s()).apply_async()
82+
def ext_and_transform_emdat_latest_data(**kwargs):
83+
# FIXME: Why are we getting data from settings.EMDAT_START_YEAR to get the latest data?
84+
# Also, the filtering only filters using year so we might have lot of duplicate data
85+
variables: EMDATQueryVars = {
86+
"limit": -1,
87+
"from": int(settings.EMDAT_START_YEAR),
88+
"to": datetime.now().year,
89+
"include_hist": None,
90+
}
91+
92+
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
1393

1494

1595
@shared_task
16-
def ext_and_transform_emdat_latest_data(**kwargs):
17-
chain(extract_emdat_latest_data.s(), transform_emdat_data.s()).apply_async()
96+
def ext_and_transform_emdat_historical_data(**kwargs):
97+
variables: EMDATQueryVars = {
98+
"limit": -1,
99+
"from": None,
100+
"to": None,
101+
"include_hist": True,
102+
}
103+
104+
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()

apps/etl/etl_tasks/glide.py

+88-47
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,102 @@
1+
from datetime import datetime
2+
13
from celery import chain, shared_task
4+
from django.conf import settings
5+
6+
from apps.etl.extraction.sources.glide.extract import GlideExtraction, GlideQueryVars
7+
from apps.etl.models import ExtractionData, HazardType
8+
from apps.etl.transform.sources.glide import GlideTransformHandler
29

3-
from apps.etl.extraction.sources.glide.extract import (
4-
extract_glide_historical_data,
5-
extract_glide_latest_data,
6-
)
7-
from apps.etl.models import HazardType
8-
from apps.etl.transform.sources.glide import transform_glide_event_data
10+
GLIDE_HAZARDS = [
11+
HazardType.EARTHQUAKE,
12+
HazardType.FLOOD,
13+
HazardType.CYCLONE,
14+
HazardType.EPIDEMIC,
15+
HazardType.STORM,
16+
HazardType.DROUGHT,
17+
HazardType.TSUNAMI,
18+
HazardType.WILDFIRE,
19+
HazardType.VOLCANO,
20+
HazardType.COLDWAVE,
21+
HazardType.EXTRATROPICAL_CYCLONE,
22+
HazardType.EXTREME_TEMPERATURE,
23+
HazardType.FIRE,
24+
HazardType.FLASH_FLOOD,
25+
HazardType.HEAT_WAVE,
26+
HazardType.INSECT_INFESTATION,
27+
HazardType.LANDSLIDE,
28+
HazardType.MUD_SLIDE,
29+
HazardType.SEVERE_LOCAL_STROM,
30+
HazardType.SLIDE,
31+
HazardType.SNOW_AVALANCHE,
32+
HazardType.TECH_DISASTER,
33+
HazardType.TORNADO,
34+
HazardType.VIOLENT_WIND,
35+
HazardType.WAVE_SURGE,
36+
]
937

1038

1139
@shared_task
12-
def ext_and_transform_glide_historical_data(hazard_type: str, hazard_type_str: str, **kwargs):
13-
event_workflow = chain(
14-
extract_glide_historical_data.s(
40+
def _ext_and_transform_glide_latest_data(hazard_type: HazardType):
41+
ext_object = (
42+
ExtractionData.objects.filter(
43+
source=ExtractionData.Source.GLIDE,
1544
hazard_type=hazard_type,
16-
hazard_type_str=hazard_type_str,
17-
),
18-
transform_glide_event_data.s(),
45+
status=ExtractionData.Status.SUCCESS,
46+
resp_data__isnull=False,
47+
)
48+
.order_by("-created_at")
49+
.first()
1950
)
20-
event_workflow.apply_async()
51+
52+
if ext_object:
53+
from_date = ext_object.created_at.date()
54+
else:
55+
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
56+
57+
to_date = datetime.today().date()
58+
59+
# FIXME: Check if the date filters are inclusive
60+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
61+
variables: GlideQueryVars = {
62+
"fromyear": from_date.year,
63+
"frommonth": from_date.month,
64+
"fromday": from_date.day,
65+
"toyear": to_date.year,
66+
"tomonth": to_date.month,
67+
"today": to_date.day,
68+
"events": hazard_type.value,
69+
}
70+
71+
chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()
2172

2273

2374
@shared_task
24-
def ext_and_transform_data(hazard_type, hazard_type_str):
25-
event_workflow = chain(
26-
extract_glide_latest_data.s(
27-
hazard_type=hazard_type,
28-
hazard_type_str=hazard_type_str,
29-
),
30-
transform_glide_event_data.s(),
31-
)
32-
event_workflow.apply_async()
75+
def _ext_and_transform_glide_historical_data(hazard_type: HazardType):
76+
to_date = datetime.today().date()
77+
78+
# FIXME: Check if the date filters are inclusive
79+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
80+
variables: GlideQueryVars = {
81+
"fromyear": None,
82+
"frommonth": None,
83+
"fromday": None,
84+
"toyear": to_date.year,
85+
"tomonth": to_date.month,
86+
"today": to_date.day,
87+
"events": hazard_type.value,
88+
}
89+
90+
chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()
3391

3492

3593
@shared_task
3694
def ext_and_transform_glide_latest_data():
37-
ext_and_transform_data.delay("EQ", HazardType.EARTHQUAKE)
38-
ext_and_transform_data.delay("TC", HazardType.CYCLONE)
39-
ext_and_transform_data.delay("FL", HazardType.FLOOD)
40-
ext_and_transform_data.delay("DR", HazardType.DROUGHT)
41-
ext_and_transform_data.delay("WF", HazardType.WILDFIRE)
42-
ext_and_transform_data.delay("VO", HazardType.VOLCANO)
43-
ext_and_transform_data.delay("TS", HazardType.TSUNAMI)
44-
ext_and_transform_data.delay("CW", HazardType.COLDWAVE)
45-
ext_and_transform_data.delay("EP", HazardType.EPIDEMIC)
46-
ext_and_transform_data.delay("EC", HazardType.EXTRATROPICAL_CYCLONE)
47-
ext_and_transform_data.delay("ET", HazardType.EXTREME_TEMPERATURE)
48-
ext_and_transform_data.delay("FR", HazardType.FIRE)
49-
ext_and_transform_data.delay("FF", HazardType.FLASH_FLOOD)
50-
ext_and_transform_data.delay("HT", HazardType.HEAT_WAVE)
51-
ext_and_transform_data.delay("IN", HazardType.INSECT_INFESTATION)
52-
ext_and_transform_data.delay("LS", HazardType.LANDSLIDE)
53-
ext_and_transform_data.delay("MS", HazardType.MUD_SLIDE)
54-
ext_and_transform_data.delay("ST", HazardType.SEVERE_LOCAL_STROM)
55-
ext_and_transform_data.delay("SL", HazardType.SLIDE)
56-
ext_and_transform_data.delay("AV", HazardType.SNOW_AVALANCHE)
57-
ext_and_transform_data.delay("SS", HazardType.STORM)
58-
ext_and_transform_data.delay("AC", HazardType.TECH_DISASTER)
59-
ext_and_transform_data.delay("TO", HazardType.TORNADO)
60-
ext_and_transform_data.delay("VW", HazardType.VIOLENT_WIND)
61-
ext_and_transform_data.delay("WV", HazardType.WAVE_SURGE)
95+
for hazard_type in GLIDE_HAZARDS:
96+
_ext_and_transform_glide_latest_data(hazard_type)
97+
98+
99+
@shared_task
100+
def ext_and_transform_glide_historical_data():
101+
for hazard_type in GLIDE_HAZARDS:
102+
_ext_and_transform_glide_historical_data(hazard_type)

apps/etl/etl_tasks/usgs.py

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
@shared_task
88
def ext_and_transform_usgs_latest_data():
9-
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson"
109
url = f"{settings.USGS_DATA_URL}/all_day.geojson"
1110
ext_and_transform_data.delay(url)
1211

apps/etl/extraction/sources/base/handler.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def store_extraction_data(
2727
validate_source_func: Callable[[Any], None],
2828
source: int,
2929
response: dict,
30-
instance_id: int = None,
30+
instance_id: int | None = None,
3131
):
3232
"""
3333
Save extracted data into data base. Checks for duplicate conent using hashing.
@@ -72,14 +72,15 @@ def _create_extraction_instance(cls, url: str, source: int) -> ExtractionData:
7272
status=ExtractionData.Status.PENDING,
7373
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
7474
trace_id=str(uuid.uuid4()),
75+
# FIXME Pass hazard type
7576
hazard_type=None,
7677
attempt_no=0,
7778
resp_code=0,
7879
)
7980

8081
@classmethod
8182
def _update_instance_status(
82-
cls, instance: ExtractionData, status: int, validation_status: int = None, update_validation: bool = False
83+
cls, instance: ExtractionData, status: int, validation_status: int | None = None, update_validation: bool = False
8384
) -> None:
8485
"""
8586
Update the status of the extraction instance.
@@ -116,7 +117,7 @@ def _save_response_data(cls, instance: ExtractionData, response: requests.Respon
116117
return json.loads(response.content)
117118

118119
@classmethod
119-
def handle_extraction(cls, url: str, params: dict, headers: dict, source: int) -> dict:
120+
def handle_extraction(cls, url: str, params: dict | None, headers: dict, source: int) -> dict:
120121
"""
121122
Process data extraction.
122123
Returns:

apps/etl/extraction/sources/desinventar/extract.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def store_extraction_data(
2626
instance_id: int = None,
2727
):
2828
"""
29-
Save extracted data into database. Checks for duplicate content using hashing.
29+
Save extracted data into database.
3030
"""
3131
file_name = f"{source}.zip"
3232
resp_data = response
@@ -41,6 +41,7 @@ def store_extraction_data(
4141
# manage duplicate file content.
4242
manage_duplicate_file_content(
4343
source=extraction_instance.source,
44+
# FIXME: We need to calculate has for zip file
4445
hash_content=None,
4546
instance=extraction_instance,
4647
response_data=resp_data.content,

0 commit comments

Comments
 (0)