forked from vtrainor/ola-ui
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcontrolflow.py
148 lines (117 loc) · 3.36 KB
/
controlflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from collections import deque
class RDMAction(object):
"""The base class all actions inherit from."""
def Execute(self, on_complete):
on_complete()
class GetRDMAction(RDMAction):
"""An action which performs an RDM GET."""
def __init__(self, data_dict, get_fn):
"""Create a new GET action.
Args:
data_dict: the dict to update
get_fn: the function to use for RDM GETs
"""
self._data = data_dict
self._get_fn = get_fn
def Params(self):
"""This method provides the parameters for the GET."""
return []
def UpdateDict(succeeded, params):
"""This method is called when the GET completes."""
pass
def ShouldExecute(self):
"""This method controls if the action should be skipped."""
return True
def Execute(self, universe, uid, on_complete):
"""Perform the RDM GET."""
if not self.ShouldExecute():
on_complete()
return
self._get_fn(
universe, uid, 0, self.PID,
lambda b, s: self._Complete(b, s, on_complete))
def _Complete(self, succeeded, params, on_complete):
"""Called when the GET completes."""
self.UpdateDict(succeeded, params)
on_complete()
class GetIdentify(GetRDMAction):
"""An example GET IDENTIFY_DEVICE action.
This action always executes, since we want the latest information.
"""
PID = "IDENTIFY_DEVICE"
def UpdateDict(self, succeeded, params):
if succeeded:
self._data[self.PID] = params['identify_state']
class GetDeviceInfo(GetRDMAction):
"""An action that GETs DEVICE_INFO."""
PID = "DEVICE_INFO"
def ShouldExecute(self):
"""SKip this action if we already have DEVICE_INFO."""
print self._data
return self.PID not in self._data
def UpdateDict(self, succeeded, params):
if succeeded:
self._data[self.PID] = params
class RDMControlFlow(object):
"""Create a new Control Flow.
Args:
universe: the universe to use
uid: the uid to use
actions: the list of actions to perform
on_complete: the method to call when the control flow completes.
"""
def __init__(self, universe, uid, actions, on_complete):
self._universe = universe
self._uid = uid
self._actions = deque(actions)
self._on_complete = on_complete
def Run(self):
"""Run the control flow."""
self._PerformNextAction()
def _PerformNextAction(self):
if self._actions:
# run next action
action = self._actions.popleft()
action.Execute(self._universe, self._uid, self._PerformNextAction)
else:
self._on_complete()
# --------------------
# example code
def on_complete():
print 'control flow completed'
def get_fn(universe, uid, sub_device, pid, callback):
# This just simulates a RDM GET for now
print 'GET %s %s %d %s' % (universe, uid, sub_device, pid)
if pid == 'IDENTIFY_DEVICE':
callback(True, {'identify_state': True})
elif pid == 'DEVICE_INFO':
callback(True, {})
else:
callback(False, {})
def test():
uid = None
data = {}
flow = RDMControlFlow(
1,
uid,
[
GetIdentify(data, get_fn),
GetDeviceInfo(data, get_fn)
],
on_complete)
flow.Run()
print ''
print 'Running again...'
flow = RDMControlFlow(
1,
uid,
[
GetIdentify(data, get_fn),
GetDeviceInfo(data, get_fn)
],
on_complete)
flow.Run()
def main():
test()
if __name__ == "__main__":
main()