1
1
from __future__ import annotations
2
2
3
+ import asyncio
3
4
import typing
4
5
from urllib .parse import urlparse
5
6
10
11
11
12
12
13
class KafkaBackend (BroadcastBackend ):
13
- def __init__ (self , url : str ):
14
- self ._servers = [urlparse (url ).netloc ]
14
+ def __init__ (self , urls : str | list [str ]) -> None :
15
+ urls = [urls ] if isinstance (urls , str ) else urls
16
+ self ._servers = [urlparse (url ).netloc for url in urls ]
15
17
self ._consumer_channels : set [str ] = set ()
18
+ self ._ready = asyncio .Event ()
16
19
17
20
async def connect (self ) -> None :
18
21
self ._producer = AIOKafkaProducer (bootstrap_servers = self ._servers )
@@ -27,6 +30,7 @@ async def disconnect(self) -> None:
27
30
async def subscribe (self , channel : str ) -> None :
28
31
self ._consumer_channels .add (channel )
29
32
self ._consumer .subscribe (topics = self ._consumer_channels )
33
+ await self ._wait_for_assignment ()
30
34
31
35
async def unsubscribe (self , channel : str ) -> None :
32
36
self ._consumer .unsubscribe ()
@@ -35,5 +39,13 @@ async def publish(self, channel: str, message: typing.Any) -> None:
35
39
await self ._producer .send_and_wait (channel , message .encode ("utf8" ))
36
40
37
41
async def next_published (self ) -> Event :
42
+ await self ._ready .wait ()
38
43
message = await self ._consumer .getone ()
39
44
return Event (channel = message .topic , message = message .value .decode ("utf8" ))
45
+
46
+ async def _wait_for_assignment (self ) -> None :
47
+ """Wait for the consumer to be assigned to the partition."""
48
+ while not self ._consumer .assignment ():
49
+ await asyncio .sleep (0.001 )
50
+
51
+ self ._ready .set ()
0 commit comments