Skip to content

Commit 3b3e03e

Browse files
committed
fix(transport/websocket): ensure state changes are syncronised
previously a race condition could allow redis connected state mismatches
1 parent e77834d commit 3b3e03e

2 files changed

Lines changed: 40 additions & 18 deletions

File tree

shard.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: placeos-driver
2-
version: 7.11.6
2+
version: 7.11.7
33

44
dependencies:
55
action-controller:

src/placeos-driver/transport/websocket.cr

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class PlaceOS::Driver::TransportWebsocket < PlaceOS::Driver::Transport
2121
@tls = @use_tls ? new_tls_context : nil
2222
end
2323

24+
@connect_lock : Mutex = Mutex.new
25+
@connection_state_changing : Bool = false
2426
@headers_callback : -> HTTP::Headers
2527
@ip : String
2628
@path : String
@@ -31,7 +33,11 @@ class PlaceOS::Driver::TransportWebsocket < PlaceOS::Driver::Transport
3133
property :received
3234

3335
def connect(connect_timeout : Int32 = 10) : Nil
34-
return if @terminated
36+
@connect_lock.synchronize do
37+
return if @terminated || @connection_state_changing
38+
@connection_state_changing = true
39+
end
40+
3541
if websocket = @websocket
3642
return unless websocket.closed?
3743
end
@@ -47,6 +53,9 @@ class PlaceOS::Driver::TransportWebsocket < PlaceOS::Driver::Transport
4753
) do
4854
start_socket(connect_timeout)
4955
end
56+
ensure
57+
@connect_lock.synchronize { @connection_state_changing = false }
58+
disconnect if @terminated || @websocket.nil? || @websocket.try(&.closed?)
5059
end
5160

5261
private def start_socket(connect_timeout)
@@ -78,32 +87,45 @@ class PlaceOS::Driver::TransportWebsocket < PlaceOS::Driver::Transport
7887
end
7988
@proxy_in_use = proxy.try &.proxy_host
8089

81-
# Configure websocket to auto pong
90+
# configure websocket
8291
websocket = @websocket = ConnectProxy::WebSocket.new(@ip, @path, @port, @tls, headers, proxy, ignore_env: true)
83-
websocket.on_ping { |message| websocket.pong(message) }
8492

8593
# Enable queuing
86-
@queue.online = true
94+
set_connected_state(true)
8795

8896
# Start consuming data from the socket
89-
spawn(same_thread: true) { consume_io }
97+
spawn(same_thread: true) { consume_io(websocket) }
9098
rescue error
9199
logger.info(exception: error) { "error connecting to device on #{@ip}:#{@port}#{@path}" }
92-
@queue.online = false
100+
set_connected_state(false)
93101
raise error
94102
end
95103

96104
def terminate : Nil
97105
@terminated = true
98-
@websocket.try &.close
106+
@websocket.try(&.close) rescue nil
107+
@websocket = nil
99108
end
100109

101110
def disconnect : Nil
102-
@websocket.try &.close
111+
websocket = @websocket
112+
@connect_lock.synchronize do
113+
return if @connection_state_changing
114+
@websocket = nil
115+
end
116+
websocket.try(&.close) rescue nil
117+
set_connected_state(false)
118+
connect
103119
rescue error
104120
logger.info(exception: error) { "calling disconnect" }
105121
end
106122

123+
protected def set_connected_state(state : Bool)
124+
@queue.online = state
125+
rescue error
126+
logger.info(exception: error) { "setting connected state" }
127+
end
128+
107129
def send(message) : PlaceOS::Driver::TransportWebsocket
108130
websocket = @websocket
109131
return self if websocket.nil? || websocket.closed?
@@ -139,18 +161,18 @@ class PlaceOS::Driver::TransportWebsocket < PlaceOS::Driver::Transport
139161
@websocket.try &.pong(message)
140162
end
141163

142-
private def consume_io
143-
if websocket = @websocket
144-
websocket.on_binary { |bytes| process bytes }
145-
websocket.on_message { |string| process string.to_slice }
146-
websocket.run
147-
end
164+
private def consume_io(websocket)
165+
websocket.on_ping { |message| websocket.pong(message) }
166+
websocket.on_binary { |bytes| process bytes }
167+
websocket.on_message { |string| process string.to_slice }
168+
websocket.run
148169
rescue IO::Error
149170
rescue error
150171
logger.error(exception: error) { "error consuming IO" }
151172
ensure
152-
disconnect
153-
@queue.online = false
154-
connect
173+
# only call disconnect if we're still processing the same socket
174+
# if not then connect has already been called or disconnect was
175+
# called explicitly
176+
disconnect if websocket == @websocket
155177
end
156178
end

0 commit comments

Comments
 (0)