diff --git a/starter.py b/starter.py index 698aef2..78307f9 100644 --- a/starter.py +++ b/starter.py @@ -1,7 +1,13 @@ # -*- coding: utf-8 -*- +from itertools import chain +import json +from time import time + from dask.distributed import Client -from tasks import load_data, get_param, task_a, grouper, task_group, task_b +from tasks import ( + load_data, get_param, task_a, grouper, task_group, task_group_alter, + task_b, task_c) ISIN_COUNT = 500 @@ -14,7 +20,7 @@ def get_isins(): def start(): - + t = time() isins = get_isins() # граф задач (состоит из списка ISIN) @@ -41,7 +47,7 @@ def start(): param_list = [] for i in range(len(isins)): param_list.append('isin_{}'.format(i)) - param_list.append('param_a_{}'.format(i)) + param_list.append('task_a_res_{}'.format(i)) param_list.append('param_b_{}'.format(i)) graph['group_data'] = (grouper,) + tuple(param_list) @@ -56,22 +62,63 @@ def start(): 'param_b_{}'.format(i), 'group_res') + # добавляем в граф задачу task_c + for i, isin in enumerate(isins): + graph['task_c_res_{}'.format(i)] = ( + task_c, + 'isin_{}'.format(i), + 'param_b_{}'.format(i)) + # Формирование списка получаемых результатов - get_list = [] + get_list = ['group_data'] for i, isin in enumerate(isins): - get_list.append('isin_{}'.format(i)) get_list.append('task_a_res_{}'.format(i)) get_list.append('task_b_res_{}'.format(i)) + get_list.append('task_c_res_{}'.format(i)) # Создаем client client = Client('127.0.0.1:8786') # Получение результатов result = client.get(graph, get_list) + + total = time() - t + print(total) + print(len(result)) + with open('/Users/vladimirmarunov/git/dask-test/res1.txt', 'w') as f: + f.write('{}\n'.format(total)) + json.dump(result, f, indent=4) + + +def start_futures(): + t = time() + isins = get_isins() + + client = Client('127.0.0.1:8786') + + data = client.map(load_data, isins) + params_a = client.map(get_param, data, ['param_a'] * len(isins)) + params_b = client.map(get_param, data, ['param_a'] * len(isins)) + + result_a = client.map(task_a, isins, params_a, params_b) + + group_args = list(chain(*zip(isins, result_a, params_b))) + result_group = client.submit(task_group_alter, *group_args) + + result_b = client.map(task_b, isins, params_b, [result_group] * len(isins)) + + result_c = client.map(task_c, isins, params_b) + + result = client.gather([result_group] + result_a + result_b + result_c) + + total = time() - t + print(total) print(len(result)) with open('/Users/vladimirmarunov/git/dask-test/res.txt', 'w') as f: - f.write(str(result)) + f.write('{}\n'.format(total)) + json.dump(result, f, indent=4) if __name__ == '__main__': start() + start_futures() diff --git a/tasks.py b/tasks.py index c769207..150e541 100644 --- a/tasks.py +++ b/tasks.py @@ -2,12 +2,36 @@ import random import time +from dask.distributed import Future + + +def futures_expand(func): + + def wrapper(*args, **kwargs): + wrapper_args = [] + wrapper_kwargs = {} + for arg in args: + if isinstance(arg, Future): + wrapper_args.append(arg.result()) + else: + wrapper_args.append(arg) + for key, arg in kwargs.items(): + if isinstance(arg, Future): + wrapper_kwargs[key] = arg.result() + else: + wrapper_kwargs[key] = arg + + return func(*wrapper_args, **wrapper_kwargs) + + return wrapper + def _work(weight=0.01): u"""Имитация работы""" time.sleep(random.randint(1, 20) * weight) +@futures_expand def load_data(isin): u""" Загрузка нвчальных данных из базы @@ -19,10 +43,12 @@ def load_data(isin): 'param_b': '{}_param_b'.format(isin)} +@futures_expand def get_param(data, param): return data[param] +@futures_expand def task_a(isin, param_a, param_b): u""" Одиночная задача. Рассчитывает независимый параметр для каждой облигации @@ -34,6 +60,7 @@ def task_a(isin, param_a, param_b): return result +@futures_expand def grouper(*args): u""" Группирует входные параметры для групповой функции @@ -44,14 +71,15 @@ def grouper(*args): try: while True: isin = next(iterator) - param_a = next(iterator) + task_a_res = next(iterator) param_b = next(iterator) - result[isin] = {'param_a': param_a, 'param_b': param_b} + result[isin] = {'task_a_res': task_a_res, 'param_b': param_b} except StopIteration: pass return result +@futures_expand def task_group(group_data): u""" Групповая задача. @@ -67,6 +95,19 @@ def task_group(group_data): return result +@futures_expand +def task_group_alter(*args): + u""" + Групповая задача. + На вход поступают данные в порядке: + isin1, param_a_isin1, param_b_isin_1, isin2, .... + Взвращает некий групповой результат. + Задача требует завершения task_a для всех облигаций + """ + return task_group(grouper(*args)) + + +@futures_expand def task_b(isin, param_b, group_data): u""" Одиночная задача. Зависит от групповой задачи и от param_b @@ -75,3 +116,13 @@ def task_b(isin, param_b, group_data): result = 'Task_B_result_{}_{}_{}'.format( isin, param_b, group_data[isin]['group']) return result + + +@futures_expand +def task_c(isin, param_b): + u""" + Одиночная задача. Зависит от param_b + """ + _work() + result = 'Task_C_result_{}_{}'.format(isin, param_b) + return result diff --git a/test.py b/test.py index 5094ec8..0455a4d 100644 --- a/test.py +++ b/test.py @@ -1,10 +1,31 @@ # -*- coding: utf-8 -*- +import random +import time -from dask.multiprocessing import get +from dask.distributed import Client from sizer import getsize +ISIN_COUNT = 50 + + +def _work(weight=0.01): + u"""Имитация работы""" + time.sleep(random.randint(1, 20) * weight) + + +def get_isins(): + u""" + Зачитать из базы список ISIN + """ + return ['RU{}'.format(str(i).rjust(10, '0')) for i in range(ISIN_COUNT)] + + +def test_0(*args): + return args[0] + + def test_1(*args): return args[0] @@ -13,20 +34,173 @@ def test_2(*args): return args[0] -def _start(): - graph = { - 'a': 'a1', - 'b': 'a2', - 'par': ['a', 'b'], - 'data': (test, 'par'), - 'aa': 'data'[0], - 'bb': 'data'[1], - 'cc': 'data'[2], - 'res': (test1, 'aa', 'bb'), +def test_3(*args): + return args[0] + + +def test_4(*args): + return args[0] + + +def test_5(*args): + return args[0] + + +def test_6(*args): + return args[0] + + +def test_7(*args): + return args[0] + + +def test_8(*args): + return args[0] + + +def test_9(*args): + return args[0] + + +def test_10(*args): + return args[0] + + +def test_11(*args): + return args[0] + + +def test_12(*args): + return args[0] + + +def test_13(*args): + return args[0] + + +def test_14(*args): + return args[0] + + +def test_15(*args): + return args[0] + + +def test_16(*args): + return args[0] + + +def test_17(*args): + return args[0] + + +def test_18(*args): + return args[0] + + +def test_19(*args): + return args[0] + + +def test_20(*args): + return args[0] + + +def test_21(*args): + return args[0] + + +def test_22(*args): + return args[0] + + +def test_23(*args): + return args[0] + + +def test_24(*args): + return args[0] + + +def test_25(*args): + return args[0] + + +def test_26(*args): + return args[0] + + +def test_27(*args): + return args[0] + + +def test_28(*args): + return args[0] + + +def test_29(*args): + return args[0] + + +def test_30(*args): + return args[0] + + +def test_31(*args): + return args[0] + + +def test_32(*args): + return args[0] + + +def test_33(*args): + return args[0] + + +def _start_get_size(): + graph = {} + isins = get_isins() + + for c, i in enumerate(isins): + params = tuple() + for j in range(10): + param = 'param_{}_{}'.format(c, j) + graph[param] = '{} == {} == {}'.format( + i, c, j) + params += (param,) + for j in range(34): + graph['res_{}_{}'.format(c, j)] = ( + (globals()['test_{}'.format(j)],) + params) + print(len(graph)) + print(getsize(graph)) + c = Client('127.0.0.1:8786') + result = zip(graph.keys(), c.get(graph, graph.keys())) + print('OK') + print(result) + + +##################################################### +# from dask import delayed + + +def load_data(isin): + _work(0.03) + return { + 'isin': isin, + 'data': 'data_for_{}'.format(isin) } - res = get(graph, ['data', 'aa', 'bb', 'cc', 'res']) - print(res) + + +def _start_combine(): + import dask.bag as db + c = Client('127.0.0.1:8786') + isins = get_isins() + data = c.map(load_data, isins) + + print(data[0].result()) if __name__ == '__main__': - _start() + # _start_get_size() + _start_combine() \ No newline at end of file