-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathTestSleepMonitor.py
executable file
·143 lines (115 loc) · 4.25 KB
/
TestSleepMonitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
from twisted.internet import reactor, stdio
import twisted.internet.error
from twisted.web import server, resource
from twisted.web.static import File
from twisted.protocols import basic
from datetime import datetime
import os
import logging
import json
from ZeroConfUtils import startZeroConfServer
from Constants import OximeterStatus, MotionReason
def log(msg):
tnow = datetime.now()
logging.info('%s: %s' % (tnow.isoformat(), msg))
def setupLogging():
logFormatter = logging.Formatter("%(message)s")
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.DEBUG)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)
MOTION_REASON_MAP = {
0: MotionReason.NONE,
1: MotionReason.CAMERA,
2: MotionReason.BPM
}
OXIMETER_STATUS_MAP = {
0: OximeterStatus.CONNECTED,
1: OximeterStatus.PROBE_DISCONNECTED,
2: OximeterStatus.CABLE_DISCONNECTED
}
def mapToStr(m):
return ', '.join(['%d(%s)' % (k, v) for (k, v) in m.iteritems()])
class ProcessInput(basic.LineReceiver):
from os import linesep as delimiter
def __init__(self, statusResource):
self.statusResource = statusResource
self.propMap = {'a': 'alarm', 'b': 'BPM', 'o': 'SPO2', 'm': 'motion', 'mr': 'motionReason', 'st': 'oximeterStatus'}
def connectionMade(self):
self.transport.write('Keys: \n')
for (p, name) in self.propMap.iteritems():
self.transport.write('%s : %s\n' % (p, name))
self.transport.write('motionReason: %s\n' % mapToStr(MOTION_REASON_MAP))
self.transport.write('oximeterStatus: %s\n' % mapToStr(OXIMETER_STATUS_MAP))
self.transport.write('>>> ')
def lineReceived(self, line):
if line == 'print':
for (p, name) in self.statusResource.getStatus().iteritems():
print '%s: %s, ' % (p, name)
print
else:
try:
(propName, propValStr) = line.split('=')
propVal = int(propValStr)
realPropName = self.propMap[propName]
setattr(self.statusResource, realPropName, propVal)
except: # noqa: E722 (OK to use bare except)
self.transport.write('*** Error ***\n')
self.transport.write('>>> ')
class StatusResource(resource.Resource):
def __init__(self):
self.SPO2 = 100
self.BPM = 100
self.alarm = 0
self.motion = 0
self.motionReason = 0
self.oximeterStatus = 0
def getStatus(self):
return {
'SPO2': self.SPO2,
'BPM': self.BPM,
'alarm': bool(self.alarm),
'motion': int(self.motion),
'readTime': datetime.now().isoformat(),
'motionReason': MOTION_REASON_MAP[self.motionReason],
'oximeterStatus': OXIMETER_STATUS_MAP[self.oximeterStatus]
}
def render_GET(self, request):
request.setHeader("content-type", 'application/json')
return json.dumps(self.getStatus())
class PingResource(resource.Resource):
def render_GET(self, request):
request.setHeader("content-type", 'application/json')
request.setHeader("Access-Control-Allow-Origin", '*')
status = {'status': 'ready'}
return json.dumps(status)
def main():
log('Current pwd = %s' % os.getcwd())
statusResource = StatusResource()
root = File('web')
root.putChild('status', statusResource)
root.putChild('ping', PingResource())
root.putChild('stream.mjpeg', File('web/jabba_the_hutt.gif'))
stdio.StandardIO(ProcessInput(statusResource))
site = server.Site(root)
PORT = 80
BACKUP_PORT = 8080
portUsed = PORT
try:
reactor.listenTCP(PORT, site)
log('Started webserver at port %d' % PORT)
except twisted.internet.error.CannotListenError:
portUsed = BACKUP_PORT
reactor.listenTCP(BACKUP_PORT, site)
log('Started webserver at port %d' % BACKUP_PORT)
startZeroConfServer(portUsed)
reactor.run()
if __name__ == "__main__":
setupLogging()
log('Starting main method of sleep monitor')
try:
main()
except: # noqa: E722 (OK to use bare except)
logging.exception("main() threw exception")