Skip to content

Commit

Permalink
Merge pull request onna#14 from onna/logging_queue
Browse files Browse the repository at this point in the history
Update kafkaesk handler queue to create queue as late as possible
  • Loading branch information
vangheem authored Aug 24, 2020
2 parents 8f04d56 + 455a3df commit 373996f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
21 changes: 15 additions & 6 deletions kafkaesk/ext/logging/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from pydantic import BaseModel
from typing import IO
from typing import Optional
from typing import Tuple

import asyncio
import kafkaesk
Expand Down Expand Up @@ -32,14 +31,19 @@ class KafkaeskQueue:
def __init__(
self, app: kafkaesk.app.Application, max_queue: int = 10000,
):
self._queue: asyncio.Queue[Tuple[str, BaseModel]] = asyncio.Queue(maxsize=max_queue)
self._queue: Optional[asyncio.Queue] = None
self._queue_size = max_queue

self._app = app

self._app.on("finalize", self.flush)

self._task: Optional[asyncio.Task] = None

def start(self) -> None:
if self._queue is None:
self._queue = asyncio.Queue(maxsize=self._queue_size)

if self._task is None or self._task.done():
self._task = asyncio.get_event_loop().create_task(self._run())

Expand All @@ -59,6 +63,9 @@ def running(self) -> bool:
return True

async def _run(self) -> None:
if self._queue is None:
raise RuntimeError("Queue must be started before workers")

while True:
try:
stream, message = await asyncio.wait_for(self._queue.get(), 1)
Expand All @@ -72,9 +79,10 @@ async def _run(self) -> None:
return

async def flush(self) -> None:
while not self._queue.empty():
stream, message = await self._queue.get()
await self._publish(stream, message)
if self._queue is not None:
while not self._queue.empty():
stream, message = await self._queue.get()
await self._publish(stream, message)

async def _publish(self, stream: str, message: BaseModel) -> None:
if self._app._intialized:
Expand All @@ -90,7 +98,8 @@ def _print_to_stderr(self, message: BaseModel, error: str) -> None:
sys.stderr.write(f"Error sending log to Kafak: \n{error}\nMessage: {message.json()}")

def put_nowait(self, stream: str, message: PydanticLogModel) -> None:
self._queue.put_nowait((stream, message))
if self._queue is not None:
self._queue.put_nowait((stream, message))


class PydanticKafkaeskHandler(logging.Handler):
Expand Down
2 changes: 2 additions & 0 deletions tests/ext/logging/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ async def consume(data: PydanticLogModel):
async def test_queue_flush(self, app, queue, log_consumer):

async with app:
queue.start()
for i in range(10):
queue.put_nowait("log.test", PydanticLogModel(count=i))

Expand Down Expand Up @@ -198,6 +199,7 @@ async def test_queue_publish(self, app, queue, log_consumer, capsys):
@pytest.mark.with_max_queue(1)
async def test_queue_max_size(self, app, queue):

queue.start()
queue.put_nowait("log.test", PydanticLogModel())

with pytest.raises(asyncio.QueueFull):
Expand Down

0 comments on commit 373996f

Please sign in to comment.