Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#136 improve typing #139

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions broadcaster/_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ async def connect(self) -> None:
async def disconnect(self) -> None:
raise NotImplementedError()

async def subscribe(self, group: str) -> None:
async def subscribe(self, channel: str) -> None:
raise NotImplementedError()

async def unsubscribe(self, group: str) -> None:
async def unsubscribe(self, channel: str) -> None:
raise NotImplementedError()

async def publish(self, channel: str, message: Any) -> None:
Expand Down
12 changes: 9 additions & 3 deletions broadcaster/_backends/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def __init__(self, urls: str | list[str]) -> None:
self._ready = asyncio.Event()

async def connect(self) -> None:
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers)
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers) # pyright: ignore
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers) # pyright: ignore
await self._producer.start()
await self._consumer.start()

Expand All @@ -41,7 +41,13 @@ async def publish(self, channel: str, message: typing.Any) -> None:
async def next_published(self) -> Event:
await self._ready.wait()
message = await self._consumer.getone()
return Event(channel=message.topic, message=message.value.decode("utf8"))
value = message.value

# for type compatibility:
# we declare Event.message as str, so convert None to empty string
if value is None:
value = b""
return Event(channel=message.topic, message=value.decode("utf8"))

async def _wait_for_assignment(self) -> None:
"""Wait for the consumer to be assigned to the partition."""
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/_backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def _pubsub_listener(self) -> None:
class RedisStreamBackend(BroadcastBackend):
def __init__(self, url: str):
url = url.replace("redis-stream", "redis", 1)
self.streams: dict[str, str] = {}
self.streams: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {}
self._ready = asyncio.Event()
self._producer = redis.Redis.from_url(url)
self._consumer = redis.Redis.from_url(url)
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Subscriber:
def __init__(self, queue: asyncio.Queue[Event | None]) -> None:
self._queue = queue

async def __aiter__(self) -> AsyncGenerator[Event | None, None] | None:
async def __aiter__(self) -> AsyncGenerator[Event | None, None]:
try:
while True:
yield await self.get()
Expand Down
Loading