1
1
"""
2
- script for running sqs listener
3
-
4
2
Created December 21st, 2016
5
3
@author: Yaakov Gesher
6
4
@version: 0.9.0
11
9
# start imports
12
10
# ================
13
11
14
- import boto3
15
- import boto3 .session
16
12
import json
17
- import time
18
13
import logging
19
14
import os
20
15
import sys
21
- from sqs_launcher import SqsLauncher
16
+ import time
22
17
from abc import ABCMeta , abstractmethod
23
18
19
+ import boto3
20
+ import boto3 .session
21
+
22
+ from sqs_launcher import SqsLauncher
23
+
24
24
# ================
25
25
# start class
26
26
# ================
27
27
28
28
sqs_logger = logging .getLogger ('sqs_listener' )
29
29
30
+
30
31
class SqsListener (object ):
31
32
__metaclass__ = ABCMeta
32
33
@@ -47,8 +48,8 @@ def __init__(self, queue, **kwargs):
47
48
else :
48
49
boto3_session = None
49
50
if (
50
- not os .environ .get ('AWS_ACCOUNT_ID' , None ) and
51
- not (boto3 .Session ().get_credentials ().method in ['iam-role' , 'assume-role' , 'assume-role-with-web-identity' ])
51
+ not os .environ .get ('AWS_ACCOUNT_ID' , None ) and
52
+ not (boto3 .Session ().get_credentials ().method in ['iam-role' , 'assume-role' , 'assume-role-with-web-identity' ])
52
53
):
53
54
raise EnvironmentError ('Environment variable `AWS_ACCOUNT_ID` not set and no role found.' )
54
55
@@ -74,7 +75,6 @@ def __init__(self, queue, **kwargs):
74
75
self ._region_name = kwargs .get ('region_name' , self ._session .region_name )
75
76
self ._client = self ._initialize_client ()
76
77
77
-
78
78
def _initialize_client (self ):
79
79
# new session for each instantiation
80
80
ssl = True
@@ -83,31 +83,30 @@ def _initialize_client(self):
83
83
84
84
sqs = self ._session .client ('sqs' , region_name = self ._region_name , endpoint_url = self ._endpoint_name , use_ssl = ssl )
85
85
queues = sqs .list_queues (QueueNamePrefix = self ._queue_name )
86
- mainQueueExists = False
87
- errorQueueExists = False
86
+ main_queue_exists = False
87
+ error_queue_exists = False
88
88
if 'QueueUrls' in queues :
89
89
for q in queues ['QueueUrls' ]:
90
90
qname = q .split ('/' )[- 1 ]
91
91
if qname == self ._queue_name :
92
- mainQueueExists = True
92
+ main_queue_exists = True
93
93
if self ._error_queue_name and qname == self ._error_queue_name :
94
- errorQueueExists = True
95
-
94
+ error_queue_exists = True
96
95
97
96
# create queue if necessary.
98
97
# creation is idempotent, no harm in calling on a queue if it already exists.
99
98
if self ._queue_url is None :
100
- if not mainQueueExists :
99
+ if not main_queue_exists :
101
100
sqs_logger .warning ("main queue not found, creating now" )
102
101
103
102
# is this a fifo queue?
104
103
if self ._queue_name .endswith (".fifo" ):
105
- fifoQueue = "true"
104
+ fifo_queue = "true"
106
105
q = sqs .create_queue (
107
106
QueueName = self ._queue_name ,
108
107
Attributes = {
109
108
'VisibilityTimeout' : self ._queue_visibility_timeout , # 10 minutes
110
- 'FifoQueue' :fifoQueue
109
+ 'FifoQueue' : fifo_queue
111
110
}
112
111
)
113
112
else :
@@ -120,7 +119,7 @@ def _initialize_client(self):
120
119
)
121
120
self ._queue_url = q ['QueueUrl' ]
122
121
123
- if self ._error_queue_name and not errorQueueExists :
122
+ if self ._error_queue_name and not error_queue_exists :
124
123
sqs_logger .warning ("error queue not found, creating now" )
125
124
q = sqs .create_queue (
126
125
QueueName = self ._error_queue_name ,
@@ -162,9 +161,8 @@ def _start_listening(self):
162
161
163
162
try :
164
163
deserialized = self ._deserializer (m_body )
165
- except Exception as e :
166
- sqs_logger .error ("Unable to parse message" )
167
- sqs_logger .exception (e )
164
+ except :
165
+ sqs_logger .exception ("Unable to parse message" )
168
166
continue
169
167
170
168
if 'MessageAttributes' in m :
@@ -185,12 +183,11 @@ def _start_listening(self):
185
183
ReceiptHandle = receipt_handle
186
184
)
187
185
except Exception as ex :
188
- # need exception logtype to log stack trace
189
186
sqs_logger .exception (ex )
190
187
if self ._error_queue_name :
191
188
exc_type , exc_obj , exc_tb = sys .exc_info ()
192
189
193
- sqs_logger .info ( "Pushing exception to error queue" )
190
+ sqs_logger .info ("Pushing exception to error queue" )
194
191
error_launcher = SqsLauncher (queue = self ._error_queue_name , create_queue = True )
195
192
error_launcher .launch_message (
196
193
{
@@ -203,9 +200,9 @@ def _start_listening(self):
203
200
time .sleep (self ._poll_interval )
204
201
205
202
def listen (self ):
206
- sqs_logger .info ( "Listening to queue " + self ._queue_name )
203
+ sqs_logger .info ("Listening to queue " + self ._queue_name )
207
204
if self ._error_queue_name :
208
- sqs_logger .info ( "Using error queue " + self ._error_queue_name )
205
+ sqs_logger .info ("Using error queue " + self ._error_queue_name )
209
206
210
207
self ._start_listening ()
211
208
0 commit comments