26
26
0xaf , 0xfe , 0xf7 , 0x00 , 0x00 , 0x1e ]
27
27
KEEP_ALIVE_COMMAND_PREAMBLE = [0xD0 , 0x00 , 0x00 , 0x00 , 0x02 ]
28
28
KEEP_ALIVE_RESPONSE_PREAMBLE = [0xd8 , 0x0 , 0x0 , 0x0 , 0x07 ]
29
- COMMAND_RESPONSE_PREAMBLE = [0x88 , 0x00 , 0x00 , 0x00 , 0x03 , 0x00 ]
30
29
KEEP_ALIVE_TIME = 5
31
30
RECONNECT_TIME = 5
32
31
SOCKET_TIMEOUT = 5
@@ -86,7 +85,6 @@ def __init__(self, ip, port=BRIDGE_PORT, version=BRIDGE_VERSION,
86
85
self ._socket .settimeout (SOCKET_TIMEOUT )
87
86
self ._socket .connect ((ip , port ))
88
87
self ._command_queue = queue .Queue ()
89
- self ._ack_queue = queue .Queue ()
90
88
self ._lock = threading .Lock ()
91
89
self .active = 0
92
90
self ._selected_number = None
@@ -169,11 +167,10 @@ def send(self, command, reps=REPS, wait=MIN_WAIT):
169
167
self ._command_queue .put ((command , reps , wait ))
170
168
# Wait before accepting another command.
171
169
# This keeps individual groups relatively synchronized.
172
- if self .version < 6 :
173
- sleep = reps * wait * self .active
174
- if command .select and self ._selected_number != command .group_number :
175
- sleep += SELECT_WAIT
176
- time .sleep (sleep )
170
+ sleep = reps * wait * self .active
171
+ if command .select and self ._selected_number != command .group_number :
172
+ sleep += SELECT_WAIT
173
+ time .sleep (sleep )
177
174
178
175
def _consume (self ):
179
176
""" Consume commands from the queue.
@@ -211,21 +208,13 @@ def _consume(self):
211
208
self .is_ready = False
212
209
213
210
# Repeat command as necessary.
214
- command_bytes = command .get_bytes (self )
215
- todo = reps
216
- while todo > 0 and self .is_ready :
217
- if self ._send_raw (command_bytes ):
218
- try :
219
- while self .sn != self ._ack_queue .get (timeout = wait ):
220
- pass
221
-
222
- # ACK received, stop repeating
223
- todo = 0
224
- except queue .Empty :
225
- todo = todo - 1
226
- else :
227
- # Stop sending on socket error
228
- self .is_ready = False
211
+ for _ in range (reps ):
212
+ if self .is_ready :
213
+ if self ._send_raw (command .get_bytes (self )):
214
+ time .sleep (wait )
215
+ else :
216
+ # Stop sending on socket error
217
+ self .is_ready = False
229
218
230
219
# Wait if bridge is not ready, we're only reading is_ready, no lock needed
231
220
if not self .is_ready and not self .is_closed :
@@ -242,19 +231,13 @@ def _send_raw(self, command):
242
231
"""
243
232
try :
244
233
self ._socket .send (bytearray (command ))
234
+ self ._sn = (self ._sn + 1 ) % 256
245
235
return True
246
236
except (socket .error , socket .timeout ):
247
237
# We can get a socket.error or timeout exception if the bridge is disconnected,
248
238
# but we are still sending data. In that case, return False to indicate that data is not sent.
249
239
return False
250
240
251
- def next_sn (self ):
252
- """
253
- Increases the sequential byte and returns it.
254
- """
255
- self ._sn = (self ._sn + 1 ) % 256
256
- return self ._sn
257
-
258
241
def _init_connection (self ):
259
242
"""
260
243
Requests the session ids of the bridge.
@@ -270,7 +253,7 @@ def _init_connection(self):
270
253
self ._wb1 = response [19 ]
271
254
self ._wb2 = response [20 ]
272
255
self .is_ready = True
273
- except socket .timeout :
256
+ except ( socket .error , socket . timeout ) :
274
257
# Connection timed out, bridge is not ready for us
275
258
self .is_ready = False
276
259
finally :
@@ -291,8 +274,7 @@ def _reconnect(self):
291
274
292
275
def _keep_alive (self ):
293
276
"""
294
- Send keep alive messages continuously to bridge and
295
- handle command responses.
277
+ Send keep alive messages continuously to bridge.
296
278
"""
297
279
send_next_keep_alive_at = 0
298
280
while not self .is_closed :
@@ -309,14 +291,15 @@ def _keep_alive(self):
309
291
timeout = max (0 , need_response_by - time .monotonic ())
310
292
ready = select .select ([self ._socket ], [], [], timeout )
311
293
if ready [0 ]:
312
- response = bytearray (12 )
313
- self ._socket .recv_into (response )
314
-
315
- if response .startswith (bytearray (KEEP_ALIVE_RESPONSE_PREAMBLE )):
316
- send_next_keep_alive_at = need_response_by
317
- elif response .startswith (bytearray (COMMAND_RESPONSE_PREAMBLE )):
318
- sn = response [len (COMMAND_RESPONSE_PREAMBLE )]
319
- self ._ack_queue .put (sn )
294
+ try :
295
+ response = bytearray (12 )
296
+ self ._socket .recv_into (response )
297
+
298
+ if response [:5 ] == bytearray (KEEP_ALIVE_RESPONSE_PREAMBLE ):
299
+ send_next_keep_alive_at = need_response_by
300
+ except (socket .error , socket .timeout ):
301
+ with self ._lock :
302
+ self .is_ready = False
320
303
elif send_next_keep_alive_at < need_response_by :
321
304
# Acquire the lock to make sure we don't change self.is_ready
322
305
# while _consume() is sending commands
0 commit comments