Skip to content

Commit

Permalink
distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
vmarunov committed Jan 19, 2018
1 parent 37fb33d commit ba95944
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
6 changes: 6 additions & 0 deletions start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh

# Запуск шедулера


# Запуск воркеров
13 changes: 9 additions & 4 deletions starter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
from dask.multiprocessing import get
from dask.distributed import Client

from tasks import load_data, get_param, task_a, grouper, task_group, task_b


ISIN_COUNT = 10
ISIN_COUNT = 1000


def get_isins():
Expand Down Expand Up @@ -64,9 +64,14 @@ def start():
get_list.append('task_a_res_{}'.format(i))
get_list.append('task_b_res_{}'.format(i))

# Создаем client
client = Client('127.0.0.1:8786')

# Получение результатов
result = get(graph, get_list)
print(result)
result = client.get(graph, get_list)
print(len(result))
with open('/Users/vladimirmarunov/git/dask-test/res.txt', 'w') as f:
f.write(str(result))


if __name__ == '__main__':
Expand Down
10 changes: 10 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
# -*- coding: utf-8 -*-
import random
import time


def _work(weight=0.01):
u"""Имитация работы"""
time.sleep(random.randint(1, 20) * weight)


def load_data(isin):
u"""
Загрузка нвчальных данных из базы
Возвращает param_a, param_b
"""
_work()
return {
'param_a': '{}_param_a'.format(isin),
'param_b': '{}_param_b'.format(isin)}
Expand All @@ -22,6 +29,7 @@ def task_a(isin, param_a, param_b):
Можно выполнять независимо от любых других задач
Зависит от загружаемых их базы param_a и param_b
"""
_work()
result = 'Task_A_result_{}_{}_{}'.format(isin, param_a, param_b)
return result

Expand Down Expand Up @@ -51,6 +59,7 @@ def task_group(group_data):
Взвращает некий групповой результат.
Задача требует завершения task_a для всех облигаций
"""
_work(0.1)
result = {}
for isin, data in group_data.iteritems():
result[isin] = data
Expand All @@ -62,6 +71,7 @@ def task_b(isin, param_b, group_data):
u"""
Одиночная задача. Зависит от групповой задачи и от param_b
"""
_work()
result = 'Task_B_result_{}_{}_{}'.format(
isin, param_b, group_data[isin]['group'])
return result

0 comments on commit ba95944

Please sign in to comment.