diff --git a/syncmaster/backend/api/v1/runs.py b/syncmaster/backend/api/v1/runs.py index 10cd2495..ecb7fb93 100644 --- a/syncmaster/backend/api/v1/runs.py +++ b/syncmaster/backend/api/v1/runs.py @@ -21,8 +21,8 @@ ReadRunSchema, RunPageSchema, ) -from syncmaster.worker.config import celery -from syncmaster.worker.settings import get_worker_settings +from syncmaster.worker.settings import WorkerAppSettings +from syncmaster.worker.tasks import celery router = APIRouter(tags=["Runs"], responses=get_error_responses()) @@ -117,7 +117,7 @@ async def start_run( type=RunType.MANUAL, ) - log_url = Template(get_worker_settings().worker.LOG_URL_TEMPLATE).render( + log_url = Template(WorkerAppSettings().worker.LOG_URL_TEMPLATE).render( run=run, correlation_id=correlation_id.get(), ) diff --git a/syncmaster/backend/settings/auth/keycloak.py b/syncmaster/backend/settings/auth/keycloak.py index 46ac523a..f8717c9e 100644 --- a/syncmaster/backend/settings/auth/keycloak.py +++ b/syncmaster/backend/settings/auth/keycloak.py @@ -1,9 +1,10 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -from pydantic import BaseModel, Field, SecretStr +from pydantic import Field, SecretStr +from pydantic_settings import BaseSettings -class KeycloakSettings(BaseModel): +class KeycloakSettings(BaseSettings): server_url: str = Field(..., description="Keycloak server URL") client_id: str = Field(..., description="Keycloak client ID") @@ -13,10 +14,20 @@ class KeycloakSettings(BaseModel): verify_ssl: bool = Field(True, description="Verify SSL certificates") scope: str = Field("openid", description="Keycloak scope") + class Config: + env_prefix = "SYNCMASTER__" + env_nested_delimiter = "__" + extra = "allow" -class KeycloakAuthProviderSettings(BaseModel): + +class KeycloakAuthProviderSettings(BaseSettings): """Settings related to Keycloak interaction.""" keycloak: KeycloakSettings = Field( description="Keycloak settings", ) + + class Config: + env_prefix = "SYNCMASTER__" + env_nested_delimiter = "__" + extra = "allow" diff --git a/syncmaster/scheduler/transfer_fetcher.py b/syncmaster/scheduler/transfer_fetcher.py index 7b5c962e..a7435509 100644 --- a/syncmaster/scheduler/transfer_fetcher.py +++ b/syncmaster/scheduler/transfer_fetcher.py @@ -3,7 +3,7 @@ from sqlalchemy import select from syncmaster.db.models import Transfer -from syncmaster.scheduler.settings import SchedulerSettings as Settings +from syncmaster.scheduler.settings import SchedulerAppSettings as Settings from syncmaster.scheduler.utils import get_async_session diff --git a/syncmaster/scheduler/transfer_job_manager.py b/syncmaster/scheduler/transfer_job_manager.py index f0a1d061..8b8e040e 100644 --- a/syncmaster/scheduler/transfer_job_manager.py +++ b/syncmaster/scheduler/transfer_job_manager.py @@ -12,7 +12,7 @@ from syncmaster.scheduler.settings import SchedulerAppSettings as Settings from syncmaster.scheduler.utils import get_async_session from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema -from syncmaster.worker.config import celery +from syncmaster.worker.tasks import celery class TransferJobManager: diff --git a/syncmaster/worker/__init__.py b/syncmaster/worker/__init__.py index eb9bf462..c7cfa052 100644 --- a/syncmaster/worker/__init__.py +++ b/syncmaster/worker/__init__.py @@ -1,2 +1,18 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from celery import Celery + +from syncmaster.worker.base import WorkerTask + + +def create_celery_app(settings) -> Celery: + app = Celery( + __name__, + broker=settings.broker.url, + backend="db+" + settings.database.sync_url, + task_cls=WorkerTask(settings=settings), + imports=[ + "syncmaster.worker.transfer", + ], + ) + return app diff --git a/syncmaster/worker/base.py b/syncmaster/worker/base.py index 06868986..e3248316 100644 --- a/syncmaster/worker/base.py +++ b/syncmaster/worker/base.py @@ -3,12 +3,12 @@ from celery import Task from sqlalchemy import create_engine -from syncmaster.worker.settings import get_worker_settings +from syncmaster.worker.settings import WorkerAppSettings class WorkerTask(Task): - def __init__(self) -> None: - self.settings = get_worker_settings() + def __init__(self, settings: WorkerAppSettings) -> None: + self.settings = settings self.engine = create_engine( url=self.settings.database.sync_url, ) diff --git a/syncmaster/worker/config.py b/syncmaster/worker/config.py deleted file mode 100644 index f1b33fee..00000000 --- a/syncmaster/worker/config.py +++ /dev/null @@ -1,17 +0,0 @@ -# SPDX-FileCopyrightText: 2023-2024 MTS PJSC -# SPDX-License-Identifier: Apache-2.0 -from celery import Celery - -from syncmaster.worker.base import WorkerTask -from syncmaster.worker.settings import get_worker_settings - -worker_settings = get_worker_settings() -celery = Celery( - __name__, - broker=worker_settings.broker.url, - backend="db+" + worker_settings.database.sync_url, - task_cls=WorkerTask, - imports=[ - "syncmaster.worker.transfer", - ], -) diff --git a/syncmaster/worker/settings/__init__.py b/syncmaster/worker/settings/__init__.py index adf5985f..f9b2ea8b 100644 --- a/syncmaster/worker/settings/__init__.py +++ b/syncmaster/worker/settings/__init__.py @@ -1,8 +1,5 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -import importlib -import os - from pydantic import Field from pydantic.types import ImportString from pydantic_settings import BaseSettings @@ -54,16 +51,3 @@ class WorkerAppSettings(BaseSettings): class Config: env_prefix = "SYNCMASTER__" env_nested_delimiter = "__" - - -def get_worker_settings() -> WorkerAppSettings: - # TODO: add to worker documentation - worker_settings_path = os.environ.get("WORKER_SETTINGS", None) - - if worker_settings_path: - module_name, class_name = worker_settings_path.rsplit(".", 1) - module = importlib.import_module(module_name) - settings_class = getattr(module, class_name) - else: - settings_class = WorkerAppSettings - return settings_class() diff --git a/syncmaster/worker/tasks.py b/syncmaster/worker/tasks.py new file mode 100644 index 00000000..2af38f02 --- /dev/null +++ b/syncmaster/worker/tasks.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: 2023-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from syncmaster.worker import create_celery_app +from syncmaster.worker.settings import WorkerAppSettings +from syncmaster.worker.transfer import run_transfer_task + + +def register_tasks(celery_app): + celery_app.task(name="run_transfer_task", bind=True, track_started=True)(run_transfer_task) + + +celery = create_celery_app(WorkerAppSettings()) +register_tasks(celery) diff --git a/syncmaster/worker/transfer.py b/syncmaster/worker/transfer.py index 3248c5da..e544daad 100644 --- a/syncmaster/worker/transfer.py +++ b/syncmaster/worker/transfer.py @@ -16,16 +16,14 @@ from syncmaster.db.repositories.utils import decrypt_auth_data from syncmaster.exceptions.run import RunNotFoundError from syncmaster.worker.base import WorkerTask -from syncmaster.worker.config import celery from syncmaster.worker.controller import TransferController -from syncmaster.worker.settings import get_worker_settings +from syncmaster.worker.settings import WorkerAppSettings logger = get_task_logger(__name__) -CORRELATION_CELERY_HEADER_ID = get_worker_settings().worker.CORRELATION_CELERY_HEADER_ID +CORRELATION_CELERY_HEADER_ID = WorkerAppSettings().worker.CORRELATION_CELERY_HEADER_ID -@celery.task(name="run_transfer_task", bind=True, track_started=True) def run_transfer_task(self: WorkerTask, run_id: int) -> None: onetl.log.setup_logging(level=logging.INFO) with Session(self.engine) as session: diff --git a/tests/conftest.py b/tests/conftest.py index db430c47..57881086 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ import pytest import pytest_asyncio from alembic.config import Config as AlembicConfig +from celery import Celery from httpx import AsyncClient from sqlalchemy.ext.asyncio import ( AsyncEngine, @@ -22,6 +23,8 @@ from syncmaster.backend.utils.jwt import sign_jwt from syncmaster.db.models import Base from syncmaster.scheduler.settings import SchedulerAppSettings +from syncmaster.worker import create_celery_app +from syncmaster.worker.settings import WorkerAppSettings from tests.mocks import UserTestRoles from tests.settings import TestSettings from tests.utils import prepare_new_database, run_async_migrations @@ -77,6 +80,11 @@ def scheduler_settings(request: pytest.FixtureRequest) -> SchedulerAppSettings: return SchedulerAppSettings.parse_obj(request.param) +@pytest.fixture(scope="session", params=[{}]) +def worker_settings(request: pytest.FixtureRequest) -> WorkerAppSettings: + return WorkerAppSettings.parse_obj(request.param) + + @pytest.fixture(scope="session") def test_settings(): return TestSettings() @@ -130,6 +138,12 @@ async def client(settings: Settings) -> AsyncGenerator: logger.info("END CLIENT FIXTURE") +@pytest.fixture(scope="session", params=[{}]) +def celery(worker_settings: WorkerAppSettings) -> Celery: + celery_app = create_celery_app(worker_settings) + return celery_app + + @pytest_asyncio.fixture async def create_connection_data(request): if hasattr(request, "param"): diff --git a/tests/test_integration/celery_test.py b/tests/test_integration/celery_test.py deleted file mode 100644 index 2991ae27..00000000 --- a/tests/test_integration/celery_test.py +++ /dev/null @@ -1,3 +0,0 @@ -from syncmaster.worker.config import celery - -celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"]) diff --git a/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py b/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py index 9380bff3..70351c23 100644 --- a/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py +++ b/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py @@ -2,14 +2,14 @@ import pytest from apscheduler.triggers.cron import CronTrigger +from celery import Celery from pytest_mock import MockerFixture, MockType from syncmaster.scheduler.transfer_job_manager import TransferJobManager -from syncmaster.worker.config import celery @pytest.fixture -def mock_send_task_to_tick(mocker: MockerFixture) -> MockType: +def mock_send_task_to_tick(mocker: MockerFixture, celery: Celery) -> MockType: original_to_thread = asyncio.to_thread return mocker.patch( "asyncio.to_thread", diff --git a/tests/test_integration/test_scheduler/test_task.py b/tests/test_integration/test_scheduler/test_task.py index b4d5d2c7..b1f3869f 100644 --- a/tests/test_integration/test_scheduler/test_task.py +++ b/tests/test_integration/test_scheduler/test_task.py @@ -1,15 +1,14 @@ import time from datetime import datetime, timezone +from celery import Celery from sqlalchemy.orm import Session from syncmaster.db.models.run import Run, Status from syncmaster.exceptions.run import RunNotFoundError from syncmaster.worker.base import WorkerTask -from syncmaster.worker.config import celery -@celery.task(name="tick", bind=True, track_started=True) def tick(self: WorkerTask, run_id: int) -> None: with Session(self.engine) as session: run = session.get(Run, run_id) @@ -26,3 +25,7 @@ def tick(self: WorkerTask, run_id: int) -> None: run.ended_at = datetime.now(tz=timezone.utc) session.add(run) session.commit() + + +def test_tick_task(celery: Celery): + celery.task(name="tick", bind=True, track_started=True)(tick) diff --git a/tests/test_unit/test_runs/test_create_run.py b/tests/test_unit/test_runs/test_create_run.py index 70c48ab7..6817c896 100644 --- a/tests/test_unit/test_runs/test_create_run.py +++ b/tests/test_unit/test_runs/test_create_run.py @@ -20,7 +20,7 @@ async def test_developer_plus_can_create_run_of_transfer_his_group( ) -> None: # Arrange user = group_transfer.owner_group.get_member_of_role(role_developer_plus) - mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task") + mock_send_task = mocker.patch("syncmaster.worker.tasks.celery.send_task") mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock) run = ( @@ -73,7 +73,7 @@ async def test_groupless_user_cannot_create_run( mocker, ) -> None: # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.tasks.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -103,7 +103,7 @@ async def test_group_member_cannot_create_run_of_other_group_transfer( role_guest_plus: UserTestRoles, ): # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.tasks.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) user = group.get_member_of_role(role_guest_plus) @@ -139,7 +139,7 @@ async def test_superuser_can_create_run( mocker, ) -> None: # Arrange - mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task") + mock_send_task = mocker.patch("syncmaster.worker.tasks.celery.send_task") mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -183,7 +183,7 @@ async def test_unauthorized_user_cannot_create_run( mocker, ) -> None: # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.tasks.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -212,7 +212,7 @@ async def test_group_member_cannot_create_run_of_unknown_transfer_error( ) -> None: # Arrange user = group_transfer.owner_group.get_member_of_role(role_guest_plus) - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.tasks.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -240,7 +240,7 @@ async def test_superuser_cannot_create_run_of_unknown_transfer_error( mocker, ) -> None: # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.tasks.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act diff --git a/tests/test_unit/test_scheduler/test_transfer_job_manager.py b/tests/test_unit/test_scheduler/test_transfer_job_manager.py index 486db41b..34236b43 100644 --- a/tests/test_unit/test_scheduler/test_transfer_job_manager.py +++ b/tests/test_unit/test_scheduler/test_transfer_job_manager.py @@ -80,7 +80,7 @@ async def test_send_job_to_celery_with_success( group_transfer: MockTransfer, ): # Arrange - mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task") + mock_send_task = mocker.patch("syncmaster.worker.tasks.celery.send_task") mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -107,7 +107,7 @@ async def test_send_job_to_celery_with_failure( group_transfer: MockTransfer, ): # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.tasks.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock, side_effect=KombuError) # Act & Assert