Skip to content

Commit

Permalink
[DOP-19992] - move log_url_template to server_settings
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Nov 21, 2024
1 parent eb73767 commit 2858a96
Show file tree
Hide file tree
Showing 21 changed files with 84 additions and 81 deletions.
6 changes: 2 additions & 4 deletions .env.docker
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
TZ=UTC
ENV=LOCAL

# Debug
# Server Settngs
SYNCMASTER__SERVER__DEBUG=true
SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}

# Logging
SYNCMASTER__LOGGING__SETUP=True
Expand All @@ -11,9 +12,6 @@ SYNCMASTER__LOGGING__PRESET=colored
# Encrypt / Decrypt credentials data
SYNCMASTER__ENCRYPTION__CRYPTO_KEY=UBgPTioFrtH2unlC4XFDiGf5sYfzbdSf_VgiUSaQc94=

# Worker settings
SYNCMASTER__WORKER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}

# Scheduler settings
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200

Expand Down
6 changes: 2 additions & 4 deletions .env.local
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
export TZ=UTC
export ENV=LOCAL

# Debug
# Server Settngs
export SYNCMASTER__SERVER__DEBUG=true
export SYNCMASTER__SERVER__LOG_URL_TEMPLATE="https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}"

# Logging
export SYNCMASTER__LOGGING__SETUP=True
export SYNCMASTER__LOGGING__PRESET=colored

# Worker settings
export SYNCMASTER__WORKER__LOG_URL_TEMPLATE="https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}"

# Scheduler settings
export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ dev-server: db-start ##@Application Run development server (without docker)
${POETRY} run python -m syncmaster.backend $(ARGS)

dev-worker: db-start broker-start ##@Application Run development broker (without docker)
${POETRY} run python -m celery -A syncmaster.worker.config.celery worker --max-tasks-per-child=1 $(ARGS)
${POETRY} run python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 $(ARGS)



Expand Down
2 changes: 1 addition & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ services:
context: .
target: test
command: --loglevel=info -Q test_queue
entrypoint: [coverage, run, -m, celery, -A, tests.test_integration.celery_test, worker, --max-tasks-per-child=1]
entrypoint: [coverage, run, -m, celery, -A, syncmaster.worker.celery, worker, --max-tasks-per-child=1]
env_file: .env.docker
environment:
# CI runs tests in the worker container, so we need to turn off interaction with static files for it
Expand Down
2 changes: 1 addition & 1 deletion docker/entrypoint_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ set -e
# Required to start each Celery task in separated process, avoiding issues with global Spark session object

# exec is required to forward all signals to the main process
exec python -m celery -A syncmaster.worker.config.celery worker --max-tasks-per-child=1 "$@"
exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@"

9 changes: 6 additions & 3 deletions syncmaster/backend/api/v1/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
# SPDX-License-Identifier: Apache-2.0
import asyncio
from datetime import datetime
from typing import Annotated

from asgi_correlation_id import correlation_id
from fastapi import APIRouter, Depends, Query
from jinja2 import Template
from kombu.exceptions import KombuError

from syncmaster.backend.dependencies import Stub
from syncmaster.backend.services import UnitOfWork, get_user
from syncmaster.backend.settings import ServerAppSettings as Settings
from syncmaster.db.models import RunType, Status, User
from syncmaster.db.utils import Permission
from syncmaster.errors.registration import get_error_responses
Expand All @@ -21,8 +24,7 @@
ReadRunSchema,
RunPageSchema,
)
from syncmaster.worker.config import celery
from syncmaster.worker.settings import get_worker_settings
from syncmaster.worker import celery

router = APIRouter(tags=["Runs"], responses=get_error_responses())

Expand Down Expand Up @@ -81,6 +83,7 @@ async def read_run(
@router.post("/runs")
async def start_run(
create_run_data: CreateRunSchema,
settings: Annotated[Settings, Depends(Stub(Settings))],
unit_of_work: UnitOfWork = Depends(UnitOfWork),
current_user: User = Depends(get_user(is_active=True)),
) -> ReadRunSchema:
Expand Down Expand Up @@ -117,7 +120,7 @@ async def start_run(
type=RunType.MANUAL,
)

log_url = Template(get_worker_settings().worker.LOG_URL_TEMPLATE).render(
log_url = Template(settings.server.log_url_template).render(
run=run,
correlation_id=correlation_id.get(),
)
Expand Down
4 changes: 4 additions & 0 deletions syncmaster/backend/settings/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class ServerSettings(BaseModel):
""",
),
)
log_url_template: str = Field(
"",
description="URL template for logging",
)
request_id: RequestIDSettings = Field(
default_factory=RequestIDSettings,
)
Expand Down
2 changes: 1 addition & 1 deletion syncmaster/scheduler/transfer_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from sqlalchemy import select

from syncmaster.db.models import Transfer
from syncmaster.scheduler.settings import SchedulerSettings as Settings
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
from syncmaster.scheduler.utils import get_async_session


Expand Down
2 changes: 1 addition & 1 deletion syncmaster/scheduler/transfer_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
from syncmaster.scheduler.utils import get_async_session
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
from syncmaster.worker.config import celery
from syncmaster.worker import celery


class TransferJobManager:
Expand Down
22 changes: 22 additions & 0 deletions syncmaster/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,24 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from celery import Celery

from syncmaster.worker.base import WorkerTask
from syncmaster.worker.settings import WorkerAppSettings


def create_celery_app(settings) -> Celery:
app = Celery(
__name__,
broker=settings.broker.url,
backend="db+" + settings.database.sync_url,
task_cls=WorkerTask,
imports=[
"syncmaster.worker.transfer",
],
)
return app


# TODO: initialize celery app in __name__ == "__main__"
# then initialize celery app in backend via dependency injection and initialize in scheduler
celery = create_celery_app(WorkerAppSettings())
5 changes: 3 additions & 2 deletions syncmaster/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from celery import Task
from sqlalchemy import create_engine

from syncmaster.worker.settings import get_worker_settings
from syncmaster.worker.settings import WorkerAppSettings as Settings


class WorkerTask(Task):

def __init__(self) -> None:
self.settings = get_worker_settings()
self.settings = Settings()
self.engine = create_engine(
url=self.settings.database.sync_url,
)
17 changes: 0 additions & 17 deletions syncmaster/worker/config.py

This file was deleted.

2 changes: 1 addition & 1 deletion syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
)

def perform_transfer(self, settings: WorkerAppSettings) -> None:
spark = settings.CREATE_SPARK_SESSION_FUNCTION(
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
run=self.run,
source=self.source_handler.connection_dto,
target=self.target_handler.connection_dto,
Expand Down
22 changes: 1 addition & 21 deletions syncmaster/worker/settings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
import importlib
import os

from pydantic import Field
from pydantic.types import ImportString
from pydantic_settings import BaseSettings
Expand All @@ -23,13 +20,9 @@ class WorkerSettings(BaseSettings):
.. code-block:: bash
SYNCMASTER__WORKER__LOG_URL_TEMPLATE="https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }}"
SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY
"""

LOG_URL_TEMPLATE: str = Field(
"",
description="URL template for logging",
)
CORRELATION_CELERY_HEADER_ID: str = Field(
"CORRELATION_CELERY_HEADER_ID",
description="Header ID for correlation in Celery",
Expand All @@ -54,16 +47,3 @@ class WorkerAppSettings(BaseSettings):
class Config:
env_prefix = "SYNCMASTER__"
env_nested_delimiter = "__"


def get_worker_settings() -> WorkerAppSettings:
# TODO: add to worker documentation
worker_settings_path = os.environ.get("WORKER_SETTINGS", None)

if worker_settings_path:
module_name, class_name = worker_settings_path.rsplit(".", 1)
module = importlib.import_module(module_name)
settings_class = getattr(module, class_name)
else:
settings_class = WorkerAppSettings
return settings_class()
13 changes: 7 additions & 6 deletions syncmaster/worker/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import onetl
from asgi_correlation_id import correlation_id
from celery import Celery
from celery.signals import after_setup_logger, before_task_publish, task_prerun
from celery.utils.log import get_task_logger
from sqlalchemy import select
Expand All @@ -15,18 +16,18 @@
from syncmaster.db.models import AuthData, Run, Status, Transfer
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker.base import WorkerTask
from syncmaster.worker.config import celery
from syncmaster.worker import celery
from syncmaster.worker.controller import TransferController
from syncmaster.worker.settings import get_worker_settings
from syncmaster.worker.settings import WorkerAppSettings

logger = get_task_logger(__name__)

CORRELATION_CELERY_HEADER_ID = get_worker_settings().worker.CORRELATION_CELERY_HEADER_ID
WORKER_SETTINGS = WorkerAppSettings()
CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID


@celery.task(name="run_transfer_task", bind=True, track_started=True)
def run_transfer_task(self: WorkerTask, run_id: int) -> None:
def run_transfer_task(self: Celery, run_id: int) -> None:
onetl.log.setup_logging(level=logging.INFO)
with Session(self.engine) as session:
run_transfer(
Expand Down Expand Up @@ -70,7 +71,7 @@ def run_transfer(session: Session, run_id: int, settings: Settings):
source_auth_data=source_auth_data,
target_auth_data=target_auth_data,
)
controller.perform_transfer()
controller.perform_transfer(WORKER_SETTINGS)
except Exception:
run.status = Status.FAILED
logger.exception("Run %r was failed", run.id)
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import pytest_asyncio
from alembic.config import Config as AlembicConfig
from celery import Celery
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import (
AsyncEngine,
Expand All @@ -22,6 +23,8 @@
from syncmaster.backend.utils.jwt import sign_jwt
from syncmaster.db.models import Base
from syncmaster.scheduler.settings import SchedulerAppSettings
from syncmaster.worker import create_celery_app
from syncmaster.worker.settings import WorkerAppSettings
from tests.mocks import UserTestRoles
from tests.settings import TestSettings
from tests.utils import prepare_new_database, run_async_migrations
Expand Down Expand Up @@ -77,6 +80,11 @@ def scheduler_settings(request: pytest.FixtureRequest) -> SchedulerAppSettings:
return SchedulerAppSettings.parse_obj(request.param)


@pytest.fixture(scope="session", params=[{}])
def worker_settings(request: pytest.FixtureRequest) -> WorkerAppSettings:
return WorkerAppSettings.parse_obj(request.param)


@pytest.fixture(scope="session")
def test_settings():
return TestSettings()
Expand Down Expand Up @@ -130,6 +138,12 @@ async def client(settings: Settings) -> AsyncGenerator:
logger.info("END CLIENT FIXTURE")


@pytest.fixture(scope="session", params=[{}])
def celery(worker_settings: WorkerAppSettings) -> Celery:
celery_app = create_celery_app(worker_settings)
return celery_app


@pytest_asyncio.fixture
async def create_connection_data(request):
if hasattr(request, "param"):
Expand Down
3 changes: 0 additions & 3 deletions tests/test_integration/celery_test.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import pytest
from apscheduler.triggers.cron import CronTrigger
from celery import Celery
from pytest_mock import MockerFixture, MockType

from syncmaster.scheduler.transfer_job_manager import TransferJobManager
from syncmaster.worker.config import celery


@pytest.fixture
def mock_send_task_to_tick(mocker: MockerFixture) -> MockType:
def mock_send_task_to_tick(mocker: MockerFixture, celery: Celery) -> MockType:
original_to_thread = asyncio.to_thread
return mocker.patch(
"asyncio.to_thread",
Expand Down
10 changes: 6 additions & 4 deletions tests/test_integration/test_scheduler/test_task.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import time
from datetime import datetime, timezone

from celery import Celery
from sqlalchemy.orm import Session

from syncmaster.db.models.run import Run, Status
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker.base import WorkerTask
from syncmaster.worker.config import celery
from syncmaster.worker import celery

celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"])


@celery.task(name="tick", bind=True, track_started=True)
def tick(self: WorkerTask, run_id: int) -> None:
with Session(self.engine) as session:
def tick(celery: Celery, run_id: int) -> None:
with Session(celery.engine) as session:
run = session.get(Run, run_id)
if run is None:
raise RunNotFoundError
Expand Down
Loading

0 comments on commit 2858a96

Please sign in to comment.