diff --git a/broadcaster/_backends/redis.py b/broadcaster/_backends/redis.py index ca00bc2..3122d4c 100644 --- a/broadcaster/_backends/redis.py +++ b/broadcaster/_backends/redis.py @@ -43,14 +43,20 @@ async def next_published(self) -> Event: async def _pubsub_listener(self) -> None: # redis-py does not listen to the pubsub connection if there are no channels subscribed # so we need to wait until the first channel is subscribed to start listening - await self._ready.wait() - async for message in self._pubsub.listen(): - if message["type"] == "message": - event = Event( - channel=message["channel"].decode(), - message=message["data"].decode(), - ) - await self._queue.put(event) + while True: + await self._ready.wait() + async for message in self._pubsub.listen(): + if message["type"] == "message": + event = Event( + channel=message["channel"].decode(), + message=message["data"].decode(), + ) + await self._queue.put(event) + + # when no channel subscribed, clear the event. + # And then in next loop, event will blocked again until + # the new channel subscribed.Now asyncio.Task will not exit again. + self._ready.clear() StreamMessageType = typing.Tuple[bytes, typing.Tuple[typing.Tuple[bytes, typing.Dict[bytes, bytes]]]]