Skip to content

Commit

Permalink
Fixups
Browse files Browse the repository at this point in the history
* Fix error not being propagated properly
* Add test for singleton
* Add logging for worker
  • Loading branch information
imranariffin committed Dec 2, 2023
1 parent 53f2eed commit 116da4e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/aiotaskq/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

#!/usr/bin/env python

import logging
import typing as t

import typer

from . import __version__
from .constants import Config
from .interfaces import ConcurrencyType
from .worker import Defaults, run_worker_forever

cli = typer.Typer()
logging.basicConfig(level=Config.log_level())


def _version_callback(value: bool):
Expand Down
7 changes: 7 additions & 0 deletions src/aiotaskq/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module to define and store all constants used across the library."""

import logging
from os import environ

from .interfaces import SerializationType
Expand All @@ -17,3 +18,9 @@ def serialization_type() -> SerializationType:
"""Return the serialization type as provided via env var AIOTASKQ_SERIALIZATION."""
s: str | None = environ.get("AIOTASKQ_SERIALIZATION", SerializationType.DEFAULT.value)
return SerializationType[s.upper()]

@staticmethod
def log_level() -> int:
"""Return the LOG_LEVEL environment variable."""
level: int = int(environ.get("AIOTASKQ_LOG_LEVEL", logging.DEBUG))
return level
10 changes: 2 additions & 8 deletions src/aiotaskq/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# pylint: disable=cyclic-import

import copy
from importlib import import_module

import inspect
import logging
Expand Down Expand Up @@ -193,13 +192,8 @@ async def _get_result(self) -> RT:
logger.debug("Retrieving result for task [task_id=%s]", self.id)
async_result: AsyncResult[RT] = await AsyncResult.from_publisher(task_id=self.id)
result: RT | Exception = async_result.get()

if isinstance(result, dict) and "error" in result:
exception_module = import_module(result["error"]["module"])
exception_class: type[Exception] = getattr(exception_module, result["error"]["class"])
exception = exception_class(*result["error"]["args"])
raise exception

if isinstance(result, Exception):
raise result
return result

def _validate_arguments(self, task_args: tuple, task_kwargs: dict):
Expand Down
16 changes: 16 additions & 0 deletions src/tests/test_concurrency_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from aiotaskq.exceptions import ConcurrencyTypeNotSupported
from aiotaskq.concurrency_manager import ConcurrencyManagerSingleton
from aiotaskq.interfaces import ConcurrencyType


def test_unsupported_concurrency_type():
Expand All @@ -18,3 +19,18 @@ def test_unsupported_concurrency_type():
assert (
str(error) == 'Concurrency type "some-incorrect-concurrency-type" is not yet supported.'
)


def test_singleton():
# When getting the concurrency_manager instance more than once
instance_1 = ConcurrencyManagerSingleton.get(
concurrency_type=ConcurrencyType.MULTIPROCESSING,
concurrency=4,
)
instance_2 = ConcurrencyManagerSingleton.get(
concurrency_type=ConcurrencyType.MULTIPROCESSING,
concurrency=4,
)

# Then the both instances should be the identical instance
assert instance_1 is instance_2
9 changes: 7 additions & 2 deletions src/tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async def test_retry_as_per_task_definition(worker: "WorkerFixture", some_file:
# Then the task should be retried as many times as configured
with open(some_file, encoding="utf-8") as fi:
assert len(fi.readlines()) == 1 + 2, f"file: {fi.readlines()}" # First call + 2 retries
assert isinstance(exception, simple_app.SomeException)


@pytest.mark.asyncio
Expand Down Expand Up @@ -122,7 +123,7 @@ async def test_retry_as_per_task_call(
except simple_app.SomeException2 as e:
exception = e
finally:
assert exception is not None
assert isinstance(exception, simple_app.SomeException2)
if os.path.exists(some_file):
os.remove(some_file)

Expand All @@ -141,6 +142,8 @@ async def test_retry_as_per_task_call(
assert (
len(fi.readlines()) == 1 + retries_expected
) # First call + `retries_expected` retries
# And the task should fail with the expected exception
assert isinstance(exception, simple_app.SomeException2)


@pytest.mark.asyncio
Expand All @@ -156,7 +159,7 @@ async def test_no_retry_as_per_task_call(worker: "WorkerFixture", some_file: str
except simple_app.SomeException as e:
exception = e
finally:
assert exception is not None
assert isinstance(exception, simple_app.SomeException)
if os.path.exists(some_file):
os.remove(some_file)

Expand All @@ -176,6 +179,8 @@ async def test_no_retry_as_per_task_call(worker: "WorkerFixture", some_file: str
# Then the task should NOT be retried
with open(some_file, encoding="utf-8") as fi:
assert len(fi.readlines()) == 1
# And the task should fail with the expected exception
assert isinstance(exception , simple_app.SomeException)


@pytest.mark.asyncio
Expand Down

0 comments on commit 116da4e

Please sign in to comment.