Skip to content

Commit 573114e

Browse files
Add class based etl for emdat.
1 parent 39a37a5 commit 573114e

File tree

7 files changed

+176
-239
lines changed

7 files changed

+176
-239
lines changed

apps/etl/etl_tasks/emdat.py

+86-9
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,94 @@
1+
from datetime import datetime
2+
13
from celery import chain, shared_task
4+
from django.conf import settings
5+
6+
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction
7+
from apps.etl.transform.sources.emdat import EMDATTransformHandler
28

3-
from apps.etl.extraction.sources.emdat.extract import (
4-
extract_emdat_historical_data,
5-
extract_emdat_latest_data,
6-
)
7-
from apps.etl.transform.sources.emdat import transform_emdat_data
9+
QUERY = """
10+
query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) {
11+
api_version
12+
public_emdat(
13+
cursor: {
14+
offset: $offset,
15+
limit: $limit
16+
}
17+
filters: {
18+
include_hist: $include_hist
19+
from: $from
20+
to: $to
21+
}
22+
) {
23+
total_available
24+
info {
25+
timestamp
26+
filters
27+
cursor
28+
version
29+
}
30+
data {
31+
disno
32+
classif_key
33+
group
34+
subgroup
35+
type
36+
subtype
37+
external_ids
38+
name
39+
iso
40+
country
41+
subregion
42+
region
43+
location
44+
origin
45+
associated_types
46+
ofda_response
47+
appeal
48+
declaration
49+
aid_contribution
50+
magnitude
51+
magnitude_scale
52+
latitude
53+
longitude
54+
river_basin
55+
start_year
56+
start_month
57+
start_day
58+
end_year
59+
end_month
60+
end_day
61+
total_deaths
62+
no_injured
63+
no_affected
64+
no_homeless
65+
total_affected
66+
reconstr_dam
67+
reconstr_dam_adj
68+
insur_dam
69+
insur_dam_adj
70+
total_dam
71+
total_dam_adj
72+
cpi
73+
admin_units
74+
entry_date
75+
last_update
76+
}
77+
}
78+
}
79+
"""
880

981

1082
@shared_task
11-
def ext_and_transform_emdat_historical_data(**kwargs):
12-
chain(extract_emdat_historical_data.s(), transform_emdat_data.s()).apply_async()
83+
def ext_and_transform_emdat_latest_data(**kwargs):
84+
to_year = datetime.now().year
85+
from_year = int(settings.EMDAT_START_YEAR)
86+
variables = {"limit": -1, "from": from_year, "to": to_year}
87+
88+
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
1389

1490

1591
@shared_task
16-
def ext_and_transform_emdat_latest_data(**kwargs):
17-
chain(extract_emdat_latest_data.s(), transform_emdat_data.s()).apply_async()
92+
def ext_and_transform_emdat_historical_data(**kwargs):
93+
variables = {"limit": -1, "include_hist": True}
94+
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()

apps/etl/etl_tasks/glide.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,15 @@ def _ext_and_transform_glide_latest_data(hazard_type, hazard_type_str):
5454
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
5555

5656
to_date = datetime.today().date()
57-
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
57+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
5858

5959
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
6060

6161

6262
@shared_task
6363
def _ext_and_transform_glide_historical_data(hazard_type, hazard_type_str):
6464
to_date = datetime.today().date()
65-
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?toyear={to_date.year}&tomonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
65+
url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?toyear={to_date.year}&tomonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
6666
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
6767

6868

+66-143
Original file line numberDiff line numberDiff line change
@@ -1,156 +1,79 @@
1-
import json
21
import logging
3-
import uuid
4-
from datetime import datetime
2+
import typing
53

64
import requests
7-
from celery import shared_task
85
from django.conf import settings
9-
from django.core.files.base import ContentFile
106

11-
from apps.etl.models import ExtractionData, HazardType
7+
from apps.etl.extraction.sources.base.handler import BaseExtraction
8+
from apps.etl.models import ExtractionData
9+
from main.celery import app
1210

1311
logger = logging.getLogger(__name__)
1412

13+
EMDATQueryVars = typing.TypedDict(
14+
"EMDATQueryVars",
15+
{
16+
"limit": int | None,
17+
"from": int | None,
18+
"to": int | None,
19+
"include_hist": bool | None,
20+
},
21+
)
1522

16-
@shared_task
17-
def extract_emdat_latest_data():
18-
to_year = datetime.now().year
19-
from_year = int(settings.EMDAT_START_YEAR)
20-
# ref: https://files.emdat.be/docs/emdat_api_cookbook.pdfhttps://files.emdat.be/docs/emdat_api_cookbook.pdf
21-
variables = {"limit": -1, "from": from_year, "to": to_year}
22-
return import_hazard_data(variables)
2323

24-
25-
@shared_task
26-
def extract_emdat_historical_data():
27-
variables = {"limit": -1, "include_hist": True}
28-
return import_hazard_data(variables)
29-
30-
31-
@shared_task
32-
def import_hazard_data(variables, **kwargs):
24+
class EMDATExtraction(BaseExtraction):
3325
"""
34-
Import hazard data from glide api
26+
Handles data extraction from the EMDAT API.
3527
"""
36-
logger.info("Importing EMDAT data")
37-
query = """
38-
query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) {
39-
api_version
40-
public_emdat(
41-
cursor: {
42-
offset: $offset,
43-
limit: $limit
44-
}
45-
filters: {
46-
include_hist: $include_hist
47-
from: $from
48-
to: $to
49-
}
50-
) {
51-
total_available
52-
info {
53-
timestamp
54-
filters
55-
cursor
56-
version
57-
}
58-
data {
59-
disno
60-
classif_key
61-
group
62-
subgroup
63-
type
64-
subtype
65-
external_ids
66-
name
67-
iso
68-
country
69-
subregion
70-
region
71-
location
72-
origin
73-
associated_types
74-
ofda_response
75-
appeal
76-
declaration
77-
aid_contribution
78-
magnitude
79-
magnitude_scale
80-
latitude
81-
longitude
82-
river_basin
83-
start_year
84-
start_month
85-
start_day
86-
end_year
87-
end_month
88-
end_day
89-
total_deaths
90-
no_injured
91-
no_affected
92-
no_homeless
93-
total_affected
94-
reconstr_dam
95-
reconstr_dam_adj
96-
insur_dam
97-
insur_dam_adj
98-
total_dam
99-
total_dam_adj
100-
cpi
101-
admin_units
102-
entry_date
103-
last_update
104-
}
105-
}
106-
}
107-
"""
108-
109-
EMDAT_URL = f"{settings.EMDAT_URL}"
110-
HEADERS = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY}
111-
112-
# Create new extraction object for each extraction
113-
emdat_instance = ExtractionData.objects.create(
114-
source=ExtractionData.Source.EMDAT,
115-
status=ExtractionData.Status.PENDING,
116-
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
117-
hazard_type=HazardType.OTHER,
118-
attempt_no=0,
119-
trace_id=str(uuid.uuid4()),
120-
resp_code=0,
121-
)
122-
123-
try:
124-
# Set extraction status to progress
125-
emdat_instance.status = ExtractionData.Status.IN_PROGRESS
126-
emdat_instance.save(update_fields=["status"])
127-
128-
paylod = {"query": query, "variables": variables}
129-
response = requests.post(EMDAT_URL, json=paylod, headers=HEADERS)
130-
response.raise_for_status()
13128

132-
# Save the extraction data
133-
if response and response.status_code == 200:
134-
file_name = "emdat_disaster_data.json"
135-
emdat_instance.resp_data.save(file_name, ContentFile(response.content))
136-
137-
# Set extraction status to success
138-
emdat_instance.status = ExtractionData.Status.SUCCESS
139-
response_content_json = json.loads(response.content)
140-
141-
# if data is empty set validation status to No Data
142-
if not response_content_json["data"]["public_emdat"]:
143-
emdat_instance.source_validation_status = ExtractionData.ValidationStatus.NO_DATA
144-
145-
emdat_instance.save(update_fields=["status", "source_validation_status"])
146-
147-
logger.info("EMDAT data imported sucessfully")
148-
return emdat_instance.id
149-
150-
except requests.exceptions.RequestException:
151-
# Set extraction status to Fail
152-
emdat_instance.status = ExtractionData.Status.FAILED
153-
emdat_instance.save(update_fields=["status"])
154-
logger.error("Extraction failed", exc_info=True, extra={"source": ExtractionData.Source.EMDAT})
155-
# FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
156-
raise
29+
# FIXME: We need to handle GraphQL request in BaseExtraction
30+
@classmethod
31+
def handle_extraction(cls, query: str, variables: EMDATQueryVars, source: int) -> int: # type: ignore[reportIncompatibleMethodOverride]
32+
"""
33+
Process data extraction.
34+
Returns:
35+
int: ID of the extraction instance
36+
"""
37+
logger.info("Starting data extraction")
38+
39+
url = f"{settings.EMDAT_URL}"
40+
headers = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY}
41+
42+
instance = cls._create_extraction_instance(url=url, source=source)
43+
44+
try:
45+
cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
46+
47+
paylod = {"query": query, "variables": variables}
48+
response = requests.post(url, json=paylod, headers=headers)
49+
response.raise_for_status()
50+
response_data = cls._save_response_data(instance, response)
51+
52+
if not response_data or not response_data["data"]["public_emdat"]:
53+
cls._update_instance_status(
54+
instance,
55+
ExtractionData.Status.SUCCESS,
56+
ExtractionData.ValidationStatus.NO_DATA,
57+
update_validation=True,
58+
)
59+
logger.warning("No hazard data found in response")
60+
else:
61+
cls._update_instance_status(instance, ExtractionData.Status.SUCCESS)
62+
63+
return instance.id
64+
65+
except requests.exceptions.RequestException:
66+
cls._update_instance_status(instance, ExtractionData.Status.FAILED)
67+
logger.error(
68+
"extraction failed",
69+
exc_info=True,
70+
extra={
71+
"source": instance.source,
72+
},
73+
)
74+
raise
75+
76+
@staticmethod
77+
@app.task
78+
def task(query: str, variables: EMDATQueryVars): # type: ignore[reportIncompatibleMethodOverride]
79+
return EMDATExtraction().handle_extraction(query, variables, ExtractionData.Source.EMDAT)

apps/etl/management/commands/extract_emdat_data.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ class Command(BaseCommand):
1111
help = "Import data from EM-DAT"
1212

1313
def handle(self, *args, **options):
14-
ext_and_transform_emdat_historical_data.delay()
14+
ext_and_transform_emdat_historical_data()

0 commit comments

Comments
 (0)