1
+ # built-in python
2
+ import asyncio
3
+ from io import BytesIO
4
+ from multiprocessing .pool import ThreadPool
5
+ import logging
6
+
7
+ # third-party
8
+ import sentry_sdk
9
+ import orjson
10
+ import pandas as pd
11
+ import aiokafka
12
+
13
+ # in-house
14
+ from consumers import BaseKafkaClient
15
+ from models .conf import KafkaSettings
16
+ from config import *
17
+ from address_resolver import AddressAPI
18
+
19
+ # set logger level
20
+ logger = logging .getLogger ()
21
+ logger .setLevel (logging .INFO )
22
+
23
+ sentry_sdk .init (dsn = SENTRY_DSN )
24
+
25
+ address_api = AddressAPI (GOOGLE_API_KEY , OPENAI_API_KEY , NER_API_KEY )
26
+
27
+ class AddressResolve (BaseKafkaClient ):
28
+
29
+ async def process_message (self , record : aiokafka .ConsumerRecord ):
30
+ message = record .value
31
+
32
+ messageIo = BytesIO (message )
33
+ address_df = pd .read_json (messageIo )
34
+ address_df_replica = address_df .copy ()
35
+
36
+ regex_results = pd .DataFrame (
37
+ [address_api .regex_api_request (raw_text , entry_id ) for
38
+ raw_text , entry_id in
39
+ zip (address_df .raw_text .values , address_df .id .values )])
40
+ regex_to_geocode = regex_results [regex_results .ws >= 0.7 ]
41
+ del regex_results
42
+
43
+ # Ner Process
44
+ address_df = address_df [~ address_df .id .isin (regex_to_geocode .id .values )]
45
+ with ThreadPool (60 ) as executor :
46
+ ner_results = executor .map (
47
+ lambda p : address_api .ner_api_request (* p ),
48
+ zip (address_df .raw_text .values ,
49
+ address_df .id .values ))
50
+ ner_results = pd .DataFrame (ner_results )
51
+ ner_to_geocode = ner_results [ner_results .ws >= 0.5 ]
52
+ del ner_results
53
+
54
+ geocode_data = pd .concat ([regex_to_geocode [['address' , 'id' ]],
55
+ ner_to_geocode [['address' , 'id' ]]], axis = 0 )
56
+ del regex_to_geocode , ner_to_geocode
57
+
58
+ with ThreadPool (60 ) as executor :
59
+ geocode_data = executor .map (
60
+ lambda p : address_api .google_geocode_api_request (* p ),
61
+ zip (geocode_data .address .values , geocode_data .id .values ))
62
+
63
+ geocode_data = pd .DataFrame (geocode_data )
64
+ geocode_data = pd .merge (geocode_data [geocode_data .is_resolved == True ],
65
+ address_df_replica , on = 'id' , how = 'left' )
66
+ del address_df_replica
67
+
68
+ final_data = []
69
+ for d in geocode_data .iterrows ():
70
+ d = d [1 ]
71
+ final_data .append (
72
+ {
73
+ 'location' : {
74
+ "formatted_address" : d .get ('formatted_address' , '' ),
75
+ "latitude" : d .get ('latitude' , 0.0 ),
76
+ "longitude" : d .get ('longitude' , 0.0 ),
77
+ "northeast_lat" : d .get ('northeast_lat' , 0.0 ),
78
+ "northeast_lng" : d .get ('northeast_lng' , 0.0 ),
79
+ "southwest_lat" : d .get ('southwest_lat' , 0.0 ),
80
+ "southwest_lng" : d .get ('southwest_lng' , 0.0 ),
81
+ "entry_id" : d .get ('id' ),
82
+ "epoch" : d .get ('epoch' ),
83
+ "channel" : d .get ('channel' )},
84
+ 'feed' : {
85
+ "id" : d .get ('id' ),
86
+ "raw_text" : d .get ('raw_text' ),
87
+ "channel" : d .get ('channel' ),
88
+ "extra_parameters" : d .get ('extra_parameters' , {}),
89
+ "epoch" : d .get ('epoch' )}
90
+ }
91
+ )
92
+
93
+ await self .producer .send_and_wait (KAFKA_PROCESSED_TOPIC ,
94
+ orjson .dumps (final_data ))
95
+ logger .info ("Message Processed." )
96
+
97
+
98
+ if __name__ == '__main__' :
99
+ loop = asyncio .new_event_loop ()
100
+ asyncio .set_event_loop (loop )
101
+
102
+ kafka_settings = KafkaSettings (
103
+ loop = loop ,
104
+ client_id = CLIENT_ID ,
105
+ bootstrap_servers = BOOTSTRAP_SERVERS ,
106
+ max_pool_records = MAX_POOL_RECORDS ,
107
+ message_timeout_ms = MESSAGE_TIMEOUT_MS
108
+ )
109
+
110
+ try :
111
+ server = AddressResolve (topic = KAFKA_ADDRESS_RESOLVE_TOPIC , server_settings = kafka_settings )
112
+ loop .run_until_complete (server .run ())
113
+ finally :
114
+ loop .run_until_complete (loop .shutdown_asyncgens ())
115
+ loop .close ()
0 commit comments