Skip to content

Commit

Permalink
with futures
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarunov committed Jan 23, 2018
1 parent db1c021 commit 8427bf0
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 22 deletions.
59 changes: 53 additions & 6 deletions starter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# -*- coding: utf-8 -*-
from itertools import chain
import json
from time import time

from dask.distributed import Client

from tasks import load_data, get_param, task_a, grouper, task_group, task_b
from tasks import (
load_data, get_param, task_a, grouper, task_group, task_group_alter,
task_b, task_c)

ISIN_COUNT = 500

Expand All @@ -14,7 +20,7 @@ def get_isins():


def start():

t = time()
isins = get_isins()

# граф задач (состоит из списка ISIN)
Expand All @@ -41,7 +47,7 @@ def start():
param_list = []
for i in range(len(isins)):
param_list.append('isin_{}'.format(i))
param_list.append('param_a_{}'.format(i))
param_list.append('task_a_res_{}'.format(i))
param_list.append('param_b_{}'.format(i))
graph['group_data'] = (grouper,) + tuple(param_list)

Expand All @@ -56,22 +62,63 @@ def start():
'param_b_{}'.format(i),
'group_res')

# добавляем в граф задачу task_c
for i, isin in enumerate(isins):
graph['task_c_res_{}'.format(i)] = (
task_c,
'isin_{}'.format(i),
'param_b_{}'.format(i))

# Формирование списка получаемых результатов
get_list = []
get_list = ['group_data']
for i, isin in enumerate(isins):
get_list.append('isin_{}'.format(i))
get_list.append('task_a_res_{}'.format(i))
get_list.append('task_b_res_{}'.format(i))
get_list.append('task_c_res_{}'.format(i))

# Создаем client
client = Client('127.0.0.1:8786')

# Получение результатов
result = client.get(graph, get_list)

total = time() - t
print(total)
print(len(result))
with open('/Users/vladimirmarunov/git/dask-test/res1.txt', 'w') as f:
f.write('{}\n'.format(total))
json.dump(result, f, indent=4)


def start_futures():
t = time()
isins = get_isins()

client = Client('127.0.0.1:8786')

data = client.map(load_data, isins)
params_a = client.map(get_param, data, ['param_a'] * len(isins))
params_b = client.map(get_param, data, ['param_a'] * len(isins))

result_a = client.map(task_a, isins, params_a, params_b)

group_args = list(chain(*zip(isins, result_a, params_b)))
result_group = client.submit(task_group_alter, *group_args)

result_b = client.map(task_b, isins, params_b, [result_group] * len(isins))

result_c = client.map(task_c, isins, params_b)

result = client.gather([result_group] + result_a + result_b + result_c)

total = time() - t
print(total)
print(len(result))
with open('/Users/vladimirmarunov/git/dask-test/res.txt', 'w') as f:
f.write(str(result))
f.write('{}\n'.format(total))
json.dump(result, f, indent=4)


if __name__ == '__main__':
start()
start_futures()
55 changes: 53 additions & 2 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,36 @@
import random
import time

from dask.distributed import Future


def futures_expand(func):

def wrapper(*args, **kwargs):
wrapper_args = []
wrapper_kwargs = {}
for arg in args:
if isinstance(arg, Future):
wrapper_args.append(arg.result())
else:
wrapper_args.append(arg)
for key, arg in kwargs.items():
if isinstance(arg, Future):
wrapper_kwargs[key] = arg.result()
else:
wrapper_kwargs[key] = arg

return func(*wrapper_args, **wrapper_kwargs)

return wrapper


def _work(weight=0.01):
u"""Имитация работы"""
time.sleep(random.randint(1, 20) * weight)


@futures_expand
def load_data(isin):
u"""
Загрузка нвчальных данных из базы
Expand All @@ -19,10 +43,12 @@ def load_data(isin):
'param_b': '{}_param_b'.format(isin)}


@futures_expand
def get_param(data, param):
return data[param]


@futures_expand
def task_a(isin, param_a, param_b):
u"""
Одиночная задача. Рассчитывает независимый параметр для каждой облигации
Expand All @@ -34,6 +60,7 @@ def task_a(isin, param_a, param_b):
return result


@futures_expand
def grouper(*args):
u"""
Группирует входные параметры для групповой функции
Expand All @@ -44,14 +71,15 @@ def grouper(*args):
try:
while True:
isin = next(iterator)
param_a = next(iterator)
task_a_res = next(iterator)
param_b = next(iterator)
result[isin] = {'param_a': param_a, 'param_b': param_b}
result[isin] = {'task_a_res': task_a_res, 'param_b': param_b}
except StopIteration:
pass
return result


@futures_expand
def task_group(group_data):
u"""
Групповая задача.
Expand All @@ -67,6 +95,19 @@ def task_group(group_data):
return result


@futures_expand
def task_group_alter(*args):
u"""
Групповая задача.
На вход поступают данные в порядке:
isin1, param_a_isin1, param_b_isin_1, isin2, ....
Взвращает некий групповой результат.
Задача требует завершения task_a для всех облигаций
"""
return task_group(grouper(*args))


@futures_expand
def task_b(isin, param_b, group_data):
u"""
Одиночная задача. Зависит от групповой задачи и от param_b
Expand All @@ -75,3 +116,13 @@ def task_b(isin, param_b, group_data):
result = 'Task_B_result_{}_{}_{}'.format(
isin, param_b, group_data[isin]['group'])
return result


@futures_expand
def task_c(isin, param_b):
u"""
Одиночная задача. Зависит от param_b
"""
_work()
result = 'Task_C_result_{}_{}'.format(isin, param_b)
return result
Loading

0 comments on commit 8427bf0

Please sign in to comment.