diff --git a/.github/workflows/scheduler-tests.yml b/.github/workflows/scheduler-tests.yml new file mode 100644 index 00000000..db1e1bc7 --- /dev/null +++ b/.github/workflows/scheduler-tests.yml @@ -0,0 +1,79 @@ +name: Scheduler Tests +on: + workflow_call: + +env: + DEFAULT_PYTHON: '3.12' + +jobs: + tests: + name: Run Scheduler tests + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Cache jars + uses: actions/cache@v4 + with: + path: ./cached_jars + key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler + restore-keys: | + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler + ${{ runner.os }}-python- + + - name: Build Worker Image + uses: docker/build-push-action@v6 + with: + context: . + tags: mtsrus/syncmaster-worker:${{ github.sha }} + target: test + file: docker/Dockerfile.worker + load: true + cache-from: mtsrus/syncmaster-worker:develop + + - name: Docker compose up + run: | + docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans + docker compose -f docker-compose.test.yml --profile worker up -d --wait --wait-timeout 200 + env: + WORKER_IMAGE_TAG: ${{ github.sha }} + + # This is important, as coverage is exported after receiving SIGTERM + - name: Run Scheduler Tests + run: | + docker compose -f ./docker-compose.test.yml --profile worker exec -T worker coverage run -m pytest -vvv -s -m "worker and scheduler_integration" + + - name: Dump worker logs on failure + if: failure() + uses: jwalton/gh-docker-logs@v2 + with: + images: mtsrus/syncmaster-worker + dest: ./logs + + - name: Shutdown + if: always() + run: | + docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans + + - name: Upload worker logs + uses: actions/upload-artifact@v4 + if: failure() + with: + name: worker-logs-scheduler + path: logs/* + + - name: Upload coverage results + uses: actions/upload-artifact@v4 + with: + name: coverage-scheduler + path: reports/* + # https://github.com/actions/upload-artifact/issues/602 + include-hidden-files: true diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4c66a009..a059363f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -32,6 +32,10 @@ jobs: name: S3 tests uses: ./.github/workflows/s3-tests.yml + scheduler_tests: + name: Scheduler tests + uses: ./.github/workflows/scheduler-tests.yml + unit_tests: name: Unit tests uses: ./.github/workflows/unit-test.yml diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 7a616995..1e304566 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -78,6 +78,7 @@ services: context: . target: test command: --loglevel=info -Q test_queue + entrypoint: [python, -m, celery, -A, tests.test_integration.celery_test, worker, --max-tasks-per-child=1] env_file: .env.docker volumes: - ./syncmaster:/app/syncmaster @@ -90,7 +91,7 @@ services: condition: service_healthy rabbitmq: condition: service_healthy - profiles: [worker, s3, oracle, hdfs, hive, all] + profiles: [worker, scheduler, s3, oracle, hdfs, hive, all] test-postgres: image: postgres diff --git a/syncmaster/scheduler/__init__.py b/syncmaster/scheduler/__init__.py index eb9bf462..aa0c378f 100644 --- a/syncmaster/scheduler/__init__.py +++ b/syncmaster/scheduler/__init__.py @@ -1,2 +1,4 @@ # SPDX-FileCopyrightText: 2023-2024 MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from syncmaster.scheduler.transfer_fetcher import TransferFetcher +from syncmaster.scheduler.transfer_job_manager import TransferJobManager diff --git a/syncmaster/settings/__init__.py b/syncmaster/settings/__init__.py index 831fb0d0..19aa9995 100644 --- a/syncmaster/settings/__init__.py +++ b/syncmaster/settings/__init__.py @@ -53,8 +53,6 @@ class Settings(BaseSettings): SCHEDULER_TRANSFER_FETCHING_TIMEOUT: int = 180 # seconds SCHEDULER_MISFIRE_GRACE_TIME: int = 300 # seconds - CORRELATION_CELERY_HEADER_ID: str = "CORRELATION_CELERY_HEADER_ID" - TOKEN_EXPIRED_TIME: int = 60 * 60 * 10 # 10 hours CREATE_SPARK_SESSION_FUNCTION: ImportString = "syncmaster.worker.spark.get_worker_spark_session" diff --git a/tests/conftest.py b/tests/conftest.py index ab143513..8304f8d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,6 +32,7 @@ "tests.test_unit.test_runs.run_fixtures", "tests.test_unit.test_connections.connection_fixtures", "tests.test_unit.test_scheduler.scheduler_fixtures", + "tests.test_integration.test_scheduler.scheduler_fixtures", ] diff --git a/tests/test_integration/celery_test.py b/tests/test_integration/celery_test.py new file mode 100644 index 00000000..2991ae27 --- /dev/null +++ b/tests/test_integration/celery_test.py @@ -0,0 +1,3 @@ +from syncmaster.worker.config import celery + +celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"]) diff --git a/tests/test_integration/test_scheduler/__init__.py b/tests/test_integration/test_scheduler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_integration/test_scheduler/scheduler_fixtures/__init__.py b/tests/test_integration/test_scheduler/scheduler_fixtures/__init__.py new file mode 100644 index 00000000..a0781350 --- /dev/null +++ b/tests/test_integration/test_scheduler/scheduler_fixtures/__init__.py @@ -0,0 +1,7 @@ +from tests.test_integration.test_scheduler.scheduler_fixtures.mocker_fixtures import ( + mock_add_job, + mock_send_task_to_tick, +) +from tests.test_integration.test_scheduler.scheduler_fixtures.transfer_fixture import ( + group_transfer_integration_mock, +) diff --git a/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py b/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py new file mode 100644 index 00000000..9380bff3 --- /dev/null +++ b/tests/test_integration/test_scheduler/scheduler_fixtures/mocker_fixtures.py @@ -0,0 +1,33 @@ +import asyncio + +import pytest +from apscheduler.triggers.cron import CronTrigger +from pytest_mock import MockerFixture, MockType + +from syncmaster.scheduler.transfer_job_manager import TransferJobManager +from syncmaster.worker.config import celery + + +@pytest.fixture +def mock_send_task_to_tick(mocker: MockerFixture) -> MockType: + original_to_thread = asyncio.to_thread + return mocker.patch( + "asyncio.to_thread", + new=lambda func, *args, **kwargs: original_to_thread(celery.send_task, "tick", *args[1:], **kwargs), + ) + + +@pytest.fixture +def mock_add_job(mocker: MockerFixture, transfer_job_manager: TransferJobManager) -> MockType: + original_add_job = transfer_job_manager.scheduler.add_job + return mocker.patch.object( + transfer_job_manager.scheduler, + "add_job", + side_effect=lambda func, id, trigger, misfire_grace_time, args: original_add_job( + func=func, + id=id, + trigger=CronTrigger(second="*"), + misfire_grace_time=misfire_grace_time, + args=args, + ), + ) diff --git a/tests/test_integration/test_scheduler/scheduler_fixtures/transfer_fixture.py b/tests/test_integration/test_scheduler/scheduler_fixtures/transfer_fixture.py new file mode 100644 index 00000000..d88f4fdc --- /dev/null +++ b/tests/test_integration/test_scheduler/scheduler_fixtures/transfer_fixture.py @@ -0,0 +1,140 @@ +from collections.abc import AsyncGenerator + +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.backend.api.v1.auth.utils import sign_jwt +from syncmaster.db.repositories.utils import decrypt_auth_data +from syncmaster.settings import Settings +from tests.mocks import ( + MockConnection, + MockCredentials, + MockGroup, + MockTransfer, + MockUser, + UserTestRoles, +) +from tests.test_unit.conftest import create_group_member +from tests.test_unit.utils import ( + create_connection, + create_credentials, + create_group, + create_queue, + create_transfer, + create_user, +) + + +@pytest_asyncio.fixture +async def group_transfer_integration_mock( + session: AsyncSession, + settings: Settings, + create_connection_data: dict | None, + create_transfer_data: dict | None, +) -> AsyncGenerator[MockTransfer, None]: + group_owner = await create_user( + session=session, + username="group_transfer_owner", + is_active=True, + ) + group = await create_group( + session=session, + name="group_for_group_transfer", + owner_id=group_owner.id, + ) + + queue = await create_queue( + session=session, + name="test_queue", + group_id=group.id, + ) + + members: list[MockUser] = [] + for username in ( + "transfer_group_member_maintainer", + "transfer_group_member_developer", + "transfer_group_member_guest", + ): + members.append( + await create_group_member( + username=username, + group_id=group.id, + session=session, + settings=settings, + ), + ) + + await session.commit() + mock_group = MockGroup( + group=group, + owner=MockUser( + user=group_owner, + auth_token=sign_jwt(group_owner.id, settings), + role=UserTestRoles.Owner, + ), + members=members, + ) + + source_connection = await create_connection( + session=session, + name="group_transfer_source_connection", + group_id=group.id, + data=create_connection_data, + ) + source_connection_creds = await create_credentials( + session=session, + settings=settings, + connection_id=source_connection.id, + ) + target_connection = await create_connection( + session=session, + name="group_transfer_target_connection", + group_id=group.id, + data=create_connection_data, + ) + target_connection_creds = await create_credentials( + session=session, + settings=settings, + connection_id=target_connection.id, + ) + + transfer = await create_transfer( + session=session, + name="group_transfer", + group_id=group.id, + source_connection_id=source_connection.id, + target_connection_id=target_connection.id, + queue_id=queue.id, + source_params=create_transfer_data, + target_params=create_transfer_data, + ) + + yield MockTransfer( + transfer=transfer, + source_connection=MockConnection( + connection=source_connection, + owner_group=mock_group, + credentials=MockCredentials( + value=decrypt_auth_data(source_connection_creds.value, settings=settings), + connection_id=source_connection.id, + ), + ), + target_connection=MockConnection( + connection=target_connection, + owner_group=mock_group, + credentials=MockCredentials( + value=decrypt_auth_data(target_connection_creds.value, settings=settings), + connection_id=target_connection.id, + ), + ), + owner_group=mock_group, + ) + await session.delete(transfer) + await session.delete(source_connection) + await session.delete(target_connection) + await session.delete(group) + await session.delete(group_owner) + await session.delete(queue) + for member in members: + await session.delete(member.user) + await session.commit() diff --git a/tests/test_integration/test_scheduler/test_scheduler.py b/tests/test_integration/test_scheduler/test_scheduler.py new file mode 100644 index 00000000..cbf6ad36 --- /dev/null +++ b/tests/test_integration/test_scheduler/test_scheduler.py @@ -0,0 +1,51 @@ +import asyncio + +import pytest +from pytest_mock import MockType +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from syncmaster.db.models import Run, Status +from syncmaster.scheduler import TransferFetcher, TransferJobManager +from syncmaster.settings import Settings +from tests.mocks import MockTransfer + +pytestmark = [pytest.mark.asyncio, pytest.mark.worker, pytest.mark.scheduler_integration] + + +async def test_scheduler( + session: AsyncSession, + settings: Settings, + group_transfer_integration_mock: MockTransfer, + transfer_job_manager: TransferJobManager, + mock_send_task_to_tick: MockType, + mock_add_job: MockType, +): + group_transfer = group_transfer_integration_mock + transfer_fetcher = TransferFetcher(settings) + transfers = await transfer_fetcher.fetch_updated_jobs() + assert transfers + assert group_transfer.transfer.id in {t.id for t in transfers} + + transfer_job_manager.update_jobs(transfers) + + job = transfer_job_manager.scheduler.get_job(str(group_transfer.id)) + assert job is not None + + await asyncio.sleep(1.5) # make sure that created job with every-second cron worked + + run = await session.scalar( + select(Run).filter_by(transfer_id=group_transfer.id).order_by(Run.created_at.desc()), + ) + assert run is not None + assert run.status in [Status.CREATED, Status.STARTED] + + for _ in range(3): + await asyncio.sleep(2) + await session.refresh(run) + run = await session.scalar(select(Run, run.id)) + if run.status == Status.FINISHED: + break + + assert run.status == Status.FINISHED + assert run.ended_at is not None diff --git a/tests/test_integration/test_scheduler/test_task.py b/tests/test_integration/test_scheduler/test_task.py new file mode 100644 index 00000000..b4d5d2c7 --- /dev/null +++ b/tests/test_integration/test_scheduler/test_task.py @@ -0,0 +1,28 @@ +import time +from datetime import datetime, timezone + +from sqlalchemy.orm import Session + +from syncmaster.db.models.run import Run, Status +from syncmaster.exceptions.run import RunNotFoundError +from syncmaster.worker.base import WorkerTask +from syncmaster.worker.config import celery + + +@celery.task(name="tick", bind=True, track_started=True) +def tick(self: WorkerTask, run_id: int) -> None: + with Session(self.engine) as session: + run = session.get(Run, run_id) + if run is None: + raise RunNotFoundError + + run.started_at = datetime.now(tz=timezone.utc) + run.status = Status.STARTED + session.add(run) + session.commit() + + time.sleep(2) # to make sure that previous status is handled in test + run.status = Status.FINISHED + run.ended_at = datetime.now(tz=timezone.utc) + session.add(run) + session.commit() diff --git a/tests/test_unit/utils.py b/tests/test_unit/utils.py index e112a32a..2681eb5c 100644 --- a/tests/test_unit/utils.py +++ b/tests/test_unit/utils.py @@ -154,7 +154,7 @@ async def create_transfer( source_params: dict | None = None, target_params: dict | None = None, is_scheduled: bool = True, - schedule: str = "0 0 * * *", + schedule: str = "* * * * *", strategy_params: dict | None = None, description: str = "", ) -> Transfer: