diff --git a/apps/etl/etl_tasks/emdat.py b/apps/etl/etl_tasks/emdat.py index 2dd1eeb0..32ee5da2 100644 --- a/apps/etl/etl_tasks/emdat.py +++ b/apps/etl/etl_tasks/emdat.py @@ -1,17 +1,104 @@ +from datetime import datetime + from celery import chain, shared_task +from django.conf import settings + +from apps.etl.extraction.sources.emdat.extract import EMDATExtraction, EMDATQueryVars +from apps.etl.transform.sources.emdat import EMDATTransformHandler -from apps.etl.extraction.sources.emdat.extract import ( - extract_emdat_historical_data, - extract_emdat_latest_data, -) -from apps.etl.transform.sources.emdat import transform_emdat_data +QUERY = """ +query monty( + $limit: Int + $offset: Int + $include_hist: Boolean + $from: Int + $to: Int +) { + api_version + public_emdat( + cursor: { offset: $offset, limit: $limit } + filters: { include_hist: $include_hist, from: $from, to: $to } + ) { + total_available + info { + timestamp + filters + cursor + version + } + data { + disno + classif_key + group + subgroup + type + subtype + external_ids + name + iso + country + subregion + region + location + origin + associated_types + ofda_response + appeal + declaration + aid_contribution + magnitude + magnitude_scale + latitude + longitude + river_basin + start_year + start_month + start_day + end_year + end_month + end_day + total_deaths + no_injured + no_affected + no_homeless + total_affected + reconstr_dam + reconstr_dam_adj + insur_dam + insur_dam_adj + total_dam + total_dam_adj + cpi + admin_units + entry_date + last_update + } + } +} +""" @shared_task -def ext_and_transform_emdat_historical_data(**kwargs): - chain(extract_emdat_historical_data.s(), transform_emdat_data.s()).apply_async() +def ext_and_transform_emdat_latest_data(**kwargs): + # FIXME: Why are we getting data from settings.EMDAT_START_YEAR to get the latest data? + # Also, the filtering only filters using year so we might have lot of duplicate data + variables: EMDATQueryVars = { + "limit": -1, + "from": int(settings.EMDAT_START_YEAR), + "to": datetime.now().year, + "include_hist": None, + } + + chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async() @shared_task -def ext_and_transform_emdat_latest_data(**kwargs): - chain(extract_emdat_latest_data.s(), transform_emdat_data.s()).apply_async() +def ext_and_transform_emdat_historical_data(**kwargs): + variables: EMDATQueryVars = { + "limit": -1, + "from": None, + "to": None, + "include_hist": True, + } + + chain(EMDATExtraction.task.s(QUERY, variables), EMDATTransformHandler.task.s()).apply_async() diff --git a/apps/etl/etl_tasks/glide.py b/apps/etl/etl_tasks/glide.py index b1560c1c..222aec7a 100644 --- a/apps/etl/etl_tasks/glide.py +++ b/apps/etl/etl_tasks/glide.py @@ -1,61 +1,102 @@ +from datetime import datetime + from celery import chain, shared_task +from django.conf import settings + +from apps.etl.extraction.sources.glide.extract import GlideExtraction, GlideQueryVars +from apps.etl.models import ExtractionData, HazardType +from apps.etl.transform.sources.glide import GlideTransformHandler -from apps.etl.extraction.sources.glide.extract import ( - extract_glide_historical_data, - extract_glide_latest_data, -) -from apps.etl.models import HazardType -from apps.etl.transform.sources.glide import transform_glide_event_data +GLIDE_HAZARDS = [ + HazardType.EARTHQUAKE, + HazardType.FLOOD, + HazardType.CYCLONE, + HazardType.EPIDEMIC, + HazardType.STORM, + HazardType.DROUGHT, + HazardType.TSUNAMI, + HazardType.WILDFIRE, + HazardType.VOLCANO, + HazardType.COLDWAVE, + HazardType.EXTRATROPICAL_CYCLONE, + HazardType.EXTREME_TEMPERATURE, + HazardType.FIRE, + HazardType.FLASH_FLOOD, + HazardType.HEAT_WAVE, + HazardType.INSECT_INFESTATION, + HazardType.LANDSLIDE, + HazardType.MUD_SLIDE, + HazardType.SEVERE_LOCAL_STROM, + HazardType.SLIDE, + HazardType.SNOW_AVALANCHE, + HazardType.TECH_DISASTER, + HazardType.TORNADO, + HazardType.VIOLENT_WIND, + HazardType.WAVE_SURGE, +] @shared_task -def ext_and_transform_glide_historical_data(hazard_type: str, hazard_type_str: str, **kwargs): - event_workflow = chain( - extract_glide_historical_data.s( +def _ext_and_transform_glide_latest_data(hazard_type: HazardType): + ext_object = ( + ExtractionData.objects.filter( + source=ExtractionData.Source.GLIDE, hazard_type=hazard_type, - hazard_type_str=hazard_type_str, - ), - transform_glide_event_data.s(), + status=ExtractionData.Status.SUCCESS, + resp_data__isnull=False, + ) + .order_by("-created_at") + .first() ) - event_workflow.apply_async() + + if ext_object: + from_date = ext_object.created_at.date() + else: + from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date() + + to_date = datetime.today().date() + + # FIXME: Check if the date filters are inclusive + url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp" + variables: GlideQueryVars = { + "fromyear": from_date.year, + "frommonth": from_date.month, + "fromday": from_date.day, + "toyear": to_date.year, + "tomonth": to_date.month, + "today": to_date.day, + "events": hazard_type.value, + } + + chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async() @shared_task -def ext_and_transform_data(hazard_type, hazard_type_str): - event_workflow = chain( - extract_glide_latest_data.s( - hazard_type=hazard_type, - hazard_type_str=hazard_type_str, - ), - transform_glide_event_data.s(), - ) - event_workflow.apply_async() +def _ext_and_transform_glide_historical_data(hazard_type: HazardType): + to_date = datetime.today().date() + + # FIXME: Check if the date filters are inclusive + url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp" + variables: GlideQueryVars = { + "fromyear": None, + "frommonth": None, + "fromday": None, + "toyear": to_date.year, + "tomonth": to_date.month, + "today": to_date.day, + "events": hazard_type.value, + } + + chain(GlideExtraction.task.s(url, variables), GlideTransformHandler.task.s()).apply_async() @shared_task def ext_and_transform_glide_latest_data(): - ext_and_transform_data.delay("EQ", HazardType.EARTHQUAKE) - ext_and_transform_data.delay("TC", HazardType.CYCLONE) - ext_and_transform_data.delay("FL", HazardType.FLOOD) - ext_and_transform_data.delay("DR", HazardType.DROUGHT) - ext_and_transform_data.delay("WF", HazardType.WILDFIRE) - ext_and_transform_data.delay("VO", HazardType.VOLCANO) - ext_and_transform_data.delay("TS", HazardType.TSUNAMI) - ext_and_transform_data.delay("CW", HazardType.COLDWAVE) - ext_and_transform_data.delay("EP", HazardType.EPIDEMIC) - ext_and_transform_data.delay("EC", HazardType.EXTRATROPICAL_CYCLONE) - ext_and_transform_data.delay("ET", HazardType.EXTREME_TEMPERATURE) - ext_and_transform_data.delay("FR", HazardType.FIRE) - ext_and_transform_data.delay("FF", HazardType.FLASH_FLOOD) - ext_and_transform_data.delay("HT", HazardType.HEAT_WAVE) - ext_and_transform_data.delay("IN", HazardType.INSECT_INFESTATION) - ext_and_transform_data.delay("LS", HazardType.LANDSLIDE) - ext_and_transform_data.delay("MS", HazardType.MUD_SLIDE) - ext_and_transform_data.delay("ST", HazardType.SEVERE_LOCAL_STROM) - ext_and_transform_data.delay("SL", HazardType.SLIDE) - ext_and_transform_data.delay("AV", HazardType.SNOW_AVALANCHE) - ext_and_transform_data.delay("SS", HazardType.STORM) - ext_and_transform_data.delay("AC", HazardType.TECH_DISASTER) - ext_and_transform_data.delay("TO", HazardType.TORNADO) - ext_and_transform_data.delay("VW", HazardType.VIOLENT_WIND) - ext_and_transform_data.delay("WV", HazardType.WAVE_SURGE) + for hazard_type in GLIDE_HAZARDS: + _ext_and_transform_glide_latest_data(hazard_type) + + +@shared_task +def ext_and_transform_glide_historical_data(): + for hazard_type in GLIDE_HAZARDS: + _ext_and_transform_glide_historical_data(hazard_type) diff --git a/apps/etl/etl_tasks/usgs.py b/apps/etl/etl_tasks/usgs.py index 7b277287..2f04a163 100644 --- a/apps/etl/etl_tasks/usgs.py +++ b/apps/etl/etl_tasks/usgs.py @@ -6,7 +6,6 @@ @shared_task def ext_and_transform_usgs_latest_data(): - url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson" url = f"{settings.USGS_DATA_URL}/all_day.geojson" ext_and_transform_data.delay(url) diff --git a/apps/etl/extraction/sources/base/handler.py b/apps/etl/extraction/sources/base/handler.py index e3f87063..d5b31eaa 100644 --- a/apps/etl/extraction/sources/base/handler.py +++ b/apps/etl/extraction/sources/base/handler.py @@ -27,7 +27,7 @@ def store_extraction_data( validate_source_func: Callable[[Any], None], source: int, response: dict, - instance_id: int = None, + instance_id: int | None = None, ): """ Save extracted data into data base. Checks for duplicate conent using hashing. @@ -72,6 +72,7 @@ def _create_extraction_instance(cls, url: str, source: int) -> ExtractionData: status=ExtractionData.Status.PENDING, source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION, trace_id=str(uuid.uuid4()), + # FIXME Pass hazard type hazard_type=None, attempt_no=0, resp_code=0, @@ -79,7 +80,7 @@ def _create_extraction_instance(cls, url: str, source: int) -> ExtractionData: @classmethod def _update_instance_status( - cls, instance: ExtractionData, status: int, validation_status: int = None, update_validation: bool = False + cls, instance: ExtractionData, status: int, validation_status: int | None = None, update_validation: bool = False ) -> None: """ Update the status of the extraction instance. @@ -116,7 +117,7 @@ def _save_response_data(cls, instance: ExtractionData, response: requests.Respon return json.loads(response.content) @classmethod - def handle_extraction(cls, url: str, params: dict, headers: dict, source: int) -> dict: + def handle_extraction(cls, url: str, params: dict | None, headers: dict, source: int) -> dict: """ Process data extraction. Returns: diff --git a/apps/etl/extraction/sources/desinventar/extract.py b/apps/etl/extraction/sources/desinventar/extract.py index bfce3a1f..9b3cf0ad 100644 --- a/apps/etl/extraction/sources/desinventar/extract.py +++ b/apps/etl/extraction/sources/desinventar/extract.py @@ -26,7 +26,7 @@ def store_extraction_data( instance_id: int = None, ): """ - Save extracted data into database. Checks for duplicate content using hashing. + Save extracted data into database. """ file_name = f"{source}.zip" resp_data = response @@ -41,6 +41,7 @@ def store_extraction_data( # manage duplicate file content. manage_duplicate_file_content( source=extraction_instance.source, + # FIXME: We need to calculate has for zip file hash_content=None, instance=extraction_instance, response_data=resp_data.content, diff --git a/apps/etl/extraction/sources/emdat/extract.py b/apps/etl/extraction/sources/emdat/extract.py index cfac420a..334d929d 100644 --- a/apps/etl/extraction/sources/emdat/extract.py +++ b/apps/etl/extraction/sources/emdat/extract.py @@ -1,157 +1,74 @@ -import json import logging -import uuid -from datetime import datetime +import typing import requests -from celery import shared_task from django.conf import settings -from django.core.files.base import ContentFile -from apps.etl.models import ExtractionData, HazardType +from apps.etl.extraction.sources.base.handler import BaseExtraction +from apps.etl.models import ExtractionData +from main.celery import app from main.logging import log_extra logger = logging.getLogger(__name__) +EMDATQueryVars = typing.TypedDict( + "EMDATQueryVars", + { + "limit": int | None, + "from": int | None, + "to": int | None, + "include_hist": bool | None, + }, +) -@shared_task -def extract_emdat_latest_data(): - to_year = datetime.now().year - from_year = int(settings.EMDAT_START_YEAR) - # ref: https://files.emdat.be/docs/emdat_api_cookbook.pdfhttps://files.emdat.be/docs/emdat_api_cookbook.pdf - variables = {"limit": -1, "from": from_year, "to": to_year} - return import_hazard_data(variables) - -@shared_task -def extract_emdat_historical_data(): - variables = {"limit": -1, "include_hist": True} - return import_hazard_data(variables) - - -@shared_task -def import_hazard_data(variables, **kwargs): +class EMDATExtraction(BaseExtraction): """ - Import hazard data from glide api + Handles data extraction from the EMDAT API. """ - logger.info("Importing EMDAT data") - query = """ - query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) { - api_version - public_emdat( - cursor: { - offset: $offset, - limit: $limit - } - filters: { - include_hist: $include_hist - from: $from - to: $to - } - ) { - total_available - info { - timestamp - filters - cursor - version - } - data { - disno - classif_key - group - subgroup - type - subtype - external_ids - name - iso - country - subregion - region - location - origin - associated_types - ofda_response - appeal - declaration - aid_contribution - magnitude - magnitude_scale - latitude - longitude - river_basin - start_year - start_month - start_day - end_year - end_month - end_day - total_deaths - no_injured - no_affected - no_homeless - total_affected - reconstr_dam - reconstr_dam_adj - insur_dam - insur_dam_adj - total_dam - total_dam_adj - cpi - admin_units - entry_date - last_update - } - } - } - """ - - EMDAT_URL = f"{settings.EMDAT_URL}" - HEADERS = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY} - - # Create new extraction object for each extraction - emdat_instance = ExtractionData.objects.create( - source=ExtractionData.Source.EMDAT, - status=ExtractionData.Status.PENDING, - source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION, - hazard_type=HazardType.OTHER, - attempt_no=0, - trace_id=str(uuid.uuid4()), - resp_code=0, - ) - - try: - # Set extraction status to progress - emdat_instance.status = ExtractionData.Status.IN_PROGRESS - emdat_instance.save(update_fields=["status"]) - - paylod = {"query": query, "variables": variables} - response = requests.post(EMDAT_URL, json=paylod, headers=HEADERS) - response.raise_for_status() - # Save the extraction data - if response and response.status_code == 200: - file_name = "emdat_disaster_data.json" - emdat_instance.resp_data.save(file_name, ContentFile(response.content)) - - # Set extraction status to success - emdat_instance.status = ExtractionData.Status.SUCCESS - response_content_json = json.loads(response.content) - - # if data is empty set validation status to No Data - if not response_content_json["data"]["public_emdat"]: - emdat_instance.source_validation_status = ExtractionData.ValidationStatus.NO_DATA - - emdat_instance.save(update_fields=["status", "source_validation_status"]) - - logger.info("EMDAT data imported sucessfully") - return emdat_instance.id - - except requests.exceptions.RequestException: - # Set extraction status to Fail - emdat_instance.status = ExtractionData.Status.FAILED - emdat_instance.save(update_fields=["status"]) - logger.error("Extraction failed", exc_info=True, extra=log_extra({"source": ExtractionData.Source.EMDAT})) - # FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this. - raise + # FIXME: We need to handle GraphQL request in BaseExtraction + @classmethod + def handle_extraction(cls, query: str, variables: EMDATQueryVars, source: int) -> int: # type: ignore[reportIncompatibleMethodOverride] + """ + Process data extraction. + Returns: + int: ID of the extraction instance + """ + logger.info("Starting data extraction") + + url = f"{settings.EMDAT_URL}" + headers = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY} + + instance = cls._create_extraction_instance(url=url, source=source) + + try: + cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS) + + paylod = {"query": query, "variables": variables} + response = requests.post(url, json=paylod, headers=headers) + response.raise_for_status() + response_data = cls._save_response_data(instance, response) + + if not response_data or not response_data["data"]["public_emdat"]: + cls._update_instance_status( + instance, + ExtractionData.Status.SUCCESS, + ExtractionData.ValidationStatus.NO_DATA, + update_validation=True, + ) + logger.warning("No hazard data found in response") + else: + cls._update_instance_status(instance, ExtractionData.Status.SUCCESS) + + return instance.id + + except requests.exceptions.RequestException: + cls._update_instance_status(instance, ExtractionData.Status.FAILED) + logger.error("Extraction failed", exc_info=True, extra=log_extra({"source": ExtractionData.Source.EMDAT})) + raise + + @staticmethod + @app.task + def task(query: str, variables: EMDATQueryVars): # type: ignore[reportIncompatibleMethodOverride] + return EMDATExtraction().handle_extraction(query, variables, ExtractionData.Source.EMDAT) diff --git a/apps/etl/extraction/sources/glide/extract.py b/apps/etl/extraction/sources/glide/extract.py index 966a3177..711deb85 100644 --- a/apps/etl/extraction/sources/glide/extract.py +++ b/apps/etl/extraction/sources/glide/extract.py @@ -1,92 +1,31 @@ -import logging -import uuid -from datetime import datetime +import typing -import requests -from celery import shared_task -from django.conf import settings - -from apps.etl.extraction.sources.base.extract import Extraction -from apps.etl.extraction.sources.base.utils import store_extraction_data +from apps.etl.extraction.sources.base.handler import BaseExtraction from apps.etl.models import ExtractionData +from main.celery import app -logger = logging.getLogger(__name__) - - -@shared_task -def extract_glide_latest_data(hazard_type, hazard_type_str): - ext_object = ( - ExtractionData.objects.filter( - source=ExtractionData.Source.GLIDE, - hazard_type=hazard_type, - status=ExtractionData.Status.SUCCESS, - resp_data__isnull=False, - ) - .order_by("-created_at") - .first() - ) - if ext_object: - from_date = ext_object.created_at.date() - else: - from_date = datetime.strptime(settings.GLIDE_START_DATE, "%Y-%m-%d").date() - - to_date = datetime.today().date() - url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?fromyear={from_date.year}&frommonth={from_date.month}&fromday={from_date.day}&toyear={to_date.year}&tomonth={to_date.month}&today={to_date.day}&events={hazard_type}" # noqa: E501 - return import_glide_hazard_data(hazard_type, hazard_type_str, url) - +HEADERS = {"accept": "application/json"} -@shared_task -def extract_glide_historical_data(hazard_type, hazard_type_str): - to_date = datetime.today().date() - url = f"{settings.GLIDE_URL}/glide/jsonglideset.jsp?toyear={to_date.year}&frommonth={to_date.month}&to_date={to_date.day}&events={hazard_type}" # noqa: E501 - return import_glide_hazard_data(hazard_type, hazard_type_str, url) +GlideQueryVars = typing.TypedDict( + "GlideQueryVars", + { + "fromyear": int | None, + "frommonth": int | None, + "fromday": int | None, + "toyear": int | None, + "tomonth": int | None, + "today": int | None, + "events": str | None, + }, +) -@shared_task(bind=True, max_retries=3, default_retry_delay=60) -def import_glide_hazard_data(self, hazard_type: str, hazard_type_str: str, url: str, **kwargs): +class GlideExtraction(BaseExtraction): """ - Import hazard data from glide api + Handles data extraction from the GLIDE API. """ - logger.info(f"Importing GDACS - {hazard_type} data") - - # Create a Extraction object in the begining - instance_id = kwargs.get("instance_id", None) - retry_count = kwargs.get("retry_count", None) - - glide_instance = ( - ExtractionData.objects.get(id=instance_id) - if instance_id - else ExtractionData.objects.create( - source=ExtractionData.Source.GLIDE, - status=ExtractionData.Status.PENDING, - source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION, - hazard_type=hazard_type_str, - attempt_no=0, - resp_code=0, - trace_id=str(uuid.uuid4()), - ) - ) - - # Extract the data from api. - glide_extraction = Extraction(url=url) - response = None - try: - response = glide_extraction.pull_data( - source=ExtractionData.Source.GLIDE, - ext_object_id=glide_instance.id, - retry_count=retry_count if retry_count else 1, - ) - except requests.exceptions.RequestException as exc: - self.retry(exc=exc, kwargs={"instance_id": glide_instance.id, "retry_count": self.request.retries}) - - if response: - # Save the extracted data into the existing glide object - glide_instance = store_extraction_data( - response=response, - source=ExtractionData.Source.GLIDE, - validate_source_func=None, - instance_id=glide_instance.id, - ) - logger.info(f"{hazard_type} data imported sucessfully") - return glide_instance.id + @staticmethod + @app.task + def task(url: str, variables: GlideQueryVars): # type: ignore[reportIncompatibleMethodOverride] + return GlideExtraction().handle_extraction(url, variables, HEADERS, ExtractionData.Source.GLIDE) diff --git a/apps/etl/management/commands/extract_emdat_data.py b/apps/etl/management/commands/extract_emdat_data.py index 9c33a805..4ab11c5b 100644 --- a/apps/etl/management/commands/extract_emdat_data.py +++ b/apps/etl/management/commands/extract_emdat_data.py @@ -11,4 +11,4 @@ class Command(BaseCommand): help = "Import data from EM-DAT" def handle(self, *args, **options): - ext_and_transform_emdat_historical_data.delay() + ext_and_transform_emdat_historical_data() diff --git a/apps/etl/management/commands/extract_glide_data.py b/apps/etl/management/commands/extract_glide_data.py index c60f43a1..00622cfa 100644 --- a/apps/etl/management/commands/extract_glide_data.py +++ b/apps/etl/management/commands/extract_glide_data.py @@ -1,39 +1,10 @@ -import logging - from django.core.management.base import BaseCommand from apps.etl.etl_tasks.glide import ext_and_transform_glide_historical_data -from apps.etl.models import HazardType - -logger = logging.getLogger(__name__) class Command(BaseCommand): help = "Import data from glide api" def handle(self, *args, **options): - ext_and_transform_glide_historical_data.delay("EQ", HazardType.EARTHQUAKE) - ext_and_transform_glide_historical_data.delay("TC", HazardType.CYCLONE) - ext_and_transform_glide_historical_data.delay("FL", HazardType.FLOOD) - ext_and_transform_glide_historical_data.delay("DR", HazardType.DROUGHT) - ext_and_transform_glide_historical_data.delay("WF", HazardType.WILDFIRE) - ext_and_transform_glide_historical_data.delay("VO", HazardType.VOLCANO) - ext_and_transform_glide_historical_data.delay("TS", HazardType.TSUNAMI) - ext_and_transform_glide_historical_data.delay("CW", HazardType.COLDWAVE) - ext_and_transform_glide_historical_data.delay("EP", HazardType.EPIDEMIC) - ext_and_transform_glide_historical_data.delay("EC", HazardType.EXTRATROPICAL_CYCLONE) - ext_and_transform_glide_historical_data.delay("ET", HazardType.EXTREME_TEMPERATURE) - ext_and_transform_glide_historical_data.delay("FR", HazardType.FIRE) - ext_and_transform_glide_historical_data.delay("FF", HazardType.FLASH_FLOOD) - ext_and_transform_glide_historical_data.delay("HT", HazardType.HEAT_WAVE) - ext_and_transform_glide_historical_data.delay("IN", HazardType.INSECT_INFESTATION) - ext_and_transform_glide_historical_data.delay("LS", HazardType.LANDSLIDE) - ext_and_transform_glide_historical_data.delay("MS", HazardType.MUD_SLIDE) - ext_and_transform_glide_historical_data.delay("ST", HazardType.SEVERE_LOCAL_STROM) - ext_and_transform_glide_historical_data.delay("SL", HazardType.SLIDE) - ext_and_transform_glide_historical_data.delay("AV", HazardType.SNOW_AVALANCHE) - ext_and_transform_glide_historical_data.delay("SS", HazardType.STORM) - ext_and_transform_glide_historical_data.delay("AC", HazardType.TECH_DISASTER) - ext_and_transform_glide_historical_data.delay("TO", HazardType.TORNADO) - ext_and_transform_glide_historical_data.delay("VW", HazardType.VIOLENT_WIND) - ext_and_transform_glide_historical_data.delay("WV", HazardType.WAVE_SURGE) + ext_and_transform_glide_historical_data() diff --git a/apps/etl/transform/sources/emdat.py b/apps/etl/transform/sources/emdat.py index 9f228405..e29d92e7 100644 --- a/apps/etl/transform/sources/emdat.py +++ b/apps/etl/transform/sources/emdat.py @@ -1,101 +1,33 @@ import json import logging -import uuid -from celery import shared_task from django.conf import settings from pystac_monty.geocoding import GAULGeocoder from pystac_monty.sources.emdat import EMDATDataSource, EMDATTransformer -from apps.etl.models import ExtractionData, PyStacLoadData, Transform -from apps.etl.utils import read_file_data -from main.logging import log_extra -from main.managers import BulkCreateManager +from apps.etl.models import ExtractionData +from apps.etl.transform.sources.handler import BaseTransformerHandler +from main.celery import app logger = logging.getLogger(__name__) -collection_and_item_type_map = { - "emdat-events": PyStacLoadData.ItemType.EVENT, - "emdat-hazards": PyStacLoadData.ItemType.HAZARD, - "emdat-impacts": PyStacLoadData.ItemType.IMPACT, -} +class EMDATTransformHandler(BaseTransformerHandler): + transformer = EMDATTransformer + transformer_schema = EMDATDataSource -@shared_task -def transform_emdat_data(extraction_id, **kwargs): - """ - Transform extracted data from emdat graphql api to STAC item . - """ - ext_instance = ExtractionData.objects.filter(id=extraction_id).first() - if not ext_instance.resp_data: - logger.info("Transformation ended due to no data") - return + @classmethod + def get_schema_data(cls, extraction_obj: ExtractionData): + with extraction_obj.resp_data.open() as file_data: + data = json.loads(file_data.read()) - data = read_file_data(ext_instance.resp_data) - json_data = json.loads(data) - - transform_data( - ExtractionData.Source.EMDAT, - EMDATTransformer, - EMDATDataSource, - extraction_id, - json_data, - ) - - -@shared_task -def transform_data(source, transformer, data_source, extraction_id, data): - logger.info(f"Transformation started for {source} data") - ext_instance = ExtractionData.objects.get(id=extraction_id) - - # create transform object - transform_obj = Transform.objects.create( - extraction=ext_instance, - status=Transform.Status.PENDING, - trace_id=ext_instance.trace_id, - ) - - # initialize bulk manager to create the PyStacLoadData in bulk. - bulk_mgr = BulkCreateManager(chunk_size=1000) - - geocoder = GAULGeocoder(gpkg_path=None, service_base_url=settings.GEOCODER_URL) - - try: - # Get transformer for each source and transform it to stac item.. - transformer = transformer(data=data_source(source_url=ext_instance.url, data=data), geocoder=geocoder) - transformed_items = transformer.make_items() - - # update transformation status to success - transform_obj.status = Transform.Status.SUCCESS - transform_obj.save(update_fields=["status"]) - except Exception as e: - logger.error( - "Transformation failed", exc_info=True, extra=log_extra({"extraction_id": ext_instance.id, "source": source}) - ) - # update transformation status to success - transform_obj.status = Transform.Status.FAILED - transform_obj.save(update_fields=["status"]) - # FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this. - raise e - - # create objects into PyStacLoadData table - for item in transformed_items: - item_type = collection_and_item_type_map[item.collection_id] - transformed_item_dict = item.to_dict() - transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4()) - bulk_mgr.add( - PyStacLoadData( - transform_id=transform_obj, - item=transformed_item_dict, - collection_id=item.collection_id, - item_type=item_type, - load_status=PyStacLoadData.LoadStatus.PENDING, - ) + return cls.transformer_schema( + source_url=extraction_obj.url, + data=data, ) - bulk_mgr.done() - - transform_obj.is_loaded = True - transform_obj.save(update_fields=["is_loaded"]) - - logger.info("Transformation ended for emdat data") + @staticmethod + @app.task + def task(extraction_id): + geocoder = GAULGeocoder(gpkg_path=None, service_base_url=settings.GEOCODER_URL) + return EMDATTransformHandler().handle_transformation(extraction_id, geocoder) diff --git a/apps/etl/transform/sources/glide.py b/apps/etl/transform/sources/glide.py index cda13e60..9e981473 100644 --- a/apps/etl/transform/sources/glide.py +++ b/apps/etl/transform/sources/glide.py @@ -1,70 +1,22 @@ -import logging -import uuid - -from celery import shared_task from pystac_monty.sources.glide import GlideDataSource, GlideTransformer -from apps.etl.models import ExtractionData, PyStacLoadData, Transform -from apps.etl.utils import read_file_data -from main.logging import log_extra -from main.managers import BulkCreateManager - -logger = logging.getLogger(__name__) - -glide_item_type_map = { - "glide-events": PyStacLoadData.ItemType.EVENT, - "glide-hazards": PyStacLoadData.ItemType.HAZARD, -} - - -@shared_task -def transform_glide_event_data(extraction_id): - logger.info("Transformation started for glide data") - glide_instance = ExtractionData.objects.get(id=extraction_id) - - if not glide_instance.resp_data: - logger.info("Transformation ended due to no data") - return - - data = read_file_data(glide_instance.resp_data) - - transform_obj = Transform.objects.create( - extraction=glide_instance, - status=Transform.Status.PENDING, - trace_id=glide_instance.trace_id, - ) - - bulk_mgr = BulkCreateManager(chunk_size=1000) - try: - transformer = GlideTransformer(GlideDataSource(source_url=glide_instance.url, data=data)) - transformed_event_items = transformer.make_items() +from apps.etl.models import ExtractionData +from apps.etl.transform.sources.handler import BaseTransformerHandler +from main.celery import app - transform_obj.status = Transform.Status.SUCCESS - transform_obj.save(update_fields=["status"]) - except Exception as e: - logger.error("Glide transformation failed", exc_info=True, extra=log_extra({"extraction_id": glide_instance.id})) - transform_obj.status = Transform.Status.FAILED - transform_obj.save(update_fields=["status"]) - # FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this. - raise e - for item in transformed_event_items: - item_type = glide_item_type_map[item.collection_id] - transformed_item_dict = item.to_dict() - transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4()) - bulk_mgr.add( - PyStacLoadData( - transform_id=transform_obj, - item=transformed_item_dict, - collection_id=item.collection_id, - item_type=item_type, - load_status=PyStacLoadData.LoadStatus.PENDING, - ) - ) +class GlideTransformHandler(BaseTransformerHandler): + transformer = GlideTransformer + transformer_schema = GlideDataSource - bulk_mgr.done() + @classmethod + def get_schema_data(cls, extraction_obj: ExtractionData): + with extraction_obj.resp_data.open() as file_data: + data = file_data.read() - transform_obj.is_loaded = True - transform_obj.save(update_fields=["is_loaded"]) + return cls.transformer_schema(source_url=extraction_obj.url, data=data) - logger.info("Transformation ended for glide data") + @staticmethod + @app.task + def task(extraction_id): + return GlideTransformHandler().handle_transformation(extraction_id) diff --git a/apps/etl/transform/sources/handler.py b/apps/etl/transform/sources/handler.py index 5b042d21..1e387891 100644 --- a/apps/etl/transform/sources/handler.py +++ b/apps/etl/transform/sources/handler.py @@ -28,6 +28,11 @@ "pdc-events": PyStacLoadData.ItemType.EVENT, "pdc-hazards": PyStacLoadData.ItemType.HAZARD, "pdc-impacts": PyStacLoadData.ItemType.IMPACT, + "glide-events": PyStacLoadData.ItemType.EVENT, + "glide-hazards": PyStacLoadData.ItemType.HAZARD, + "emdat-events": PyStacLoadData.ItemType.EVENT, + "emdat-hazards": PyStacLoadData.ItemType.HAZARD, + "emdat-impacts": PyStacLoadData.ItemType.IMPACT, } @@ -101,7 +106,7 @@ def load_stac_item_to_queue(cls, transform_items, transform_obj_id): @staticmethod @app.task - def task(extraction_id): + def task(extraction_id: int): """ Not NotImplemented due to celery limitation with classmethod Eg: return XYZTransformHandler.handle_transformation(extraction_id)