-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcafe_server.py
83 lines (61 loc) · 2.1 KB
/
cafe_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#!/usr/bin/python
# coding=utf-8
import serial
import SocketServer
import threading
import json
from prometheus_client import Gauge, Counter, start_http_server
s = Gauge('ini_door_state', 'Ini door open-1, closed-0')
c = Counter('ini_door_requests', 'Ini door requests')
class TCPRequestHandler(SocketServer.BaseRequestHandler):
def __init__(self, callback, *args, **keys):
self.callback = callback
SocketServer.BaseRequestHandler.__init__(self, *args, **keys)
def handle(self):
c.inc()
data = self.callback()
print "request: " + data
self.request.send(data)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
class TuerSensor(object):
"""Benutzt CTS und RTS Pin am Serielanschluss"""
def __init__(self,port):
self._s = serial.Serial( port=port)
self._s.setRTS()
def ist_offen(self):
d_open = not self._s.getCTS()
s.set( 1 if d_open else 0 )
return d_open
class CafeServer(object):
def __init__(self):
port_tuer = '/dev/ttyS0'
self._tuer = TuerSensor(port=port_tuer)
## TCP Server starten
# port CAFE, 51966
HOST, PORT = "0.0.0.0", 0xCAFE
self.server = ThreadedTCPServer((HOST, PORT),
lambda *args, **keys: TCPRequestHandler(
cafeserver.getJson, *args, **keys))
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
def run(self):
self.server_thread.start()
self.server_thread.join()
def getData(self):
return {'tuer_offen':self._tuer.ist_offen()}
def getJson(self):
return json.dumps(self.getData())
def shutdown(self):
self.server.shutdown()
self.server_thread.join()
#######################################################################
## Start
if __name__ == "__main__":
start_http_server(0xCAFF)
try:
cafeserver = CafeServer()
cafeserver.run()
except KeyboardInterrupt:
print "beenden..."
cafeserver.shutdown()