-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.py
executable file
·90 lines (63 loc) · 2.21 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
class Event:
def __init__(self, code, name=''):
self.code = code
self.name = name or code
def __str__(self):
return '{} : {}'.format(self.code, self.name)
class State:
def __init__(self, name, description=''):
self.name = name
self.description = description or self._desc(name)
self.actions = []
self.transitions = {}
def _desc(self, name):
return '{} state'.format(name.replace('_', ' ').capitalize())
def __str__(self):
if self.description:
return '{} ({})'.format(self.name, self.description)
else:
return self.name
def add_transition(self, event, state):
self.transitions[event.code] = state
def add_action(self, cmd):
self.actions.append(cmd)
class StateMachine:
def __init__(self, initial_state):
self.start = self.current_state = initial_state
def all_states(self):
result = set([])
def _collect_states(state, result):
result.add(state)
for s in state.transitions.values():
if s not in result:
_collect_states(s, result)
return result
return _collect_states(self.start, result)
def all_event_codes(self):
result = set([])
for state in self.all_states():
for code in state.transitions.keys():
result.add(code)
return result
def handle(self, code):
state = self.current_state
if code in state.transitions:
new_state = state.transitions[code]
else:
new_state = self.start
print('{} --[{}]--> {}'.format(state, code, new_state))
self.current_state = new_state
return new_state.actions
class NetworkBus:
def __init__(self):
self.clients = set([])
def subscribe(self, client):
self.clients.add(client)
def send(self, code):
print('Network bradcasts {} ({})'.format(code, len(self.clients)))
for c in self.clients:
actions = c.handle(code)
for action in actions:
self.send(action.code)