forked from EVNotify/EVNotiPi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtelemetry_proxy.py
142 lines (116 loc) · 4.5 KB
/
telemetry_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
""" msgpack telemetry """
from time import monotonic
from lzma import compress, decompress
from threading import Thread
import logging
from msgpack import packb, unpackb
from requests import Session
from requests.exceptions import RequestException, ConnectTimeout
log = logging.getLogger("EVNotiPi/TelemetryProxy")
def msg_encode(msg):
""" encode and compress message """
return compress(packb(msg))
def msg_decode(msg):
""" decompress and decode message """
return unpackb(decompress(msg), use_list=False)
class TelemetryProxy:
""" Submit all available data to anm influxdb """
def __init__(self, config, car, gps, evnotify):
log.info("Initializing MsgPack")
self._backends = config['backends']
self._evn_akey = evnotify._config['akey']
self._car = car
self._cartype = car.get_evn_model()
self._gps = gps
self._interval = config.get('interval', 5)
self._field_states = {}
self._fields = None
self._next_transmit = 0
self._points = []
self._base_url = config['url']
self._auth = config['authorization']
self._transmit_url = f'{self._base_url}/transmit/{self._car.id}'
self._session = Session()
self._websocket = None
self._running = False
self._settings_submitted = False
def start(self):
""" Start the submission thread """
log.debug('Starting thread')
assert not self._running
self._running = True
self._car.register_data(self.data_callback)
log.debug('Thread running')
def stop(self):
""" Stop the submission thread """
assert self._running
self._car.unregister_data(self.data_callback)
self._running = False
def _submit_settings(self):
log.info('Submitting service settings')
payload = msg_encode(self._backends)
response = self._session.post(f'{self._base_url}/setsvcsettings/{self._car.id}',
headers={'Authorization': self._auth},
data=payload)
assert response.status_code == 200
data = msg_decode(response.content)
self._fields = data['fields']
log.debug('got fields (%s)', self._fields)
def data_callback(self, data):
""" Callback to receive data from "car" """
now = monotonic()
states = self._field_states
points = self._points
log.debug("Enqeue...")
point = {
'carid': self._car.id,
'cartype': self._cartype,
'akey': self._evn_akey,
}
for key, value in data.items():
if key == 'timestamp':
point[key] = value
continue
if value is None or \
(self._fields is not None and key not in self._fields):
continue
if key not in states:
states[key] = {'next_interval': 0, 'last_value': None}
if value != states[key]['last_value'] or \
now >= states[key]['next_interval']:
states[key] = {
'next_interval': now + 60,
'last_value': value
}
point[key] = value
#log.debug(point)
points.append(point)
if now >= self._next_transmit:
self._next_transmit = now + self._interval
Thread(target=self._submit).start()
def _submit(self):
states = self._field_states
points = self._points
payload = msg_encode(points)
log.debug(points)
try:
if not self._settings_submitted:
self._submit_settings()
self._settings_submitted = True
ret = self._session.post(self._transmit_url,
headers={'Authorization': self._auth},
data=payload)
if ret.status_code == 402: # Server requests settings
states.clear() # also make sure we send all values on next try
points.clear()
self._submit_settings()
else:
points.clear()
except RequestException as exception:
log.warning(str(exception))
self._session.close()
except ConnectTimeout as exception:
log.warning(str(exception))
def check_thread(self):
""" Return the status of the thread """
return self._running