|
1 |
| -import json |
2 | 1 | import logging
|
3 |
| -from datetime import datetime |
4 | 2 |
|
5 | 3 | import requests
|
6 |
| -from celery import shared_task |
7 | 4 | from django.conf import settings
|
8 |
| -from django.core.files.base import ContentFile |
9 | 5 |
|
10 |
| -from apps.etl.models import ExtractionData, HazardType |
| 6 | +from apps.etl.extraction.sources.base.handler import BaseExtraction |
| 7 | +from apps.etl.models import ExtractionData |
| 8 | +from main.celery import app |
11 | 9 |
|
12 | 10 | logger = logging.getLogger(__name__)
|
13 | 11 |
|
14 | 12 |
|
15 |
| -@shared_task |
16 |
| -def extract_emdat_latest_data(): |
17 |
| - to_year = datetime.now().year |
18 |
| - from_year = int(settings.EMDAT_START_YEAR) |
19 |
| - # ref: https://files.emdat.be/docs/emdat_api_cookbook.pdfhttps://files.emdat.be/docs/emdat_api_cookbook.pdf |
20 |
| - variables = {"limit": -1, "from": from_year, "to": to_year} |
21 |
| - return import_hazard_data(variables) |
22 |
| - |
23 |
| - |
24 |
| -@shared_task |
25 |
| -def extract_emdat_historical_data(): |
26 |
| - variables = {"limit": -1, "include_hist": True} |
27 |
| - return import_hazard_data(variables) |
28 |
| - |
29 |
| - |
30 |
| -@shared_task |
31 |
| -def import_hazard_data(variables, **kwargs): |
| 13 | +class EMDATExtraction(BaseExtraction): |
32 | 14 | """
|
33 |
| - Import hazard data from glide api |
| 15 | + Handles data extraction from the EMDAT API. |
34 | 16 | """
|
35 |
| - logger.info("Importing EMDAT data") |
36 |
| - query = """ |
37 |
| - query monty ($limit: Int, $offset: Int, $include_hist: Boolean, $from: Int, $to: Int) { |
38 |
| - api_version |
39 |
| - public_emdat( |
40 |
| - cursor: { |
41 |
| - offset: $offset, |
42 |
| - limit: $limit |
43 |
| - } |
44 |
| - filters: { |
45 |
| - include_hist: $include_hist |
46 |
| - from: $from |
47 |
| - to: $to |
48 |
| - } |
49 |
| - ) { |
50 |
| - total_available |
51 |
| - info { |
52 |
| - timestamp |
53 |
| - filters |
54 |
| - cursor |
55 |
| - version |
56 |
| - } |
57 |
| - data { |
58 |
| - disno |
59 |
| - classif_key |
60 |
| - group |
61 |
| - subgroup |
62 |
| - type |
63 |
| - subtype |
64 |
| - external_ids |
65 |
| - name |
66 |
| - iso |
67 |
| - country |
68 |
| - subregion |
69 |
| - region |
70 |
| - location |
71 |
| - origin |
72 |
| - associated_types |
73 |
| - ofda_response |
74 |
| - appeal |
75 |
| - declaration |
76 |
| - aid_contribution |
77 |
| - magnitude |
78 |
| - magnitude_scale |
79 |
| - latitude |
80 |
| - longitude |
81 |
| - river_basin |
82 |
| - start_year |
83 |
| - start_month |
84 |
| - start_day |
85 |
| - end_year |
86 |
| - end_month |
87 |
| - end_day |
88 |
| - total_deaths |
89 |
| - no_injured |
90 |
| - no_affected |
91 |
| - no_homeless |
92 |
| - total_affected |
93 |
| - reconstr_dam |
94 |
| - reconstr_dam_adj |
95 |
| - insur_dam |
96 |
| - insur_dam_adj |
97 |
| - total_dam |
98 |
| - total_dam_adj |
99 |
| - cpi |
100 |
| - admin_units |
101 |
| - entry_date |
102 |
| - last_update |
103 |
| - } |
104 |
| - } |
105 |
| - } |
106 |
| - """ |
107 |
| - |
108 |
| - EMDAT_URL = f"{settings.EMDAT_URL}" |
109 |
| - HEADERS = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY} |
110 |
| - |
111 |
| - # Create new extraction object for each extraction |
112 |
| - emdat_instance = ExtractionData.objects.create( |
113 |
| - source=ExtractionData.Source.EMDAT, |
114 |
| - status=ExtractionData.Status.PENDING, |
115 |
| - source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION, |
116 |
| - hazard_type=HazardType.OTHER, |
117 |
| - attempt_no=0, |
118 |
| - resp_code=0, |
119 |
| - ) |
120 |
| - |
121 |
| - try: |
122 |
| - # Set extraction status to progress |
123 |
| - emdat_instance.status = ExtractionData.Status.IN_PROGRESS |
124 |
| - emdat_instance.save(update_fields=["status"]) |
125 | 17 |
|
126 |
| - paylod = {"query": query, "variables": variables} |
127 |
| - response = requests.post(EMDAT_URL, json=paylod, headers=HEADERS) |
128 |
| - response.raise_for_status() |
129 |
| - |
130 |
| - # Save the extraction data |
131 |
| - if response and response.status_code == 200: |
132 |
| - file_name = "emdat_disaster_data.json" |
133 |
| - emdat_instance.resp_data.save(file_name, ContentFile(response.content)) |
134 |
| - |
135 |
| - # Set extraction status to success |
136 |
| - emdat_instance.status = ExtractionData.Status.SUCCESS |
137 |
| - response_content_json = json.loads(response.content) |
138 |
| - |
139 |
| - # if data is empty set validation status to No Data |
140 |
| - if not response_content_json["data"]["public_emdat"]: |
141 |
| - emdat_instance.source_validation_status = ExtractionData.ValidationStatus.NO_DATA |
142 |
| - |
143 |
| - emdat_instance.save(update_fields=["status", "source_validation_status"]) |
144 |
| - |
145 |
| - logger.info("EMDAT data imported sucessfully") |
146 |
| - return emdat_instance.id |
147 |
| - |
148 |
| - except requests.exceptions.RequestException: |
149 |
| - # Set extraction status to Fail |
150 |
| - emdat_instance.status = ExtractionData.Status.FAILED |
151 |
| - emdat_instance.save(update_fields=["status"]) |
152 |
| - logger.error("Extraction failed", exc_info=True, extra={"source": ExtractionData.Source.EMDAT}) |
153 |
| - # FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this. |
154 |
| - raise |
| 18 | + @classmethod |
| 19 | + def handle_extraction(cls, query: str, variables: dict, source: int) -> int: |
| 20 | + """ |
| 21 | + Process data extraction. |
| 22 | + Returns: |
| 23 | + int: ID of the extraction instance |
| 24 | + """ |
| 25 | + logger.info("Starting data extraction") |
| 26 | + |
| 27 | + url = f"{settings.EMDAT_URL}" |
| 28 | + headers = {"Authorization": settings.EMDAT_AUTHORIZATION_KEY} |
| 29 | + |
| 30 | + instance = cls._create_extraction_instance(url=url, source=source) |
| 31 | + |
| 32 | + try: |
| 33 | + cls._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS) |
| 34 | + paylod = {"query": query, "variables": variables} |
| 35 | + response = requests.post(url, json=paylod, headers=headers) |
| 36 | + response.raise_for_status() |
| 37 | + response_data = cls._save_response_data(instance, response) |
| 38 | + # Check if response contains data |
| 39 | + if response_data: |
| 40 | + cls._update_instance_status(instance, ExtractionData.Status.SUCCESS) |
| 41 | + if not response_data["data"]["public_emdat"]: |
| 42 | + cls._update_instance_status( |
| 43 | + instance, |
| 44 | + ExtractionData.Status.SUCCESS, |
| 45 | + ExtractionData.ValidationStatus.NO_DATA, |
| 46 | + update_validation=True, |
| 47 | + ) |
| 48 | + logger.warning("No hazard data found in response") |
| 49 | + |
| 50 | + return instance.id |
| 51 | + |
| 52 | + except requests.exceptions.RequestException: |
| 53 | + cls._update_instance_status(instance, ExtractionData.Status.FAILED) |
| 54 | + logger.error( |
| 55 | + "extraction failed", |
| 56 | + exc_info=True, |
| 57 | + extra={ |
| 58 | + "source": instance.source, |
| 59 | + }, |
| 60 | + ) |
| 61 | + raise |
| 62 | + |
| 63 | + @staticmethod |
| 64 | + @app.task |
| 65 | + def task(query, variables): |
| 66 | + return EMDATExtraction().handle_extraction(query, variables, ExtractionData.Source.EMDAT) |
0 commit comments