Skip to content

Commit 528cf42

Browse files
Fix redisBackend _pubsub_listener just listen once (#134)
* fix: solve the redisBackend `_pubsub_listener` just listen once. * chore: change the comment
1 parent 22e8b2a commit 528cf42

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

broadcaster/_backends/redis.py

+14-8
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,20 @@ async def next_published(self) -> Event:
4343
async def _pubsub_listener(self) -> None:
4444
# redis-py does not listen to the pubsub connection if there are no channels subscribed
4545
# so we need to wait until the first channel is subscribed to start listening
46-
await self._ready.wait()
47-
async for message in self._pubsub.listen():
48-
if message["type"] == "message":
49-
event = Event(
50-
channel=message["channel"].decode(),
51-
message=message["data"].decode(),
52-
)
53-
await self._queue.put(event)
46+
while True:
47+
await self._ready.wait()
48+
async for message in self._pubsub.listen():
49+
if message["type"] == "message":
50+
event = Event(
51+
channel=message["channel"].decode(),
52+
message=message["data"].decode(),
53+
)
54+
await self._queue.put(event)
55+
56+
# when no channel subscribed, clear the event.
57+
# And then in next loop, event will blocked again until
58+
# the new channel subscribed.Now asyncio.Task will not exit again.
59+
self._ready.clear()
5460

5561

5662
StreamMessageType = typing.Tuple[bytes, typing.Tuple[typing.Tuple[bytes, typing.Dict[bytes, bytes]]]]

0 commit comments

Comments
 (0)