Skip to content

Commit 69cf29a

Browse files
#136 improve typing (#139)
1 parent 23c9b40 commit 69cf29a

File tree

4 files changed

+13
-7
lines changed

4 files changed

+13
-7
lines changed

broadcaster/_backends/base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ async def connect(self) -> None:
1313
async def disconnect(self) -> None:
1414
raise NotImplementedError()
1515

16-
async def subscribe(self, group: str) -> None:
16+
async def subscribe(self, channel: str) -> None:
1717
raise NotImplementedError()
1818

19-
async def unsubscribe(self, group: str) -> None:
19+
async def unsubscribe(self, channel: str) -> None:
2020
raise NotImplementedError()
2121

2222
async def publish(self, channel: str, message: Any) -> None:

broadcaster/_backends/kafka.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ def __init__(self, urls: str | list[str]) -> None:
1818
self._ready = asyncio.Event()
1919

2020
async def connect(self) -> None:
21-
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
22-
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers)
21+
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) # pyright: ignore
22+
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) # pyright: ignore
2323
await self._producer.start()
2424
await self._consumer.start()
2525

@@ -41,7 +41,13 @@ async def publish(self, channel: str, message: typing.Any) -> None:
4141
async def next_published(self) -> Event:
4242
await self._ready.wait()
4343
message = await self._consumer.getone()
44-
return Event(channel=message.topic, message=message.value.decode("utf8"))
44+
value = message.value
45+
46+
# for type compatibility:
47+
# we declare Event.message as str, so convert None to empty string
48+
if value is None:
49+
value = b""
50+
return Event(channel=message.topic, message=value.decode("utf8"))
4551

4652
async def _wait_for_assignment(self) -> None:
4753
"""Wait for the consumer to be assigned to the partition."""

broadcaster/_backends/redis.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def _pubsub_listener(self) -> None:
6565
class RedisStreamBackend(BroadcastBackend):
6666
def __init__(self, url: str):
6767
url = url.replace("redis-stream", "redis", 1)
68-
self.streams: dict[str, str] = {}
68+
self.streams: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {}
6969
self._ready = asyncio.Event()
7070
self._producer = redis.Redis.from_url(url)
7171
self._consumer = redis.Redis.from_url(url)

broadcaster/_base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class Subscriber:
110110
def __init__(self, queue: asyncio.Queue[Event | None]) -> None:
111111
self._queue = queue
112112

113-
async def __aiter__(self) -> AsyncGenerator[Event | None, None] | None:
113+
async def __aiter__(self) -> AsyncGenerator[Event | None, None]:
114114
try:
115115
while True:
116116
yield await self.get()

0 commit comments

Comments
 (0)