Skip to content

Commit

Permalink
[DOP-19992] - create celery app with provided settings
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Nov 21, 2024
1 parent eb73767 commit e1ef97e
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 64 deletions.
6 changes: 3 additions & 3 deletions syncmaster/backend/api/v1/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
ReadRunSchema,
RunPageSchema,
)
from syncmaster.worker.config import celery
from syncmaster.worker.settings import get_worker_settings
from syncmaster.worker.settings import WorkerAppSettings
from syncmaster.worker.tasks import celery

router = APIRouter(tags=["Runs"], responses=get_error_responses())

Expand Down Expand Up @@ -117,7 +117,7 @@ async def start_run(
type=RunType.MANUAL,
)

log_url = Template(get_worker_settings().worker.LOG_URL_TEMPLATE).render(
log_url = Template(WorkerAppSettings().worker.LOG_URL_TEMPLATE).render(
run=run,
correlation_id=correlation_id.get(),
)
Expand Down
17 changes: 14 additions & 3 deletions syncmaster/backend/settings/auth/keycloak.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from pydantic import BaseModel, Field, SecretStr
from pydantic import Field, SecretStr
from pydantic_settings import BaseSettings


class KeycloakSettings(BaseModel):
class KeycloakSettings(BaseSettings):

server_url: str = Field(..., description="Keycloak server URL")
client_id: str = Field(..., description="Keycloak client ID")
Expand All @@ -13,10 +14,20 @@ class KeycloakSettings(BaseModel):
verify_ssl: bool = Field(True, description="Verify SSL certificates")
scope: str = Field("openid", description="Keycloak scope")

class Config:
env_prefix = "SYNCMASTER__"
env_nested_delimiter = "__"
extra = "allow"

class KeycloakAuthProviderSettings(BaseModel):

class KeycloakAuthProviderSettings(BaseSettings):
"""Settings related to Keycloak interaction."""

keycloak: KeycloakSettings = Field(
description="Keycloak settings",
)

class Config:
env_prefix = "SYNCMASTER__"
env_nested_delimiter = "__"
extra = "allow"
2 changes: 1 addition & 1 deletion syncmaster/scheduler/transfer_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlalchemy import select

from syncmaster.db.models import Transfer
from syncmaster.scheduler.settings import SchedulerSettings as Settings
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
from syncmaster.scheduler.utils import get_async_session


Expand Down
2 changes: 1 addition & 1 deletion syncmaster/scheduler/transfer_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
from syncmaster.scheduler.utils import get_async_session
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
from syncmaster.worker.config import celery
from syncmaster.worker.tasks import celery


class TransferJobManager:
Expand Down
16 changes: 16 additions & 0 deletions syncmaster/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,18 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from celery import Celery

from syncmaster.worker.base import WorkerTask


def create_celery_app(settings) -> Celery:
app = Celery(
__name__,
broker=settings.broker.url,
backend="db+" + settings.database.sync_url,
task_cls=WorkerTask(settings=settings),
imports=[
"syncmaster.worker.transfer",
],
)
return app
6 changes: 3 additions & 3 deletions syncmaster/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from celery import Task
from sqlalchemy import create_engine

from syncmaster.worker.settings import get_worker_settings
from syncmaster.worker.settings import WorkerAppSettings


class WorkerTask(Task):
def __init__(self) -> None:
self.settings = get_worker_settings()
def __init__(self, settings: WorkerAppSettings) -> None:
self.settings = settings
self.engine = create_engine(
url=self.settings.database.sync_url,
)
17 changes: 0 additions & 17 deletions syncmaster/worker/config.py

This file was deleted.

16 changes: 0 additions & 16 deletions syncmaster/worker/settings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
import importlib
import os

from pydantic import Field
from pydantic.types import ImportString
from pydantic_settings import BaseSettings
Expand Down Expand Up @@ -54,16 +51,3 @@ class WorkerAppSettings(BaseSettings):
class Config:
env_prefix = "SYNCMASTER__"
env_nested_delimiter = "__"


def get_worker_settings() -> WorkerAppSettings:
# TODO: add to worker documentation
worker_settings_path = os.environ.get("WORKER_SETTINGS", None)

if worker_settings_path:
module_name, class_name = worker_settings_path.rsplit(".", 1)
module = importlib.import_module(module_name)
settings_class = getattr(module, class_name)
else:
settings_class = WorkerAppSettings
return settings_class()
13 changes: 13 additions & 0 deletions syncmaster/worker/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from syncmaster.worker import create_celery_app
from syncmaster.worker.settings import WorkerAppSettings
from syncmaster.worker.transfer import run_transfer_task


def register_tasks(celery_app):
celery_app.task(name="run_transfer_task", bind=True, track_started=True)(run_transfer_task)


celery = create_celery_app(WorkerAppSettings())
register_tasks(celery)
6 changes: 2 additions & 4 deletions syncmaster/worker/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker.base import WorkerTask
from syncmaster.worker.config import celery
from syncmaster.worker.controller import TransferController
from syncmaster.worker.settings import get_worker_settings
from syncmaster.worker.settings import WorkerAppSettings

logger = get_task_logger(__name__)

CORRELATION_CELERY_HEADER_ID = get_worker_settings().worker.CORRELATION_CELERY_HEADER_ID
CORRELATION_CELERY_HEADER_ID = WorkerAppSettings().worker.CORRELATION_CELERY_HEADER_ID


@celery.task(name="run_transfer_task", bind=True, track_started=True)
def run_transfer_task(self: WorkerTask, run_id: int) -> None:
onetl.log.setup_logging(level=logging.INFO)
with Session(self.engine) as session:
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import pytest_asyncio
from alembic.config import Config as AlembicConfig
from celery import Celery
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import (
AsyncEngine,
Expand All @@ -22,6 +23,8 @@
from syncmaster.backend.utils.jwt import sign_jwt
from syncmaster.db.models import Base
from syncmaster.scheduler.settings import SchedulerAppSettings
from syncmaster.worker import create_celery_app
from syncmaster.worker.settings import WorkerAppSettings
from tests.mocks import UserTestRoles
from tests.settings import TestSettings
from tests.utils import prepare_new_database, run_async_migrations
Expand Down Expand Up @@ -77,6 +80,11 @@ def scheduler_settings(request: pytest.FixtureRequest) -> SchedulerAppSettings:
return SchedulerAppSettings.parse_obj(request.param)


@pytest.fixture(scope="session", params=[{}])
def worker_settings(request: pytest.FixtureRequest) -> WorkerAppSettings:
return WorkerAppSettings.parse_obj(request.param)


@pytest.fixture(scope="session")
def test_settings():
return TestSettings()
Expand Down Expand Up @@ -130,6 +138,12 @@ async def client(settings: Settings) -> AsyncGenerator:
logger.info("END CLIENT FIXTURE")


@pytest.fixture(scope="session", params=[{}])
def celery(worker_settings: WorkerAppSettings) -> Celery:
celery_app = create_celery_app(worker_settings)
return celery_app


@pytest_asyncio.fixture
async def create_connection_data(request):
if hasattr(request, "param"):
Expand Down
3 changes: 0 additions & 3 deletions tests/test_integration/celery_test.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import pytest
from apscheduler.triggers.cron import CronTrigger
from celery import Celery
from pytest_mock import MockerFixture, MockType

from syncmaster.scheduler.transfer_job_manager import TransferJobManager
from syncmaster.worker.config import celery


@pytest.fixture
def mock_send_task_to_tick(mocker: MockerFixture) -> MockType:
def mock_send_task_to_tick(mocker: MockerFixture, celery: Celery) -> MockType:
original_to_thread = asyncio.to_thread
return mocker.patch(
"asyncio.to_thread",
Expand Down
7 changes: 5 additions & 2 deletions tests/test_integration/test_scheduler/test_task.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import time
from datetime import datetime, timezone

from celery import Celery
from sqlalchemy.orm import Session

from syncmaster.db.models.run import Run, Status
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker.base import WorkerTask
from syncmaster.worker.config import celery


@celery.task(name="tick", bind=True, track_started=True)
def tick(self: WorkerTask, run_id: int) -> None:
with Session(self.engine) as session:
run = session.get(Run, run_id)
Expand All @@ -26,3 +25,7 @@ def tick(self: WorkerTask, run_id: int) -> None:
run.ended_at = datetime.now(tz=timezone.utc)
session.add(run)
session.commit()


def test_tick_task(celery: Celery):
celery.task(name="tick", bind=True, track_started=True)(tick)
14 changes: 7 additions & 7 deletions tests/test_unit/test_runs/test_create_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async def test_developer_plus_can_create_run_of_transfer_his_group(
) -> None:
# Arrange
user = group_transfer.owner_group.get_member_of_role(role_developer_plus)
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task")
mock_send_task = mocker.patch("syncmaster.worker.tasks.celery.send_task")
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

run = (
Expand Down Expand Up @@ -73,7 +73,7 @@ async def test_groupless_user_cannot_create_run(
mocker,
) -> None:
# Arrange
mocker.patch("syncmaster.worker.config.celery.send_task")
mocker.patch("syncmaster.worker.tasks.celery.send_task")
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

# Act
Expand Down Expand Up @@ -103,7 +103,7 @@ async def test_group_member_cannot_create_run_of_other_group_transfer(
role_guest_plus: UserTestRoles,
):
# Arrange
mocker.patch("syncmaster.worker.config.celery.send_task")
mocker.patch("syncmaster.worker.tasks.celery.send_task")
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)
user = group.get_member_of_role(role_guest_plus)

Expand Down Expand Up @@ -139,7 +139,7 @@ async def test_superuser_can_create_run(
mocker,
) -> None:
# Arrange
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task")
mock_send_task = mocker.patch("syncmaster.worker.tasks.celery.send_task")
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

# Act
Expand Down Expand Up @@ -183,7 +183,7 @@ async def test_unauthorized_user_cannot_create_run(
mocker,
) -> None:
# Arrange
mocker.patch("syncmaster.worker.config.celery.send_task")
mocker.patch("syncmaster.worker.tasks.celery.send_task")
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

# Act
Expand Down Expand Up @@ -212,7 +212,7 @@ async def test_group_member_cannot_create_run_of_unknown_transfer_error(
) -> None:
# Arrange
user = group_transfer.owner_group.get_member_of_role(role_guest_plus)
mocker.patch("syncmaster.worker.config.celery.send_task")
mocker.patch("syncmaster.worker.tasks.celery.send_task")
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

# Act
Expand Down Expand Up @@ -240,7 +240,7 @@ async def test_superuser_cannot_create_run_of_unknown_transfer_error(
mocker,
) -> None:
# Arrange
mocker.patch("syncmaster.worker.config.celery.send_task")
mocker.patch("syncmaster.worker.tasks.celery.send_task")
mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

# Act
Expand Down
4 changes: 2 additions & 2 deletions tests/test_unit/test_scheduler/test_transfer_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def test_send_job_to_celery_with_success(
group_transfer: MockTransfer,
):
# Arrange
mock_send_task = mocker.patch("syncmaster.worker.config.celery.send_task")
mock_send_task = mocker.patch("syncmaster.worker.tasks.celery.send_task")
mock_to_thread = mocker.patch("asyncio.to_thread", new_callable=AsyncMock)

# Act
Expand All @@ -107,7 +107,7 @@ async def test_send_job_to_celery_with_failure(
group_transfer: MockTransfer,
):
# Arrange
mocker.patch("syncmaster.worker.config.celery.send_task")
mocker.patch("syncmaster.worker.tasks.celery.send_task")
mocker.patch("asyncio.to_thread", new_callable=AsyncMock, side_effect=KombuError)

# Act & Assert
Expand Down

0 comments on commit e1ef97e

Please sign in to comment.