diff --git a/sqs_listener/__init__.py b/sqs_listener/__init__.py index 3fce7d8..fedae7c 100644 --- a/sqs_listener/__init__.py +++ b/sqs_listener/__init__.py @@ -66,6 +66,7 @@ def __init__(self, queue, **kwargs): self._wait_time = kwargs.get('wait_time', 0) self._max_number_of_messages = kwargs.get('max_number_of_messages', 1) self._deserializer = kwargs.get("deserializer", json.loads) + self._should_listen = False # must come last if boto3_session: @@ -139,7 +140,7 @@ def _initialize_client(self): def _start_listening(self): # TODO consider incorporating output processing from here: https://github.com/debrouwere/sqs-antenna/blob/master/antenna/__init__.py - while True: + while self._should_listen: # calling with WaitTimeSecconds of zero show the same behavior as # not specifiying a wait time, ie: short polling messages = self._client.receive_message( @@ -154,6 +155,8 @@ def _start_listening(self): sqs_logger.debug(messages) sqs_logger.info("{} messages received".format(len(messages['Messages']))) for m in messages['Messages']: + if not self._should_listen: + break receipt_handle = m['ReceiptHandle'] m_body = m['Body'] message_attribs = None @@ -197,15 +200,22 @@ def _start_listening(self): ) else: + if not self._should_listen: + break time.sleep(self._poll_interval) + sqs_logger.info("client is not in listening state, stopping sqs listener") def listen(self): + self._should_listen = True sqs_logger.info("Listening to queue " + self._queue_name) if self._error_queue_name: sqs_logger.info("Using error queue " + self._error_queue_name) self._start_listening() + def stop(self): + self._should_listen = False + def _prepare_logger(self): logger = logging.getLogger('eg_daemon') logger.setLevel(logging.INFO)