2
2
3
3
import queue
4
4
import socket
5
+ import select
5
6
import time
6
7
import threading
7
8
from datetime import datetime , timedelta
25
26
0xaf , 0xfe , 0xf7 , 0x00 , 0x00 , 0x1e ]
26
27
KEEP_ALIVE_COMMAND_PREAMBLE = [0xD0 , 0x00 , 0x00 , 0x00 , 0x02 ]
27
28
KEEP_ALIVE_RESPONSE_PREAMBLE = [0xd8 , 0x0 , 0x0 , 0x0 , 0x07 ]
29
+ COMMAND_RESPONSE_PREAMBLE = [0x88 , 0x00 , 0x00 , 0x00 , 0x03 , 0x00 ]
28
30
KEEP_ALIVE_TIME = 5
29
31
RECONNECT_TIME = 5
30
32
SOCKET_TIMEOUT = 5
@@ -84,6 +86,7 @@ def __init__(self, ip, port=BRIDGE_PORT, version=BRIDGE_VERSION,
84
86
self ._socket .settimeout (SOCKET_TIMEOUT )
85
87
self ._socket .connect ((ip , port ))
86
88
self ._command_queue = queue .Queue ()
89
+ self ._ack_queue = queue .Queue ()
87
90
self ._lock = threading .Lock ()
88
91
self .active = 0
89
92
self ._selected_number = None
@@ -166,10 +169,11 @@ def send(self, command, reps=REPS, wait=MIN_WAIT):
166
169
self ._command_queue .put ((command , reps , wait ))
167
170
# Wait before accepting another command.
168
171
# This keeps individual groups relatively synchronized.
169
- sleep = reps * wait * self .active
170
- if command .select and self ._selected_number != command .group_number :
171
- sleep += SELECT_WAIT
172
- time .sleep (sleep )
172
+ if self .version < 6 :
173
+ sleep = reps * wait * self .active
174
+ if command .select and self ._selected_number != command .group_number :
175
+ sleep += SELECT_WAIT
176
+ time .sleep (sleep )
173
177
174
178
def _consume (self ):
175
179
""" Consume commands from the queue.
@@ -181,49 +185,54 @@ def _consume(self):
181
185
be used by one thread at a time. Note that this can and
182
186
will delay commands if multiple groups are attempting
183
187
to communicate at the same time on the same bridge.
184
-
185
- TODO: Only wait when another command comes in.
186
188
"""
187
189
while not self .is_closed :
190
+ # Get command from queue.
191
+ msg = self ._command_queue .get ()
192
+
193
+ # Closed
194
+ if msg is None :
195
+ return
196
+
188
197
# Use the lock so we are sure is_ready is not changed during execution
189
198
# and the socket is not in use
190
199
with self ._lock :
191
- # Check if bridge is ready and there are
192
- if self .is_ready and not self . _command_queue . empty () :
193
- # Get command from queue.
194
- ( command , reps , wait ) = self . _command_queue . get ()
200
+ # Check if bridge is ready
201
+ if self .is_ready :
202
+ ( command , reps , wait ) = msg
203
+
195
204
# Select group if a different group is currently selected.
196
205
if command .select and self ._selected_number != command .group_number :
197
- if not self ._send_raw (command .select_command .get_bytes (self )):
206
+ if self ._send_raw (command .select_command .get_bytes (self )):
207
+ self ._selected_number = command .group_number
208
+ time .sleep (SELECT_WAIT )
209
+ else :
198
210
# Stop sending on socket error
199
211
self .is_ready = False
200
- continue
201
212
202
- time .sleep (SELECT_WAIT )
203
213
# Repeat command as necessary.
204
- for _ in range (reps ):
205
- if not self ._send_raw (command .get_bytes (self )):
214
+ command_bytes = command .get_bytes (self )
215
+ todo = reps
216
+ while todo > 0 and self .is_ready :
217
+ if self ._send_raw (command_bytes ):
218
+ try :
219
+ while self .sn != self ._ack_queue .get (timeout = wait ):
220
+ pass
221
+
222
+ # ACK received, stop repeating
223
+ todo = 0
224
+ except queue .Empty :
225
+ todo = todo - 1
226
+ else :
206
227
# Stop sending on socket error
207
228
self .is_ready = False
208
- continue
209
- time .sleep (wait )
210
-
211
- self ._selected_number = command .group_number
212
-
213
- # Wait a little time if queue is empty
214
- if self ._command_queue .empty ():
215
- time .sleep (MIN_WAIT )
216
229
217
230
# Wait if bridge is not ready, we're only reading is_ready, no lock needed
218
- if not self .is_ready :
219
- if self .is_closed :
220
- return
221
-
222
- # Give the reconnect some time
223
- time .sleep (RECONNECT_TIME )
224
-
231
+ if not self .is_ready and not self .is_closed :
225
232
# For older bridges, always try again, there's no keep-alive thread
226
233
if self .version < 6 :
234
+ # Give the reconnect some time
235
+ time .sleep (RECONNECT_TIME )
227
236
self .is_ready = True
228
237
229
238
def _send_raw (self , command ):
@@ -233,13 +242,19 @@ def _send_raw(self, command):
233
242
"""
234
243
try :
235
244
self ._socket .send (bytearray (command ))
236
- self ._sn = (self ._sn + 1 ) % 256
237
245
return True
238
246
except (socket .error , socket .timeout ):
239
247
# We can get a socket.error or timeout exception if the bridge is disconnected,
240
248
# but we are still sending data. In that case, return False to indicate that data is not sent.
241
249
return False
242
250
251
+ def next_sn (self ):
252
+ """
253
+ Increases the sequential byte and returns it.
254
+ """
255
+ self ._sn = (self ._sn + 1 ) % 256
256
+ return self ._sn
257
+
243
258
def _init_connection (self ):
244
259
"""
245
260
Requests the session ids of the bridge.
@@ -276,42 +291,42 @@ def _reconnect(self):
276
291
277
292
def _keep_alive (self ):
278
293
"""
279
- Send keep alive messages continuously to bridge.
294
+ Send keep alive messages continuously to bridge and
295
+ handle command responses.
280
296
"""
297
+ send_next_keep_alive_at = 0
281
298
while not self .is_closed :
282
299
if not self .is_ready :
283
300
self ._reconnect ()
284
301
continue
285
302
286
- # Acquire the lock to make sure we don't change self.is_ready
287
- # while _consume() is sending commands
288
- with self ._lock :
303
+ if time .monotonic () > send_next_keep_alive_at :
289
304
command = KEEP_ALIVE_COMMAND_PREAMBLE + [self .wb1 , self .wb2 ]
290
305
self ._send_raw (command )
291
-
292
- start = datetime .now ()
293
- connection_alive = False
294
- while datetime .now () - start < timedelta (seconds = SOCKET_TIMEOUT ):
295
- response = bytearray (12 )
296
- try :
297
- self ._socket .recv_into (response )
298
- except socket .timeout :
299
- break
300
-
301
- if response [:5 ] == bytearray (KEEP_ALIVE_RESPONSE_PREAMBLE ):
302
- connection_alive = True
303
- break
304
-
305
- if not connection_alive :
306
+ need_response_by = time .monotonic () + KEEP_ALIVE_TIME
307
+
308
+ # Wait for responses
309
+ timeout = max (0 , need_response_by - time .monotonic ())
310
+ ready = select .select ([self ._socket ], [], [], timeout )
311
+ if ready [0 ]:
312
+ response = bytearray (12 )
313
+ self ._socket .recv_into (response )
314
+
315
+ if response .startswith (bytearray (KEEP_ALIVE_RESPONSE_PREAMBLE )):
316
+ send_next_keep_alive_at = need_response_by
317
+ elif response .startswith (bytearray (COMMAND_RESPONSE_PREAMBLE )):
318
+ sn = response [len (COMMAND_RESPONSE_PREAMBLE )]
319
+ self ._ack_queue .put (sn )
320
+ elif send_next_keep_alive_at < need_response_by :
321
+ # Acquire the lock to make sure we don't change self.is_ready
322
+ # while _consume() is sending commands
323
+ with self ._lock :
306
324
self .is_ready = False
307
325
308
- # Wait for KEEP_ALIVE_TIME seconds before sending next keep-alive message
309
- if self .is_ready :
310
- time .sleep (KEEP_ALIVE_TIME )
311
-
312
326
def close (self ):
313
327
"""
314
328
Closes the connection to the bridge.
315
329
"""
316
330
self .is_closed = True
317
331
self .is_ready = False
332
+ self ._command_queue .put (None )
0 commit comments