Skip to content

Commit 3e3d5b2

Browse files
committed
Moved factory logic to from_url method on each client
1 parent 2529743 commit 3e3d5b2

File tree

5 files changed

+168
-64
lines changed

5 files changed

+168
-64
lines changed

CHANGES.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
### 1.1.7
2+
(Apr 26, 2019)
3+
4+
- Moved factory logic for client creation to from_url method on client module
5+
- Added TasqFuture result from clients result, to return more structured results
6+
with additional informations about execution.
7+
18
### 1.1.6
29
(Apr 22, 2019)
310

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ production enviroments.
2525
Starting a worker on a node, with debug flag set to true on configuration file
2626

2727
```
28-
$ tq worker
28+
$ tq redis
2929
Listening for jobs on 127.0.0.1:9000
3030
Response actor started
3131
```

tasq/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from __future__ import absolute_import, division, print_function, unicode_literals
22

3+
from tasq.queue import TasqQueue
34
from tasq.jobqueue import JobQueue
4-
from tasq.worker import ProcessQueueWorker#, ThreadQueueWorker
5+
from tasq.worker import ProcessQueueWorker
56
from tasq.remote.client import ZMQTasqClient, RedisTasqClient
67
from tasq.remote.supervisor import (ZMQActorSupervisor,
78
ZMQQueueSupervisor,
89
RedisActorSupervisor,
910
Supervisors)
1011

11-
__version__ = '1.0.5'
12+
__version__ = '1.1.7'

tasq/queue.py

Lines changed: 16 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,12 @@ def init_client(client, host, port, *args, **kwargs):
1818
return client(host, port, *args, **kwargs)
1919

2020

21-
Client = namedtuple('Client', ('handler', 'arguments'))
22-
23-
24-
defaults = {
25-
'redis': Client(RedisTasqClient, {'host': '127.0.0.1',
26-
'port': 6379,
27-
'db': 0,
28-
'name': os.getpid()}),
29-
'amqp': Client(RabbitMQTasqClient, {'host': '127.0.0.1',
30-
'port': 5672,
31-
'name': os.getpid()}),
32-
'unix': Client(ZMQTasqClient, {'host': '127.0.0.1',
33-
'port': 9000,
34-
'plport': 9001,
35-
'unix_socket': True}),
36-
'zmq': Client(ZMQTasqClient, {'host': '127.0.0.1',
37-
'port': 9000,
38-
'plport': 9001}),
39-
'tcp': Client(ZMQTasqClient, {'host': '127.0.0.1',
40-
'port': 9000,
41-
'plport': 9001})
21+
backends = {
22+
'redis': RedisTasqClient,
23+
'amqp': RabbitMQTasqClient,
24+
'unix': ZMQTasqClient,
25+
'zmq': ZMQTasqClient,
26+
'tcp': ZMQTasqClient
4227
}
4328

4429

@@ -49,8 +34,8 @@ class TasqQueue:
4934
5035
The formats accepted for the backends are:
5136
52-
- redis://localhost:6379/0?name=test-queue-redis
53-
- amqp://localhost:5672?name=test-queue-rabbitmq
37+
- redis://localhost:6379/0?name=redis-queue
38+
- amqp://localhost:5672?name=amqp-queue
5439
- zmq://localhost:9000?plport=9010
5540
- tcp://localhost:5555
5641
@@ -74,28 +59,14 @@ class TasqQueue:
7459
def __init__(self, backend=u'zmq://localhost:9000',
7560
store=None, sign_data=False):
7661

77-
url = urlparse(backend)
78-
scheme = url.scheme or 'zmq'
79-
assert url.scheme in {'redis', 'zmq', 'amqp', 'unix', 'tcp'}, \
80-
f"Unsupported {url.scheme}"
81-
args = {
82-
'host': url.hostname,
83-
'port': url.port,
84-
'db': int(url.path.split('/')[-1]) if url.path else None,
85-
'name': url.query.split('=')[-1] if url.query and scheme not in {'tcp', 'zmq'} else None,
86-
'plport': int(url.query.split('=')[-1]) if url.query and scheme == 'zmq' else None,
87-
'sign_data': sign_data
88-
}
89-
90-
# Update defaults arguments
91-
for k in defaults[scheme].arguments:
92-
if k not in args or not args[k]:
93-
args[k] = defaults[scheme].arguments[k]
94-
95-
# Remove useless args
96-
args = {k: v for k, v in args.items() if v is not None}
97-
98-
self._backend = defaults[scheme].handler(**args)
62+
if isinstance(backend, str):
63+
url = urlparse(backend)
64+
scheme = url.scheme or 'zmq'
65+
assert url.scheme in {'redis', 'zmq', 'amqp', 'unix', 'tcp'}, \
66+
f"Unsupported {url.scheme}"
67+
self._backend = backends[scheme].from_url(backend, sign_data)
68+
else:
69+
self._backend = backend
9970
# Handle only redis as a backend store for now
10071
if store:
10172
urlstore = urlparse(store)

tasq/remote/client.py

Lines changed: 141 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
remote workers.
66
"""
77

8-
from __future__ import absolute_import, division, print_function, unicode_literals
8+
from __future__ import (absolute_import, division,
9+
print_function, unicode_literals)
910

11+
from urllib.parse import urlparse
1012
from abc import ABCMeta, abstractmethod
1113
from concurrent.futures import Future
1214
from threading import Thread
@@ -23,10 +25,7 @@
2325

2426

2527
class TasqClientNotConnected(Exception):
26-
27-
def __init__(self, msg=u''):
28-
self.message = msg
29-
super().__init__(self.message)
28+
pass
3029

3130

3231
class TasqFuture(Future):
@@ -44,10 +43,22 @@ def exec_time(self):
4443

4544
class BaseTasqClient(metaclass=ABCMeta):
4645

47-
"""
48-
Simple client class to schedule jobs to remote workers, currently
46+
"""Simple client class to schedule jobs to remote workers, currently
4947
supports a synchronous way of calling tasks awaiting for results and an
5048
asynchronous one which collect results in a dedicated dictionary
49+
50+
Attributes
51+
----------
52+
:type host: str
53+
:param host: The IP address to connect with
54+
55+
:type port: int
56+
:param port: The port associated with the host param
57+
58+
:type sign_data: bool or False
59+
:param sign_data: Boolean flag, sign bytes passing around through sockets
60+
if True
61+
5162
"""
5263

5364
def __init__(self, host, port, sign_data=False):
@@ -100,11 +111,6 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
100111
pass
101112
self.close()
102113

103-
def __repr__(self):
104-
status = 'connected' if self.is_connected else 'disconnected'
105-
return f"<BaseTasqClient worker=(tcp://{self.host}:{self.port}, " \
106-
f"tcp://{self.host}:{self.port}) status={status}>"
107-
108114
@abstractmethod
109115
def _make_client(self):
110116
pass
@@ -188,7 +194,11 @@ def schedule_blocking(self, func, *args, **kwargs):
188194
:type func: func
189195
:param func: A function to be executed on a worker by enqueing it
190196
197+
:rtype: tasq.remote.client.TasqFuture
191198
:return: The result of the func execution
199+
200+
:raise: tasq.remote.client.TasqClientNotConnected, in case of not
201+
connected client
192202
"""
193203
if not self.is_connected:
194204
raise TasqClientNotConnected('Client not connected to no worker')
@@ -203,8 +213,31 @@ class ZMQTasqClient(BaseTasqClient):
203213
"""Simple client class to schedule jobs to remote workers, currently
204214
supports a synchronous way of calling tasks awaiting for results and an
205215
asynchronous one which collect results in a dedicated dictionary
216+
217+
Attributes
218+
----------
219+
:type host: str
220+
:param host: The IP address to connect with
221+
222+
:type port: int
223+
:param port: The port associated with the host param for PUSH channel
224+
communication
225+
226+
:type plport: int or None
227+
:param plport: The pull port to retrieve bytes from
228+
229+
:type sign_data: bool or False
230+
:param sign_data: Boolean flag, sign bytes passing around through sockets
231+
if True
232+
233+
:type unix_socket: bool or False
234+
:param unix_socket: Boolean flag to decide wether to use a UNIX socket or a
235+
TCP one
236+
206237
"""
207238

239+
__extraparams__ = {'plport'}
240+
208241
def __init__(self, host, port, plport=None, sign_data=False, unix_socket=False):
209242
self._plport = plport or port + 1
210243
# Unix socket flag, if set to true, unix sockets for interprocess
@@ -245,12 +278,52 @@ def _gather_results(self):
245278
except KeyError:
246279
self._log.error("Can't update result: key not found")
247280

281+
@classmethod
282+
def from_url(cls, url, sign_data=False):
283+
u = urlparse(url)
284+
scheme = u.scheme or 'zmq'
285+
assert scheme in ('zmq', 'unix', 'tcp'), f"Unsupported {scheme}"
286+
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?')}
287+
extras = {k: v for k, v in extras.items() if k in cls.__extraparams__}
288+
conn_args = {
289+
'host': u.hostname or '127.0.0.1',
290+
'port': u.port or 9000,
291+
'plport': int(extras.get('plport', 0)),
292+
'sign_data': sign_data,
293+
'unix_socket': scheme == 'unix'
294+
}
295+
return cls(**conn_args)
296+
248297

249298
class RedisTasqClient(BaseTasqClient):
250299

251-
""""""
300+
"""Simple Redis client class to schedule jobs to remote workers using
301+
redis as the backend broker.
302+
303+
Attributes
304+
----------
305+
:type host: str or 'localhost'
306+
:param host: The IP address of the Redis instance to connect with
307+
308+
:type port: int or 6379
309+
:param port: The port associated with the host param
310+
311+
:type db: int or 0
312+
:param db: The database to use on redis for the queues
313+
314+
:type name: str or redis-queue
315+
:param name: The name of the redis queue
316+
317+
:type sign_data: bool or False
318+
:param sign_data: Boolean flag, sign bytes passing around through sockets
319+
if True
320+
321+
"""
322+
323+
__extraparams__ = {'db', 'name'}
252324

253-
def __init__(self, host, port, db, name, sign_data=False):
325+
def __init__(self, host='localhost', port=6379,
326+
db=0, name='redis-queue', sign_data=False):
254327
self._db = db
255328
self._name = name
256329
super().__init__(host, port, sign_data)
@@ -304,12 +377,49 @@ def disconnect(self):
304377
if self.is_connected:
305378
self._is_connected = False
306379

380+
@classmethod
381+
def from_url(cls, url, sign_data=False):
382+
u = urlparse(url)
383+
scheme = u.scheme or 'redis'
384+
assert scheme == 'redis', f"Unsupported {scheme}"
385+
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?')}
386+
extras = {k: v for k, v in extras.items() if k in cls.__extraparams__}
387+
conn_args = {
388+
'host': u.hostname or 'localhost',
389+
'port': u.port or 6379,
390+
'db': int(extras.get('db', 0)),
391+
'name': extras.get('name', 'redis-queue'),
392+
'sign_data': sign_data
393+
}
394+
return cls(**conn_args)
395+
307396

308397
class RabbitMQTasqClient(BaseTasqClient):
309398

310-
""""""
399+
"""Simple RabbitMQ client class to schedule jobs to remote workers using
400+
RabbitMQ as the backend broker.
401+
402+
Attributes
403+
----------
404+
:type host: str or 'localhost'
405+
:param host: The IP address of the RabbitMQ instance to connect with
406+
407+
:type port: int or 5672
408+
:param port: The port associated with the host param
409+
410+
:type name: str or amqp-queue
411+
:param name: The name of the RabbitMQ queue
412+
413+
:type sign_data: bool or False
414+
:param sign_data: Boolean flag, sign bytes passing around through sockets
415+
if True
311416
312-
def __init__(self, host, port, name, sign_data=False):
417+
"""
418+
419+
__extraparams__ = {'name'}
420+
421+
def __init__(self, host='localhost', port=5672,
422+
name='amqp-queue', sign_data=False):
313423
self._name = name
314424
super().__init__(host, port, sign_data)
315425

@@ -357,6 +467,21 @@ def disconnect(self):
357467
if self.is_connected:
358468
self._is_connected = False
359469

470+
@classmethod
471+
def from_url(cls, url, sign_data=False):
472+
u = urlparse(url)
473+
scheme = u.scheme or 'amqp'
474+
assert scheme == 'amqp', f"Unsupported {scheme}"
475+
extras = {t.split('=')[0]: t.split('=')[1] for t in u.query.split('?')}
476+
extras = {k: v for k, v in extras.items() if k in cls.__extraparams__}
477+
conn_args = {
478+
'host': u.hostname or 'localhost',
479+
'port': u.port or 5672,
480+
'name': extras.get('name', 'amqp-queue'),
481+
'sign_data': sign_data
482+
}
483+
return cls(**conn_args)
484+
360485

361486
class TasqClientPool:
362487

0 commit comments

Comments
 (0)