Skip to content

Commit 628bb87

Browse files
committed
Updated README
1 parent 9370ea1 commit 628bb87

File tree

7 files changed

+73
-108
lines changed

7 files changed

+73
-108
lines changed

README.md

Lines changed: 54 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ production enviroments.
2222

2323
## Quickstart
2424

25-
Starting a worker on a node, with debug flag set to true on configuration file
25+
Starting a worker on a node using Redis as backend
2626

2727
```
28-
$ tq redis
29-
Listening for jobs on 127.0.0.1:9000
30-
Response actor started
28+
$ tq redis-worker --log-level DEBUG
29+
2019-04-26 23:15:28 - tasq.remote.supervisor-17903: Worker type: Actor
3130
```
3231

3332
In a python shell
@@ -42,77 +41,59 @@ Warning: disable autoreload in ipython_config.py to improve performance.
4241
4342
In [1]: from tasq.queue import TasqQueue
4443
45-
In [2]: tq = TasqQueue(backend='redis://localhost:6379/0?name=test')
44+
In [2]: tq = TasqQueue(backend='redis://localhost:6379')
4645
47-
In [3]: def dup(n):
48-
...: return n * 2
46+
In [3]: def fib(n):
47+
...: if n == 0:
48+
...: return 0
49+
...: a, b = 0, 1
50+
...: for _ in range(n - 1):
51+
...: a, b = b, a + b
52+
...: return b
4953
...:
5054
51-
In [4]: fut = tq.put(dup, 5, name='task-01')
52-
53-
In [5]: fut
54-
Out[5]: <TasqFuture at 0x7f2851826518 state=finished returned JobResult>
55-
56-
In [6]: fut.unwrap()
57-
Out[6]: 10
58-
```
59-
60-
**Lower-level TasqClient**
61-
62-
```
63-
Python 3.6.5 (default, Apr 12 2018, 22:45:43)
64-
[GCC 7.3.1 20180312] on linux
65-
Type "help", "copyright", "credits" or "license" for more information.
66-
>>> from tasq import TasqClient
67-
>>> tc = TasqClient('127.0.0.1', 9000)
68-
>>> tc.connect()
69-
>>>
70-
>>> def foo(num):
71-
>>> import time
72-
>>> import random
73-
>>> r = random.randint(0, 2)
74-
>>> time.sleep(r)
75-
>>> return f'Foo - {random.randint(0, num)}'
76-
>>>
77-
>>> fut = tc.schedule(foo, 5, name='Task-1')
78-
>>> fut
79-
>>> <Future at 0x7f7d6e048160 state=pending>
80-
>>> fut.result
81-
>>>
82-
>>> # After some time, to let worker complete the job
83-
>>> fut.result
84-
>>> 'Foo - 2'
85-
>>> tc.results
86-
>>> {'Task-1': <Future at 0x7f7d6e048160 state=finished returned str>}
87-
>>>
88-
>>> tc.schedule_blocking(foo, 5, name='Task-2')
89-
>>> 'Foo - 4'
90-
>>>
91-
>>> tc.results
92-
>>> {'Task-1': <Future at 0x7f7d6e048160 state=finished returned str>,
93-
>>> 'Task-2': <Future at 0x7f7d6e047268 state=finished returned str>}
94-
```
95-
96-
Scheduling a job after a delay
97-
98-
```
99-
>>> fut = tc.schedule(foo, 5, name='Delayed-Task', delay=5)
100-
>>> tc.results
101-
>>> {'Delayed-Task': <Future at 0x7f7d6e044208 state=pending>}
102-
>>> # Wait 5 seconds
103-
>>> tc.results
104-
>>> {'Delayed-Task': <Future at 0x7f7d6e044208 state=finished returned str>}
105-
>>> fut.result()
106-
>>> 'Foo - 2'
55+
In [4]: # Asynchronous execution
56+
In [5]: fut = tq.put(fib, 50, name='fib-async')
57+
58+
In [6]: fut
59+
Out[6]: <TasqFuture at 0x7f2851826518 state=finished returned JobResult>
60+
61+
In [7]: fut.unwrap()
62+
Out[7]: 12586269025
63+
64+
In [8]: res = tq.put_blocking(fib, 50, name='fib-sync')
65+
66+
In [9]: res.unwrap()
67+
Out[9]: 12586269025
10768
```
10869

109-
Scheduling a task to be executed continously in a defined interval
70+
Scheduling jobs after a delay
71+
```
72+
73+
In [10]: fut = tc.schedule(fib, 5, name='fib-delayed', delay=5)
11074
75+
In [11]: fut
76+
Out[11]: <TasqFuture at 0x7f2951856418 state=pending>
77+
78+
In [12]: # wait 5 seconds
79+
80+
In [13]: fut.unwrap()
81+
Out[13]: 5
82+
83+
In [14] tq.results
84+
Out[14] {'fib-async': <TasqFuture at 0x7f2851826518 state=finished returned JobResult>,
85+
Out[14] 'fib-sync': <TasqFuture at 0x7f7d6e047268 state=finished returned JobResult>
86+
Out[14] 'fib-delayed': <TasqFuture at 0x7f2951856418 state=finished returned JobResult>}
11187
```
112-
>>> tc.schedule(foo, 5, name='8_seconds_interval_task', eta='8s')
113-
>>> tc.schedule(foo, 5, name='2_hours_interval_task', eta='2h')
88+
89+
Scheduling a task to be executed continously in a defined interval
90+
11491
```
92+
In [15] tq.put(fib, 5, name='8_seconds_interval_fib', eta='8s')
11593
94+
In [16] tq.put(fib, 5, name='2_hours_interval_fib', eta='2h')
95+
96+
```
11697
Delayed and interval tasks are supported even in blocking scheduling manner.
11798

11899
Tasq also supports an optional static configuration file, in the `tasq.settings.py` module is
@@ -124,27 +105,18 @@ By setting the `-f` flag it is possible to also set a location of a configuratio
124105
filesystem
125106

126107
```
127-
$ tq --worker -f path/to/conf/conf.json
108+
$ tq worker -c path/to/conf/conf.json
128109
```
129110

130111
A worker can be started by specifying the type of sub worker we want:
131112

132113
```
133-
$ tq --worker --worker-type process
114+
$ tq rabbitmq-worker --worker-type process
134115
```
135116
Using `process` type subworker it is possible to use a distributed queue for parallel execution,
136117
usefull when the majority of the jobs are CPU bound instead of I/O bound (actors are preferable in
137118
that case).
138119

139-
Multiple workers can be started in the same node, this will start two worker process ready to
140-
receive jobs.
141-
142-
```
143-
$ tq --workers 127.0.0.1:9000:9001, 127.0.0.1:9090:9091
144-
Listening for jobs on 127.0.0.1:9000
145-
Listening for jobs on 127.0.0.1:9090
146-
```
147-
148120
If jobs are scheduled for execution on a disconnected client, or remote workers are not up at the
149121
time of the scheduling, all jobs will be enqeued for later execution. This means that there's no
150122
need to actually start workers before job scheduling, at the first worker up all jobs will be sent
@@ -201,15 +173,13 @@ See the [CHANGES](CHANGES.md) file.
201173

202174
## TODO:
203175

204-
- [ ] Possibility of a broker to persist jobs (classic task queue celery like)
176+
- [x] Possibility of a broker to persist jobs (classic task queue celery like)
177+
- [x] Delayed tasks and scheduled cron tasks
178+
- [x] Configuration handling throughout the code
179+
- [x] Better explanation of the implementation and actors defined
180+
- [x] Improve CLI options
205181
- [ ] Check for pynacl for security on pickled data
206182
- [ ] Tests
207-
- [ ] A meaningful client pool
208-
- [x] Debugging multiprocessing start for more workers on the same node
209183
- [ ] Refactor of existing code and corner case handling (Still very basic implementation of even
210184
simple heuristics)
211-
- [x] Delayed tasks and scheduled cron tasks
212-
- [x] Configuration handling throughout the code
213-
- [x] Better explanation of the implementation and actors defined
214-
- [ ] Improve CLI options
215185
- [ ] Dockerfile

tasq/actors/routers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
remote calls.
66
"""
77

8-
from __future__ import absolute_import, division, print_function, unicode_literals
8+
from __future__ import (absolute_import, division,
9+
print_function, unicode_literals)
910

1011
from abc import ABCMeta, abstractmethod
1112

@@ -25,6 +26,7 @@ class Router(metaclass=ABCMeta):
2526
:type func_name: str or 'submit'
2627
:param func_name: The name of the method that must be called after the
2728
message has been routed.
29+
2830
"""
2931

3032
def __init__(self, workers, func_name=u'submit', *args, **kwargs):
@@ -51,7 +53,7 @@ def _call_func(self, idx, msg):
5153
@abstractmethod
5254
def _route_message(self, msg):
5355
"""To be defined on subclass"""
54-
pass
56+
raise NotImplementedError
5557

5658
def route(self, msg):
5759
"""Call `_route_message` private method, call function directly in case

tasq/cli/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,11 @@ def start_worker(supervisor_type, worker_type, host, **kwargs):
9090
s_type = supervisors[worker_type][supervisor_type]
9191
except KeyError:
9292
raise UnknownSupervisorException()
93-
else:
93+
try:
9494
supervisor = supervisor_factory.create(s_type, host=host, **kwargs)
9595
supervisor.serve_forever()
96+
except KeyboardInterrupt:
97+
supervisor.stop()
9698

9799

98100
def main():

tasq/queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ def __init__(self, backend=u'zmq://localhost:9000',
7777
# Connect with the backend
7878
self._backend.connect()
7979

80+
def __len__(self):
81+
return len(self.pending_jobs())
82+
8083
def _store_results(self):
8184
while True:
8285
tasqfuture = self._results.get()

tasq/remote/actors.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ def __init__(self, name=u'', ctx=None, response_actor=None):
140140
self._response_actor = response_actor or self._ctx.actor_of(
141141
ResponseActor, 'TimedActor - ResponseActor'
142142
)
143-
self._response_actor.start()
143+
if isinstance(self._response_actor, Actor):
144+
self._response_actor.start()
144145
super().__init__(name, ctx)
145146

146147
def submit(self, job, eta):
@@ -205,7 +206,10 @@ def run(self):
205206
self._log.debug('%s - Timed job %s result = %s',
206207
self.name, job.job_id, jobres)
207208
result.set_result(response)
208-
self._response_actor.send(result)
209+
if isinstance(self._response_actor, Actor):
210+
self._response_actor.send(result)
211+
else:
212+
self._response_actor.route(result)
209213
self.submit(job, str(job.delay) + 's')
210214

211215

tasq/remote/sockets.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,6 @@ def recv_result_data(self, timeout=None, unpickle=True, signkey=None):
219219
if zipped_result and unpickle:
220220
return decompress_and_unpickle(zipped_result)
221221
return zipped_result
222+
223+
def close(self):
224+
self._backend.close()

tasq/remote/supervisor.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from __future__ import absolute_import, division, print_function, unicode_literals
99

1010
import os
11-
import signal
1211
import asyncio
1312
from threading import Thread, Event
1413
from abc import ABCMeta, abstractmethod
@@ -114,8 +113,6 @@ def __init__(self, host, pull_port, push_port, num_workers=max_workers(),
114113
super().__init__(host, num_workers=num_workers, signkey=signkey)
115114
# Event loop
116115
self._loop = asyncio.get_event_loop()
117-
# Handling loop exit
118-
self._loop.add_signal_handler(signal.SIGINT, self.stop)
119116

120117
@property
121118
def push_port(self):
@@ -298,7 +295,6 @@ def __init__(self, host, port, db, name, num_workers=max_workers(),
298295
self._response_thread = Thread(target=self._respond, daemon=True)
299296
self._response_thread.start()
300297
self._run = True
301-
self._done = Event()
302298
self._log.info("Worker type: %s", worker_class.__name__)
303299

304300
@property
@@ -314,7 +310,6 @@ def _init_server(self):
314310
def stop(self):
315311
self._log.info("\nStopping..")
316312
self._run = False
317-
self._done.wait()
318313
# Use a poison pill to stop the loop
319314
self._jobqueue.shutdown()
320315
self._response_thread.join()
@@ -327,7 +322,6 @@ def serve_forever(self):
327322
continue
328323
self._log.info("Received job")
329324
self._jobqueue.add_job(job)
330-
self._done.set()
331325

332326
def _respond(self):
333327
"""Spin a loop and respond to client with whatever results arrive in
@@ -374,9 +368,6 @@ def __init__(self, host, port, db, name, num_workers=max_workers(),
374368
response_actor=self._responses
375369
)
376370
self._log.info("Worker type: Actor")
377-
self._done = Event()
378-
signal.signal(signal.SIGINT, self.stop)
379-
signal.signal(signal.SIGTERM, self.stop)
380371

381372
@property
382373
def name(self):
@@ -391,7 +382,6 @@ def _init_server(self):
391382
def stop(self):
392383
self._log.info("\nStopping..")
393384
self._run = False
394-
self._done.wait()
395385

396386
def serve_forever(self):
397387
"""Receive jobs from clients with polling"""
@@ -403,7 +393,6 @@ def serve_forever(self):
403393
res = self._workers.route(job)
404394
self._responses.route(res)
405395
self._log.info("Routed")
406-
self._done.set()
407396

408397
@classmethod
409398
def create(cls, host, port, db, name, num_workers=max_workers(),
@@ -431,7 +420,6 @@ def __init__(self, host, port, name, num_workers=max_workers(),
431420
self._response_thread = Thread(target=self._respond, daemon=True)
432421
self._response_thread.start()
433422
self._run = True
434-
self._done = Event()
435423
self._log.info("Worker type: %s", worker_class.__name__)
436424

437425
@property
@@ -447,7 +435,6 @@ def _init_server(self):
447435
def stop(self):
448436
self._log.info("\nStopping..")
449437
self._run = False
450-
self._done.wait()
451438
self._jobqueue.shutdown()
452439
self._response_thread.join()
453440

@@ -459,7 +446,6 @@ def serve_forever(self):
459446
continue
460447
self._log.info("Received job")
461448
self._jobqueue.add_job(job)
462-
self._done.set()
463449

464450
def _respond(self):
465451
"""Spin a loop and respond to client with whatever results arrive in
@@ -505,9 +491,6 @@ def __init__(self, host, port, name, num_workers=max_workers(),
505491
response_actor=self._responses
506492
)
507493
self._log.info("Worker type: Actor")
508-
self._done = Event()
509-
signal.signal(signal.SIGINT, self.stop)
510-
signal.signal(signal.SIGTERM, self.stop)
511494

512495
@property
513496
def name(self):
@@ -523,7 +506,6 @@ def stop(self):
523506
self._log.info("\nStopping..")
524507
self._run = False
525508
self._server.close()
526-
self._done.wait()
527509

528510
def serve_forever(self):
529511
"""Receive jobs from clients with polling"""
@@ -535,7 +517,6 @@ def serve_forever(self):
535517
res = self._workers.route(job)
536518
self._responses.route(res)
537519
self._log.info("Routed")
538-
self._done.set()
539520

540521
@classmethod
541522
def create(cls, host, port, name, num_workers=max_workers(),

0 commit comments

Comments
 (0)