diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..19feb7a --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +dask==0.16.1 \ No newline at end of file diff --git a/starter.py b/starter.py new file mode 100644 index 0000000..6d14e64 --- /dev/null +++ b/starter.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +from dask.multiprocessing import get + +from tasks import load_data, get_param, task_a, grouper, task_group, task_b + + +ISIN_COUNT = 10 + + +def get_isins(): + u""" + Зачитать из базы список ISIN + """ + return ['RU{}'.format(str(i).rjust(10, '0')) for i in range(ISIN_COUNT)] + + +def start(): + + isins = get_isins() + + # граф задач (состоит из списка ISIN) + graph = {'isin_{}'.format(i): isin for i, isin in enumerate(isins)} + + # добавляем в граф задачи загрузки данных из БД + # разбиения загруженных данных на параметры + for i, isin in enumerate(isins): + graph['data_{}'.format(i)] = (load_data, 'isin_{}'.format(i)) + graph['param_a_{}'.format(i)] = ( + get_param, 'data_{}'.format(i), 'param_a') + graph['param_b_{}'.format(i)] = ( + get_param, 'data_{}'.format(i), 'param_b') + + # добавляем в граф задачу task_a + for i, isin in enumerate(isins): + graph['task_a_res_{}'.format(i)] = ( + task_a, + 'isin_{}'.format(i), + 'param_a_{}'.format(i), + 'param_b_{}'.format(i)) + + # Добавляем в граф параметр для групповой задачи: + param_list = [] + for i in range(len(isins)): + param_list.append('isin_{}'.format(i)) + param_list.append('param_a_{}'.format(i)) + param_list.append('param_b_{}'.format(i)) + graph['group_data'] = (grouper,) + tuple(param_list) + + # Добавляем в граф групповую задачу + graph['group_res'] = (task_group, 'group_data') + + # добавляем в граф задачу task_b + for i, isin in enumerate(isins): + graph['task_b_res_{}'.format(i)] = ( + task_b, + 'isin_{}'.format(i), + 'param_b_{}'.format(i), + 'group_res') + + # Формирование списка получаемых результатов + get_list = [] + for i, isin in enumerate(isins): + get_list.append('isin_{}'.format(i)) + get_list.append('task_a_res_{}'.format(i)) + get_list.append('task_b_res_{}'.format(i)) + + # Получение результатов + result = get(graph, get_list) + print(result) + + +if __name__ == '__main__': + start() diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..1b265f9 --- /dev/null +++ b/tasks.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +import random + + +def load_data(isin): + u""" + Загрузка нвчальных данных из базы + Возвращает param_a, param_b + """ + return { + 'param_a': '{}_param_a'.format(isin), + 'param_b': '{}_param_b'.format(isin)} + + +def get_param(data, param): + return data[param] + + +def task_a(isin, param_a, param_b): + u""" + Одиночная задача. Рассчитывает независимый параметр для каждой облигации + Можно выполнять независимо от любых других задач + Зависит от загружаемых их базы param_a и param_b + """ + result = 'Task_A_result_{}_{}_{}'.format(isin, param_a, param_b) + return result + + +def grouper(*args): + u""" + Группирует входные параметры для групповой функции + Возвращает в виде одного параметра + """ + result = {} + iterator = iter(args) + try: + while True: + isin = next(iterator) + param_a = next(iterator) + param_b = next(iterator) + result[isin] = {'param_a': param_a, 'param_b': param_b} + except StopIteration: + pass + return result + + +def task_group(group_data): + u""" + Групповая задача. + На вход приходит словарь, {ISIN: данные} + Взвращает некий групповой результат. + Задача требует завершения task_a для всех облигаций + """ + result = {} + for isin, data in group_data.iteritems(): + result[isin] = data + result[isin]['group'] = random.randint(0, 100) + return result + + +def task_b(isin, param_b, group_data): + u""" + Одиночная задача. Зависит от групповой задачи и от param_b + """ + result = 'Task_B_result_{}_{}_{}'.format( + isin, param_b, group_data[isin]['group']) + return result diff --git a/test.py b/test.py new file mode 100644 index 0000000..c534cc6 --- /dev/null +++ b/test.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +from dask.multiprocessing import get + + +def test(par): + return 'a', 1, par + + +def test1(par1, par2): + return ['2', par1, par2] + + +def _start(): + graph = { + 'a': 'a1', + 'b': 'a2', + 'par': ['a', 'b'], + 'data': (test, 'par'), + 'aa': 'data'[0], + 'bb': 'data'[1], + 'cc': 'data'[2], + 'res': (test1, 'aa', 'bb'), + } + res = get(graph, ['data', 'aa', 'bb', 'cc', 'res']) + print(res) + + +if __name__ == '__main__': + _start()