Skip to content

Commit

Permalink
calc
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarunov committed Jan 18, 2018
1 parent 55d1ec1 commit 37fb33d
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dask==0.16.1
73 changes: 73 additions & 0 deletions starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
from dask.multiprocessing import get

from tasks import load_data, get_param, task_a, grouper, task_group, task_b


ISIN_COUNT = 10


def get_isins():
u"""
Зачитать из базы список ISIN
"""
return ['RU{}'.format(str(i).rjust(10, '0')) for i in range(ISIN_COUNT)]


def start():

isins = get_isins()

# граф задач (состоит из списка ISIN)
graph = {'isin_{}'.format(i): isin for i, isin in enumerate(isins)}

# добавляем в граф задачи загрузки данных из БД
# разбиения загруженных данных на параметры
for i, isin in enumerate(isins):
graph['data_{}'.format(i)] = (load_data, 'isin_{}'.format(i))
graph['param_a_{}'.format(i)] = (
get_param, 'data_{}'.format(i), 'param_a')
graph['param_b_{}'.format(i)] = (
get_param, 'data_{}'.format(i), 'param_b')

# добавляем в граф задачу task_a
for i, isin in enumerate(isins):
graph['task_a_res_{}'.format(i)] = (
task_a,
'isin_{}'.format(i),
'param_a_{}'.format(i),
'param_b_{}'.format(i))

# Добавляем в граф параметр для групповой задачи:
param_list = []
for i in range(len(isins)):
param_list.append('isin_{}'.format(i))
param_list.append('param_a_{}'.format(i))
param_list.append('param_b_{}'.format(i))
graph['group_data'] = (grouper,) + tuple(param_list)

# Добавляем в граф групповую задачу
graph['group_res'] = (task_group, 'group_data')

# добавляем в граф задачу task_b
for i, isin in enumerate(isins):
graph['task_b_res_{}'.format(i)] = (
task_b,
'isin_{}'.format(i),
'param_b_{}'.format(i),
'group_res')

# Формирование списка получаемых результатов
get_list = []
for i, isin in enumerate(isins):
get_list.append('isin_{}'.format(i))
get_list.append('task_a_res_{}'.format(i))
get_list.append('task_b_res_{}'.format(i))

# Получение результатов
result = get(graph, get_list)
print(result)


if __name__ == '__main__':
start()
67 changes: 67 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
import random


def load_data(isin):
u"""
Загрузка нвчальных данных из базы
Возвращает param_a, param_b
"""
return {
'param_a': '{}_param_a'.format(isin),
'param_b': '{}_param_b'.format(isin)}


def get_param(data, param):
return data[param]


def task_a(isin, param_a, param_b):
u"""
Одиночная задача. Рассчитывает независимый параметр для каждой облигации
Можно выполнять независимо от любых других задач
Зависит от загружаемых их базы param_a и param_b
"""
result = 'Task_A_result_{}_{}_{}'.format(isin, param_a, param_b)
return result


def grouper(*args):
u"""
Группирует входные параметры для групповой функции
Возвращает в виде одного параметра
"""
result = {}
iterator = iter(args)
try:
while True:
isin = next(iterator)
param_a = next(iterator)
param_b = next(iterator)
result[isin] = {'param_a': param_a, 'param_b': param_b}
except StopIteration:
pass
return result


def task_group(group_data):
u"""
Групповая задача.
На вход приходит словарь, {ISIN: данные}
Взвращает некий групповой результат.
Задача требует завершения task_a для всех облигаций
"""
result = {}
for isin, data in group_data.iteritems():
result[isin] = data
result[isin]['group'] = random.randint(0, 100)
return result


def task_b(isin, param_b, group_data):
u"""
Одиночная задача. Зависит от групповой задачи и от param_b
"""
result = 'Task_B_result_{}_{}_{}'.format(
isin, param_b, group_data[isin]['group'])
return result
29 changes: 29 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
from dask.multiprocessing import get


def test(par):
return 'a', 1, par


def test1(par1, par2):
return ['2', par1, par2]


def _start():
graph = {
'a': 'a1',
'b': 'a2',
'par': ['a', 'b'],
'data': (test, 'par'),
'aa': 'data'[0],
'bb': 'data'[1],
'cc': 'data'[2],
'res': (test1, 'aa', 'bb'),
}
res = get(graph, ['data', 'aa', 'bb', 'cc', 'res'])
print(res)


if __name__ == '__main__':
_start()

0 comments on commit 37fb33d

Please sign in to comment.