-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathexample_multiprocessing_merge_root.py
47 lines (36 loc) · 1.52 KB
/
example_multiprocessing_merge_root.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from stream import Stream
from example_operators import join_synch
class Count(object):
def __init__(self, n):
self.n = n
def process_target_merge(dict_queues):
#-------------------------------------------
# 1. SPECIFY INPUT QUEUES FOR THE PROCESSES
#-------------------------------------------
# Specify that the input stream for THIS process
# is dict_queues['q_root']
Stream.scheduler.input_queue = dict_queues['q_root']
#-------------------------------------------
# 2. SPECIFY STREAMS IN THIS PROCESS
#-------------------------------------------
x, y = Stream('x'), Stream('y')
#-------------------------------------------
# 3. SPECIFY EXTERNAL STREAMS
#-------------------------------------------
# This process does not modify external streams.
#-------------------------------------------
# 4. SPECIFY CALLBACK FUNCTIONS IN THIS PROCESS
#-------------------------------------------
def callback(alist, count):
print ('merge ', sum(alist))
count.n -= 1
if count.n <= 0:
Stream.scheduler.halted = True
#-------------------------------------------
# 5. SPECIFY AGENTS IN THIS PROCESS
#-------------------------------------------
join_synch(in_streams=[x, y], func=callback, count=Count(4))
#-------------------------------------------
# 6. START SCHEDULER AND THUS START THIS PROCESS
#-------------------------------------------
Stream.scheduler.start()