Skip to content

Commit 15f9e69

Browse files
Add class based etl for emdat.
1 parent a556fdd commit 15f9e69

File tree

6 files changed

+162
-235
lines changed

6 files changed

+162
-235
lines changed

apps/etl/etl_tasks/emdat.py

+86-9
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,94 @@
1+
from datetime import datetime
2+
13
from celery import chain, shared_task
4+
from django.conf import settings
5+
6+
from apps.etl.extraction.sources.emdat.extract import EMDATExtraction
7+
from apps.etl.transform.sources.emdat import EMDATTransformHandler
28

3-
from apps.etl.extraction.sources.emdat.extract import (
4-
extract_emdat_historical_data,
5-
extract_emdat_latest_data,
6-
)
7-
from apps.etl.transform.sources.emdat import transform_emdat_data
9+
QUERY = """
10+
query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) {
11+
api_version
12+
public_emdat(
13+
cursor: {
14+
offset: $offset,
15+
limit: $limit
16+
}
17+
filters: {
18+
include_hist: $include_hist
19+
from: $from
20+
to: $to
21+
}
22+
) {
23+
total_available
24+
info {
25+
timestamp
26+
filters
27+
cursor
28+
version
29+
}
30+
data {
31+
disno
32+
classif_key
33+
group
34+
subgroup
35+
type
36+
subtype
37+
external_ids
38+
name
39+
iso
40+
country
41+
subregion
42+
region
43+
location
44+
origin
45+
associated_types
46+
ofda_response
47+
appeal
48+
declaration
49+
aid_contribution
50+
magnitude
51+
magnitude_scale
52+
latitude
53+
longitude
54+
river_basin
55+
start_year
56+
start_month
57+
start_day
58+
end_year
59+
end_month
60+
end_day
61+
total_deaths
62+
no_injured
63+
no_affected
64+
no_homeless
65+
total_affected
66+
reconstr_dam
67+
reconstr_dam_adj
68+
insur_dam
69+
insur_dam_adj
70+
total_dam
71+
total_dam_adj
72+
cpi
73+
admin_units
74+
entry_date
75+
last_update
76+
}
77+
}
78+
}
79+
"""
880

981

1082
@shared_task
11-
def ext_and_transform_emdat_historical_data(**kwargs):
12-
chain(extract_emdat_historical_data.s(), transform_emdat_data.s()).apply_async()
83+
def ext_and_transform_emdat_latest_data(**kwargs):
84+
to_year = datetime.now().year
85+
from_year = int(settings.EMDAT_START_YEAR)
86+
variables = {"limit": -1, "from": from_year, "to": to_year}
87+
88+
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
1389

1490

1591
@shared_task
16-
def ext_and_transform_emdat_latest_data(**kwargs):
17-
chain(extract_emdat_latest_data.s(), transform_emdat_data.s()).apply_async()
92+
def ext_and_transform_emdat_historical_data(**kwargs):
93+
variables = {"limit": -1, "include_hist": True}
94+
chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async()
+54-142
Original file line numberDiff line numberDiff line change
@@ -1,154 +1,66 @@
1-
import json
21
import logging
3-
from datetime import datetime
42

53
import requests
6-
from celery import shared_task
74
from django.conf import settings
8-
from django.core.files.base import ContentFile
95

10-
from apps.etl.models import ExtractionData, HazardType
6+
from apps.etl.extraction.sources.base.handler import BaseExtraction
7+
from apps.etl.models import ExtractionData
8+
from main.celery import app
119

1210
logger = logging.getLogger(__name__)
1311

1412

15-
@shared_task
16-
def extract_emdat_latest_data():
17-
to_year = datetime.now().year
18-
from_year = int(settings.EMDAT_START_YEAR)
19-
# ref: https://files.emdat.be/docs/emdat_api_cookbook.pdfhttps://files.emdat.be/docs/emdat_api_cookbook.pdf
20-
variables = {"limit": -1, "from": from_year, "to": to_year}
21-
return import_hazard_data(variables)
22-
23-
24-
@shared_task
25-
def extract_emdat_historical_data():
26-
variables = {"limit": -1, "include_hist": True}
27-
return import_hazard_data(variables)
28-
29-
30-
@shared_task
31-
def import_hazard_data(variables, **kwargs):
13+
class EMDATExtraction(BaseExtraction):
3214
"""
33-
Import hazard data from glide api
15+
Handles data extraction from the EMDAT API.
3416
"""
35-
logger.info("Importing EMDAT data")
36-
query = """
37-
query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) {
38-
api_version
39-
public_emdat(
40-
cursor: {
41-
offset: $offset,
42-
limit: $limit
43-
}
44-
filters: {
45-
include_hist: $include_hist
46-
from: $from
47-
to: $to
48-
}
49-
) {
50-
total_available
51-
info {
52-
timestamp
53-
filters
54-
cursor
55-
version
56-
}
57-
data {
58-
disno
59-
classif_key
60-
group
61-
subgroup
62-
type
63-
subtype
64-
external_ids
65-
name
66-
iso
67-
country
68-
subregion
69-
region
70-
location
71-
origin
72-
associated_types
73-
ofda_response
74-
appeal
75-
declaration
76-
aid_contribution
77-
magnitude
78-
magnitude_scale
79-
latitude
80-
longitude
81-
river_basin
82-
start_year
83-
start_month
84-
start_day
85-
end_year
86-
end_month
87-
end_day
88-
total_deaths
89-
no_injured
90-
no_affected
91-
no_homeless
92-
total_affected
93-
reconstr_dam
94-
reconstr_dam_adj
95-
insur_dam
96-
insur_dam_adj
97-
total_dam
98-
total_dam_adj
99-
cpi
100-
admin_units
101-
entry_date
102-
last_update
103-
}
104-
}
105-
}
106-
"""
107-
108-
EMDAT_URL = f"{settings.EMDAT_URL}"
109-
HEADERS = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY}
110-
111-
# Create new extraction object for each extraction
112-
emdat_instance = ExtractionData.objects.create(
113-
source=ExtractionData.Source.EMDAT,
114-
status=ExtractionData.Status.PENDING,
115-
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
116-
hazard_type=HazardType.OTHER,
117-
attempt_no=0,
118-
resp_code=0,
119-
)
120-
121-
try:
122-
# Set extraction status to progress
123-
emdat_instance.status = ExtractionData.Status.IN_PROGRESS
124-
emdat_instance.save(update_fields=["status"])
12517

126-
paylod = {"query": query, "variables": variables}
127-
response = requests.post(EMDAT_URL, json=paylod, headers=HEADERS)
128-
response.raise_for_status()
129-
130-
# Save the extraction data
131-
if response and response.status_code == 200:
132-
file_name = "emdat_disaster_data.json"
133-
emdat_instance.resp_data.save(file_name, ContentFile(response.content))
134-
135-
# Set extraction status to success
136-
emdat_instance.status = ExtractionData.Status.SUCCESS
137-
response_content_json = json.loads(response.content)
138-
139-
# if data is empty set validation status to No Data
140-
if not response_content_json["data"]["public_emdat"]:
141-
emdat_instance.source_validation_status = ExtractionData.ValidationStatus.NO_DATA
142-
143-
emdat_instance.save(update_fields=["status", "source_validation_status"])
144-
145-
logger.info("EMDAT data imported sucessfully")
146-
return emdat_instance.id
147-
148-
except requests.exceptions.RequestException:
149-
# Set extraction status to Fail
150-
emdat_instance.status = ExtractionData.Status.FAILED
151-
emdat_instance.save(update_fields=["status"])
152-
logger.error("Extraction failed", exc_info=True, extra={"source": ExtractionData.Source.EMDAT})
153-
# FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
154-
raise
18+
@classmethod
19+
def handle_extraction(cls, query: str, variables: dict, source: int) -> int:
20+
"""
21+
Process data extraction.
22+
Returns:
23+
int: ID of the extraction instance
24+
"""
25+
logger.info("Starting data extraction")
26+
27+
url = f"{settings.EMDAT_URL}"
28+
headers = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY}
29+
30+
instance = cls._create_extraction_instance(url=url, source=source)
31+
32+
try:
33+
cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
34+
paylod = {"query": query, "variables": variables}
35+
response = requests.post(url, json=paylod, headers=headers)
36+
response.raise_for_status()
37+
response_data = cls._save_response_data(instance, response)
38+
# Check if response contains data
39+
if response_data:
40+
cls._update_instance_status(instance, ExtractionData.Status.SUCCESS)
41+
if not response_data["data"]["public_emdat"]:
42+
cls._update_instance_status(
43+
instance,
44+
ExtractionData.Status.SUCCESS,
45+
ExtractionData.ValidationStatus.NO_DATA,
46+
update_validation=True,
47+
)
48+
logger.warning("No hazard data found in response")
49+
50+
return instance.id
51+
52+
except requests.exceptions.RequestException:
53+
cls._update_instance_status(instance, ExtractionData.Status.FAILED)
54+
logger.error(
55+
"extraction failed",
56+
exc_info=True,
57+
extra={
58+
"source": instance.source,
59+
},
60+
)
61+
raise
62+
63+
@staticmethod
64+
@app.task
65+
def task(query, variables):
66+
return EMDATExtraction().handle_extraction(query, variables, ExtractionData.Source.EMDAT)

apps/etl/extraction/sources/glide/extract.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ class GlideExtraction(BaseExtraction):
1313
@staticmethod
1414
@app.task
1515
def task(DATA_URL):
16-
return GlideExtraction().handle_extraction(DATA_URL, None, HEADERS, ExtractionData.Source.GLIDE)
16+
return GlideExtraction().handle_extraction(DATA_URL, None, HEADERS, ExtractionData.Source.GLIDE)

0 commit comments

Comments
 (0)