Skip to content

Commit ae8ed22

Browse files
tnagorraRup-Narayan-Rajbanshi
authored andcommitted
Fix state issue regarding Emdat extraaction
- Re-format graphql query - Define query variables for emdat and glide - Simplify logic for glide task
1 parent 5cb04a4 commit ae8ed22

File tree

11 files changed

+167
-174
lines changed

11 files changed

+167
-174
lines changed

apps/etl/etl_tasks/emdat.py

+79-69
Original file line numberDiff line numberDiff line change
@@ -3,92 +3,102 @@
33
from celery import chain, shared_task
44
from django.conf import settings
55

6-
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction
6+
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction, EMDATQueryVars
77
from apps.etl.transform.sources.emdat import EMDATTransformHandler
88

99
QUERY = """
10-
query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) {
11-
api_version
12-
public_emdat(
13-
cursor: {
14-
offset: $offset,
15-
limit: $limit
16-
}
17-
filters: {
18-
include_hist: $include_hist
19-
from: $from
20-
to: $to
21-
}
22-
) {
10+
query monty(
11+
$limit: Int
12+
$offset: Int
13+
$include_hist: Boolean
14+
$from: Int
15+
$to: Int
16+
) {
17+
api_version
18+
public_emdat(
19+
cursor: { offset: $offset, limit: $limit }
20+
filters: { include_hist: $include_hist, from: $from, to: $to }
21+
) {
2322
total_available
2423
info {
25-
timestamp
26-
filters
27-
cursor
28-
version
24+
timestamp
25+
filters
26+
cursor
27+
version
2928
}
3029
data {
31-
disno
32-
classif_key
33-
group
34-
subgroup
35-
type
36-
subtype
37-
external_ids
38-
name
39-
iso
40-
country
41-
subregion
42-
region
43-
location
44-
origin
45-
associated_types
46-
ofda_response
47-
appeal
48-
declaration
49-
aid_contribution
50-
magnitude
51-
magnitude_scale
52-
latitude
53-
longitude
54-
river_basin
55-
start_year
56-
start_month
57-
start_day
58-
end_year
59-
end_month
60-
end_day
61-
total_deaths
62-
no_injured
63-
no_affected
64-
no_homeless
65-
total_affected
66-
reconstr_dam
67-
reconstr_dam_adj
68-
insur_dam
69-
insur_dam_adj
70-
total_dam
71-
total_dam_adj
72-
cpi
73-
admin_units
74-
entry_date
75-
last_update
30+
disno
31+
classif_key
32+
group
33+
subgroup
34+
type
35+
subtype
36+
external_ids
37+
name
38+
iso
39+
country
40+
subregion
41+
region
42+
location
43+
origin
44+
associated_types
45+
ofda_response
46+
appeal
47+
declaration
48+
aid_contribution
49+
magnitude
50+
magnitude_scale
51+
latitude
52+
longitude
53+
river_basin
54+
start_year
55+
start_month
56+
start_day
57+
end_year
58+
end_month
59+
end_day
60+
total_deaths
61+
no_injured
62+
no_affected
63+
no_homeless
64+
total_affected
65+
reconstr_dam
66+
reconstr_dam_adj
67+
insur_dam
68+
insur_dam_adj
69+
total_dam
70+
total_dam_adj
71+
cpi
72+
admin_units
73+
entry_date
74+
last_update
7675
}
77-
}
7876
}
79-
"""
77+
}
78+
"""
8079

8180

8281
@shared_task
8382
def ext_and_transform_emdat_latest_data(**kwargs):
84-
to_year = datetime.now().year
85-
from_year = int(settings.EMDAT_START_YEAR)
86-
variables = {"limit": -1, "from": from_year, "to": to_year}
83+
# FIXME: Why are we getting data from settings.EMDAT_START_YEAR to get the latest data?
84+
# Also, the filtering only filters using year so we might have lot of duplicate data
85+
variables: EMDATQueryVars = {
86+
"limit": -1,
87+
"from": int(settings.EMDAT_START_YEAR),
88+
"to": datetime.now().year,
89+
"include_hist": None,
90+
}
8791

8892
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
8993

9094

9195
@shared_task
9296
def ext_and_transform_emdat_historical_data(**kwargs):
93-
variables = {"limit": -1, "include_hist": True}
97+
variables: EMDATQueryVars = {
98+
"limit": -1,
99+
"from": None,
100+
"to": None,
101+
"include_hist": True,
102+
}
103+
94104
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()

apps/etl/etl_tasks/glide.py

+60-36
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,41 @@
33
from celery import chain, shared_task
44
from django.conf import settings
55

6-
from apps.etl.extraction.sources.glide.extract import GlideExtraction
6+
from apps.etl.extraction.sources.glide.extract import GlideExtraction, GlideQueryVars
77
from apps.etl.models import ExtractionData, HazardType
88
from apps.etl.transform.sources.glide import GlideTransformHandler
99

1010
GLIDE_HAZARDS = [
11-
("EQ", HazardType.EARTHQUAKE),
12-
("TC", HazardType.CYCLONE),
13-
("FL", HazardType.FLOOD),
14-
("DR", HazardType.DROUGHT),
15-
("WF", HazardType.WILDFIRE),
16-
("VO", HazardType.VOLCANO),
17-
("TS", HazardType.TSUNAMI),
18-
("CW", HazardType.COLDWAVE),
19-
("EP", HazardType.EPIDEMIC),
20-
("EC", HazardType.EXTRATROPICAL_CYCLONE),
21-
("ET", HazardType.EXTREME_TEMPERATURE),
22-
("FR", HazardType.FIRE),
23-
("FF", HazardType.FLASH_FLOOD),
24-
("HT", HazardType.HEAT_WAVE),
25-
("IN", HazardType.INSECT_INFESTATION),
26-
("LS", HazardType.LANDSLIDE),
27-
("MS", HazardType.MUD_SLIDE),
28-
("ST", HazardType.SEVERE_LOCAL_STROM),
29-
("SL", HazardType.SLIDE),
30-
("AV", HazardType.SNOW_AVALANCHE),
31-
("SS", HazardType.STORM),
32-
("AC", HazardType.TECH_DISASTER),
33-
("TO", HazardType.TORNADO),
34-
("VW", HazardType.VIOLENT_WIND),
35-
("WV", HazardType.WAVE_SURGE),
11+
HazardType.EARTHQUAKE,
12+
HazardType.FLOOD,
13+
HazardType.CYCLONE,
14+
HazardType.EPIDEMIC,
15+
HazardType.STORM,
16+
HazardType.DROUGHT,
17+
HazardType.TSUNAMI,
18+
HazardType.WILDFIRE,
19+
HazardType.VOLCANO,
20+
HazardType.COLDWAVE,
21+
HazardType.EXTRATROPICAL_CYCLONE,
22+
HazardType.EXTREME_TEMPERATURE,
23+
HazardType.FIRE,
24+
HazardType.FLASH_FLOOD,
25+
HazardType.HEAT_WAVE,
26+
HazardType.INSECT_INFESTATION,
27+
HazardType.LANDSLIDE,
28+
HazardType.MUD_SLIDE,
29+
HazardType.SEVERE_LOCAL_STROM,
30+
HazardType.SLIDE,
31+
HazardType.SNOW_AVALANCHE,
32+
HazardType.TECH_DISASTER,
33+
HazardType.TORNADO,
34+
HazardType.VIOLENT_WIND,
35+
HazardType.WAVE_SURGE,
3636
]
3737

3838

3939
@shared_task
40-
def _ext_and_transform_glide_latest_data(hazard_type, hazard_type_str):
40+
def _ext_and_transform_glide_latest_data(hazard_type: HazardType):
4141
ext_object = (
4242
ExtractionData.objects.filter(
4343
source=ExtractionData.Source.GLIDE,
@@ -48,31 +48,55 @@ def _ext_and_transform_glide_latest_data(hazard_type, hazard_type_str):
4848
.order_by("-created_at")
4949
.first()
5050
)
51+
5152
if ext_object:
5253
from_date = ext_object.created_at.date()
5354
else:
5455
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
5556

5657
to_date = datetime.today().date()
57-
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
5858

59-
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
59+
# FIXME: Check if the date filters are inclusive
60+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
61+
variables: GlideQueryVars = {
62+
"fromyear": from_date.year,
63+
"frommonth": from_date.month,
64+
"fromday": from_date.day,
65+
"toyear": to_date.year,
66+
"tomonth": to_date.month,
67+
"today": to_date.day,
68+
"events": hazard_type.value,
69+
}
70+
71+
chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()
6072

6173

6274
@shared_task
63-
def _ext_and_transform_glide_historical_data(hazard_type, hazard_type_str):
75+
def _ext_and_transform_glide_historical_data(hazard_type: HazardType):
6476
to_date = datetime.today().date()
65-
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?toyear={to_date.year}&tomonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
66-
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
77+
78+
# FIXME: Check if the date filters are inclusive
79+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp"
80+
variables: GlideQueryVars = {
81+
"fromyear": None,
82+
"frommonth": None,
83+
"fromday": None,
84+
"toyear": to_date.year,
85+
"tomonth": to_date.month,
86+
"today": to_date.day,
87+
"events": hazard_type.value,
88+
}
89+
90+
chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async()
6791

6892

6993
@shared_task
7094
def ext_and_transform_glide_latest_data():
71-
for hazard_type, hazard_type_str in GLIDE_HAZARDS:
72-
_ext_and_transform_glide_latest_data(hazard_type, hazard_type_str)
95+
for hazard_type in GLIDE_HAZARDS:
96+
_ext_and_transform_glide_latest_data(hazard_type)
7397

7498

7599
@shared_task
76100
def ext_and_transform_glide_historical_data():
77-
for hazard_type, hazard_type_str in GLIDE_HAZARDS:
78-
_ext_and_transform_glide_historical_data(hazard_type, hazard_type_str)
101+
for hazard_type in GLIDE_HAZARDS:
102+
_ext_and_transform_glide_historical_data(hazard_type)

apps/etl/extraction/sources/base/handler.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def store_extraction_data(
2727
validate_source_func: Callable[[Any], None],
2828
source: int,
2929
response: dict,
30-
instance_id: int = None,
30+
instance_id: int | None = None,
3131
):
3232
"""
3333
Save extracted data into data base. Checks for duplicate conent using hashing.
@@ -72,14 +72,15 @@ def _create_extraction_instance(cls, url: str, source: int) -> ExtractionData:
7272
status=ExtractionData.Status.PENDING,
7373
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
7474
trace_id=str(uuid.uuid4()),
75+
# FIXME Pass hazard type
7576
hazard_type=None,
7677
attempt_no=0,
7778
resp_code=0,
7879
)
7980

8081
@classmethod
8182
def _update_instance_status(
82-
cls, instance: ExtractionData, status: int, validation_status: int = None, update_validation: bool = False
83+
cls, instance: ExtractionData, status: int, validation_status: int | None = None, update_validation: bool = False
8384
) -> None:
8485
"""
8586
Update the status of the extraction instance.
@@ -116,7 +117,7 @@ def _save_response_data(cls, instance: ExtractionData, response: requests.Respon
116117
return json.loads(response.content)
117118

118119
@classmethod
119-
def handle_extraction(cls, url: str, params: dict, headers: dict, source: int) -> dict:
120+
def handle_extraction(cls, url: str, params: dict | None, headers: dict, source: int) -> dict:
120121
"""
121122
Process data extraction.
122123
Returns:

apps/etl/extraction/sources/desinventar/extract.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def store_extraction_data(
2626
instance_id: int = None,
2727
):
2828
"""
29-
Save extracted data into database. Checks for duplicate content using hashing.
29+
Save extracted data into database.
3030
"""
3131
file_name = f"{source}.zip"
3232
resp_data = response
@@ -41,6 +41,7 @@ def store_extraction_data(
4141
# manage duplicate file content.
4242
manage_duplicate_file_content(
4343
source=extraction_instance.source,
44+
# FIXME: We need to calculate has for zip file
4445
hash_content=None,
4546
instance=extraction_instance,
4647
response_data=resp_data.content,

apps/etl/extraction/sources/emdat/extract.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,4 @@ def handle_extraction(cls, query: str, variables: EMDATQueryVars, source: int) -
7171
@staticmethod
7272
@app.task
7373
def task(query: str, variables: EMDATQueryVars): # type: ignore[reportIncompatibleMethodOverride]
74-
return EMDATExtraction().handle_extraction(query, variables, ExtractionData.Source.EMDAT)
74+
return EMDATExtraction().handle_extraction(query, variables, ExtractionData.Source.EMDAT)

apps/etl/extraction/sources/glide/extract.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ class GlideExtraction(BaseExtraction):
2828
@staticmethod
2929
@app.task
3030
def task(url: str, variables: GlideQueryVars): # type: ignore[reportIncompatibleMethodOverride]
31-
return GlideExtraction().handle_extraction(url, variables, HEADERS, ExtractionData.Source.GLIDE)
31+
return GlideExtraction().handle_extraction(url, variables, HEADERS, ExtractionData.Source.GLIDE)

apps/etl/tasks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ def extract_usgs_data():
1919

2020
@shared_task
2121
def load_data():
22-
call_command("load_data_to_stac")
22+
call_command("load_data_to_stac")

0 commit comments

Comments
 (0)