Skip to content

Commit 37d6cc7

Browse files
Fix pdc file open
- Add start date from env variable for gdacs, glide, emdat, ifrc - Load pystac item with status equals to pending. - Set conrtab for idu historical data. - Fix glide url - Send data into pdc transformer in a temp file.
1 parent 46f7a4d commit 37d6cc7

File tree

15 files changed

+104
-52
lines changed

15 files changed

+104
-52
lines changed

apps/etl/etl_tasks/emdat.py

+3-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from celery import shared_task
1+
from celery import chain, shared_task
22

33
from apps.etl.extraction.sources.emdat.extract import (
44
extract_emdat_historical_data,
@@ -9,17 +9,9 @@
99

1010
@shared_task
1111
def ext_and_transform_emdat_historical_data(**kwargs):
12-
# Extract the data from emdat
13-
extraction_id = extract_emdat_historical_data()
14-
15-
# Transform the data from emdat
16-
transform_emdat_data(extraction_id)
12+
chain(extract_emdat_historical_data.s(), transform_emdat_data.s()).apply_async()
1713

1814

1915
@shared_task
2016
def ext_and_transform_emdat_latest_data(**kwargs):
21-
# Extract the data from emdat
22-
extraction_id = extract_emdat_latest_data()
23-
24-
# Transform the data from emdat
25-
transform_emdat_data(extraction_id)
17+
chain(extract_emdat_latest_data.s(), transform_emdat_data.s()).apply_async()

apps/etl/etl_tasks/gdacs.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import requests
66
from celery import chain, shared_task
7+
from django.conf import settings
78

89
from apps.etl.extraction.sources.base.extract import Extraction
910
from apps.etl.extraction.sources.base.utils import store_extraction_data
@@ -40,7 +41,7 @@ def _ext_and_transform_data(hazard_type: str, hazard_type_str: str):
4041
from_date = ext_object.created_at.date()
4142
else:
4243
# Fetch data up to one week at the begining.
43-
from_date = datetime.today() - timedelta(days=7)
44+
from_date = datetime.strptime(settings.GDACS_START_DATE, "%Y-%m-%d").date()
4445

4546
to_date = datetime.now().date()
4647
ext_and_transform_gdacs_data.delay(hazard_type, hazard_type_str, from_date, to_date)

apps/etl/etl_tasks/idu.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
logger = logging.getLogger(__name__)
1010

11-
HISTORICAL_DATA_URL = f"{settings.IDMC_DATA_URL}/external-api/idus/idus_all_retrieve"
11+
HISTORICAL_DATA_URL = f"{settings.IDMC_DATA_URL}/external-api/idus/all/"
1212
LATEST_DATA_URL = f"{settings.IDMC_DATA_URL}/external-api/idus/last-180-days/"
1313

1414

apps/etl/etl_tasks/ifrc_event.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from datetime import datetime, timedelta
1+
from datetime import datetime
22

33
from celery import chain, shared_task
44
from django.conf import settings
@@ -13,9 +13,6 @@
1313

1414
@shared_task
1515
def ext_and_transform_ifrcevent_latest_data():
16-
END_DATE = datetime.now().date()
17-
START_DATE = END_DATE - timedelta(days=7)
18-
1916
ext_object = (
2017
ExtractionData.objects.filter(
2118
source=ExtractionData.Source.DREF, status=ExtractionData.Status.SUCCESS, resp_data__isnull=False
@@ -26,6 +23,8 @@ def ext_and_transform_ifrcevent_latest_data():
2623

2724
if ext_object:
2825
START_DATE = ext_object.created_at.date()
26+
else:
27+
START_DATE = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
2928

3029
LATEST_DATA_PARAMS = {
3130
"disaster_start_date__gte": START_DATE,

apps/etl/etl_tasks/usgs.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
@shared_task
77
def ext_and_transform_usgs_latest_data():
8-
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.geojson"
8+
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson"
99
ext_and_transform_data.delay(url)
1010

1111

apps/etl/extraction/sources/base/utils.py

-14
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import hashlib
22
import json
3-
import os
43

5-
from django.conf import settings
64
from django.core.files.base import ContentFile
75

86
from apps.etl.models import ExtractionData
@@ -107,15 +105,3 @@ def store_pdc_exposure_data(
107105
instance.resp_data.save(content_file.name, content_file)
108106

109107
return instance
110-
111-
112-
def store_geojson_file(response, source=None, validate_source_func=None, instance_id=None, hazard_type=None, metadata=None):
113-
file_extension = "geojson"
114-
file_name = f"{instance_id}pdc.{file_extension}"
115-
instance = ExtractionData.objects.get(id=instance_id.id)
116-
file_path = os.path.join(settings.MEDIA_ROOT, "source_raw_data", file_name)
117-
with open(file_path, "w") as f:
118-
json.dump(response, f)
119-
instance.metadata["geojson_file_path"] = file_path
120-
instance.save()
121-
return instance.id

apps/etl/extraction/sources/desinventar/extract.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def handle_extraction(cls, url: str, params: dict, headers: dict, source: int) -
6161
try:
6262
cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
6363

64-
response = requests.get(url, params=params, headers=headers, timeout=30)
64+
response = requests.get(url, params=params, headers=headers, timeout=180)
6565
response.raise_for_status()
6666
instance.resp_code = response.status_code
6767

apps/etl/extraction/sources/emdat/extract.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
@shared_task
1616
def extract_emdat_latest_data():
1717
to_year = datetime.now().year
18-
from_year = int(to_year) - 1
18+
from_year = int(settings.EMDAT_START_YEAR)
1919
# ref: https://files.emdat.be/docs/emdat_api_cookbook.pdfhttps://files.emdat.be/docs/emdat_api_cookbook.pdf
2020
variables = {"limit": -1, "from": from_year, "to": to_year}
2121
return import_hazard_data(variables)

apps/etl/extraction/sources/glide/extract.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import logging
2-
from datetime import datetime, timedelta
2+
from datetime import datetime
33

44
import requests
55
from celery import shared_task
6+
from django.conf import settings
67

78
from apps.etl.extraction.sources.base.extract import Extraction
89
from apps.etl.extraction.sources.base.utils import store_extraction_data
@@ -26,11 +27,10 @@ def extract_glide_latest_data(hazard_type, hazard_type_str):
2627
if ext_object:
2728
from_date = ext_object.created_at.date()
2829
else:
29-
# Fetch data up to one week at the begining.
30-
from_date = datetime.today() - timedelta(days=7)
30+
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
3131

3232
to_date = datetime.today().date()
33-
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&frommonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
33+
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
3434
return import_glide_hazard_data(hazard_type, hazard_type_str, url)
3535

3636

apps/etl/extraction/sources/pdc/extract.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from apps.etl.extraction.sources.base.extract import Extraction
99
from apps.etl.extraction.sources.base.utils import (
1010
store_extraction_data,
11-
store_geojson_file,
1211
store_pdc_exposure_data,
1312
)
1413
from apps.etl.models import ExtractionData, HazardType
@@ -44,7 +43,16 @@ def get_hazard_details(self, extraction_id, **kwargs):
4443
for hazard in response_data:
4544
try:
4645
geo_json_file = geo.get_polygon(hazard["uuid"])
47-
store_geojson_file(geo_json_file, instance_id=instance_id)
46+
47+
geo_json_data = store_pdc_exposure_data(
48+
response=geo_json_file,
49+
source=ExtractionData.Source.PDC,
50+
validate_source_func=None,
51+
parent_id=instance_id.id,
52+
hazard_type=HAZARD_TYPE_MAP.get(hazard["type_ID"]),
53+
metadata={},
54+
)
55+
4856
if hazard["type_ID"] not in HAZARD_TYPE_MAP.keys():
4957
continue
5058
r = requests.get(
@@ -72,7 +80,7 @@ def get_hazard_details(self, extraction_id, **kwargs):
7280
hazard_type=HAZARD_TYPE_MAP.get(hazard["type_ID"]),
7381
metadata={"exposure_id": exposure_id, "uuid": hazard["uuid"]},
7482
)
75-
PDCTransformHandler.task(exposure_detail.id)
83+
PDCTransformHandler.task(exposure_detail.id, geo_json_data.id)
7684
except Exception as exc:
7785
self.retry(exc=exc, kwargs={"instance_id": instance_id.id, "retry_count": self.request.retries})
7886

@@ -105,7 +113,7 @@ def import_hazard_data(self, **kwargs):
105113
# Extract the data from api.
106114
pdc_extraction = Extraction(
107115
url=pdc_url,
108-
headers={"Authorization": "Bearer {}".format(settings.PDC_AUTHORIZATION_KEY)},
116+
headers={"Authorization": "Bearer {}".format(settings.PDC_AUTHORIZATION_KEY)}, # NOTE: Does this key expire??
109117
)
110118
response = None
111119
try:

apps/etl/load/sources/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def load_data(django_command: BaseCommand | None = None):
2929
"""Load data into STAC"""
3030
logger.info("Loading data into Stac")
3131

32-
transformed_items = PyStacLoadData.objects.exclude(load_status=PyStacLoadData.LoadStatus.SUCCESS)
32+
transformed_items = PyStacLoadData.objects.filter(load_status=PyStacLoadData.LoadStatus.PENDING)
3333

3434
bulk_mgr = BulkUpdateManager(["load_status"], chunk_size=1000)
3535
for item in transformed_items.iterator():

apps/etl/transform/sources/pdc.py

+61-7
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,84 @@
11
import json
2+
import logging
3+
import tempfile
24

35
from pystac_monty.sources.pdc import PDCDataSource, PDCTransformer
46

5-
from apps.etl.models import ExtractionData
7+
from apps.etl.models import ExtractionData, Transform
68
from main.celery import app
79

810
from .handler import BaseTransformerHandler
911

12+
logger = logging.getLogger(__name__)
13+
1014

1115
class PDCTransformHandler(BaseTransformerHandler):
1216
transformer = PDCTransformer
1317
transformer_schema = PDCDataSource
1418

1519
@classmethod
16-
def get_schema_data(cls, extraction_obj: ExtractionData):
20+
def get_schema_data(cls, extraction_obj: ExtractionData, geo_json_obj: ExtractionData):
1721
source_url = extraction_obj.url
22+
23+
with extraction_obj.parent.resp_data.open("rb") as f:
24+
file_content = f.read()
25+
tmp_hazard_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
26+
tmp_hazard_file.write(file_content)
27+
28+
with extraction_obj.resp_data.open("rb") as f:
29+
file_content = f.read()
30+
tmp_exposure_detail_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
31+
tmp_exposure_detail_file.write(file_content)
32+
33+
with geo_json_obj.resp_data.open("rb") as f:
34+
file_content = f.read()
35+
tmp_geojson_file = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
36+
tmp_geojson_file.write(file_content)
37+
1838
data = {
19-
"hazards_file_path": extraction_obj.parent.resp_data.path,
39+
"hazards_file_path": tmp_hazard_file.name,
2040
"exposure_timestamp": extraction_obj.metadata["exposure_id"],
2141
"uuid": extraction_obj.metadata["uuid"],
22-
"exposure_detail_file_path": extraction_obj.resp_data.path,
23-
"geojson_file_path": extraction_obj.parent.metadata["geojson_file_path"] or None,
42+
"exposure_detail_file_path": tmp_exposure_detail_file.name,
43+
"geojson_file_path": tmp_geojson_file.name,
2444
}
45+
2546
return cls.transformer_schema(source_url=source_url, data=json.dumps(data))
2647

48+
@classmethod
49+
def handle_transformation(cls, extraction_id, geo_json_id):
50+
logger.info("Transformation started")
51+
extraction_obj = ExtractionData.objects.filter(id=extraction_id).first()
52+
geo_json_obj = ExtractionData.objects.filter(id=geo_json_id).first()
53+
if not extraction_obj.resp_data:
54+
logger.info("Transformation ended due to no data")
55+
return
56+
57+
transform_obj = Transform.objects.create(
58+
extraction=extraction_obj,
59+
status=Transform.Status.PENDING,
60+
)
61+
62+
try:
63+
schema = cls.get_schema_data(extraction_obj, geo_json_obj)
64+
transformer = cls.transformer(schema)
65+
transformed_items = transformer.make_items()
66+
67+
transform_obj.status = Transform.Status.SUCCESS
68+
transform_obj.save(update_fields=["status"])
69+
70+
cls.load_stac_item_to_queue(transformed_items, transform_obj.id)
71+
72+
logger.info("Transformation ended")
73+
74+
except Exception as e:
75+
logger.error("Transformation failed", exc_info=True, extra={"extraction_id": extraction_obj.id})
76+
transform_obj.status = Transform.Status.FAILED
77+
transform_obj.save(update_fields=["status"])
78+
# FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
79+
raise e
80+
2781
@staticmethod
2882
@app.task
29-
def task(extraction_id):
30-
return PDCTransformHandler().handle_transformation(extraction_id)
83+
def task(extraction_id, geo_json_id):
84+
return PDCTransformHandler().handle_transformation(extraction_id, geo_json_id)

helm/values.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ worker:
118118
# NOTE: Make sure keys are lowercase
119119
default:
120120
enabled: true
121-
replicaCount: 2
121+
replicaCount: 10
122122
celeryArgs:
123123
- "-Q"
124124
- "celery"

main/settings.py

+12
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@
7171
EMDAT_AUTHORIZATION_KEY=str,
7272
IDMC_CLIENT_ID=str,
7373
IDMC_DATA_URL=(str, "https://helix-tools-api.idmcdb.org"),
74+
# Default start date for latest data extraction
75+
GLIDE_START_DATE=(str, "2025-01-01"),
76+
IFRCEVENT_START_DATE=(str, "2025-01-01"),
77+
GDACS_START_DATE=(str, "2025-01-01"),
78+
EMDAT_START_YEAR=(str, "2024"),
7479
# ETL Load configs
7580
EOAPI_DOMAIN=str, # http://montandon-eoapi.ifrc.org
7681
GFD_CREDENTIAL=str,
@@ -85,6 +90,13 @@
8590
ARC_USERNAME=str,
8691
ARC_PASSWORD=str,
8792
)
93+
GLIDE_START_DATE = env("GLIDE_START_DATE")
94+
95+
GDACS_START_DATE = env("GDACS_START_DATE")
96+
97+
IFRCEVENT_START_DATE = env("IFRCEVENT_START_DATE")
98+
99+
EMDAT_START_YEAR = env("EMDAT_START_YEAR")
88100

89101
DESINVENTAR_DATA_URL = env("DESINVENTAR_DATA_URL")
90102

0 commit comments

Comments
 (0)