4
4
from pystac_monty .geocoding import GAULGeocoder
5
5
from pystac_monty .sources .ibtracs import IBTrACSDataSource , IBTrACSTransformer
6
6
7
- from apps .etl .models import ExtractionData , Transform
7
+ from apps .etl .models import ExtractionData
8
8
from apps .etl .transform .sources .handler import BaseTransformerHandler
9
9
from main .celery import app
10
10
14
14
class IbtracsTransformHandler (BaseTransformerHandler ):
15
15
transformer = IBTrACSTransformer
16
16
transformer_schema = IBTrACSDataSource
17
- geocoder = GAULGeocoder (gpkg_path = None , service_base_url = settings .GEOCODER_URL )
18
17
19
18
@classmethod
20
19
def get_schema_data (cls , extraction_obj : ExtractionData ):
@@ -23,37 +22,8 @@ def get_schema_data(cls, extraction_obj: ExtractionData):
23
22
24
23
return cls .transformer_schema (source_url = extraction_obj .url , data = data .decode ("utf-8" ))
25
24
26
- @classmethod
27
- def handle_transformation (cls , extraction_id ):
28
- logger .info ("Transformation started" )
29
- extraction_obj = ExtractionData .objects .filter (id = extraction_id ).first ()
30
-
31
- transform_obj = Transform .objects .create (
32
- extraction = extraction_obj ,
33
- status = Transform .Status .PENDING ,
34
- )
35
-
36
- try :
37
- schema = cls .get_schema_data (extraction_obj )
38
- transformer = cls .transformer (schema , geocoder = cls .geocoder )
39
- transformed_items = transformer .make_items ()
40
- print ("..........." , transformed_items [0 ].__dict__ )
41
-
42
- transform_obj .status = Transform .Status .SUCCESS
43
- transform_obj .save (update_fields = ["status" ])
44
-
45
- cls .load_stac_item_to_queue (transformed_items , transform_obj .id )
46
-
47
- logger .info ("Transformation ended" )
48
-
49
- except Exception as e :
50
- logger .error ("Transformation failed" , exc_info = True , extra = {"extraction_id" : extraction_obj .id })
51
- transform_obj .status = Transform .Status .FAILED
52
- transform_obj .save (update_fields = ["status" ])
53
- # FIXME: Check if this creates duplicate entry in Sentry. if yes, remove this.
54
- raise e
55
-
56
25
@staticmethod
57
26
@app .task
58
27
def task (extraction_id ):
59
- return IbtracsTransformHandler ().handle_transformation (extraction_id )
28
+ geocoder = GAULGeocoder (gpkg_path = None , service_base_url = settings .GEOCODER_URL )
29
+ return IbtracsTransformHandler ().handle_transformation (extraction_id , geocoder )
0 commit comments