-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathexample_multiprocessing_simple_process_x.py
More file actions
38 lines (31 loc) · 1.4 KB
/
example_multiprocessing_simple_process_x.py
File metadata and controls
38 lines (31 loc) · 1.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from stream import Stream, ExternalStream
from example_operators import single_item
def process_target_x(dict_queues):
#-------------------------------------------
# 1. SPECIFY INPUT QUEUES FOR THE PROCESSES
#-------------------------------------------
# Specify that the input stream for THIS process
# is dict_queues['x']
Stream.scheduler.input_queue = dict_queues['x']
#-------------------------------------------
# 2. SPECIFY STREAMS IN THIS PROCESS
#-------------------------------------------
x = Stream(name='x')
#-------------------------------------------
# 3. SPECIFY EXTERNAL STREAMS
#-------------------------------------------
y = ExternalStream(name='y', queue=dict_queues['y'])
#-------------------------------------------
# 4. SPECIFY CALLBACK FUNCTIONS IN THIS PROCESS
#-------------------------------------------
def callback_x(stream_item):
print('message received by process x: ', stream_item)
y.append(stream_item+1)
#-------------------------------------------
# 5. SPECIFY AGENTS IN THIS PROCESS
#-------------------------------------------
single_item(in_stream=x, func=callback_x)
#-------------------------------------------
# 6. START SCHEDULER AND THUS START THIS PROCESS
#-------------------------------------------
Stream.scheduler.start()