-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
130 lines (106 loc) · 3.56 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# -*- coding: utf-8 -*-
from functools import wraps
import random
import time
from dask.distributed import Future
def futures_expand(func):
@wraps(func)
def wrapper(*args, **kwargs):
wrapper_args = []
wrapper_kwargs = {}
for arg in args:
if isinstance(arg, Future):
wrapper_args.append(arg.result())
else:
wrapper_args.append(arg)
for key, arg in kwargs.items():
if isinstance(arg, Future):
wrapper_kwargs[key] = arg.result()
else:
wrapper_kwargs[key] = arg
return func(*wrapper_args, **wrapper_kwargs)
return wrapper
def _work(weight=0.01):
u"""Имитация работы"""
time.sleep(random.randint(1, 20) * weight)
@futures_expand
def load_data(isin):
u"""
Загрузка нвчальных данных из базы
Возвращает param_a, param_b
"""
_work()
return {
'param_a': '{}_param_a'.format(isin),
'param_b': '{}_param_b'.format(isin)}
@futures_expand
def get_param(data, param):
return data[param]
@futures_expand
def task_a(isin, param_a, param_b):
u"""
Одиночная задача. Рассчитывает независимый параметр для каждой облигации
Можно выполнять независимо от любых других задач
Зависит от загружаемых их базы param_a и param_b
"""
_work()
result = 'Task_A_result_{}_{}_{}'.format(isin, param_a, param_b)
return result
@futures_expand
def grouper(*args):
u"""
Группирует входные параметры для групповой функции
Возвращает в виде одного параметра
"""
result = {}
iterator = iter(args)
try:
while True:
isin = next(iterator)
task_a_res = next(iterator)
param_b = next(iterator)
result[isin] = {'task_a_res': task_a_res, 'param_b': param_b}
except StopIteration:
pass
return result
@futures_expand
def task_group(group_data):
u"""
Групповая задача.
На вход приходит словарь, {ISIN: данные}
Взвращает некий групповой результат.
Задача требует завершения task_a для всех облигаций
"""
_work(0.1)
result = {}
for isin, data in group_data.iteritems():
result[isin] = data
result[isin]['group'] = random.randint(0, 100)
return result
@futures_expand
def task_group_alter(*args):
u"""
Групповая задача.
На вход поступают данные в порядке:
isin1, param_a_isin1, param_b_isin_1, isin2, ....
Взвращает некий групповой результат.
Задача требует завершения task_a для всех облигаций
"""
return task_group(grouper(*args))
@futures_expand
def task_b(isin, param_b, group_data):
u"""
Одиночная задача. Зависит от групповой задачи и от param_b
"""
_work()
result = 'Task_B_result_{}_{}_{}'.format(
isin, param_b, group_data[isin]['group'])
return result
@futures_expand
def task_c(isin, param_b):
u"""
Одиночная задача. Зависит от param_b
"""
_work()
result = 'Task_C_result_{}_{}'.format(isin, param_b)
return result