Skip to content

Commit fbdd8f2

Browse files
committed
Merge branch 'fix/idu-extract-transform' into develop
2 parents b8fafe3 + 3b3f7ba commit fbdd8f2

File tree

3 files changed

+14
-162
lines changed

3 files changed

+14
-162
lines changed

apps/etl/etl_tasks/idu.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,20 @@
1414

1515
@shared_task
1616
def extract_and_transform_idu_data(url):
17+
"""Extract and Transform IDU data"""
1718
chain(
18-
IDUExtraction.handle_extraction.s(url=url),
19+
IDUExtraction.task.s(url=url),
1920
IDUTransformHandler.task.s(),
2021
).apply_async()
2122

2223

2324
@shared_task
2425
def ext_and_transform_idu_historical_data():
26+
"""Extract and Transform IDU historical data"""
2527
extract_and_transform_idu_data(HISTORICAL_DATA_URL)
2628

2729

2830
@shared_task
2931
def ext_and_transform_idu_latest_data():
32+
"""Extract and Transform IDU latest data"""
3033
extract_and_transform_idu_data(LATEST_DATA_URL)
+8-161
Original file line numberDiff line numberDiff line change
@@ -1,176 +1,23 @@
1-
import json
21
import logging
3-
import uuid
4-
from typing import Any, Callable
52

6-
import requests
73
from django.conf import settings
84

9-
from apps.etl.extraction.sources.base.extract import Extraction
10-
from apps.etl.extraction.sources.base.utils import (
11-
hash_file_content,
12-
manage_duplicate_file_content,
13-
)
5+
from apps.etl.extraction.sources.base.handler import BaseExtraction
146
from apps.etl.models import ExtractionData
157
from main.celery import app
16-
from main.logging import log_extra
178

189
logger = logging.getLogger(__name__)
1910

20-
HEADERS = {"accept": "application/json"}
21-
PARAMS = {"client_id": settings.IDMC_CLIENT_ID}
2211

23-
24-
class IDUExtraction(Extraction):
12+
class IDUExtraction(BaseExtraction):
2513
"""
26-
Handles data extraction from the IDU API for hazard data.
14+
Handles data extraction from the IDU API.
2715
"""
2816

29-
def __init__(self, url: str = None):
30-
"""
31-
Initialize the IDU extraction process.
32-
Args:
33-
url (str, optional): Override the default API URL. Defaults to BASE_URL.
34-
"""
35-
super().__init__()
36-
37-
@staticmethod
38-
def store_extraction_data(
39-
validate_source_func: Callable[[Any], None],
40-
response: dict,
41-
source: ExtractionData.Source = None,
42-
instance_id: int = None,
43-
):
44-
"""
45-
Save extracted data into data base. Checks for duplicate conent using hashing.
46-
"""
47-
file_extension = "json"
48-
file_name = f"{source}.{file_extension}"
49-
resp_data_content = response.content
50-
51-
# save the additional response data after the data is fetched from api.
52-
extraction_instance = ExtractionData.objects.get(id=instance_id)
53-
extraction_instance.resp_data_type = response.headers.get("Content-Type", "")
54-
extraction_instance.save()
55-
56-
# Validate the non empty response data.
57-
if resp_data_content and not response.status_code == 204:
58-
# Source validation
59-
if validate_source_func:
60-
extraction_instance.source_validation_status = validate_source_func(resp_data_content)["status"]
61-
extraction_instance.content_validation = validate_source_func(resp_data_content)["validation_error"]
62-
63-
# manage duplicate file content.
64-
hash_content = hash_file_content(resp_data_content)
65-
manage_duplicate_file_content(
66-
source=source,
67-
hash_content=hash_content,
68-
instance=extraction_instance,
69-
response_data=resp_data_content,
70-
file_name=file_name,
71-
)
72-
return extraction_instance
73-
74-
@staticmethod
75-
def _create_extraction_instance(url) -> ExtractionData:
76-
"""
77-
Create and return a new extraction instance with initial status.
78-
Returns:
79-
ExtractionData: The created extraction instance
80-
"""
81-
return ExtractionData.objects.create(
82-
source=ExtractionData.Source.IDU,
83-
url=url,
84-
status=ExtractionData.Status.PENDING,
85-
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
86-
hazard_type=None,
87-
attempt_no=0,
88-
resp_code=0,
89-
trace_id=str(uuid.uuid4()),
90-
)
91-
92-
@staticmethod
93-
def _update_instance_status(
94-
instance: ExtractionData, status: int, validation_status: int = None, update_validation: bool = False
95-
) -> None:
96-
"""
97-
Update the status of the extraction instance.
98-
Args:
99-
instance: ExtractionData instance to update
100-
status: New status to set
101-
validation_status: Optional validation status to set
102-
update_validation: Whether to update validation status
103-
"""
104-
instance.status = status
105-
if update_validation and validation_status:
106-
instance.source_validation_status = validation_status
107-
instance.save(update_fields=["status", "source_validation_status"])
108-
else:
109-
instance.save(update_fields=["status"])
110-
111-
@staticmethod
112-
def _save_response_data(instance: ExtractionData, response: requests.Response) -> dict:
113-
"""
114-
Save the response data to the extraction instance.
115-
Args:
116-
instance: ExtractionData instance to save to
117-
response: Response object containing the data
118-
Returns:
119-
dict: Parsed JSON response content
120-
"""
121-
instance = IDUExtraction.store_extraction_data(
122-
response=response,
123-
source=ExtractionData.Source.IDU,
124-
validate_source_func=None,
125-
instance_id=instance.id,
126-
)
127-
128-
return json.loads(response.content)
129-
13017
@staticmethod
13118
@app.task
132-
def handle_extraction(url) -> dict:
133-
"""
134-
Process IDU data extraction.
135-
Returns:
136-
int: ID of the extraction instance
137-
"""
138-
logger.info("Starting IDU data extraction")
139-
instance = IDUExtraction._create_extraction_instance(url=url)
140-
141-
try:
142-
IDUExtraction._update_instance_status(instance, ExtractionData.Status.IN_PROGRESS)
143-
144-
response = requests.get(url, params=PARAMS, headers=HEADERS, timeout=30)
145-
response.raise_for_status()
146-
instance.resp_code = response.status_code
147-
148-
if response.status_code == 200:
149-
response_data = IDUExtraction._save_response_data(instance, response)
150-
# Check if response contains data
151-
if response_data:
152-
IDUExtraction._update_instance_status(instance, ExtractionData.Status.SUCCESS)
153-
logger.info("IDU data extracted successfully")
154-
else:
155-
IDUExtraction._update_instance_status(
156-
instance,
157-
ExtractionData.Status.SUCCESS,
158-
ExtractionData.ValidationStatus.NO_DATA,
159-
update_validation=True,
160-
)
161-
logger.warning("No hazard data found in IDU response")
162-
163-
return instance.id
164-
165-
except requests.exceptions.RequestException:
166-
IDUExtraction._update_instance_status(instance, ExtractionData.Status.FAILED)
167-
logger.error(
168-
"IDU extraction failed",
169-
exc_info=True,
170-
extra=log_extra(
171-
{
172-
"source": ExtractionData.Source.IDU,
173-
}
174-
),
175-
)
176-
raise
19+
def task(url: str): # type: ignore[reportIncompatibleMethodOverride]
20+
"""IDU Task"""
21+
headers = {"accept": "application/json"}
22+
params = {"client_id": settings.IDMC_CLIENT_ID}
23+
return IDUExtraction.handle_extraction(url=url, params=params, headers=headers, source=ExtractionData.Source.IDU)

apps/etl/transform/sources/idu.py

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77

88
class IDUTransformHandler(BaseTransformerHandler):
9+
"""IDU Transformer handler"""
10+
911
transformer = IDUTransformer
1012
transformer_schema = IDUDataSource
1113

0 commit comments

Comments
 (0)