Skip to content

Commit dcb24ac

Browse files
committed
Refactored CLI commands, added factory methods to supervisors
1 parent 3e3d5b2 commit dcb24ac

File tree

7 files changed

+222
-173
lines changed

7 files changed

+222
-173
lines changed

CHANGES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
### 1.1.8
2+
(Apr 26, 2019)
3+
4+
- Refactored CLI commands and logging format
5+
- Added factory methods to supervisors
6+
17
### 1.1.7
28
(Apr 26, 2019)
39

tasq/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
RedisActorSupervisor,
1010
Supervisors)
1111

12-
__version__ = '1.1.7'
12+
__version__ = '1.1.8'

tasq/cli/main.py

Lines changed: 98 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -7,155 +7,117 @@
77
print_function, unicode_literals)
88

99
import argparse
10-
from enum import Enum
1110
from ..settings import get_config
11+
from tasq.remote.supervisor import supervisor_factory
12+
from tasq.logger import logger
1213

1314

14-
class WorkerType(Enum):
15-
ActorWorker = 'actor'
16-
ProcessWorker = 'process'
15+
class UnknownSupervisorException(Exception):
16+
pass
1717

1818

19-
def get_parser():
20-
parser = argparse.ArgumentParser(description='Tasq CLI commands')
21-
parser.add_argument('subcommand')
22-
parser.add_argument('-f', action='store')
23-
parser.add_argument('--secure', '-s', action='store_true')
24-
parser.add_argument('--unix', '-u', action='store_true')
25-
parser.add_argument('--workers', nargs='*')
26-
parser.add_argument('--worker-type', action='store')
27-
parser.add_argument('--num-workers', action='store')
28-
parser.add_argument('--random', action='store')
29-
parser.add_argument('--verbose', '-v', action='store_true')
30-
parser.add_argument('--addr', '-a', action='store')
31-
parser.add_argument('--port', '-p', action='store')
32-
parser.add_argument('--db', action='store')
33-
parser.add_argument('--name', action='store')
34-
return parser
35-
36-
37-
def start_worker(host, port, sign_data, unix_socket, worker_type):
38-
from tasq.remote.supervisor import ZMQActorSupervisor, ZMQQueueSupervisor
39-
if worker_type == WorkerType.ActorWorker:
40-
supervisor = ZMQActorSupervisor(host, port, port + 1,
41-
sign_data=sign_data, unix_socket=unix_socket)
42-
else:
43-
supervisor = ZMQQueueSupervisor(host, port, port + 1,
44-
sign_data=sign_data, unix_socket=unix_socket)
45-
supervisor.serve_forever()
19+
supervisors = {
20+
'actor': {
21+
'zmq': 'ZMQ_ACTOR_SUPERVISOR',
22+
'redis': 'REDIS_ACTOR_SUPERVISOR',
23+
'rabbitmq': 'AMQP_ACTOR_SUPERVISOR'
24+
},
25+
'process': {
26+
'zmq': 'ZMQ_QUEUE_SUPERVISOR',
27+
'redis': 'REDIS_QUEUE_SUPERVISOR',
28+
'rabbitmq': 'AMQP_QUEUE_SUPERVISOR'
29+
}
30+
}
4631

4732

48-
def start_redis_worker(host, port, db, name, sign_data, worker_type):
49-
from tasq.remote.supervisor import RedisActorSupervisor, RedisQueueSupervisor
50-
if worker_type == WorkerType.ActorWorker:
51-
supervisor = RedisActorSupervisor(host, port, db,
52-
name, sign_data=sign_data)
53-
else:
54-
supervisor = RedisQueueSupervisor(host, port, db, name,
55-
sign_data=sign_data)
56-
supervisor.serve_forever()
33+
def parse_arguments():
34+
parser = argparse.ArgumentParser(description='Tasq CLI')
35+
parser.add_argument('subcommand')
36+
parser.add_argument('--conf', '-c',
37+
help='The filepath to the configuration file, in json',
38+
nargs='?')
39+
parser.add_argument(
40+
'--address', '-a',
41+
help='The ZMQ host address to connect to, default to localhost',
42+
nargs='?'
43+
)
44+
parser.add_argument('--port', '-p',
45+
help='The ZMQ port to connect to, default to 9000 for '
46+
'ZMQ/TCP/UNIX connections, to 6379 while using a '
47+
'redis broker or 5672 in case of RabbitMQ as backend.',
48+
nargs='?',
49+
type=int)
50+
parser.add_argument('--plport',
51+
help='The ZMQ port to connect to, default to 9001 for '
52+
'ZMQ/TCP/UNIX connections',
53+
nargs='?',
54+
type=int)
55+
parser.add_argument('--worker-type',
56+
help='The type of worker to deploy for a supervisor',
57+
nargs='?')
58+
parser.add_argument('--db',
59+
help='The database to use with redis as backend',
60+
nargs='?',
61+
type=int)
62+
parser.add_argument(
63+
'--name',
64+
help='The name of the queue, only for redis or rabbitmq backends',
65+
nargs='?'
66+
)
67+
parser.add_argument(
68+
'--shared-key', '-sk',
69+
help='The shared key to use to sign byte streams between clients and '
70+
'supervisors',
71+
nargs='?'
72+
)
73+
parser.add_argument('--unix', '-u',
74+
help='Unix socket flag, in case supervisors and '
75+
'clients reside on the same node',
76+
action='store_true')
77+
parser.add_argument('--num-workers',
78+
help='Number of workers to instantiate on the node',
79+
nargs='?',
80+
type=int)
81+
parser.add_argument('--log-level',
82+
help='Set logging level',
83+
nargs='?')
84+
args = parser.parse_args()
85+
return args
5786

5887

59-
def start_rabbitmq_worker(host, port, name, sign_data, worker_type):
60-
from tasq.remote.supervisor import RabbitMQActorSupervisor, RabbitMQQueueSupervisor
61-
if worker_type == WorkerType.ActorWorker:
62-
supervisor = RabbitMQActorSupervisor(host, port,
63-
name, sign_data=sign_data)
88+
def start_worker(supervisor_type, worker_type, host, **kwargs):
89+
try:
90+
s_type = supervisors[worker_type][supervisor_type]
91+
except KeyError:
92+
raise UnknownSupervisorException()
6493
else:
65-
supervisor = RabbitMQQueueSupervisor(host, port, name,
66-
sign_data=sign_data)
67-
supervisor.serve_forever()
68-
69-
70-
def start_workers(workers, sign_data, unix_socket):
71-
from tasq.remote.supervisor import Supervisors
72-
supervisors = Supervisors(workers, sign_data=sign_data, unix_socket=unix_socket)
73-
supervisors.start_procs()
74-
75-
76-
def start_random_workers(host, num_workers, sign_data, unix_socket):
77-
import random
78-
from tasq.remote.supervisor import Supervisors
79-
workers_set = set()
80-
init_port = 9000
81-
while True:
82-
port = random.randint(init_port, 65000)
83-
if (host, port, port + 1) in workers_set:
84-
continue
85-
workers_set.add((host, port, port + 1))
86-
if len(workers_set) == num_workers:
87-
break
88-
init_port = port + 2
89-
supervisors = Supervisors(list(workers_set), sign_data=sign_data, unix_socket=unix_socket)
90-
supervisors.start_procs()
91-
92-
93-
def _translate_peers(workers):
94-
return [tuple(x.split(':')) for x in workers]
94+
supervisor = supervisor_factory.create(s_type, host=host, **kwargs)
95+
supervisor.serve_forever()
9596

9697

9798
def main():
98-
conf = get_config()
99-
parser = get_parser()
100-
args = parser.parse_args()
101-
if args.f:
102-
conf = get_config(path=args.f)
103-
host, port = conf['host'], conf['port']
104-
verbose = conf['verbose']
99+
args = parse_arguments()
100+
conf = get_config(args.conf)
101+
logger.loglevel = args.log_level or conf['log_level']
105102
sign_data = conf['sign_data']
106103
unix_socket = conf['unix_socket']
107-
num_workers = 4
108-
db = 0
109-
worker_type = WorkerType.ActorWorker
110-
if args.verbose:
111-
verbose = True
112-
if args.secure:
113-
sign_data = True
114-
if args.unix:
115-
unix_socket = True
116-
if args.num_workers:
117-
num_workers = args.num_workers
118-
if args.worker_type:
119-
try:
120-
worker_type = WorkerType(args.worker_type)
121-
except ValueError:
122-
print(f"{args.worker_type} is not a valid type: use either process "
123-
"or actor. Fallbacking to actor")
124-
if args.workers:
125-
try:
126-
pairs = conf['workers']
127-
except KeyError:
128-
pairs = args.workers
129-
if not pairs:
130-
print("No [host:port] list specified")
131-
else:
132-
workers = _translate_peers(pairs)
133-
else:
134-
workers = _translate_peers(args.workers or conf['workers'])
135-
start_workers(workers, sign_data, unix_socket)
104+
num_workers = args.num_workers or conf['num_workers']
105+
worker_type = args.worker_type or 'actor'
106+
addr = args.address or conf['addr']
136107
if args.subcommand == 'worker':
137-
if args.addr:
138-
host = args.addr
139-
if args.port:
140-
port = int(args.port)
141-
start_worker(host, port, sign_data, unix_socket, worker_type)
142-
elif args.subcommand == 'redis':
143-
if args.addr:
144-
host = args.addr
145-
port = 6379
146-
if args.port:
147-
port = int(args.port)
148-
if args.db:
149-
db = int(args.db)
150-
name = args.name or 'redis-queue'
151-
start_redis_worker(host, port, db, name, sign_data, worker_type)
152-
elif args.subcommand == 'rabbitmq':
153-
if args.addr:
154-
host = args.addr
155-
port = 5672
156-
if args.port:
157-
port = int(args.port)
158-
name = args.name or 'rabbitmq-queue'
159-
start_rabbitmq_worker(host, port, name, sign_data, worker_type)
160-
elif args.random:
161-
start_random_workers(host, int(args.random), sign_data, unix_socket)
108+
push_port = args.plport or conf['zmq']['push_port']
109+
pull_port = args.port or conf['zmq']['pull_port']
110+
start_worker('zmq', worker_type, addr, push_port=push_port,
111+
pull_port=pull_port, sign_data=sign_data,
112+
unix_socket=unix_socket)
113+
elif args.subcommand == 'redis-worker':
114+
port = args.port or conf['redis']['port']
115+
db = args.db or conf['redis']['db']
116+
name = args.name or conf['redis']['name']
117+
start_worker('redis', worker_type, addr, port,
118+
db=db, name=name, sign_data=sign_data)
119+
elif args.subcommand == 'rabbitmq-worker':
120+
port = args.port or conf['rabbitmq']['port']
121+
name = args.name or conf['rabbitmq']['name']
122+
start_worker('rabbitmq', worker_type, addr,
123+
port, name=name, sign_data=sign_data)

tasq/logger.py

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
Provides utility method to spawn a logger, file handler are capped by default.
55
"""
66

7-
from __future__ import absolute_import, division, print_function, unicode_literals
7+
from __future__ import (absolute_import, division,
8+
print_function, unicode_literals)
89

910
import os
1011
import errno
@@ -14,7 +15,7 @@
1415

1516
DEFAULT_LOGSIZE = int(os.getenv('LOGSIZE', '5242880'))
1617
DEFAULT_LOGPATH = os.getenv('LOGPATH', '/tmp/log/tasq')
17-
DEFAULT_FORMAT = os.getenv('LOGFMT', '%(name)s - %(message)s')
18+
DEFAULT_FORMAT = os.getenv('LOGFMT', '%(asctime)s - %(name)s: %(message)s')
1819
DEFAULT_LOGLEVEL = os.getenv('LOGLVL', 'INFO')
1920

2021

@@ -61,32 +62,39 @@ def __init__(self, filename, max_size=DEFAULT_LOGSIZE, backup_count=2,
6162
backup_count, encoding, delay)
6263

6364

64-
def get_logger(name, loglevel=DEFAULT_LOGLEVEL,
65-
fmt=DEFAULT_FORMAT, logpath=DEFAULT_LOGPATH):
65+
class GlobalLogger:
6666

67-
# create module logger
68-
logger = logging.getLogger(name)
69-
logger.setLevel(LOGLVLMAP[loglevel])
67+
def __init__(self, loglevel=DEFAULT_LOGLEVEL,
68+
fmt=DEFAULT_FORMAT, logpath=DEFAULT_LOGPATH):
69+
self.loglevel = loglevel
70+
self.fmt = fmt
71+
self.logpath = logpath
7072

71-
if not logger.handlers:
72-
# create file handler which logs even debug messages
73-
fh = MakeCappedFileHandler(os.path.join(logpath, f'{name}.log'))
74-
fh.setLevel(logging.DEBUG)
73+
def get_logger(self, name):
74+
# create module logger
75+
logger = logging.getLogger(name)
76+
logger.setLevel(LOGLVLMAP[self.loglevel])
7577

76-
# create console handler with a higher log level
77-
ch = logging.StreamHandler()
78-
ch.setLevel(LOGLVLMAP[loglevel])
78+
if not logger.handlers:
79+
# create file handler which logs even debug messages
80+
fh = MakeCappedFileHandler(os.path.join(self.logpath, f'{name}.log'))
81+
fh.setLevel(logging.DEBUG)
7982

80-
# create formatter and add it to the handlers
81-
formatter = logging.Formatter(fmt)
82-
fh.setFormatter(logging.Formatter('%(asctime)s - ' + fmt))
83-
ch.setFormatter(formatter)
83+
# create console handler with a higher log level
84+
ch = logging.StreamHandler()
85+
ch.setLevel(LOGLVLMAP[self.loglevel])
8486

85-
# add the handlers to the logger
86-
logger.addHandler(fh)
87-
logger.addHandler(ch)
87+
# create formatter and add it to the handlers
88+
formatter = logging.Formatter(self.fmt, '%Y-%m-%d %H:%M:%S')
89+
fh.setFormatter(formatter)
90+
ch.setFormatter(formatter)
8891

89-
return logger
92+
# add the handlers to the logger
93+
logger.addHandler(fh)
94+
logger.addHandler(ch)
9095

96+
return logger
9197

92-
log = get_logger(__name__)
98+
99+
logger = GlobalLogger()
100+
get_logger = logger.get_logger

tasq/queue.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,14 @@
44
The main client module, provides interfaces to instantiate queues
55
"""
66

7-
import os
87
from queue import Queue
98
from threading import Thread
109
from urllib.parse import urlparse
11-
from collections import namedtuple
1210
from tasq.remote.backends.redis import RedisStore
1311
from tasq.remote.client import (ZMQTasqClient, RedisTasqClient,
1412
RabbitMQTasqClient, TasqFuture)
1513

1614

17-
def init_client(client, host, port, *args, **kwargs):
18-
return client(host, port, *args, **kwargs)
19-
20-
2115
backends = {
2216
'redis': RedisTasqClient,
2317
'amqp': RabbitMQTasqClient,
@@ -65,8 +59,11 @@ def __init__(self, backend=u'zmq://localhost:9000',
6559
assert url.scheme in {'redis', 'zmq', 'amqp', 'unix', 'tcp'}, \
6660
f"Unsupported {url.scheme}"
6761
self._backend = backends[scheme].from_url(backend, sign_data)
68-
else:
62+
elif isinstance(backend,
63+
(ZMQTasqClient, RabbitMQTasqClient, RedisTasqClient)):
6964
self._backend = backend
65+
else:
66+
print("Unsupported backend", backend)
7067
# Handle only redis as a backend store for now
7168
if store:
7269
urlstore = urlparse(store)
@@ -95,7 +92,7 @@ def put(self, func, *args, **kwargs):
9592
self._results.put(tasq_result)
9693
return tasq_result
9794

98-
def pub_blocking(self, func, *args, **kwargs):
95+
def put_blocking(self, func, *args, **kwargs):
9996
tasq_result = self._backend.schedule_blocking(func, *args, **kwargs)
10097
if self._store:
10198
self._results.put(tasq_result)

0 commit comments

Comments
 (0)