27
27
28
28
sqs_logger = logging .getLogger ('sqs_listener' )
29
29
30
+
30
31
class SqsListener (object ):
31
32
__metaclass__ = ABCMeta
32
33
@@ -38,16 +39,16 @@ def __init__(self, queue, **kwargs):
38
39
aws_access_key = kwargs .get ('aws_access_key' , '' )
39
40
aws_secret_key = kwargs .get ('aws_secret_key' , '' )
40
41
41
- if len (aws_access_key ) != 0 and len ( aws_secret_key ) != 0 :
42
+ if all (aws_access_key , aws_secret_key ):
42
43
boto3_session = boto3 .Session (
43
44
aws_access_key_id = aws_access_key ,
44
- aws_secret_access_key = aws_secret_key
45
+ aws_secret_access_key = aws_secret_key ,
45
46
)
46
47
else :
47
48
if (not os .environ .get ('AWS_ACCOUNT_ID' , None ) and
48
49
not ('iam-role' == boto3 .Session ().get_credentials ().method )):
49
50
raise EnvironmentError ('Environment variable `AWS_ACCOUNT_ID` not set and no role found.' )
50
-
51
+ self . _queue = None # The SQS Queue resource
51
52
self ._queue_name = queue
52
53
self ._poll_interval = kwargs .get ("interval" , 60 )
53
54
self ._queue_visibility_timeout = kwargs .get ('visibility_timeout' , '600' )
@@ -67,135 +68,109 @@ def __init__(self, queue, **kwargs):
67
68
else :
68
69
self ._session = boto3 .session .Session ()
69
70
self ._region_name = kwargs .get ('region_name' , self ._session .region_name )
70
- self ._client = self ._initialize_client ()
71
-
71
+ self ._resource = self ._initialize_resource ()
72
72
73
- def _initialize_client (self ):
73
+ def _initialize_resource (self ):
74
74
# new session for each instantiation
75
75
ssl = True
76
76
if self ._region_name == 'elasticmq' :
77
77
ssl = False
78
78
79
- sqs = self ._session .client ('sqs' , region_name = self ._region_name , endpoint_url = self ._endpoint_name , use_ssl = ssl )
80
- queues = sqs .list_queues (QueueNamePrefix = self ._queue_name )
81
- mainQueueExists = False
82
- errorQueueExists = False
83
- if 'QueueUrls' in queues :
84
- for q in queues ['QueueUrls' ]:
85
- qname = q .split ('/' )[- 1 ]
86
- if qname == self ._queue_name :
87
- mainQueueExists = True
88
- if self ._error_queue_name and qname == self ._error_queue_name :
89
- errorQueueExists = True
90
-
91
-
92
- # create queue if necessary.
79
+ sqs = self ._session .resource ('sqs' , region_name = self ._region_name , endpoint_url = self ._endpoint_name , use_ssl = ssl )
80
+ queues = sqs .queues .filter (QueueNamePrefix = self ._queue_name )
81
+ main_queue_exists = False
82
+ error_queue_exists = False
83
+ for q in queues :
84
+ qname = q .url .split ('/' )[- 1 ]
85
+ if qname == self ._queue_name :
86
+ main_queue_exists = True
87
+ if self ._error_queue_name and qname == self ._error_queue_name :
88
+ error_queue_exists = True
89
+
90
+ # create queue if necessary.
93
91
# creation is idempotent, no harm in calling on a queue if it already exists.
94
92
if self ._queue_url is None :
95
- if not mainQueueExists :
93
+ if not main_queue_exists :
96
94
sqs_logger .warning ("main queue not found, creating now" )
97
-
95
+ queue_attributes = {
96
+ 'VisibilityTimeout' : self ._queue_visibility_timeout , # 10 minutes
97
+ }
98
98
# is this a fifo queue?
99
99
if self ._queue_name .endswith (".fifo" ):
100
- fifoQueue = "true"
101
- q = sqs .create_queue (
102
- QueueName = self ._queue_name ,
103
- Attributes = {
104
- 'VisibilityTimeout' : self ._queue_visibility_timeout , # 10 minutes
105
- 'FifoQueue' :fifoQueue
106
- }
107
- )
108
- else :
109
- # need to avoid FifoQueue property for normal non-fifo queues
110
- q = sqs .create_queue (
111
- QueueName = self ._queue_name ,
112
- Attributes = {
113
- 'VisibilityTimeout' : self ._queue_visibility_timeout , # 10 minutes
114
- }
115
- )
116
- self ._queue_url = q ['QueueUrl' ]
117
-
118
- if self ._error_queue_name and not errorQueueExists :
100
+ queue_attributes ["FifoQueue" ] = "true"
101
+ q = sqs .create_queue (
102
+ QueueName = self ._queue_name ,
103
+ Attributes = queue_attributes ,
104
+ )
105
+ self ._queue_url = q .url
106
+
107
+ if self ._error_queue_name and not error_queue_exists :
119
108
sqs_logger .warning ("error queue not found, creating now" )
120
109
q = sqs .create_queue (
121
110
QueueName = self ._error_queue_name ,
122
111
Attributes = {
123
- 'VisibilityTimeout' : self ._queue_visibility_timeout # 10 minutes
124
- }
112
+ 'VisibilityTimeout' : self ._queue_visibility_timeout , # 10 minutes
113
+ },
125
114
)
126
115
127
116
if self ._queue_url is None :
128
- if os . environ . get ( 'AWS_ACCOUNT_ID' , None ):
129
- qs = sqs . get_queue_url ( QueueName = self ._queue_name ,
130
- QueueOwnerAWSAccountId = os .environ .get ('AWS_ACCOUNT_ID' , None ))
131
- else :
132
- qs = sqs . get_queue_url ( QueueName = self . _queue_name )
133
- self ._queue_url = qs [ 'QueueUrl' ]
117
+ qs = sqs . get_queue_by_name (
118
+ QueueName = self ._queue_name ,
119
+ QueueOwnerAWSAccountId = os .environ .get ('AWS_ACCOUNT_ID' , None ),
120
+ )
121
+ self . _queue_url = qs . url
122
+ self ._queue = sqs . Queue ( self . _queue_url )
134
123
return sqs
135
124
136
125
def _start_listening (self ):
137
- # TODO consider incorporating output processing from here: https://github.com/debrouwere/sqs-antenna/blob/master/antenna/__init__.py
126
+ # TODO consider incorporating output processing from here:
127
+ # https://github.com/debrouwere/sqs-antenna/blob/master/antenna/__init__.py
138
128
while True :
139
- # calling with WaitTimeSecconds of zero show the same behavior as
129
+ # calling with ` WaitTimeSecconds` of zero show the same behavior as
140
130
# not specifiying a wait time, ie: short polling
141
- messages = self ._client .receive_message (
142
- QueueUrl = self ._queue_url ,
143
- MessageAttributeNames = self ._message_attribute_names ,
131
+ messages = self ._queue .receive_message (
144
132
AttributeNames = self ._attribute_names ,
133
+ MessageAttributeNames = self ._message_attribute_names ,
145
134
WaitTimeSeconds = self ._wait_time ,
146
- MaxNumberOfMessages = self ._max_number_of_messages
135
+ MaxNumberOfMessages = self ._max_number_of_messages ,
147
136
)
148
- if 'Messages' in messages :
149
-
150
- sqs_logger .debug (messages )
151
- continue
152
- sqs_logger .info ("{} messages received" .format (len (messages ['Messages' ])))
153
- for m in messages ['Messages' ]:
154
- receipt_handle = m ['ReceiptHandle' ]
155
- m_body = m ['Body' ]
156
- message_attribs = None
157
- attribs = None
158
-
159
- # catch problems with malformed JSON, usually a result of someone writing poor JSON directly in the AWS console
160
- try :
161
- params_dict = json .loads (m_body )
162
- except :
163
- sqs_logger .warning ("Unable to parse message - JSON is not formatted properly" )
164
- continue
165
- if 'MessageAttributes' in m :
166
- message_attribs = m ['MessageAttributes' ]
167
- if 'Attributes' in m :
168
- attribs = m ['Attributes' ]
169
- try :
170
- if self ._force_delete :
171
- self ._client .delete_message (
172
- QueueUrl = self ._queue_url ,
173
- ReceiptHandle = receipt_handle
174
- )
175
- self .handle_message (params_dict , message_attribs , attribs )
176
- else :
177
- self .handle_message (params_dict , message_attribs , attribs )
178
- self ._client .delete_message (
179
- QueueUrl = self ._queue_url ,
180
- ReceiptHandle = receipt_handle
181
- )
182
- except Exception as ex :
183
- # need exception logtype to log stack trace
184
- sqs_logger .exception (ex )
185
- if self ._error_queue_name :
186
- exc_type , exc_obj , exc_tb = sys .exc_info ()
187
-
188
- sqs_logger .info ( "Pushing exception to error queue" )
189
- error_launcher = SqsLauncher (queue = self ._error_queue_name , create_queue = True )
190
- error_launcher .launch_message (
191
- {
192
- 'exception_type' : str (exc_type ),
193
- 'error_message' : str (ex .args )
194
- }
195
- )
196
-
197
- else :
137
+ if not messages :
198
138
time .sleep (self ._poll_interval )
139
+ continue
140
+ sqs_logger .debug (messages )
141
+ sqs_logger .info ("{} messages received" .format (len (messages ['Messages' ])))
142
+ for m in messages :
143
+ receipt_handle = m .receipt_handle
144
+ m_body = m .body
145
+ message_attribs = m .message_attributes
146
+ attribs = m .attributes
147
+ # catch problems with malformed JSON, usually a result of someone writing poor JSON directly in the AWS console
148
+ try :
149
+ params_dict = json .loads (m_body )
150
+ except :
151
+ sqs_logger .warning ("Unable to parse message - JSON is not formatted properly" )
152
+ continue
153
+ try :
154
+ if self ._force_delete :
155
+ m .delete ()
156
+ self .handle_message (params_dict , message_attribs , attribs )
157
+ else :
158
+ self .handle_message (params_dict , message_attribs , attribs )
159
+ m .delete ()
160
+ except Exception as ex :
161
+ # need exception logtype to log stack trace
162
+ sqs_logger .exception (ex )
163
+ if self ._error_queue_name :
164
+ exc_type , exc_obj , exc_tb = sys .exc_info ()
165
+
166
+ sqs_logger .info ( "Pushing exception to error queue" )
167
+ error_launcher = SqsLauncher (queue = self ._error_queue_name , create_queue = True )
168
+ error_launcher .launch_message (
169
+ {
170
+ 'exception_type' : str (exc_type ),
171
+ 'error_message' : str (ex .args )
172
+ }
173
+ )
199
174
200
175
def listen (self ):
201
176
sqs_logger .info ( "Listening to queue " + self ._queue_name )
@@ -204,19 +179,6 @@ def listen(self):
204
179
205
180
self ._start_listening ()
206
181
207
- def _prepare_logger (self ):
208
- logger = logging .getLogger ('eg_daemon' )
209
- logger .setLevel (logging .INFO )
210
-
211
- sh = logging .StreamHandler (sys .stdout )
212
- sh .setLevel (logging .INFO )
213
-
214
- formatstr = '[%(asctime)s - %(name)s - %(levelname)s] %(message)s'
215
- formatter = logging .Formatter (formatstr )
216
-
217
- sh .setFormatter (formatter )
218
- logger .addHandler (sh )
219
-
220
182
@abstractmethod
221
183
def handle_message (self , body , attributes , messages_attributes ):
222
184
"""
0 commit comments