diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 461e9740..777d131a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -34,6 +34,12 @@ repos: "black", ] + - repo: https://github.com/PyCQA/docformatter + rev: v1.7.5 + hooks: + - id: docformatter + args: [--in-place] + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: diff --git a/examples/django/proj/settings.py b/examples/django/proj/settings.py index d6b2580d..e3f2bb2b 100644 --- a/examples/django/proj/settings.py +++ b/examples/django/proj/settings.py @@ -14,10 +14,7 @@ CELERY_ACCEPT_CONTENT = ["json"] CELERY_RESULT_BACKEND = "db+sqlite:///results.sqlite" CELERY_TASK_SERIALIZER = "json" - - -""" -Django settings for proj project. +"""Django settings for proj project. Generated by 'django-admin startproject' using Django 2.2.1. diff --git a/examples/django/proj/wsgi.py b/examples/django/proj/wsgi.py index d48b8110..3886f8f8 100644 --- a/examples/django/proj/wsgi.py +++ b/examples/django/proj/wsgi.py @@ -1,5 +1,4 @@ -""" -WSGI config for proj project. +"""WSGI config for proj project. This module contains the WSGI application used by Django's development server and any production WSGI deployments. It should expose a module-level variable @@ -11,7 +10,6 @@ that later delegates to the Django one. For example, you could introduce WSGI middleware here, or combine a Django application with an application of another framework. - """ import os diff --git a/src/pytest_celery/__init__.py b/src/pytest_celery/__init__.py index a134c125..6910c33e 100644 --- a/src/pytest_celery/__init__.py +++ b/src/pytest_celery/__init__.py @@ -1,6 +1,4 @@ -""" -pytest-celery a shim pytest plugin to enable celery.contrib.pytest -""" +"""Pytest-celery a shim pytest plugin to enable celery.contrib.pytest.""" # flake8: noqa diff --git a/src/pytest_celery/api/backend.py b/src/pytest_celery/api/backend.py index ae914bd5..9758410d 100644 --- a/src/pytest_celery/api/backend.py +++ b/src/pytest_celery/api/backend.py @@ -7,6 +7,13 @@ class CeleryTestBackend(CeleryTestNode): + """CeleryTestBackend is specialized node type for handling celery backends + nodes. It is used to encapsulate a backend instance. + + Responsibility Scope: + Handling backend specific requirements and configuration. + """ + @classmethod def default_config(cls) -> dict: return { @@ -23,6 +30,14 @@ def restart(self, reload_container: bool = True, force: bool = False) -> None: class CeleryBackendCluster(CeleryTestCluster): + """CeleryBackendCluster is a specialized cluster type for handling celery + backends. It is used to define which backend instances are available for + the test. + + Responsibility Scope: + Provude useful methods for managing a cluster of celery backends. + """ + def __init__(self, *backends: tuple[CeleryTestBackend | CeleryTestContainer]) -> None: super().__init__(*backends) diff --git a/src/pytest_celery/api/base.py b/src/pytest_celery/api/base.py index 9f47b194..9c248993 100644 --- a/src/pytest_celery/api/base.py +++ b/src/pytest_celery/api/base.py @@ -14,55 +14,92 @@ class CeleryTestNode: + """CeleryTestNode is the logical representation of a container instance. It + is used to provide a common interface for interacting with the container + regardless of the underlying implementation. + + Responsibility Scope: + The node's responsibility is to wrap the container and provide + useful methods for interacting with it. + """ + def __init__(self, container: CeleryTestContainer, app: Celery = None) -> None: + """Setup the base components of a CeleryTestNode. + + Args: + container (CeleryTestContainer): Container to use for the node. + app (Celery, optional): Celery app. Defaults to None. + """ self._container = container self._app = app @property def container(self) -> CeleryTestContainer: + """Underlying container for the node.""" return self._container @property def app(self) -> Celery: + """Celery app for the node if available.""" return self._app - def __eq__(self, __value: object) -> bool: - if isinstance(__value, CeleryTestNode): + def __eq__(self, other: object) -> bool: + if isinstance(other, CeleryTestNode): return all( ( - self.container == __value.container, - self.app == __value.app, + self.container == other.container, + self.app == other.app, ) ) return False @classmethod def default_config(cls) -> dict: + """Default node configurations if not overridden by the user.""" return {} def ready(self) -> bool: + """Waits until the node is ready or raise an exception if it fails to + boot up.""" return self.container.ready() def config(self, *args: tuple, **kwargs: dict) -> dict: + """Compile the configurations required for Celery from this node.""" return self.container.celeryconfig def logs(self) -> str: + """Get the logs of the underlying container.""" return self.container.logs() def name(self) -> str: + """Get the name of this node.""" return self.container.name def hostname(self) -> str: + """Get the hostname of this node.""" return self.container.id[:12] def kill(self, signal: str | int = "SIGKILL", reload_container: bool = True) -> None: + """Kill the underlying container. + + Args: + signal (str | int, optional): Signal to send to the container. Defaults to "SIGKILL". + reload_container (bool, optional): Reload the container object after killing it. Defaults to True. + """ if self.container.status == "running": self.container.kill(signal=signal) if reload_container: self.container.reload() def restart(self, reload_container: bool = True, force: bool = False) -> None: + """Restart the underlying container. + + Args: + reload_container (bool, optional): Reload the container object after restarting it. Defaults to True. + force (bool, optional): Kill the container before restarting it. Defaults to False. + """ if force: + # Use SIGTERM to allow the container to gracefully shutdown self.kill(signal="SIGTERM", reload_container=reload_container) self.container.restart(timeout=CONTAINER_TIMEOUT) if reload_container: @@ -71,19 +108,41 @@ def restart(self, reload_container: bool = True, force: bool = False) -> None: self.app.set_current() def teardown(self) -> None: + """Teardown the node.""" self.container.teardown() def wait_for_log(self, log: str, message: str = "", timeout: int = RESULT_TIMEOUT) -> None: + """Wait for a log to appear in the container. + + Args: + log (str): Log to wait for. + message (str, optional): Message to display while waiting. Defaults to "". + timeout (int, optional): Timeout in seconds. Defaults to RESULT_TIMEOUT. + """ message = message or f"Waiting for worker container '{self.name()}' to log -> {log}" wait_for_callable(message=message, func=lambda: log in self.logs(), timeout=timeout) def assert_log_exists(self, log: str, message: str = "", timeout: int = RESULT_TIMEOUT) -> None: + """Assert that a log exists in the container. + + Args: + log (str): Log to assert. + message (str, optional): Message to display while waiting. Defaults to "". + timeout (int, optional): Timeout in seconds. Defaults to RESULT_TIMEOUT. + """ try: self.wait_for_log(log, message, timeout) except pytest_docker_tools.exceptions.TimeoutError: assert False, f"Worker container '{self.name()}' did not log -> {log} within {timeout} seconds" def assert_log_does_not_exist(self, log: str, message: str = "", timeout: int = 1) -> None: + """Assert that a log does not exist in the container. + + Args: + log (str): Log to assert. + message (str, optional): Message to display while waiting. Defaults to "". + timeout (int, optional): Timeout in seconds. Defaults to 1. + """ message = message or f"Waiting for worker container '{self.name()}' to not log -> {log}" try: self.wait_for_log(log, message, timeout) @@ -93,7 +152,24 @@ def assert_log_does_not_exist(self, log: str, message: str = "", timeout: int = class CeleryTestCluster: + """CeleryTestCluster is a collection of CeleryTestNodes. It is used to + collect the test nodes into a single object for easier management. + + Responsibility Scope: + The cluster's responsibility is to define which nodes will be used for + the test. + """ + def __init__(self, *nodes: tuple[CeleryTestNode | CeleryTestContainer]) -> None: + """Setup the base components of a CeleryTestCluster. + + Args: + *nodes (tuple[CeleryTestNode | CeleryTestContainer]): Nodes to use for the cluster. + + Raises: + ValueError: At least one node is required. + TypeError: All nodes must be CeleryTestNode or CeleryTestContainer + """ if not nodes: raise ValueError("At least one node is required") if len(nodes) == 1 and isinstance(nodes[0], list): @@ -105,10 +181,16 @@ def __init__(self, *nodes: tuple[CeleryTestNode | CeleryTestContainer]) -> None: @property def nodes(self) -> tuple[CeleryTestNode]: + """Get the nodes of the cluster.""" return self._nodes @nodes.setter def nodes(self, nodes: tuple[CeleryTestNode | CeleryTestContainer]) -> None: + """Set the nodes of the cluster. + + Args: + nodes (tuple[CeleryTestNode | CeleryTestContainer]): Nodes to use for the cluster. + """ self._nodes = self._set_nodes(*nodes) # type: ignore def __iter__(self) -> Iterator[CeleryTestNode]: @@ -120,15 +202,16 @@ def __getitem__(self, index: Any) -> CeleryTestNode: def __len__(self) -> int: return len(self.nodes) - def __eq__(self, __value: object) -> bool: - if isinstance(__value, CeleryTestCluster): + def __eq__(self, other: object) -> bool: + if isinstance(other, CeleryTestCluster): for node in self: - if node not in __value: + if node not in other: return False return False @classmethod def default_config(cls) -> dict: + """Default cluster configurations if not overridden by the user.""" return {} @abstractmethod @@ -137,6 +220,15 @@ def _set_nodes( *nodes: tuple[CeleryTestNode | CeleryTestContainer], node_cls: type[CeleryTestNode] = CeleryTestNode, ) -> tuple[CeleryTestNode]: + """Set the nodes of the cluster. + + Args: + *nodes (tuple[CeleryTestNode | CeleryTestContainer]): Nodes to use for the cluster. + node_cls (type[CeleryTestNode], optional): Node class to use. Defaults to CeleryTestNode. + + Returns: + tuple[CeleryTestNode]: Nodes to use for the cluster. + """ return tuple( node_cls(node) if isinstance( @@ -148,9 +240,12 @@ def _set_nodes( ) # type: ignore def ready(self) -> bool: + """Waits until the cluster is ready or raise an exception if any of the + nodes fail to boot up.""" return all(node.ready() for node in self) def config(self, *args: tuple, **kwargs: dict) -> dict: + """Compile the configurations required for Celery from this cluster.""" config = [node.container.celeryconfig for node in self] return { "urls": [c["url"] for c in config], @@ -158,6 +253,6 @@ def config(self, *args: tuple, **kwargs: dict) -> dict: } def teardown(self) -> None: - # Do not need to call teardown on the nodes - # but only tear down self - pass + """Teardown the cluster.""" + # Nodes teardown themselves, so we just need to clear the cluster + # if there is any cleanup to do diff --git a/src/pytest_celery/api/broker.py b/src/pytest_celery/api/broker.py index 539bd255..52054f5b 100644 --- a/src/pytest_celery/api/broker.py +++ b/src/pytest_celery/api/broker.py @@ -7,6 +7,13 @@ class CeleryTestBroker(CeleryTestNode): + """CeleryTestBroker is specialized node type for handling celery brokers + nodes. It is used to encapsulate a broker instance. + + Responsibility Scope: + Handling broker specific requirements and configuration. + """ + @classmethod def default_config(cls) -> dict: return { @@ -23,6 +30,14 @@ def restart(self, reload_container: bool = True, force: bool = False) -> None: class CeleryBrokerCluster(CeleryTestCluster): + """CeleryBrokerCluster is a specialized cluster type for handling celery + brokers. It is used to define which broker instances are available for the + test. + + Responsibility Scope: + Provude useful methods for managing a cluster of celery brokers. + """ + def __init__(self, *brokers: tuple[CeleryTestBroker | CeleryTestContainer]) -> None: super().__init__(*brokers) diff --git a/src/pytest_celery/api/container.py b/src/pytest_celery/api/container.py index 75456ac8..2c3abb18 100644 --- a/src/pytest_celery/api/container.py +++ b/src/pytest_celery/api/container.py @@ -9,29 +9,91 @@ class CeleryTestContainer(wrappers.Container): + """This is an extension of pytest_docker_tools.wrappers.Container, adding + improved control over the container lifecycle. + + Responsibility Scope: + Provide useful methods for managing a container instance. + """ + @property def client(self) -> Any: + """Provides an API client for interacting with the container, if + available. + + Subclasses should implement this to return an instance of the client + specific to the service running in the container. + + Raises: + NotImplementedError: There is not client available by default. + + Returns: + Any: Client instance. + """ raise NotImplementedError("CeleryTestContainer.client") @property def celeryconfig(self) -> dict: + """Each container is responsible for providing the configuration values + required for Celery. This property should be implemented to return the + configuration values for the specific container. + + Raises: + NotImplementedError: There is no config available by default. + + Returns: + dict: Configuration values required for Celery. + """ raise NotImplementedError("CeleryTestContainer.celeryconfig") @classmethod - def command(cls, *args: str) -> list: + def command(cls, *args: str) -> list[str]: + """Override the CMD instruction in the Dockerfile. + + This method should be overridden in derived classes to provide the + specific command and its arguments required to start the container. + + Args: + *args (str): Additional command-line arguments. + + Raises: + NotImplementedError: Rely on the Dockerfile if not set otherwise by default. + + Returns: + list[str]: A list containing the command to run in the container as + the first element, followed by the command-line arguments. + """ + # To be used with pytest_docker_tools.container using the command # kwarg with the class method as value # e.g. command=MyContainer.command() raise NotImplementedError("CeleryTestContainer.command") def teardown(self) -> None: - pass + """Teardown the container instance.""" @property def ready_prompt(self) -> str | None: + """A log string that indicates the container has finished starting up + and is ready to use. + + Returns: + str | None: A string to wait for or None for no wait. Defaults to None. + """ return None def _wait_port(self, port: str) -> int: + """Blocks until the specified port is ready. + + Args: + port (str): Port to wait for. + + Raises: + ValueError: Port cannot be empty. + + Returns: + int: Port number. + """ if not port: raise ValueError("Port cannot be empty") @@ -42,6 +104,16 @@ def _wait_port(self, port: str) -> int: @retry(pytest_docker_tools.exceptions.TimeoutError, delay=10, tries=3) def _wait_ready(self, timeout: int = 30) -> bool: + """Wait for the container to be ready by polling the logs for the + readiness prompt. If no prompt is set, the container is considered + ready as soon as its live logs are accessible. + + Args: + timeout (int, optional): Timeout in seconds. Defaults to 30. + + Returns: + bool: True if ready, False otherwise. + """ if self.ready_prompt is not None: if self.ready_prompt not in self.logs(): wait_for_callable( @@ -57,6 +129,16 @@ def _wait_ready(self, timeout: int = 30) -> bool: return True def ready(self) -> bool: + """Override the default ready() method to wait for the container to be + using our waiting logic on top of the default implementation. + + When this method returns False, other attempts will be made until the container + is ready or stop if other conditions do not allow for it to be ready. + The underlying implementation of pytest_docker_tools is currently responsible for error raising. + + Returns: + bool: True if ready, False otherwise. + """ if super().ready(): return self._wait_ready() else: diff --git a/src/pytest_celery/api/setup.py b/src/pytest_celery/api/setup.py index 33197636..1807eef2 100644 --- a/src/pytest_celery/api/setup.py +++ b/src/pytest_celery/api/setup.py @@ -14,6 +14,17 @@ class CeleryTestSetup: + """The test setup is the main entrypoint for accessing the celery + architecture from the test. It is the glue that holds all of the relevant + entities of the specific test case environment. + + Each test case will have its own test setup instance, which is created for the + test case by the plugin and is configured for the specific run and its given configurations. + + Responsibility Scope: + Provide useful access to the celery architecture from the test. + """ + def __init__( self, worker_cluster: CeleryWorkerCluster, @@ -21,11 +32,20 @@ def __init__( backend_cluster: CeleryBackendCluster, app: Celery = None, ): + """Setup the base components of a setup. + + Args: + worker_cluster (CeleryWorkerCluster): Precorfigured worker cluster. + broker_cluster (CeleryBrokerCluster): Precorfigured broker cluster. + backend_cluster (CeleryBackendCluster): Precorfigured backend cluster. + app (Celery, optional): Celery app configured for all of the nodes. Defaults to None. + """ self._worker_cluster = worker_cluster self._broker_cluster = broker_cluster self._backend_cluster = backend_cluster self._app = app + # Special internal ping task, does not conflict with user "ping" tasks self.ping = ping def __len__(self) -> int: @@ -33,38 +53,55 @@ def __len__(self) -> int: @property def app(self) -> Celery: + """The celery app configured for all of the nodes.""" return self._app @property def backend_cluster(self) -> CeleryBackendCluster: + """The backend cluster of this setup.""" return self._backend_cluster @property def backend(self) -> CeleryTestBackend: + """The first backend node of the backend cluster.""" return self._backend_cluster[0] # type: ignore @property def broker_cluster(self) -> CeleryBrokerCluster: + """The broker cluster of this setup.""" return self._broker_cluster @property def broker(self) -> CeleryTestBroker: + """The first broker node of the broker cluster.""" return self._broker_cluster[0] # type: ignore @property def worker_cluster(self) -> CeleryWorkerCluster: + """The worker cluster of this setup.""" return self._worker_cluster @property def worker(self) -> CeleryTestWorker: + """The first worker node of the worker cluster.""" return self._worker_cluster[0] # type: ignore @classmethod def name(cls) -> str: + # TODO: Possibly not needed/required refactoring return DEFAULT_WORKER_APP_NAME @classmethod def config(cls, celery_worker_cluster_config: dict) -> dict: + """Creates a configuration dict to be used by app.config_from_object(). + The configuration is compiled from all of the nodes in the setup. + + Args: + celery_worker_cluster_config (dict): The configuration of the worker cluster. + + Returns: + dict: Celery-aware configuration dict. + """ if not celery_worker_cluster_config: raise ValueError("celery_worker_cluster_config is empty") @@ -79,11 +116,23 @@ def config(cls, celery_worker_cluster_config: dict) -> dict: @classmethod def update_app_config(cls, app: Celery) -> None: - # Use app.conf.update() to update the app config - pass + """Hook for updating the app configuration in a subclass. + + Args: + app (Celery): App after initial configuration. + """ @classmethod def create_setup_app(cls, celery_setup_config: dict, celery_setup_app_name: str) -> Celery: + """Creates a celery app for the setup. + + Args: + celery_setup_config (dict): Celery configuration dict. + celery_setup_app_name (str): Celery app name. + + Returns: + Celery: Celery app configured for this setup. + """ if celery_setup_config is None: raise ValueError("celery_setup_config is None") @@ -97,42 +146,105 @@ def create_setup_app(cls, celery_setup_config: dict, celery_setup_app_name: str) return app def chords_allowed(self) -> bool: + # TODO: Possibly a not relevant try: self.app.backend.ensure_chords_allowed() except NotImplementedError: return False + # TODO: Possibly a bug if any([v.startswith("4.") for v in self.worker_cluster.versions]): return False return True def teardown(self) -> None: - pass + """Teardown the setup.""" def ready(self, ping: bool = False, control: bool = False, docker: bool = True) -> bool: - ready = True - - if docker and ready: - if self.broker_cluster: - ready = ready and self.broker_cluster.ready() - if self.backend_cluster: - ready = ready and self.backend_cluster.ready() - if self.worker_cluster: - ready = ready and self.worker_cluster.ready() - - if control and ready: - r = self.app.control.ping() - ready = ready and all([all([res["ok"] == "pong" for _, res in response.items()]) for response in r]) - - if ping and ready: - # TODO: ignore mypy globally for type overriding - worker: CeleryTestWorker - for worker in self.worker_cluster: # type: ignore - res = self.ping.s().apply_async(queue=worker.worker_queue) - ready = ready and res.get(timeout=RESULT_TIMEOUT) == "pong" - - # Set app for all nodes + """Check if the setup is ready for testing. All of the confirmations + are optional. + + Warning: + Enabling additional confirmations may hurt performance. + Disabling all confirmations may result in false positive results. + Use with caution. + + Args: + ping (bool, optional): Confirm via ping task. Defaults to False. + control (bool, optional): Confirm via ping control. Defaults to False. + docker (bool, optional): Confirm via docker container status. Defaults to True. + + Returns: + bool: True if the setup is ready for testing (all required confirmations passed). + """ + ready = ( + self._is_task_ping_ready(ping) and self._is_control_ping_ready(control) and self._is_docker_ready(docker) + ) + + if ready: + self._set_app_for_all_nodes() + + return ready + + def _is_docker_ready(self, docker: bool) -> bool: + """Check if the node's containers are ready. + + Args: + docker (bool): Flag to enable docker readiness check. + + Returns: + bool: True if the node's containers are ready, False otherwise. + """ + if not docker: + return True + + return ( + (not self.broker_cluster or self.broker_cluster.ready()) + and (not self.backend_cluster or self.backend_cluster.ready()) + and (not self.worker_cluster or self.worker_cluster.ready()) + ) + + def _is_control_ping_ready(self, control: bool) -> bool: + """Check if worker nodes respond to control ping. + + Args: + control (bool): Flag to enable control ping check. + + Returns: + bool: True if control pings are successful, False otherwise. + """ + if not control: + return True + + responses = self.app.control.ping() + return all([all([res["ok"] == "pong" for _, res in response.items()]) for response in responses]) + + def _is_task_ping_ready(self, ping: bool) -> bool: + """Check if worker nodes respond to ping task. + + Args: + ping (bool): Flag to enable ping task check. + + Returns: + bool: True if ping tasks are successful, False otherwise. + """ + if not ping: + return True + + worker: CeleryTestWorker + for worker in self.worker_cluster: # type: ignore + res = self.ping.s().apply_async(queue=worker.worker_queue) + if res.get(timeout=RESULT_TIMEOUT) != "pong": + return False + return True + + def _set_app_for_all_nodes(self) -> None: + """Set the app instance for all nodes in the setup. + + This ensures each node has a reference to the centralized Celery + app instance. + """ nodes: tuple = tuple() if self.broker_cluster: nodes += self.broker_cluster.nodes @@ -140,7 +252,6 @@ def ready(self, ping: bool = False, control: bool = False, docker: bool = True) nodes += self.backend_cluster.nodes if self.worker_cluster: nodes += self.worker_cluster.nodes + for node in nodes: node._app = self.app - - return ready diff --git a/src/pytest_celery/api/worker.py b/src/pytest_celery/api/worker.py index 2804a586..e8c75d0a 100644 --- a/src/pytest_celery/api/worker.py +++ b/src/pytest_celery/api/worker.py @@ -11,7 +11,20 @@ class CeleryTestWorker(CeleryTestNode): + """CeleryTestWorker is specialized node type for handling celery worker + nodes. It is used to encapsulate a worker instance. + + Responsibility Scope: + Managing a celery worker. + """ + def __init__(self, container: CeleryTestContainer, app: Celery): + """A celery worker node must be initialized with a celery app. + + Args: + container (CeleryTestContainer): Container to use for the node. + app (Celery, optional): Celery app to be accessed from the tests. + """ super().__init__(container, app) # Helps with autocomplete in the IDE @@ -19,21 +32,26 @@ def __init__(self, container: CeleryTestContainer, app: Celery): @property def version(self) -> str: + """Celery version of this worker node.""" return self.container.version() @property def log_level(self) -> str: + """Celery log level of this worker node.""" return self.container.log_level() @property def worker_name(self) -> str: + """Celery test worker node name.""" return self.container.worker_name() @property def worker_queue(self) -> str: + """Celery queue for this worker node.""" return self.container.worker_queue() def hostname(self) -> str: + """Hostname of the worker node.""" return f"{self.worker_name}@{super().hostname()}" def get_running_processes_info( @@ -41,6 +59,28 @@ def get_running_processes_info( columns: list[str] | None = None, filters: dict[str, str] | None = None, ) -> list[dict]: + """Get running processes info on the container of this node. + + Possible columns: + - pid + - name + - username + - cmdline + - cpu_percent + - memory_percent + - create_time + + Args: + columns (list[str] | None, optional): Columns to query. Defaults to None (all). + filters (dict[str, str] | None, optional): Filters to apply. Defaults to None. + + Raises: + RuntimeError: If the command fails. + + Returns: + list[dict]: List of processes info per requested columns. + """ + # Use special vendors/worker/content/utils.py module exit_code, output = self.container.exec_run( f'python -c "from utils import get_running_processes_info; print(get_running_processes_info({columns!r}))"' ) @@ -58,6 +98,14 @@ def get_running_processes_info( class CeleryWorkerCluster(CeleryTestCluster): + """CeleryWorkerCluster is a specialized cluster type for handling celery + workers. It is used to define which worker instances are available for the + test. + + Responsibility Scope: + Provude useful methods for managing a cluster of celery workers. + """ + def __init__(self, *workers: tuple[CeleryTestWorker | CeleryTestContainer]) -> None: super().__init__(*workers) @@ -70,4 +118,5 @@ def _set_nodes( @property def versions(self) -> set[str]: + """Celery versions of all workers in this cluster.""" return {worker.version for worker in self} # type: ignore diff --git a/src/pytest_celery/defaults.py b/src/pytest_celery/defaults.py index 35e7429d..b67947a5 100644 --- a/src/pytest_celery/defaults.py +++ b/src/pytest_celery/defaults.py @@ -16,25 +16,17 @@ from pytest_celery.vendors.worker.defaults import CELERY_SETUP_WORKER from pytest_celery.vendors.worker.defaults import * -########## -# Fixtures -########## - -CELERY_SETUP = "celery_setup" -CELERY_WORKER = "celery_worker" -CELERY_WORKER_CLUSTER = "celery_worker_cluster" -CELERY_BACKEND = "celery_backend" -CELERY_BACKEND_CLUSTER = "celery_backend_cluster" -CELERY_BROKER = "celery_broker" -CELERY_BROKER_CLUSTER = "celery_broker_cluster" +#################################################################################### +# Automatic components +#################################################################################### -###################### -# Fixtures collections -###################### +# The following collections are used to parametrize the user's tests +# according to the components they need to run against. By default, +# all possible combinations are used. -# These collections define which components are used by -# pytest-celery by default. If not specified otherwise, the -# user's tests will have a matrix of all possible combinations automatically +# When a new component is added to the corresponding collection it +# will automatically add it to the parametrization of every (relevant) test IMPLICITLY! +# Tests that do not rely on default parametrization will not be affected. ALL_CELERY_WORKERS = (CELERY_SETUP_WORKER,) ALL_CELERY_BACKENDS = ( @@ -46,12 +38,34 @@ CELERY_RABBITMQ_BROKER, ) -########## +#################################################################################### +# Fixtures +#################################################################################### + +# These fixtures are used by pytest-celery plugin to setup the environment for the user's tests. +# They function as the main interface into the test environment's components. +# They are directly affected by the automatic collections: ALL_CELERY_WORKERS, ALL_CELERY_BACKENDS, ALL_CELERY_BROKERS. + +CELERY_SETUP = "celery_setup" +CELERY_WORKER = "celery_worker" +CELERY_WORKER_CLUSTER = "celery_worker_cluster" +CELERY_BACKEND = "celery_backend" +CELERY_BACKEND_CLUSTER = "celery_backend_cluster" +CELERY_BROKER = "celery_broker" +CELERY_BROKER_CLUSTER = "celery_broker_cluster" + +#################################################################################### # Docker -########## +#################################################################################### + +# Default timeouts for Docker-related operations in seconds. +# Defines how long to wait for a Docker container to get ready. CONTAINER_TIMEOUT = 60 -RESULT_TIMEOUT = 60 +# Specifies the maximum duration to wait for a task result. +# It's primarily used when testing Celery tasks to ensure they complete within a reasonable timeframe. +# and is set for a recommended value for most use cases. +RESULT_TIMEOUT = 60 default_pytest_celery_network = network() diff --git a/src/pytest_celery/fixtures/backend.py b/src/pytest_celery/fixtures/backend.py index 72ffc542..1fbe5963 100644 --- a/src/pytest_celery/fixtures/backend.py +++ b/src/pytest_celery/fixtures/backend.py @@ -12,6 +12,12 @@ @pytest.fixture(params=ALL_CELERY_BACKENDS) def celery_backend(request: pytest.FixtureRequest) -> CeleryTestBackend: # type: ignore + """Parameterized fixture for all supported celery backends. Responsible for + tearing down the node. + + This fixture will add parametrization to the test function, so that + the test will be executed for each supported celery backend. + """ backend: CeleryTestBackend = request.getfixturevalue(request.param) yield backend backend.teardown() @@ -19,6 +25,17 @@ def celery_backend(request: pytest.FixtureRequest) -> CeleryTestBackend: # type @pytest.fixture def celery_backend_cluster(celery_backend: CeleryTestBackend) -> CeleryBackendCluster: # type: ignore + """Defines the cluster of backend nodes for the test. Responsible for + tearing down the cluster. + + To disable the cluster, override this fixture and return None. + + Args: + celery_backend (CeleryTestBackend): Parameterized fixture for all supported celery backends. + + Returns: + CeleryBackendCluster: Single node cluster for all supported celery backends. + """ cluster = CeleryBackendCluster(celery_backend) # type: ignore yield cluster cluster.teardown() @@ -26,6 +43,7 @@ def celery_backend_cluster(celery_backend: CeleryTestBackend) -> CeleryBackendCl @pytest.fixture def celery_backend_cluster_config(request: pytest.FixtureRequest) -> dict | None: + """Attempts to compile the celery configuration from the cluster.""" try: use_default_config = pytest.fail.Exception cluster: CeleryBackendCluster = request.getfixturevalue(CELERY_BACKEND_CLUSTER) diff --git a/src/pytest_celery/fixtures/broker.py b/src/pytest_celery/fixtures/broker.py index 5ff036df..906866d4 100644 --- a/src/pytest_celery/fixtures/broker.py +++ b/src/pytest_celery/fixtures/broker.py @@ -12,6 +12,12 @@ @pytest.fixture(params=ALL_CELERY_BROKERS) def celery_broker(request: pytest.FixtureRequest) -> CeleryTestBroker: # type: ignore + """Parameterized fixture for all supported celery brokers. Responsible for + tearing down the node. + + This fixture will add parametrization to the test function, so that + the test will be executed for each supported celery broker. + """ broker: CeleryTestBroker = request.getfixturevalue(request.param) yield broker broker.teardown() @@ -19,6 +25,18 @@ def celery_broker(request: pytest.FixtureRequest) -> CeleryTestBroker: # type: @pytest.fixture def celery_broker_cluster(celery_broker: CeleryTestBroker) -> CeleryBrokerCluster: # type: ignore + """Defines the cluster of broker nodes for the test. Responsible for + tearing down the cluster. + + It is not recommended to disable the broker cluster, but it can be done by + overriding this fixture and returning None. + + Args: + celery_broker (CeleryTestBroker): Parameterized fixture for all supported celery brokers. + + Returns: + CeleryBrokerCluster: Single node cluster for all supported celery brokers. + """ cluster = CeleryBrokerCluster(celery_broker) # type: ignore yield cluster cluster.teardown() @@ -26,6 +44,7 @@ def celery_broker_cluster(celery_broker: CeleryTestBroker) -> CeleryBrokerCluste @pytest.fixture def celery_broker_cluster_config(request: pytest.FixtureRequest) -> dict | None: + """Attempts to compile the celery configuration from the cluster.""" try: use_default_config = pytest.fail.Exception cluster: CeleryBrokerCluster = request.getfixturevalue(CELERY_BROKER_CLUSTER) diff --git a/src/pytest_celery/fixtures/setup.py b/src/pytest_celery/fixtures/setup.py index 261097c6..11019044 100644 --- a/src/pytest_celery/fixtures/setup.py +++ b/src/pytest_celery/fixtures/setup.py @@ -13,6 +13,7 @@ @pytest.fixture def celery_setup_cls() -> type[CeleryTestSetup]: # type: ignore + """The setup class to use for the test.""" return CeleryTestSetup @@ -24,12 +25,24 @@ def celery_setup( # type: ignore celery_backend_cluster: CeleryBackendCluster, celery_setup_app: Celery, ) -> CeleryTestSetup: + """Prepares a celery setup ready for testing. + + This fixture provides the entry point for a test. + This fixture loads all components and immediately prepares the environment for testing. + + Example: + >>> def test_my_celery_setup(celery_setup: CeleryTestSetup): + ... assert celery_setup.ready() + ... # do some testing + """ setup = celery_setup_cls( worker_cluster=celery_worker_cluster, broker_cluster=celery_broker_cluster, backend_cluster=celery_backend_cluster, app=celery_setup_app, ) + + # Shallow ready check for performance reasons assert setup.ready( ping=False, control=False, @@ -41,6 +54,7 @@ def celery_setup( # type: ignore @pytest.fixture def celery_setup_name(celery_setup_cls: type[CeleryTestSetup]) -> str: # type: ignore + """Fixture interface to the API.""" yield celery_setup_cls.name() @@ -49,6 +63,7 @@ def celery_setup_config( celery_setup_cls: type[CeleryTestSetup], celery_worker_cluster_config: dict, ) -> dict: + """Fixture interface to the API.""" yield celery_setup_cls.config( celery_worker_cluster_config=celery_worker_cluster_config, ) @@ -60,6 +75,7 @@ def celery_setup_app( celery_setup_config: dict, celery_setup_name: str, ) -> Celery: + """Fixture interface to the API.""" yield celery_setup_cls.create_setup_app( celery_setup_config=celery_setup_config, celery_setup_app_name=celery_setup_name, diff --git a/src/pytest_celery/fixtures/worker.py b/src/pytest_celery/fixtures/worker.py index 5e130e06..8aff7637 100644 --- a/src/pytest_celery/fixtures/worker.py +++ b/src/pytest_celery/fixtures/worker.py @@ -11,6 +11,13 @@ @pytest.fixture(params=ALL_CELERY_WORKERS) def celery_worker(request: pytest.FixtureRequest) -> CeleryTestWorker: # type: ignore + """Parameterized fixture for all supported celery workers. Responsible for + tearing down the node. + + This fixture will add parametrization to the test function, so that + the test will be executed for each supported celery worker. + """ + worker: CeleryTestWorker = request.getfixturevalue(request.param) yield worker worker.teardown() @@ -18,6 +25,17 @@ def celery_worker(request: pytest.FixtureRequest) -> CeleryTestWorker: # type: @pytest.fixture def celery_worker_cluster(celery_worker: CeleryTestWorker) -> CeleryWorkerCluster: # type: ignore + """Defines the cluster of worker nodes for the test. Responsible for + tearing down the cluster. + + To disable the cluster, override this fixture and return None. + + Args: + celery_worker (CeleryTestWorker): Parameterized fixture for all supported celery workers. + + Returns: + CeleryWorkerCluster: Single node cluster for all supported celery workers. + """ cluster = CeleryWorkerCluster(celery_worker) # type: ignore yield cluster cluster.teardown() @@ -25,6 +43,10 @@ def celery_worker_cluster(celery_worker: CeleryTestWorker) -> CeleryWorkerCluste @pytest.fixture def celery_worker_cluster_config(celery_broker_cluster_config: dict, celery_backend_cluster_config: dict) -> dict: + """Combine the broker and backend cluster configurations. + + Additional configuration can be added. + """ return { "celery_broker_cluster_config": celery_broker_cluster_config, "celery_backend_cluster_config": celery_backend_cluster_config, diff --git a/src/pytest_celery/plugin.py b/src/pytest_celery/plugin.py index ffdc740d..f46b2362 100644 --- a/src/pytest_celery/plugin.py +++ b/src/pytest_celery/plugin.py @@ -1,5 +1,12 @@ -""" -pytest-celery entry point. +"""Pytest-celery entry point. + +The new pytest-celery plugin is a complete new infrastructure for +testing Celery applications. It enables the legacy testing +infrastructure for maintaining backwards compatibility with pytest- +celery < 1.0.0. + +The legacy testing infrastructure is maintained under the celery +project. """ # pytest-celery < 1.0.0 infrastructure diff --git a/src/pytest_celery/vendors/memcached/fixtures.py b/src/pytest_celery/vendors/memcached/fixtures.py index 4d76b0c8..526aa176 100644 --- a/src/pytest_celery/vendors/memcached/fixtures.py +++ b/src/pytest_celery/vendors/memcached/fixtures.py @@ -13,6 +13,12 @@ @pytest.fixture def celery_memcached_backend(default_memcached_backend: MemcachedContainer) -> MemcachedTestBackend: + """Creates a MemcachedTestBackend instance. Responsible for tearing down + the node. + + Args: + default_memcached_backend (MemcachedContainer): Instantiated MemcachedContainer. + """ backend = MemcachedTestBackend(default_memcached_backend) yield backend backend.teardown() diff --git a/src/pytest_celery/vendors/rabbitmq/fixtures.py b/src/pytest_celery/vendors/rabbitmq/fixtures.py index e39333e2..0dc9995a 100644 --- a/src/pytest_celery/vendors/rabbitmq/fixtures.py +++ b/src/pytest_celery/vendors/rabbitmq/fixtures.py @@ -13,6 +13,12 @@ @pytest.fixture def celery_rabbitmq_broker(default_rabbitmq_broker: RabbitMQContainer) -> RabbitMQTestBroker: + """Creates a RabbitMQTestBroker instance. Responsible for tearing down the + node. + + Args: + default_rabbitmq_broker (RabbitMQContainer): Instantiated RabbitMQContainer. + """ broker = RabbitMQTestBroker(default_rabbitmq_broker) yield broker broker.teardown() diff --git a/src/pytest_celery/vendors/redis/backend/fixtures.py b/src/pytest_celery/vendors/redis/backend/fixtures.py index ded5f3f4..32ddb0ff 100644 --- a/src/pytest_celery/vendors/redis/backend/fixtures.py +++ b/src/pytest_celery/vendors/redis/backend/fixtures.py @@ -13,6 +13,12 @@ @pytest.fixture def celery_redis_backend(default_redis_backend: RedisContainer) -> RedisTestBackend: + """Creates a RedisTestBackend instance. Responsible for tearing down the + node. + + Args: + default_redis_backend (RedisContainer): Instantiated RedisContainer. + """ backend = RedisTestBackend(default_redis_backend) yield backend backend.teardown() diff --git a/src/pytest_celery/vendors/redis/broker/fixtures.py b/src/pytest_celery/vendors/redis/broker/fixtures.py index 41fcc042..f3fd29f7 100644 --- a/src/pytest_celery/vendors/redis/broker/fixtures.py +++ b/src/pytest_celery/vendors/redis/broker/fixtures.py @@ -13,6 +13,12 @@ @pytest.fixture def celery_redis_broker(default_redis_broker: RedisContainer) -> RedisTestBroker: + """Creates a RedisTestBroker instance. Responsible for tearing down the + node. + + Args: + default_redis_broker (RedisContainer): Instantiated RedisContainer. + """ broker = RedisTestBroker(default_redis_broker) yield broker broker.teardown() diff --git a/src/pytest_celery/vendors/redis/container.py b/src/pytest_celery/vendors/redis/container.py index 92518901..bdeab323 100644 --- a/src/pytest_celery/vendors/redis/container.py +++ b/src/pytest_celery/vendors/redis/container.py @@ -29,7 +29,7 @@ def celeryconfig(self) -> dict: } @classmethod - def command(cls, *args: str) -> list: + def command(cls, *args: str) -> list[str]: return ["redis-server", *args] @property diff --git a/src/pytest_celery/vendors/worker/container.py b/src/pytest_celery/vendors/worker/container.py index fc5f6a6e..4ad5ffe5 100644 --- a/src/pytest_celery/vendors/worker/container.py +++ b/src/pytest_celery/vendors/worker/container.py @@ -14,6 +14,18 @@ class CeleryWorkerContainer(CeleryTestContainer): + """This is the base class for all Celery worker containers. It is + preconfigured for a built-in Celery worker image and should be customized + for your own worker image. + + The purpose of this class is manupulating the container volume and + configurations to warm up the worker container according to the test case requirements. + + Responsibility Scope: + Prepare the worker container with the required filesystem, configurations and + dependencies of your project. + """ + def _wait_port(self, port: str) -> int: raise NotImplementedError @@ -39,28 +51,42 @@ def worker_queue(cls) -> str: @classmethod def app_module(cls) -> ModuleType: + """A preconfigured module that contains the Celery app instance. + + The module is manipulated at runtime to inject the required + configurations from the test case. + """ from pytest_celery.vendors.worker.content import app return app @classmethod def utils_module(cls) -> ModuleType: + """A utility helper module for running python code in the worker + container context.""" from pytest_celery.vendors.worker.content import utils return utils @classmethod def tasks_modules(cls) -> set: - from pytest_celery.vendors.worker import tasks + """Tasks modules.""" + from pytest_celery.vendors.worker import tasks as default_tasks - return {tasks} + return {default_tasks} @classmethod def signals_modules(cls) -> set: + """Signals handlers modules. + + This is an optional feature that can be used to inject signals + handlers that needs to in the context of the worker container. + """ return set() @classmethod def buildargs(cls) -> dict: + """Build arguments for the built-in worker image.""" return { "CELERY_VERSION": cls.version(), "CELERY_LOG_LEVEL": cls.log_level(), @@ -70,6 +96,17 @@ def buildargs(cls) -> dict: @classmethod def env(cls, celery_worker_cluster_config: dict, initial: dict | None = None) -> dict: + """Defines the environment variables for the worker container. + + See more: pytest_docker_tools.container() + + Args: + celery_worker_cluster_config (dict): Environment variables to set. + initial (dict | None, optional): Additional variables. Defaults to None. + + Returns: + dict: Environment variables set for the worker container from the test case. + """ env = initial or {} env = {**DEFAULT_WORKER_ENV.copy(), **env} @@ -96,6 +133,20 @@ def initial_content( app_module: ModuleType | None = None, utils_module: ModuleType | None = None, ) -> dict: + """Defines the initial content of the worker container. + + See more: pytest_docker_tools.volume() + + Args: + worker_tasks (set | None, optional): Set of tasks modules. Defaults to None. + worker_signals (set | None, optional): Set of signals handlers modules. Defaults to None. + worker_app (Celery | None, optional): Celery app instance. Defaults to None. + app_module (ModuleType | None, optional): app module. Defaults to None. + utils_module (ModuleType | None, optional): utils module. Defaults to None. + + Returns: + dict: Custome volume content for the worker container. + """ if app_module is None: app_module = cls.app_module() diff --git a/src/pytest_celery/vendors/worker/content/app.py b/src/pytest_celery/vendors/worker/content/app.py index c13fe927..0cf8203d 100644 --- a/src/pytest_celery/vendors/worker/content/app.py +++ b/src/pytest_celery/vendors/worker/content/app.py @@ -1,4 +1,4 @@ -""" Template for Celery worker application. """ +"""Template for Celery worker application.""" from __future__ import annotations import json diff --git a/src/pytest_celery/vendors/worker/content/utils.py b/src/pytest_celery/vendors/worker/content/utils.py index 16cc8625..f784aa1e 100644 --- a/src/pytest_celery/vendors/worker/content/utils.py +++ b/src/pytest_celery/vendors/worker/content/utils.py @@ -6,6 +6,7 @@ def get_running_processes_info(columns: list[str] | None = None) -> str: + """Get information about running processes using psutil.""" if not columns: columns = [ "pid", diff --git a/src/pytest_celery/vendors/worker/fixtures.py b/src/pytest_celery/vendors/worker/fixtures.py index 33069d6c..8f008ac5 100644 --- a/src/pytest_celery/vendors/worker/fixtures.py +++ b/src/pytest_celery/vendors/worker/fixtures.py @@ -24,6 +24,14 @@ def celery_setup_worker( default_worker_container: CeleryWorkerContainer, default_worker_app: Celery, ) -> CeleryTestWorker: + """Creates a CeleryTestWorker instance. Responsible for tearing down the + node. + + Args: + default_worker_cls (type[CeleryTestWorker]): Interface class. + default_worker_container (CeleryWorkerContainer): Instantiated CeleryWorkerContainer. + default_worker_app (Celery): Celery app instance. + """ worker = default_worker_cls( container=default_worker_container, app=default_worker_app, diff --git a/src/pytest_celery/vendors/worker/tasks.py b/src/pytest_celery/vendors/worker/tasks.py index 4dfd3802..f23bdb0f 100644 --- a/src/pytest_celery/vendors/worker/tasks.py +++ b/src/pytest_celery/vendors/worker/tasks.py @@ -3,4 +3,8 @@ @shared_task def ping() -> str: + """Pytest-celery internal task. + + Used to check if the worker is up and running. + """ return "pong" diff --git a/src/pytest_celery/vendors/worker/volume.py b/src/pytest_celery/vendors/worker/volume.py index 1aa8f449..2a810861 100644 --- a/src/pytest_celery/vendors/worker/volume.py +++ b/src/pytest_celery/vendors/worker/volume.py @@ -11,21 +11,68 @@ class WorkerInitialContent: + """This class is responsible for generating the initial content for the + worker container volume. + + Responsibility Scope: + Prepare the worker container with the required filesystem, configurations and + dependencies for your project. + """ + class Parser: + """Parser for the initial content of the worker container.""" + def imports_str(self, modules: set[ModuleType]) -> str: + """Parse the given modules and return a string with the import + statements. + + Args: + modules (set[ModuleType]): A set of modules to parse. + + Returns: + str: "from module import *" statements. + """ return "".join(f"from {module.__name__} import *\n" for module in modules) def imports_src(self, modules: set[ModuleType]) -> dict: + """Parse the given modules and return a dict with the source code + of the modules. + + Args: + modules (set[ModuleType]): A set of modules to parse. + + Returns: + dict: A dict with the source code of the modules. + """ src = dict() for module in modules: src[f"{module.__name__.replace('.', '/')}.py"] = inspect.getsource(module).encode() return src def app_name(self, name: str | None = None) -> str: + """Generates the Celery app initialization string. + + Args: + name (str | None, optional): The app name. Defaults to None. + + Returns: + str: app = Celery(name) + """ name = name or DEFAULT_WORKER_APP_NAME return f"app = Celery('{name}')" def config(self, app: Celery | None = None) -> str: + """Generates the Celery app configuration changes. + + Args: + app (Celery | None, optional): Celery app with possibly changed config. Defaults to None. + + Raises: + TypeError: If the app.conf.changes property is not a dict. + + Returns: + str: config = {key: value, ...} or config = None + """ app = app or Celery(DEFAULT_WORKER_APP_NAME) # Accessing the app.conf.changes.data property will trigger the PendingConfiguration to be resolved @@ -53,6 +100,14 @@ def __init__( app_module: ModuleType | None = None, utils_module: ModuleType | None = None, ) -> None: + """Creates an initial content for the worker container. Defaults to + built-in plugin-provided modules. + + Args: + app_module (ModuleType | None, optional): App module adjusted for parsing. Defaults to None. + utils_module (ModuleType | None, optional): Utils module with for running python code in the + worker container. Defaults to None. + """ self.parser = self.Parser() self._initial_content = { "__init__.py": b"", @@ -80,6 +135,7 @@ def __eq__(self, other: object) -> bool: ) def set_app_module(self, app_module: ModuleType | None = None) -> None: + """Injects an app module into the initial content.""" self._app_module_src: str | None if app_module: @@ -88,6 +144,7 @@ def set_app_module(self, app_module: ModuleType | None = None) -> None: self._app_module_src = None def set_utils_module(self, utils_module: ModuleType | None = None) -> None: + """Injects a utils module into the initial content.""" self._utils_module_src: str | None if utils_module: @@ -96,6 +153,12 @@ def set_utils_module(self, utils_module: ModuleType | None = None) -> None: self._utils_module_src = None def add_modules(self, name: str, modules: set[ModuleType]) -> None: + """Adds a set of modules to the initial content. + + Args: + name (str): Arbitrary unique name for the set of modules. + modules (set[ModuleType]): A set of modules to add. + """ if not name: raise ValueError("name cannot be empty") @@ -106,24 +169,49 @@ def add_modules(self, name: str, modules: set[ModuleType]) -> None: self._initial_content.update(self.parser.imports_src(modules)) def set_app_name(self, name: str | None = None) -> None: + """Sets the Celery app name. + + Args: + name (str | None, optional): The app name. Defaults to None. + """ name = name or DEFAULT_WORKER_APP_NAME self._app = self.parser.app_name(name) def set_config_from_object(self, app: Celery | None = None) -> None: + """Sets the Celery app configuration from the given app. + + Args: + app (Celery | None, optional): Celery app with possibly changed config. Defaults to None. + """ self._config = self.parser.config(app) def generate(self) -> dict: + """Generates the initial content for the worker container. + + Returns: + dict: Initial content volume for the worker container. + """ initial_content = self._initial_content.copy() initial_content["app.py"] = self._generate_app_py(initial_content) initial_content["utils.py"] = self._generate_utils_py() return initial_content def _generate_app_py(self, initial_content: dict) -> bytes: + """Generates the app.py file for the worker container. + + Args: + initial_content (dict): The current initial content. + + Returns: + bytes: The generated app.py file. + """ if not self._app_module_src: - raise ValueError("Please set_app_module() before generating initial content") + raise ValueError("Please use set_app_module() to define the app module before generating initial content") if not initial_content["imports"]: - raise ValueError("Please add_modules() before generating initial content") + raise ValueError( + "Please use set_utils_module() to define the utils module before generating initial content" + ) _imports: dict | Any = initial_content.pop("imports") imports = "{%s}" % "}{".join(_imports.keys()) @@ -143,6 +231,11 @@ def _generate_app_py(self, initial_content: dict) -> bytes: return self._app_module_src.encode() def _generate_utils_py(self) -> bytes: + """Generates the utils.py file for the worker container. + + Returns: + bytes: The generated utils.py file. + """ if not self._utils_module_src: raise ValueError("Please set_utils_module() before generating initial content") diff --git a/tests/integration/api/test_setup.py b/tests/integration/api/test_setup.py index 3bb6d4f3..69e0c33c 100644 --- a/tests/integration/api/test_setup.py +++ b/tests/integration/api/test_setup.py @@ -1,5 +1,6 @@ from __future__ import annotations +import pytest from celery import Celery from pytest_celery import RESULT_TIMEOUT @@ -9,8 +10,21 @@ class test_celery_test_setup_integration: - def test_ready(self, celery_setup: CeleryTestSetup): - assert celery_setup.ready() + @pytest.mark.parametrize( + "confirmation", + [ + # Only ping + {"ping": True, "control": False, "docker": False}, + # Only control + {"ping": False, "control": True, "docker": False}, + # Only docker + {"ping": False, "control": False, "docker": True}, + # All + {"ping": True, "control": True, "docker": True}, + ], + ) + def test_ready(self, celery_setup: CeleryTestSetup, confirmation: dict): + assert celery_setup.ready(**confirmation) def test_worker_is_connected_to_backend(self, celery_setup: CeleryTestSetup): backend_urls = [ diff --git a/tests/unit/api/test_setup.py b/tests/unit/api/test_setup.py index f247b3ce..ebdb95fb 100644 --- a/tests/unit/api/test_setup.py +++ b/tests/unit/api/test_setup.py @@ -52,13 +52,18 @@ def test_create_setup_app_no_name(self, celery_setup: CeleryTestSetup, celery_se def test_teardown(self, celery_setup: CeleryTestSetup): celery_setup.teardown() - def test_default_ready_args(self, celery_setup: CeleryTestSetup): - assert celery_setup.ready() - - def test_ping_ready(self, celery_setup: CeleryTestSetup): - celery_setup.worker_cluster.nodes = tuple() - assert celery_setup.ready(ping=True, control=False, docker=False) - - def test_docker_ready(self, celery_setup: CeleryTestSetup): + @pytest.mark.parametrize( + "confirmation", + [ + {"ping": True, "docker": False}, + {"ping": False, "docker": True}, + {"ping": True, "docker": True}, + {"ping": False, "docker": False}, + ], + ) + def test_ready(self, celery_setup: CeleryTestSetup, confirmation: dict): celery_setup.worker_cluster.nodes = tuple() - assert celery_setup.ready(ping=False, control=False, docker=True) + assert celery_setup.ready( + **confirmation, + control=False, # Not supported in unit tests + )