Skip to content

Commit 280a069

Browse files
Add class based etl for Glide.
1 parent 37d6cc7 commit 280a069

File tree

6 files changed

+95
-226
lines changed

6 files changed

+95
-226
lines changed

apps/etl/etl_tasks/glide.py

+64-47
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,78 @@
1+
from datetime import datetime
2+
13
from celery import chain, shared_task
4+
from django.conf import settings
5+
6+
from apps.etl.extraction.sources.glide.extract import GlideExtraction
7+
from apps.etl.models import ExtractionData, HazardType
8+
from apps.etl.transform.sources.glide import GlideTransformHandler
29

3-
from apps.etl.extraction.sources.glide.extract import (
4-
extract_glide_historical_data,
5-
extract_glide_latest_data,
6-
)
7-
from apps.etl.models import HazardType
8-
from apps.etl.transform.sources.glide import transform_glide_event_data
10+
GLIDE_HAZARDS = [
11+
("EQ", HazardType.EARTHQUAKE),
12+
("TC", HazardType.CYCLONE),
13+
("FL", HazardType.FLOOD),
14+
("DR", HazardType.DROUGHT),
15+
("WF", HazardType.WILDFIRE),
16+
("VO", HazardType.VOLCANO),
17+
("TS", HazardType.TSUNAMI),
18+
("CW", HazardType.COLDWAVE),
19+
("EP", HazardType.EPIDEMIC),
20+
("EC", HazardType.EXTRATROPICAL_CYCLONE),
21+
("ET", HazardType.EXTREME_TEMPERATURE),
22+
("FR", HazardType.FIRE),
23+
("FF", HazardType.FLASH_FLOOD),
24+
("HT", HazardType.HEAT_WAVE),
25+
("IN", HazardType.INSECT_INFESTATION),
26+
("LS", HazardType.LANDSLIDE),
27+
("MS", HazardType.MUD_SLIDE),
28+
("ST", HazardType.SEVERE_LOCAL_STROM),
29+
("SL", HazardType.SLIDE),
30+
("AV", HazardType.SNOW_AVALANCHE),
31+
("SS", HazardType.STORM),
32+
("AC", HazardType.TECH_DISASTER),
33+
("TO", HazardType.TORNADO),
34+
("VW", HazardType.VIOLENT_WIND),
35+
("WV", HazardType.WAVE_SURGE),
36+
]
937

1038

1139
@shared_task
12-
def ext_and_transform_glide_historical_data(hazard_type: str, hazard_type_str: str, **kwargs):
13-
event_workflow = chain(
14-
extract_glide_historical_data.s(
40+
def _ext_and_transform_glide_latest_data(hazard_type, hazard_type_str):
41+
ext_object = (
42+
ExtractionData.objects.filter(
43+
source=ExtractionData.Source.GLIDE,
1544
hazard_type=hazard_type,
16-
hazard_type_str=hazard_type_str,
17-
),
18-
transform_glide_event_data.s(),
45+
status=ExtractionData.Status.SUCCESS,
46+
resp_data__isnull=False,
47+
)
48+
.order_by("-created_at")
49+
.first()
1950
)
20-
event_workflow.apply_async()
51+
if ext_object:
52+
from_date = ext_object.created_at.date()
53+
else:
54+
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
55+
56+
to_date = datetime.today().date()
57+
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
58+
59+
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
2160

2261

2362
@shared_task
24-
def ext_and_transform_data(hazard_type, hazard_type_str):
25-
event_workflow = chain(
26-
extract_glide_latest_data.s(
27-
hazard_type=hazard_type,
28-
hazard_type_str=hazard_type_str,
29-
),
30-
transform_glide_event_data.s(),
31-
)
32-
event_workflow.apply_async()
63+
def _ext_and_transform_glide_historical_data(hazard_type, hazard_type_str):
64+
to_date = datetime.today().date()
65+
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?toyear={to_date.year}&tomonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
66+
chain(GlideExtraction.task.s(url), GlideTransformHandler.task.s()).apply_async()
3367

3468

3569
@shared_task
3670
def ext_and_transform_glide_latest_data():
37-
ext_and_transform_data.delay("EQ", HazardType.EARTHQUAKE)
38-
ext_and_transform_data.delay("TC", HazardType.CYCLONE)
39-
ext_and_transform_data.delay("FL", HazardType.FLOOD)
40-
ext_and_transform_data.delay("DR", HazardType.DROUGHT)
41-
ext_and_transform_data.delay("WF", HazardType.WILDFIRE)
42-
ext_and_transform_data.delay("VO", HazardType.VOLCANO)
43-
ext_and_transform_data.delay("TS", HazardType.TSUNAMI)
44-
ext_and_transform_data.delay("CW", HazardType.COLDWAVE)
45-
ext_and_transform_data.delay("EP", HazardType.EPIDEMIC)
46-
ext_and_transform_data.delay("EC", HazardType.EXTRATROPICAL_CYCLONE)
47-
ext_and_transform_data.delay("ET", HazardType.EXTREME_TEMPERATURE)
48-
ext_and_transform_data.delay("FR", HazardType.FIRE)
49-
ext_and_transform_data.delay("FF", HazardType.FLASH_FLOOD)
50-
ext_and_transform_data.delay("HT", HazardType.HEAT_WAVE)
51-
ext_and_transform_data.delay("IN", HazardType.INSECT_INFESTATION)
52-
ext_and_transform_data.delay("LS", HazardType.LANDSLIDE)
53-
ext_and_transform_data.delay("MS", HazardType.MUD_SLIDE)
54-
ext_and_transform_data.delay("ST", HazardType.SEVERE_LOCAL_STROM)
55-
ext_and_transform_data.delay("SL", HazardType.SLIDE)
56-
ext_and_transform_data.delay("AV", HazardType.SNOW_AVALANCHE)
57-
ext_and_transform_data.delay("SS", HazardType.STORM)
58-
ext_and_transform_data.delay("AC", HazardType.TECH_DISASTER)
59-
ext_and_transform_data.delay("TO", HazardType.TORNADO)
60-
ext_and_transform_data.delay("VW", HazardType.VIOLENT_WIND)
61-
ext_and_transform_data.delay("WV", HazardType.WAVE_SURGE)
71+
for hazard_type, hazard_type_str in GLIDE_HAZARDS:
72+
_ext_and_transform_glide_latest_data(hazard_type, hazard_type_str)
73+
74+
75+
@shared_task
76+
def ext_and_transform_glide_historical_data():
77+
for hazard_type, hazard_type_str in GLIDE_HAZARDS:
78+
_ext_and_transform_glide_historical_data(hazard_type, hazard_type_str)
+9-83
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,16 @@
1-
import logging
2-
from datetime import datetime
3-
4-
import requests
5-
from celery import shared_task
6-
from django.conf import settings
7-
8-
from apps.etl.extraction.sources.base.extract import Extraction
9-
from apps.etl.extraction.sources.base.utils import store_extraction_data
1+
from apps.etl.extraction.sources.base.handler import BaseExtraction
102
from apps.etl.models import ExtractionData
3+
from main.celery import app
114

12-
logger = logging.getLogger(__name__)
13-
14-
15-
@shared_task
16-
def extract_glide_latest_data(hazard_type, hazard_type_str):
17-
ext_object = (
18-
ExtractionData.objects.filter(
19-
source=ExtractionData.Source.GLIDE,
20-
hazard_type=hazard_type,
21-
status=ExtractionData.Status.SUCCESS,
22-
resp_data__isnull=False,
23-
)
24-
.order_by("-created_at")
25-
.first()
26-
)
27-
if ext_object:
28-
from_date = ext_object.created_at.date()
29-
else:
30-
from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date()
31-
32-
to_date = datetime.today().date()
33-
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501
34-
return import_glide_hazard_data(hazard_type, hazard_type_str, url)
35-
5+
HEADERS = {"accept": "application/json"}
366

37-
@shared_task
38-
def extract_glide_historical_data(hazard_type, hazard_type_str):
39-
to_date = datetime.today().date()
40-
url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?toyear={to_date.year}&frommonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501
41-
return import_glide_hazard_data(hazard_type, hazard_type_str, url)
427

43-
44-
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
45-
def import_glide_hazard_data(self, hazard_type: str, hazard_type_str: str, url: str, **kwargs):
8+
class GlideExtraction(BaseExtraction):
469
"""
47-
Import hazard data from glide api
10+
Handles data extraction from the GLIDE API.
4811
"""
49-
logger.info(f"Importing GDACS - {hazard_type} data")
50-
51-
# Create a Extraction object in the begining
52-
instance_id = kwargs.get("instance_id", None)
53-
retry_count = kwargs.get("retry_count", None)
54-
55-
glide_instance = (
56-
ExtractionData.objects.get(id=instance_id)
57-
if instance_id
58-
else ExtractionData.objects.create(
59-
source=ExtractionData.Source.GLIDE,
60-
status=ExtractionData.Status.PENDING,
61-
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
62-
hazard_type=hazard_type_str,
63-
attempt_no=0,
64-
resp_code=0,
65-
)
66-
)
67-
68-
# Extract the data from api.
69-
glide_extraction = Extraction(url=url)
70-
response = None
71-
try:
72-
response = glide_extraction.pull_data(
73-
source=ExtractionData.Source.GLIDE,
74-
ext_object_id=glide_instance.id,
75-
retry_count=retry_count if retry_count else 1,
76-
)
77-
except requests.exceptions.RequestException as exc:
78-
self.retry(exc=exc, kwargs={"instance_id": glide_instance.id, "retry_count": self.request.retries})
79-
80-
if response:
81-
# Save the extracted data into the existing glide object
82-
glide_instance = store_extraction_data(
83-
response=response,
84-
source=ExtractionData.Source.GLIDE,
85-
validate_source_func=None,
86-
instance_id=glide_instance.id,
87-
)
8812

89-
logger.info(f"{hazard_type} data imported sucessfully")
90-
return glide_instance.id
13+
@staticmethod
14+
@app.task
15+
def task(DATA_URL):
16+
return GlideExtraction().handle_extraction(DATA_URL, None, HEADERS, ExtractionData.Source.GLIDE)
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,10 @@
1-
import logging
2-
31
from django.core.management.base import BaseCommand
42

53
from apps.etl.etl_tasks.glide import ext_and_transform_glide_historical_data
6-
from apps.etl.models import HazardType
7-
8-
logger = logging.getLogger(__name__)
94

105

116
class Command(BaseCommand):
127
help = "Import data from glide api"
138

149
def handle(self, *args, **options):
15-
ext_and_transform_glide_historical_data.delay("EQ", HazardType.EARTHQUAKE)
16-
ext_and_transform_glide_historical_data.delay("TC", HazardType.CYCLONE)
17-
ext_and_transform_glide_historical_data.delay("FL", HazardType.FLOOD)
18-
ext_and_transform_glide_historical_data.delay("DR", HazardType.DROUGHT)
19-
ext_and_transform_glide_historical_data.delay("WF", HazardType.WILDFIRE)
20-
ext_and_transform_glide_historical_data.delay("VO", HazardType.VOLCANO)
21-
ext_and_transform_glide_historical_data.delay("TS", HazardType.TSUNAMI)
22-
ext_and_transform_glide_historical_data.delay("CW", HazardType.COLDWAVE)
23-
ext_and_transform_glide_historical_data.delay("EP", HazardType.EPIDEMIC)
24-
ext_and_transform_glide_historical_data.delay("EC", HazardType.EXTRATROPICAL_CYCLONE)
25-
ext_and_transform_glide_historical_data.delay("ET", HazardType.EXTREME_TEMPERATURE)
26-
ext_and_transform_glide_historical_data.delay("FR", HazardType.FIRE)
27-
ext_and_transform_glide_historical_data.delay("FF", HazardType.FLASH_FLOOD)
28-
ext_and_transform_glide_historical_data.delay("HT", HazardType.HEAT_WAVE)
29-
ext_and_transform_glide_historical_data.delay("IN", HazardType.INSECT_INFESTATION)
30-
ext_and_transform_glide_historical_data.delay("LS", HazardType.LANDSLIDE)
31-
ext_and_transform_glide_historical_data.delay("MS", HazardType.MUD_SLIDE)
32-
ext_and_transform_glide_historical_data.delay("ST", HazardType.SEVERE_LOCAL_STROM)
33-
ext_and_transform_glide_historical_data.delay("SL", HazardType.SLIDE)
34-
ext_and_transform_glide_historical_data.delay("AV", HazardType.SNOW_AVALANCHE)
35-
ext_and_transform_glide_historical_data.delay("SS", HazardType.STORM)
36-
ext_and_transform_glide_historical_data.delay("AC", HazardType.TECH_DISASTER)
37-
ext_and_transform_glide_historical_data.delay("TO", HazardType.TORNADO)
38-
ext_and_transform_glide_historical_data.delay("VW", HazardType.VIOLENT_WIND)
39-
ext_and_transform_glide_historical_data.delay("WV", HazardType.WAVE_SURGE)
10+
ext_and_transform_glide_historical_data()

apps/etl/tasks.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
)
1212
from apps.etl.etl_tasks.gfd import ext_and_transform_gfd_latest_data # noqa: F401
1313
from apps.etl.etl_tasks.glide import ( # noqa: F401
14-
ext_and_transform_data,
14+
ext_and_transform_glide_historical_data,
1515
ext_and_transform_glide_latest_data,
1616
)
1717
from apps.etl.etl_tasks.idu import ( # noqa: F401
@@ -34,9 +34,7 @@
3434
)
3535
from apps.etl.extraction.sources.gfd.extract import GFDExtraction
3636
from apps.etl.extraction.sources.gidd.extract import GIDDExtraction
37-
from apps.etl.extraction.sources.glide.extract import ( # noqa: F401
38-
import_glide_hazard_data,
39-
)
37+
from apps.etl.extraction.sources.glide.extract import GlideExtraction
4038
from apps.etl.extraction.sources.idu.extract import IDUExtraction
4139
from apps.etl.extraction.sources.ifrc_event.extract import IFRCEventExtraction
4240
from apps.etl.models import ExtractionData, HazardType # noqa: F401
@@ -50,7 +48,7 @@
5048
)
5149
from apps.etl.transform.sources.gfd import GFDTransformHandler # noqa: F401
5250
from apps.etl.transform.sources.gidd import GIDDTransformHandler # noqa: F401
53-
from apps.etl.transform.sources.glide import transform_glide_event_data # noqa: F401
51+
from apps.etl.transform.sources.glide import GlideTransformHandler # noqa: F401
5452
from apps.etl.transform.sources.handler import BaseTransformerHandler
5553
from apps.etl.transform.sources.idu import IDUTransformHandler # noqa: F401
5654
from apps.etl.transform.sources.ifrc_event import ( # noqa: F401
@@ -62,6 +60,7 @@
6260
GIDDExtraction.handle_extraction
6361
GFDExtraction.handle_extraction
6462
IFRCEventExtraction.handle_extraction
63+
GlideExtraction.handle_extraction
6564

6665
BaseTransformerHandler.handle_transformation
6766

apps/etl/transform/sources/glide.py

+15-61
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,22 @@
1-
import logging
2-
import uuid
3-
4-
from celery import shared_task
51
from pystac_monty.sources.glide import GlideDataSource, GlideTransformer
62

7-
from apps.etl.models import ExtractionData, PyStacLoadData, Transform
8-
from apps.etl.utils import read_file_data
9-
from main.managers import BulkCreateManager
10-
11-
logger = logging.getLogger(__name__)
12-
13-
glide_item_type_map = {
14-
"glide-events": PyStacLoadData.ItemType.EVENT,
15-
"glide-hazards": PyStacLoadData.ItemType.HAZARD,
16-
}
17-
18-
19-
@shared_task
20-
def transform_glide_event_data(extraction_id):
21-
logger.info("Transformation started for glide data")
22-
glide_instance = ExtractionData.objects.get(id=extraction_id)
23-
24-
if not glide_instance.resp_data:
25-
logger.info("Transformation ended due to no data")
26-
return
27-
28-
data = read_file_data(glide_instance.resp_data)
29-
30-
transform_obj = Transform.objects.create(
31-
extraction=glide_instance,
32-
status=Transform.Status.PENDING,
33-
)
34-
35-
bulk_mgr = BulkCreateManager(chunk_size=1000)
36-
try:
37-
transformer = GlideTransformer(GlideDataSource(source_url=glide_instance.url, data=data))
38-
transformed_event_items = transformer.make_items()
3+
from apps.etl.models import ExtractionData
4+
from apps.etl.transform.sources.handler import BaseTransformerHandler
5+
from main.celery import app
396

40-
transform_obj.status = Transform.Status.SUCCESS
41-
transform_obj.save(update_fields=["status"])
42-
except Exception as e:
43-
logger.error("Glide transformation failed", exc_info=True, extra={"extraction_id": glide_instance.id})
44-
transform_obj.status = Transform.Status.FAILED
45-
transform_obj.save(update_fields=["status"])
46-
# FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
47-
raise e
487

49-
for item in transformed_event_items:
50-
item_type = glide_item_type_map[item.collection_id]
51-
transformed_item_dict = item.to_dict()
52-
transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4())
53-
bulk_mgr.add(
54-
PyStacLoadData(
55-
transform_id=transform_obj,
56-
item=transformed_item_dict,
57-
collection_id=item.collection_id,
58-
item_type=item_type,
59-
load_status=PyStacLoadData.LoadStatus.PENDING,
60-
)
61-
)
8+
class GlideTransformHandler(BaseTransformerHandler):
9+
transformer = GlideTransformer
10+
transformer_schema = GlideDataSource
6211

63-
bulk_mgr.done()
12+
@classmethod
13+
def get_schema_data(cls, extraction_obj: ExtractionData):
14+
with extraction_obj.resp_data.open() as file_data:
15+
data = file_data.read()
6416

65-
transform_obj.is_loaded = True
66-
transform_obj.save(update_fields=["is_loaded"])
17+
return cls.transformer_schema(source_url=extraction_obj.url, data=data)
6718

68-
logger.info("Transformation ended for glide data")
19+
@staticmethod
20+
@app.task
21+
def task(extraction_id):
22+
return GlideTransformHandler().handle_transformation(extraction_id)

apps/etl/transform/sources/handler.py

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
"pdc-events": PyStacLoadData.ItemType.EVENT,
2525
"pdc-hazards": PyStacLoadData.ItemType.HAZARD,
2626
"pdc-impacts": PyStacLoadData.ItemType.IMPACT,
27+
"glide-events": PyStacLoadData.ItemType.EVENT,
28+
"glide-hazards": PyStacLoadData.ItemType.HAZARD,
2729
}
2830

2931

0 commit comments

Comments
 (0)