Skip to content

Commit f6b8170

Browse files
tnagorraRup-Narayan-Rajbanshi
authored andcommitted
Fix state issue regarding Emdat extraaction
- Re-format graphql query - Define query variables for emdat and glide - Simplify logic for glide task
1 parent 573114e commit f6b8170

File tree

5 files changed

+145
-110
lines changed

5 files changed

+145
-110
lines changed

apps/etl/etl_tasks/emdat.py

+79-69
Original file line numberDiff line numberDiff line change
@@ -3,92 +3,102 @@
33
from celery import chain, shared_task
44
from django.conf import settings
55

6-
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction
6+
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction, EMDATQueryVars
77
from apps.etl.transform.sources.emdat import EMDATTransformHandler
88

99
QUERY = """
10-
query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) {
11-
api_version
12-
public_emdat(
13-
cursor: {
14-
offset: $offset,
15-
limit: $limit
16-
}
17-
filters: {
18-
include_hist: $include_hist
19-
from: $from
20-
to: $to
21-
}
22-
) {
10+
query monty(
11+
$limit: Int
12+
$offset: Int
13+
$include_hist: Boolean
14+
$from: Int
15+
$to: Int
16+
) {
17+
api_version
18+
public_emdat(
19+
cursor: { offset: $offset, limit: $limit }
20+
filters: { include_hist: $include_hist, from: $from, to: $to }
21+
) {
2322
total_available
2423
info {
25-
timestamp
26-
filters
27-
cursor
28-
version
24+
timestamp
25+
filters
26+
cursor
27+
version
2928
}
3029
data {
31-
disno
32-
classif_key
33-
group
34-
subgroup
35-
type
36-
subtype
37-
external_ids
38-
name
39-
iso
40-
country
41-
subregion
42-
region
43-
location
44-
origin
45-
associated_types
46-
ofda_response
47-
appeal
48-
declaration
49-
aid_contribution
50-
magnitude
51-
magnitude_scale
52-
latitude
53-
longitude
54-
river_basin
55-
start_year
56-
start_month
57-
start_day
58-
end_year
59-
end_month
60-
end_day
61-
total_deaths
62-
no_injured
63-
no_affected
64-
no_homeless
65-
total_affected
66-
reconstr_dam
67-
reconstr_dam_adj
68-
insur_dam
69-
insur_dam_adj
70-
total_dam
71-
total_dam_adj
72-
cpi
73-
admin_units
74-
entry_date
75-
last_update
30+
disno
31+
classif_key
32+
group
33+
subgroup
34+
type
35+
subtype
36+
external_ids
37+
name
38+
iso
39+
country
40+
subregion
41+
region
42+
location
43+
origin
44+
associated_types
45+
ofda_response
46+
appeal
47+
declaration
48+
aid_contribution
49+
magnitude
50+
magnitude_scale
51+
latitude
52+
longitude
53+
river_basin
54+
start_year
55+
start_month
56+
start_day
57+
end_year
58+
end_month
59+
end_day
60+
total_deaths
61+
no_injured
62+
no_affected
63+
no_homeless
64+
total_affected
65+
reconstr_dam
66+
reconstr_dam_adj
67+
insur_dam
68+
insur_dam_adj
69+
total_dam
70+
total_dam_adj
71+
cpi
72+
admin_units
73+
entry_date
74+
last_update
7675
}
77-
}
7876
}
79-
"""
77+
}
78+
"""
8079

8180

8281
@shared_task
8382
def ext_and_transform_emdat_latest_data(**kwargs):
84-
to_year = datetime.now().year
85-
from_year = int(settings.EMDAT_START_YEAR)
86-
variables = {"limit": -1, "from": from_year, "to": to_year}
83+
# FIXME: Why are we getting data from settings.EMDAT_START_YEAR to get the latest data?
84+
# Also, the filtering only filters using year so we might have lot of duplicate data
85+
variables: EMDATQueryVars = {
86+
"limit": -1,
87+
"from": int(settings.EMDAT_START_YEAR),
88+
"to": datetime.now().year,
89+
"include_hist": None,
90+
}
8791

8892
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
8993

9094

9195
@shared_task
9296
def ext_and_transform_emdat_historical_data(**kwargs):
93-
variables = {"limit": -1, "include_hist": True}
97+
variables: EMDATQueryVars = {
98+
"limit": -1,
99+
"from": None,
100+
"to": None,
101+
"include_hist": True,
102+
}
103+
94104
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()

apps/etl/etl_tasks/glide.py

+60-36
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,41 @@
33
from celery import chain, shared_task
44
from django.conf import settings
55

6-
from apps.etl.extraction.sources.glide.extract import GlideExtraction
6+
from apps.etl.extraction.sources.glide.extract import GlideExtraction, GlideQueryVars
77
from apps.etl.models import ExtractionData, HazardType
88
from apps.etl.transform.sources.glide import GlideTransformHandler
99

1010
GLIDE_HAZARDS = [
11-
("EQ", HazardType.EARTHQUAKE),
12-
("TC", HazardType.CYCLONE),
13-
("FL", HazardType.FLOOD),
14-
("DR", HazardType.DROUGHT),
15-
("WF", HazardType.WILDFIRE),
16-
("VO", HazardType.VOLCANO),
17-
("TS", HazardType.TSUNAMI),
18-
("CW", HazardType.COLDWAVE),
19-
("EP", HazardType.EPIDEMIC),
20-
("EC", HazardType.EXTRATROPICAL_CYCLONE),
21-
("ET", HazardType.EXTREME_TEMPERATURE),
22-
("FR", HazardType.FIRE),
23-
("FF", HazardType.FLASH_FLOOD),
24-
("HT", HazardType.HEAT_WAVE),
25-
("IN", HazardType.INSECT_INFESTATION),
26-
("LS", HazardType.LANDSLIDE),
27-
("MS", HazardType.MUD_SLIDE),
28-
("ST", HazardType.SEVERE_LOCAL_STROM),
29-
("SL", HazardType.SLIDE),
30-
("AV", HazardType.SNOW_AVALANCHE),
31-
("SS", HazardType.STORM),
32-
("AC", HazardType.TECH_DISASTER),
33-
("TO", HazardType.TORNADO),
34-
("VW", HazardType.VIOLENT_WIND),
35-
("WV", HazardType.WAVE_SURGE),
11+
HazardType.EARTHQUAKE,
12+
HazardType.FLOOD,
13+
HazardType.CYCLONE,
14+
HazardType.EPIDEMIC,
15+
HazardType.STORM,
16+
HazardType.DROUGHT,
17+
HazardType.TSUNAMI,
18+
HazardType.WILDFIRE,
19+
HazardType.VOLCANO,
20+
HazardType.COLDWAVE,
21+
HazardType.EXTRATROPICAL_CYCLONE,
22+
HazardType.EXTREME_TEMPERATURE,
23+
HazardType.FIRE,
24+
HazardType.FLASH_FLOOD,
25+
HazardType.HEAT_WAVE,
26+
HazardType.INSECT_INFESTATION,
27+
HazardType.LANDSLIDE,
28+
HazardType.MUD_SLIDE,
29+
HazardType.SEVERE_LOCAL_STROM,
30+
HazardType.SLIDE,
31+
HazardType.SNOW_AVALANCHE,
32+
HazardType.TECH_DISASTER,
33+
HazardType.TORNADO,
34+
HazardType.VIOLENT_WIND,
35+
HazardType.WAVE_SURGE,
3636
]
3737

3838

3939
@shared_task
40-
def _ext_and_transform_glide_latest_data(hazard_type, hazard_type_str):
40+
def _ext_and_transform_glide_latest_data(hazard_type: HazardType):
4141
ext_object = (
4242
ExtractionData.objects.filter(
4343
source=ExtractionData.Source.GLIDE,
@@ -48,31 +48,55 @@ def _ext_and_transform_glide_latest_data(hazard_type, hazard_type_str):
4848
.order_by("-created_at")
4949
.first()
5050
)
51+
5152
if ext_object:
5253
from_date = ext_object.created_at.date()
5354
else:
5455
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
5556

5657
to_date = datetime.today().date()
57-
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
5858

59-
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
59+
# FIXME: Check if the date filters are inclusive
60+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
61+
variables: GlideQueryVars = {
62+
"fromyear": from_date.year,
63+
"frommonth": from_date.month,
64+
"fromday": from_date.day,
65+
"toyear": to_date.year,
66+
"tomonth": to_date.month,
67+
"today": to_date.day,
68+
"events": hazard_type.value,
69+
}
70+
71+
chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()
6072

6173

6274
@shared_task
63-
def _ext_and_transform_glide_historical_data(hazard_type, hazard_type_str):
75+
def _ext_and_transform_glide_historical_data(hazard_type: HazardType):
6476
to_date = datetime.today().date()
65-
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?toyear={to_date.year}&tomonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
66-
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
77+
78+
# FIXME: Check if the date filters are inclusive
79+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
80+
variables: GlideQueryVars = {
81+
"fromyear": None,
82+
"frommonth": None,
83+
"fromday": None,
84+
"toyear": to_date.year,
85+
"tomonth": to_date.month,
86+
"today": to_date.day,
87+
"events": hazard_type.value,
88+
}
89+
90+
chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()
6791

6892

6993
@shared_task
7094
def ext_and_transform_glide_latest_data():
71-
for hazard_type, hazard_type_str in GLIDE_HAZARDS:
72-
_ext_and_transform_glide_latest_data(hazard_type, hazard_type_str)
95+
for hazard_type in GLIDE_HAZARDS:
96+
_ext_and_transform_glide_latest_data(hazard_type)
7397

7498

7599
@shared_task
76100
def ext_and_transform_glide_historical_data():
77-
for hazard_type, hazard_type_str in GLIDE_HAZARDS:
78-
_ext_and_transform_glide_historical_data(hazard_type, hazard_type_str)
101+
for hazard_type in GLIDE_HAZARDS:
102+
_ext_and_transform_glide_historical_data(hazard_type)

apps/etl/extraction/sources/base/handler.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def store_extraction_data(
2626
validate_source_func: Callable[[Any], None],
2727
source: int,
2828
response: dict,
29-
instance_id: int = None,
29+
instance_id: int | None = None,
3030
):
3131
"""
3232
Save extracted data into data base. Checks for duplicate conent using hashing.
@@ -78,7 +78,7 @@ def _create_extraction_instance(cls, url: str, source: int) -> ExtractionData:
7878

7979
@classmethod
8080
def _update_instance_status(
81-
cls, instance: ExtractionData, status: int, validation_status: int = None, update_validation: bool = False
81+
cls, instance: ExtractionData, status: int, validation_status: int | None = None, update_validation: bool = False
8282
) -> None:
8383
"""
8484
Update the status of the extraction instance.
@@ -115,7 +115,7 @@ def _save_response_data(cls, instance: ExtractionData, response: requests.Respon
115115
return json.loads(response.content)
116116

117117
@classmethod
118-
def handle_extraction(cls, url: str, params: dict, headers: dict, source: int) -> dict:
118+
def handle_extraction(cls, url: str, params: dict | None, headers: dict, source: int) -> dict:
119119
"""
120120
Process data extraction.
121121
Returns:

apps/etl/extraction/sources/desinventar/extract.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def store_extraction_data(
2525
instance_id: int = None,
2626
):
2727
"""
28-
Save extracted data into database. Checks for duplicate content using hashing.
28+
Save extracted data into database.
2929
"""
3030
file_name = f"{source}.zip"
3131
resp_data = response
@@ -40,6 +40,7 @@ def store_extraction_data(
4040
# manage duplicate file content.
4141
manage_duplicate_file_content(
4242
source=extraction_instance.source,
43+
# FIXME: We need to calculate has for zip file
4344
hash_content=None,
4445
instance=extraction_instance,
4546
response_data=resp_data.content,

apps/etl/transform/sources/handler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def load_stac_item_to_queue(cls, transform_items, transform_obj_id):
105105

106106
@staticmethod
107107
@app.task
108-
def task(extraction_id):
108+
def task(extraction_id: int):
109109
"""
110110
Not NotImplemented due to celery limitation with classmethod
111111
Eg: return XYZTransformHandler.handle_transformation(extraction_id)

0 commit comments

Comments
 (0)