Skip to content

Commit 8170a32

Browse files
committed
Close the WebSocket connection immediately when the stream is stopped.
Currently, when the stream is stopped, we set the stream status accordingly and then wait for the `_consume` loop to check the stream status and close the WebSocket connection. The `_consume` loop calls `self._ws.recv()` with a timeout of 5 seconds, so it can take up to 5 seconds for the WebSocket connection to be closed after the stream is stopped. This is unnecessarily inefficient and complicated. Instead, we could close the WebSocket connection immediately when the stream is stopped. The `_consume` loop would still be broken out of properly because `self._ws.recv()` would raise a `ConnectionClosed` error.
1 parent bf02b2b commit 8170a32

File tree

1 file changed

+19
-52
lines changed

1 file changed

+19
-52
lines changed

alpaca_trade_api/stream.py

Lines changed: 19 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import msgpack
77
import re
88
import websockets
9-
import queue
109

1110
from .common import get_base_url, get_data_stream_url, get_credentials, URL
1211
from .entity import Entity
@@ -59,7 +58,6 @@ def __init__(self,
5958
self._running = False
6059
self._loop = None
6160
self._raw_data = raw_data
62-
self._stop_stream_queue = queue.Queue()
6361
self._handlers = {
6462
'trades': {},
6563
'quotes': {},
@@ -113,26 +111,14 @@ async def close(self):
113111

114112
async def stop_ws(self):
115113
self._should_run = False
116-
if self._stop_stream_queue.empty():
117-
self._stop_stream_queue.put_nowait({"should_stop": True})
114+
await self.close()
118115

119116
async def _consume(self):
120117
while True:
121-
if not self._stop_stream_queue.empty():
122-
self._stop_stream_queue.get(timeout=1)
123-
await self.close()
124-
break
125-
else:
126-
try:
127-
r = await asyncio.wait_for(self._ws.recv(), 5)
128-
msgs = msgpack.unpackb(r)
129-
for msg in msgs:
130-
await self._dispatch(msg)
131-
except asyncio.TimeoutError:
132-
# ws.recv is hanging when no data is received. by using
133-
# wait_for we break when no data is received, allowing us
134-
# to break the loop when needed
135-
pass
118+
r = await self._ws.recv()
119+
msgs = msgpack.unpackb(r)
120+
for msg in msgs:
121+
await self._dispatch(msg)
136122

137123
def _cast(self, msg_type, msg):
138124
result = msg
@@ -230,14 +216,10 @@ async def _run_forever(self):
230216
v for k, v in self._handlers.items()
231217
if k not in ("cancelErrors", "corrections")
232218
):
233-
if not self._stop_stream_queue.empty():
234-
# the ws was signaled to stop before starting the loop so
235-
# we break
236-
self._stop_stream_queue.get(timeout=1)
219+
if not self._should_run:
237220
return
238221
await asyncio.sleep(0.1)
239222
log.info(f'started {self._name} stream')
240-
self._should_run = True
241223
self._running = False
242224
while True:
243225
try:
@@ -253,10 +235,10 @@ async def _run_forever(self):
253235
self._running = True
254236
await self._consume()
255237
except websockets.WebSocketException as wse:
256-
await self.close()
257-
self._running = False
258-
log.warn('data websocket error, restarting connection: ' +
259-
str(wse))
238+
if self._should_run:
239+
await self.close()
240+
log.warn('data websocket error, restarting connection: ' +
241+
str(wse))
260242
except Exception as e:
261243
log.exception('error during websocket '
262244
'communication: {}'.format(str(e)))
@@ -610,7 +592,6 @@ def __init__(self,
610592
self._running = False
611593
self._loop = None
612594
self._raw_data = raw_data
613-
self._stop_stream_queue = queue.Queue()
614595
self._should_run = True
615596
self._websocket_params = websocket_params
616597

@@ -675,31 +656,18 @@ async def _start_ws(self):
675656

676657
async def _consume(self):
677658
while True:
678-
if not self._stop_stream_queue.empty():
679-
self._stop_stream_queue.get(timeout=1)
680-
await self.close()
681-
break
682-
else:
683-
try:
684-
r = await asyncio.wait_for(self._ws.recv(), 5)
685-
msg = json.loads(r)
686-
await self._dispatch(msg)
687-
except asyncio.TimeoutError:
688-
# ws.recv is hanging when no data is received. by using
689-
# wait_for we break when no data is received, allowing us
690-
# to break the loop when needed
691-
pass
659+
r = await self._ws.recv()
660+
msg = json.loads(r)
661+
await self._dispatch(msg)
692662

693663
async def _run_forever(self):
694664
self._loop = asyncio.get_running_loop()
695665
# do not start the websocket connection until we subscribe to something
696666
while not self._trade_updates_handler:
697-
if not self._stop_stream_queue.empty():
698-
self._stop_stream_queue.get(timeout=1)
667+
if not self._should_run:
699668
return
700669
await asyncio.sleep(0.1)
701670
log.info('started trading stream')
702-
self._should_run = True
703671
self._running = False
704672
while True:
705673
try:
@@ -712,10 +680,10 @@ async def _run_forever(self):
712680
self._running = True
713681
await self._consume()
714682
except websockets.WebSocketException as wse:
715-
await self.close()
716-
self._running = False
717-
log.warn('trading stream websocket error, restarting ' +
718-
' connection: ' + str(wse))
683+
if self._should_run:
684+
await self.close()
685+
log.warn('trading stream websocket error, restarting ' +
686+
' connection: ' + str(wse))
719687
except Exception as e:
720688
log.exception('error during websocket '
721689
'communication: {}'.format(str(e)))
@@ -730,8 +698,7 @@ async def close(self):
730698

731699
async def stop_ws(self):
732700
self._should_run = False
733-
if self._stop_stream_queue.empty():
734-
self._stop_stream_queue.put_nowait({"should_stop": True})
701+
await self.close()
735702

736703
def stop(self):
737704
if self._loop.is_running():

0 commit comments

Comments
 (0)