From a1fd63f8ded91f3db4c1eee255697f53968be4fa Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Thu, 21 Nov 2024 16:36:31 +0300 Subject: [PATCH] [DOP-19992] - move log_url_template to server_settings --- .env.docker | 6 ++--- .env.local | 6 ++--- Makefile | 2 +- docker/entrypoint_worker.sh | 2 +- syncmaster/backend/api/v1/runs.py | 9 +++++--- .../backend/settings/server/__init__.py | 4 ++++ syncmaster/scheduler/transfer_fetcher.py | 2 +- syncmaster/scheduler/transfer_job_manager.py | 2 +- syncmaster/worker/__init__.py | 22 +++++++++++++++++++ syncmaster/worker/base.py | 5 +++-- syncmaster/worker/config.py | 17 -------------- syncmaster/worker/controller.py | 2 +- syncmaster/worker/settings/__init__.py | 22 +------------------ syncmaster/worker/transfer.py | 13 ++++++----- tests/conftest.py | 14 ++++++++++++ tests/test_integration/celery_test.py | 2 +- .../scheduler_fixtures/mocker_fixtures.py | 4 ++-- .../test_scheduler/test_task.py | 2 +- tests/test_unit/test_runs/test_create_run.py | 14 ++++++------ .../test_transfer_job_manager.py | 4 ++-- 20 files changed, 79 insertions(+), 75 deletions(-) delete mode 100644 syncmaster/worker/config.py diff --git a/.env.docker b/.env.docker index a8fd5d10..59ca01df 100644 --- a/.env.docker +++ b/.env.docker @@ -1,8 +1,9 @@ TZ=UTC ENV=LOCAL -# Debug +# Server Settngs SYNCMASTER__SERVER__DEBUG=true +SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }} # Logging SYNCMASTER__LOGGING__SETUP=True @@ -11,9 +12,6 @@ SYNCMASTER__LOGGING__PRESET=colored # Encrypt / Decrypt credentials data SYNCMASTER__ENCRYPTION__CRYPTO_KEY=UBgPTioFrtH2unlC4XFDiGf5sYfzbdSf_VgiUSaQc94= -# Worker settings -SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }} - # Scheduler settings SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200 diff --git a/.env.local b/.env.local index ec575e24..fc0e0be3 100644 --- a/.env.local +++ b/.env.local @@ -1,16 +1,14 @@ export TZ=UTC export ENV=LOCAL -# Debug +# Server Settngs export SYNCMASTER__SERVER__DEBUG=true +export SYNCMASTER__SERVER__LOG_URL_TEMPLATE="https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}" # Logging export SYNCMASTER__LOGGING__SETUP=True export SYNCMASTER__LOGGING__PRESET=colored -# Worker settings -export SYNCMASTER__WORKER__LOG_URL_TEMPLATE="https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}" - # Scheduler settings export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200 diff --git a/Makefile b/Makefile index 8c4edafd..17a0d36d 100644 --- a/Makefile +++ b/Makefile @@ -113,7 +113,7 @@ dev-server: db-start ##@Application Run development server (without docker) ${POETRY} run python -m syncmaster.backend $(ARGS) dev-worker: db-start broker-start ##@Application Run development broker (without docker) - ${POETRY} run python -m celery -A syncmaster.worker.config.celery worker --max-tasks-per-child=1 $(ARGS) + ${POETRY} run python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 $(ARGS) diff --git a/docker/entrypoint_worker.sh b/docker/entrypoint_worker.sh index 864e8acf..189557cf 100755 --- a/docker/entrypoint_worker.sh +++ b/docker/entrypoint_worker.sh @@ -5,5 +5,5 @@ set -e # Required to start each Celery task in separated process, avoiding issues with global Spark session object # exec is required to forward all signals to the main process -exec python -m celery -A syncmaster.worker.config.celery worker --max-tasks-per-child=1 "$@" +exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@" diff --git a/syncmaster/backend/api/v1/runs.py b/syncmaster/backend/api/v1/runs.py index 10cd2495..c936c4d0 100644 --- a/syncmaster/backend/api/v1/runs.py +++ b/syncmaster/backend/api/v1/runs.py @@ -2,13 +2,16 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio from datetime import datetime +from typing import Annotated from asgi_correlation_id import correlation_id from fastapi import APIRouter, Depends, Query from jinja2 import Template from kombu.exceptions import KombuError +from syncmaster.backend.dependencies import Stub from syncmaster.backend.services import UnitOfWork, get_user +from syncmaster.backend.settings import ServerAppSettings as Settings from syncmaster.db.models import RunType, Status, User from syncmaster.db.utils import Permission from syncmaster.errors.registration import get_error_responses @@ -21,8 +24,7 @@ ReadRunSchema, RunPageSchema, ) -from syncmaster.worker.config import celery -from syncmaster.worker.settings import get_worker_settings +from syncmaster.worker import celery router = APIRouter(tags=["Runs"], responses=get_error_responses()) @@ -81,6 +83,7 @@ async def read_run( @router.post("/runs") async def start_run( create_run_data: CreateRunSchema, + settings: Annotated[Settings, Depends(Stub(Settings))], unit_of_work: UnitOfWork = Depends(UnitOfWork), current_user: User = Depends(get_user(is_active=True)), ) -> ReadRunSchema: @@ -117,7 +120,7 @@ async def start_run( type=RunType.MANUAL, ) - log_url = Template(get_worker_settings().worker.LOG_URL_TEMPLATE).render( + log_url = Template(settings.server.log_url_template).render( run=run, correlation_id=correlation_id.get(), ) diff --git a/syncmaster/backend/settings/server/__init__.py b/syncmaster/backend/settings/server/__init__.py index c83fbfdb..cece8a5c 100644 --- a/syncmaster/backend/settings/server/__init__.py +++ b/syncmaster/backend/settings/server/__init__.py @@ -34,6 +34,10 @@ class ServerSettings(BaseModel): """, ), ) + log_url_template: str = Field( + "", + description="URL template for logging", + ) request_id: RequestIDSettings = Field( default_factory=RequestIDSettings, ) diff --git a/syncmaster/scheduler/transfer_fetcher.py b/syncmaster/scheduler/transfer_fetcher.py index 7b5c962e..a7435509 100644 --- a/syncmaster/scheduler/transfer_fetcher.py +++ b/syncmaster/scheduler/transfer_fetcher.py @@ -3,7 +3,7 @@ from sqlalchemy import select from syncmaster.db.models import Transfer -from syncmaster.scheduler.settings import SchedulerSettings as Settings +from syncmaster.scheduler.settings import SchedulerAppSettings as Settings from syncmaster.scheduler.utils import get_async_session diff --git a/syncmaster/scheduler/transfer_job_manager.py b/syncmaster/scheduler/transfer_job_manager.py index f0a1d061..ca0043af 100644 --- a/syncmaster/scheduler/transfer_job_manager.py +++ b/syncmaster/scheduler/transfer_job_manager.py @@ -12,7 +12,7 @@ from syncmaster.scheduler.settings import SchedulerAppSettings as Settings from syncmaster.scheduler.utils import get_async_session from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema -from syncmaster.worker.config import celery +from syncmaster.worker import celery class TransferJobManager: diff --git a/syncmaster/worker/__init__.py b/syncmaster/worker/__init__.py index eb9bf462..216b9087 100644 --- a/syncmaster/worker/__init__.py +++ b/syncmaster/worker/__init__.py @@ -1,2 +1,24 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from celery import Celery + +from syncmaster.worker.base import WorkerTask +from syncmaster.worker.settings import WorkerAppSettings + + +def create_celery_app(settings) -> Celery: + app = Celery( + __name__, + broker=settings.broker.url, + backend="db+" + settings.database.sync_url, + task_cls=WorkerTask, + imports=[ + "syncmaster.worker.transfer", + ], + ) + return app + + +# TODO: initialize celery app in __name__ == "__main__" +# then initialize celery app in backend via dependency injection and initialize in scheduler +celery = create_celery_app(WorkerAppSettings()) diff --git a/syncmaster/worker/base.py b/syncmaster/worker/base.py index 06868986..25923923 100644 --- a/syncmaster/worker/base.py +++ b/syncmaster/worker/base.py @@ -3,12 +3,13 @@ from celery import Task from sqlalchemy import create_engine -from syncmaster.worker.settings import get_worker_settings +from syncmaster.worker.settings import WorkerAppSettings as Settings class WorkerTask(Task): + def __init__(self) -> None: - self.settings = get_worker_settings() + self.settings = Settings() self.engine = create_engine( url=self.settings.database.sync_url, ) diff --git a/syncmaster/worker/config.py b/syncmaster/worker/config.py deleted file mode 100644 index f1b33fee..00000000 --- a/syncmaster/worker/config.py +++ /dev/null @@ -1,17 +0,0 @@ -# SPDX-FileCopyrightText: 2023-2024 MTS PJSC -# SPDX-License-Identifier: Apache-2.0 -from celery import Celery - -from syncmaster.worker.base import WorkerTask -from syncmaster.worker.settings import get_worker_settings - -worker_settings = get_worker_settings() -celery = Celery( - __name__, - broker=worker_settings.broker.url, - backend="db+" + worker_settings.database.sync_url, - task_cls=WorkerTask, - imports=[ - "syncmaster.worker.transfer", - ], -) diff --git a/syncmaster/worker/controller.py b/syncmaster/worker/controller.py index 1443f181..64f344cd 100644 --- a/syncmaster/worker/controller.py +++ b/syncmaster/worker/controller.py @@ -84,7 +84,7 @@ def __init__( ) def perform_transfer(self, settings: WorkerAppSettings) -> None: - spark = settings.CREATE_SPARK_SESSION_FUNCTION( + spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION( run=self.run, source=self.source_handler.connection_dto, target=self.target_handler.connection_dto, diff --git a/syncmaster/worker/settings/__init__.py b/syncmaster/worker/settings/__init__.py index adf5985f..1e0061c3 100644 --- a/syncmaster/worker/settings/__init__.py +++ b/syncmaster/worker/settings/__init__.py @@ -1,8 +1,5 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 -import importlib -import os - from pydantic import Field from pydantic.types import ImportString from pydantic_settings import BaseSettings @@ -23,13 +20,9 @@ class WorkerSettings(BaseSettings): .. code-block:: bash - SYNCMASTER__WORKER__LOG_URL_TEMPLATE="https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}" + SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY """ - LOG_URL_TEMPLATE: str = Field( - "", - description="URL template for logging", - ) CORRELATION_CELERY_HEADER_ID: str = Field( "CORRELATION_CELERY_HEADER_ID", description="Header ID for correlation in Celery", @@ -54,16 +47,3 @@ class WorkerAppSettings(BaseSettings): class Config: env_prefix = "SYNCMASTER__" env_nested_delimiter = "__" - - -def get_worker_settings() -> WorkerAppSettings: - # TODO: add to worker documentation - worker_settings_path = os.environ.get("WORKER_SETTINGS", None) - - if worker_settings_path: - module_name, class_name = worker_settings_path.rsplit(".", 1) - module = importlib.import_module(module_name) - settings_class = getattr(module, class_name) - else: - settings_class = WorkerAppSettings - return settings_class() diff --git a/syncmaster/worker/transfer.py b/syncmaster/worker/transfer.py index 3248c5da..ba09f738 100644 --- a/syncmaster/worker/transfer.py +++ b/syncmaster/worker/transfer.py @@ -5,6 +5,7 @@ import onetl from asgi_correlation_id import correlation_id +from celery import Celery from celery.signals import after_setup_logger, before_task_publish, task_prerun from celery.utils.log import get_task_logger from sqlalchemy import select @@ -15,18 +16,18 @@ from syncmaster.db.models import AuthData, Run, Status, Transfer from syncmaster.db.repositories.utils import decrypt_auth_data from syncmaster.exceptions.run import RunNotFoundError -from syncmaster.worker.base import WorkerTask -from syncmaster.worker.config import celery +from syncmaster.worker import celery from syncmaster.worker.controller import TransferController -from syncmaster.worker.settings import get_worker_settings +from syncmaster.worker.settings import WorkerAppSettings logger = get_task_logger(__name__) -CORRELATION_CELERY_HEADER_ID = get_worker_settings().worker.CORRELATION_CELERY_HEADER_ID +WORKER_SETTINGS = WorkerAppSettings() +CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID @celery.task(name="run_transfer_task", bind=True, track_started=True) -def run_transfer_task(self: WorkerTask, run_id: int) -> None: +def run_transfer_task(self: Celery, run_id: int) -> None: onetl.log.setup_logging(level=logging.INFO) with Session(self.engine) as session: run_transfer( @@ -70,7 +71,7 @@ def run_transfer(session: Session, run_id: int, settings: Settings): source_auth_data=source_auth_data, target_auth_data=target_auth_data, ) - controller.perform_transfer() + controller.perform_transfer(WORKER_SETTINGS) except Exception: run.status = Status.FAILED logger.exception("Run %r was failed", run.id) diff --git a/tests/conftest.py b/tests/conftest.py index db430c47..57881086 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ import pytest import pytest_asyncio from alembic.config import Config as AlembicConfig +from celery import Celery from httpx import AsyncClient from sqlalchemy.ext.asyncio import ( AsyncEngine, @@ -22,6 +23,8 @@ from syncmaster.backend.utils.jwt import sign_jwt from syncmaster.db.models import Base from syncmaster.scheduler.settings import SchedulerAppSettings +from syncmaster.worker import create_celery_app +from syncmaster.worker.settings import WorkerAppSettings from tests.mocks import UserTestRoles from tests.settings import TestSettings from tests.utils import prepare_new_database, run_async_migrations @@ -77,6 +80,11 @@ def scheduler_settings(request: pytest.FixtureRequest) -> SchedulerAppSettings: return SchedulerAppSettings.parse_obj(request.param) +@pytest.fixture(scope="session", params=[{}]) +def worker_settings(request: pytest.FixtureRequest) -> WorkerAppSettings: + return WorkerAppSettings.parse_obj(request.param) + + @pytest.fixture(scope="session") def test_settings(): return TestSettings() @@ -130,6 +138,12 @@ async def client(settings: Settings) -> AsyncGenerator: logger.info("END CLIENT FIXTURE") +@pytest.fixture(scope="session", params=[{}]) +def celery(worker_settings: WorkerAppSettings) -> Celery: + celery_app = create_celery_app(worker_settings) + return celery_app + + @pytest_asyncio.fixture async def create_connection_data(request): if hasattr(request, "param"): diff --git a/tests/test_integration/celery_test.py b/tests/test_integration/celery_test.py index 2991ae27..956ec6a5 100644 --- a/tests/test_integration/celery_test.py +++ b/tests/test_integration/celery_test.py @@ -1,3 +1,3 @@ -from syncmaster.worker.config import celery +from syncmaster.worker import celery celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"]) diff --git a/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py b/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py index 9380bff3..70351c23 100644 --- a/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py +++ b/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py @@ -2,14 +2,14 @@ import pytest from apscheduler.triggers.cron import CronTrigger +from celery import Celery from pytest_mock import MockerFixture, MockType from syncmaster.scheduler.transfer_job_manager import TransferJobManager -from syncmaster.worker.config import celery @pytest.fixture -def mock_send_task_to_tick(mocker: MockerFixture) -> MockType: +def mock_send_task_to_tick(mocker: MockerFixture, celery: Celery) -> MockType: original_to_thread = asyncio.to_thread return mocker.patch( "asyncio.to_thread", diff --git a/tests/test_integration/test_scheduler/test_task.py b/tests/test_integration/test_scheduler/test_task.py index b4d5d2c7..fe219ac6 100644 --- a/tests/test_integration/test_scheduler/test_task.py +++ b/tests/test_integration/test_scheduler/test_task.py @@ -5,8 +5,8 @@ from syncmaster.db.models.run import Run, Status from syncmaster.exceptions.run import RunNotFoundError +from syncmaster.worker import celery from syncmaster.worker.base import WorkerTask -from syncmaster.worker.config import celery @celery.task(name="tick", bind=True, track_started=True) diff --git a/tests/test_unit/test_runs/test_create_run.py b/tests/test_unit/test_runs/test_create_run.py index 70c48ab7..0340aebd 100644 --- a/tests/test_unit/test_runs/test_create_run.py +++ b/tests/test_unit/test_runs/test_create_run.py @@ -20,7 +20,7 @@ async def test_developer_plus_can_create_run_of_transfer_his_group( ) -> None: # Arrange user = group_transfer.owner_group.get_member_of_role(role_developer_plus) - mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task") + mock_send_task = mocker.patch("syncmaster.worker.celery.send_task") mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock) run = ( @@ -73,7 +73,7 @@ async def test_groupless_user_cannot_create_run( mocker, ) -> None: # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -103,7 +103,7 @@ async def test_group_member_cannot_create_run_of_other_group_transfer( role_guest_plus: UserTestRoles, ): # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) user = group.get_member_of_role(role_guest_plus) @@ -139,7 +139,7 @@ async def test_superuser_can_create_run( mocker, ) -> None: # Arrange - mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task") + mock_send_task = mocker.patch("syncmaster.worker.celery.send_task") mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -183,7 +183,7 @@ async def test_unauthorized_user_cannot_create_run( mocker, ) -> None: # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -212,7 +212,7 @@ async def test_group_member_cannot_create_run_of_unknown_transfer_error( ) -> None: # Arrange user = group_transfer.owner_group.get_member_of_role(role_guest_plus) - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -240,7 +240,7 @@ async def test_superuser_cannot_create_run_of_unknown_transfer_error( mocker, ) -> None: # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act diff --git a/tests/test_unit/test_scheduler/test_transfer_job_manager.py b/tests/test_unit/test_scheduler/test_transfer_job_manager.py index 486db41b..d8d91032 100644 --- a/tests/test_unit/test_scheduler/test_transfer_job_manager.py +++ b/tests/test_unit/test_scheduler/test_transfer_job_manager.py @@ -80,7 +80,7 @@ async def test_send_job_to_celery_with_success( group_transfer: MockTransfer, ): # Arrange - mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task") + mock_send_task = mocker.patch("syncmaster.worker.celery.send_task") mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock) # Act @@ -107,7 +107,7 @@ async def test_send_job_to_celery_with_failure( group_transfer: MockTransfer, ): # Arrange - mocker.patch("syncmaster.worker.config.celery.send_task") + mocker.patch("syncmaster.worker.celery.send_task") mocker.patch("asyncio.to_thread", new_callable=AsyncMock, side_effect=KombuError) # Act & Assert