Skip to content

Commit 9370ea1

Browse files
committed
Fixed bug on rabbitmq backend module, removed useless code
1 parent dcb24ac commit 9370ea1

File tree

10 files changed

+212
-284
lines changed

10 files changed

+212
-284
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
- Refactored CLI commands and logging format
55
- Added factory methods to supervisors
6+
- Fixed bug in rabbitmq backend module
67

78
### 1.1.7
89
(Apr 26, 2019)

tasq/cli/main.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def parse_arguments():
6565
nargs='?'
6666
)
6767
parser.add_argument(
68-
'--shared-key', '-sk',
68+
'--signkey',
6969
help='The shared key to use to sign byte streams between clients and '
7070
'supervisors',
7171
nargs='?'
@@ -99,7 +99,7 @@ def main():
9999
args = parse_arguments()
100100
conf = get_config(args.conf)
101101
logger.loglevel = args.log_level or conf['log_level']
102-
sign_data = conf['sign_data']
102+
signkey = args.signkey or conf['signkey']
103103
unix_socket = conf['unix_socket']
104104
num_workers = args.num_workers or conf['num_workers']
105105
worker_type = args.worker_type or 'actor'
@@ -108,16 +108,16 @@ def main():
108108
push_port = args.plport or conf['zmq']['push_port']
109109
pull_port = args.port or conf['zmq']['pull_port']
110110
start_worker('zmq', worker_type, addr, push_port=push_port,
111-
pull_port=pull_port, sign_data=sign_data,
112-
unix_socket=unix_socket)
111+
pull_port=pull_port, signkey=signkey,
112+
num_workers=num_workers, unix_socket=unix_socket)
113113
elif args.subcommand == 'redis-worker':
114114
port = args.port or conf['redis']['port']
115115
db = args.db or conf['redis']['db']
116116
name = args.name or conf['redis']['name']
117-
start_worker('redis', worker_type, addr, port,
118-
db=db, name=name, sign_data=sign_data)
117+
start_worker('redis', worker_type, addr, port=port, db=db,
118+
name=name, num_workers=num_workers, signkey=signkey)
119119
elif args.subcommand == 'rabbitmq-worker':
120120
port = args.port or conf['rabbitmq']['port']
121121
name = args.name or conf['rabbitmq']['name']
122-
start_worker('rabbitmq', worker_type, addr,
123-
port, name=name, sign_data=sign_data)
122+
start_worker('rabbitmq', worker_type, addr, port=port,
123+
name=name, num_workers=num_workers, signkey=signkey)

tasq/jobqueue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def get_job(self, timeout=None):
8181
8282
:return: A `tasq.Job` object
8383
"""
84-
return self.get(timeout)
84+
return self.get(timeout=timeout)
8585

8686
def get_result(self, block=True, timeout=None):
8787
return self._completed_jobs.get(block, timeout)

tasq/queue.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,20 @@ class TasqQueue:
4545
:type store: str or None
4646
:param store: An URL to connect to for the results store service
4747
48-
:type sign_data: bool or False
49-
:param sign_data: A boolean flag, sign data with a shared key
48+
:type signkey: bool or False
49+
:param signkey: A boolean flag, sign data with a shared key
5050
5151
"""
5252

5353
def __init__(self, backend=u'zmq://localhost:9000',
54-
store=None, sign_data=False):
54+
store=None, signkey=None):
5555

5656
if isinstance(backend, str):
5757
url = urlparse(backend)
5858
scheme = url.scheme or 'zmq'
5959
assert url.scheme in {'redis', 'zmq', 'amqp', 'unix', 'tcp'}, \
6060
f"Unsupported {url.scheme}"
61-
self._backend = backends[scheme].from_url(backend, sign_data)
61+
self._backend = backends[scheme].from_url(backend, signkey)
6262
elif isinstance(backend,
6363
(ZMQTasqClient, RabbitMQTasqClient, RedisTasqClient)):
6464
self._backend = backend

tasq/remote/backends/rabbitmq.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ def _start(self):
5151
channel = self._get_channel()
5252
channel.basic_qos(prefetch_count=1)
5353
if self._role == 'receiver':
54+
channel.queue_declare(queue=self._queue_name, durable=True)
5455
channel.basic_consume(queue=self._queue_name,
5556
on_message_callback=self._get_job)
5657
else:
58+
channel.queue_declare(queue=self._result_name, durable=True)
5759
channel.basic_consume(queue=self._result_name,
5860
on_message_callback=self._get_res)
5961
channel.start_consuming()
@@ -68,13 +70,13 @@ def put_result(self, result):
6870

6971
def get_next_job(self, timeout=None):
7072
try:
71-
return self._jobs.get(timeout)
73+
return self._jobs.get(timeout=timeout)
7274
except queue.Empty:
7375
return None
7476

7577
def get_available_result(self, timeout=None):
7678
try:
77-
return self._results.get(timeout)
79+
return self._results.get(timeout=timeout)
7880
except queue.Empty:
7981
return None
8082

tasq/remote/client.py

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,19 @@ class BaseTasqClient(metaclass=ABCMeta):
5555
:type port: int
5656
:param port: The port associated with the host param
5757
58-
:type sign_data: bool or False
59-
:param sign_data: Boolean flag, sign bytes passing around through sockets
58+
:type signkey: bool or False
59+
:param signkey: Boolean flag, sign bytes passing around through sockets
6060
if True
6161
6262
"""
6363

64-
def __init__(self, host, port, sign_data=False):
64+
def __init__(self, host, port, signkey=None):
6565
# Host address of a remote supervisor to connect to
6666
self._host = host
6767
# Port for push side (outgoing) of the communication channel
6868
self._port = port
6969
# Send digital signed data
70-
self._sign_data = sign_data
70+
self._signkey = signkey
7171
# Client reference, set up the communication with a Supervisor
7272
self._client = self._make_client()
7373
# Connection flag
@@ -226,8 +226,8 @@ class ZMQTasqClient(BaseTasqClient):
226226
:type plport: int or None
227227
:param plport: The pull port to retrieve bytes from
228228
229-
:type sign_data: bool or False
230-
:param sign_data: Boolean flag, sign bytes passing around through sockets
229+
:type signkey: bool or False
230+
:param signkey: Boolean flag, sign bytes passing around through sockets
231231
if True
232232
233233
:type unix_socket: bool or False
@@ -238,13 +238,13 @@ class ZMQTasqClient(BaseTasqClient):
238238

239239
__extraparams__ = {'plport'}
240240

241-
def __init__(self, host, port, plport=None, sign_data=False, unix_socket=False):
241+
def __init__(self, host, port, plport=None, signkey=None, unix_socket=False):
242242
self._plport = plport or port + 1
243243
# Unix socket flag, if set to true, unix sockets for interprocess
244244
# communication will be used and ports will be used to differentiate
245245
# push and pull channel
246246
self._unix_socket = unix_socket
247-
super().__init__(host, port, sign_data)
247+
super().__init__(host, port, signkey)
248248

249249
@property
250250
def plport(self):
@@ -259,7 +259,7 @@ def __repr__(self):
259259
def _make_client(self):
260260
return ConnectionFactory \
261261
.make_client(self.host, self.port, self.plport,
262-
self._sign_data, self._unix_socket)
262+
self._signkey, self._unix_socket)
263263

264264
def _gather_results(self):
265265
"""Gathering subroutine, must be run in another thread to concurrently
@@ -279,17 +279,17 @@ def _gather_results(self):
279279
self._log.error("Can't update result: key not found")
280280

281281
@classmethod
282-
def from_url(cls, url, sign_data=False):
282+
def from_url(cls, url, signkey=None):
283283
u = urlparse(url)
284284
scheme = u.scheme or 'zmq'
285285
assert scheme in ('zmq', 'unix', 'tcp'), f"Unsupported {scheme}"
286-
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?')}
286+
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?') if t}
287287
extras = {k: v for k, v in extras.items() if k in cls.__extraparams__}
288288
conn_args = {
289289
'host': u.hostname or '127.0.0.1',
290290
'port': u.port or 9000,
291291
'plport': int(extras.get('plport', 0)),
292-
'sign_data': sign_data,
292+
'signkey': signkey,
293293
'unix_socket': scheme == 'unix'
294294
}
295295
return cls(**conn_args)
@@ -314,19 +314,19 @@ class RedisTasqClient(BaseTasqClient):
314314
:type name: str or redis-queue
315315
:param name: The name of the redis queue
316316
317-
:type sign_data: bool or False
318-
:param sign_data: Boolean flag, sign bytes passing around through sockets
317+
:type signkey: bool or False
318+
:param signkey: Boolean flag, sign bytes passing around through sockets
319319
if True
320320
321321
"""
322322

323323
__extraparams__ = {'db', 'name'}
324324

325325
def __init__(self, host='localhost', port=6379,
326-
db=0, name='redis-queue', sign_data=False):
326+
db=0, name='redis-queue', signkey=None):
327327
self._db = db
328328
self._name = name
329-
super().__init__(host, port, sign_data)
329+
super().__init__(host, port, signkey)
330330

331331
@property
332332
def name(self):
@@ -340,7 +340,7 @@ def __repr__(self):
340340
def _make_client(self):
341341
return ConnectionFactory \
342342
.make_redis_client(self.host, self.port, self._db,
343-
self._name, secure=self._sign_data)
343+
self._name, signkey=self._signkey)
344344

345345
def _gather_results(self):
346346
"""Gathering subroutine, must be run in another thread to concurrently
@@ -378,18 +378,18 @@ def disconnect(self):
378378
self._is_connected = False
379379

380380
@classmethod
381-
def from_url(cls, url, sign_data=False):
381+
def from_url(cls, url, signkey=None):
382382
u = urlparse(url)
383383
scheme = u.scheme or 'redis'
384384
assert scheme == 'redis', f"Unsupported {scheme}"
385-
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?')}
385+
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?') if t}
386386
extras = {k: v for k, v in extras.items() if k in cls.__extraparams__}
387387
conn_args = {
388388
'host': u.hostname or 'localhost',
389389
'port': u.port or 6379,
390390
'db': int(extras.get('db', 0)),
391391
'name': extras.get('name', 'redis-queue'),
392-
'sign_data': sign_data
392+
'signkey': signkey
393393
}
394394
return cls(**conn_args)
395395

@@ -410,18 +410,18 @@ class RabbitMQTasqClient(BaseTasqClient):
410410
:type name: str or amqp-queue
411411
:param name: The name of the RabbitMQ queue
412412
413-
:type sign_data: bool or False
414-
:param sign_data: Boolean flag, sign bytes passing around through sockets
413+
:type signkey: bool or False
414+
:param signkey: Boolean flag, sign bytes passing around through sockets
415415
if True
416416
417417
"""
418418

419419
__extraparams__ = {'name'}
420420

421421
def __init__(self, host='localhost', port=5672,
422-
name='amqp-queue', sign_data=False):
422+
name='amqp-queue', signkey=None):
423423
self._name = name
424-
super().__init__(host, port, sign_data)
424+
super().__init__(host, port, signkey)
425425

426426
@property
427427
def name(self):
@@ -435,14 +435,16 @@ def __repr__(self):
435435
def _make_client(self):
436436
return ConnectionFactory \
437437
.make_rabbitmq_client(self.host, self.port, 'sender',
438-
self._name, secure=self._sign_data)
438+
self._name, signkey=self._signkey)
439439

440440
def _gather_results(self):
441441
"""Gathering subroutine, must be run in another thread to concurrently
442442
listen for results and store them into a dedicated dictionary
443443
"""
444444
while True:
445445
job_result = self._client.recv_result()
446+
if not job_result:
447+
continue
446448
self._log.debug("Gathered result: %s", job_result)
447449
try:
448450
self._results[job_result.name].set_result(job_result)
@@ -468,17 +470,17 @@ def disconnect(self):
468470
self._is_connected = False
469471

470472
@classmethod
471-
def from_url(cls, url, sign_data=False):
473+
def from_url(cls, url, signkey=None):
472474
u = urlparse(url)
473475
scheme = u.scheme or 'amqp'
474476
assert scheme == 'amqp', f"Unsupported {scheme}"
475-
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?')}
477+
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?') if t}
476478
extras = {k: v for k, v in extras.items() if k in cls.__extraparams__}
477479
conn_args = {
478480
'host': u.hostname or 'localhost',
479481
'port': u.port or 5672,
480482
'name': extras.get('name', 'amqp-queue'),
481-
'sign_data': sign_data
483+
'signkey': signkey
482484
}
483485
return cls(**conn_args)
484486

0 commit comments

Comments
 (0)