Skip to content

Commit

Permalink
[DOP-20962] Add scheduler integration test (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyasDevelopment authored and Ilyas Gasanov committed Nov 13, 2024
1 parent 7c05e9e commit 441d362
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 4 deletions.
79 changes: 79 additions & 0 deletions .github/workflows/scheduler-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
name: Scheduler Tests
on:
workflow_call:

env:
DEFAULT_PYTHON: '3.12'

jobs:
tests:
name: Run Scheduler tests
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Cache jars
uses: actions/cache@v4
with:
path: ./cached_jars
key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler
restore-keys: |
${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-test-scheduler
${{ runner.os }}-python-
- name: Build Worker Image
uses: docker/build-push-action@v6
with:
context: .
tags: mtsrus/syncmaster-worker:${{ github.sha }}
target: test
file: docker/Dockerfile.worker
load: true
cache-from: mtsrus/syncmaster-worker:develop

- name: Docker compose up
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
docker compose -f docker-compose.test.yml --profile worker up -d --wait --wait-timeout 200
env:
WORKER_IMAGE_TAG: ${{ github.sha }}

# This is important, as coverage is exported after receiving SIGTERM
- name: Run Scheduler Tests
run: |
docker compose -f ./docker-compose.test.yml --profile worker exec -T worker coverage run -m pytest -vvv -s -m "worker and scheduler_integration"
- name: Dump worker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v2
with:
images: mtsrus/syncmaster-worker
dest: ./logs

- name: Shutdown
if: always()
run: |
docker compose -f docker-compose.test.yml --profile all down -v --remove-orphans
- name: Upload worker logs
uses: actions/upload-artifact@v4
if: failure()
with:
name: worker-logs-scheduler
path: logs/*

- name: Upload coverage results
uses: actions/upload-artifact@v4
with:
name: coverage-scheduler
path: reports/*
# https://github.com/actions/upload-artifact/issues/602
include-hidden-files: true
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ jobs:
name: S3 tests
uses: ./.github/workflows/s3-tests.yml

scheduler_tests:
name: Scheduler tests
uses: ./.github/workflows/scheduler-tests.yml

unit_tests:
name: Unit tests
uses: ./.github/workflows/unit-test.yml
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ services:
context: .
target: test
command: --loglevel=info -Q test_queue
entrypoint: [python, -m, celery, -A, tests.test_integration.celery_test, worker, --max-tasks-per-child=1]
env_file: .env.docker
volumes:
- ./syncmaster:/app/syncmaster
Expand All @@ -90,7 +91,7 @@ services:
condition: service_healthy
rabbitmq:
condition: service_healthy
profiles: [worker, s3, oracle, hdfs, hive, all]
profiles: [worker, scheduler, s3, oracle, hdfs, hive, all]

test-postgres:
image: postgres
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
2 changes: 0 additions & 2 deletions syncmaster/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class Settings(BaseSettings):
SCHEDULER_TRANSFER_FETCHING_TIMEOUT: int = 180 # seconds
SCHEDULER_MISFIRE_GRACE_TIME: int = 300 # seconds

CORRELATION_CELERY_HEADER_ID: str = "CORRELATION_CELERY_HEADER_ID"

TOKEN_EXPIRED_TIME: int = 60 * 60 * 10 # 10 hours
CREATE_SPARK_SESSION_FUNCTION: ImportString = "syncmaster.worker.spark.get_worker_spark_session"

Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"tests.test_unit.test_runs.run_fixtures",
"tests.test_unit.test_connections.connection_fixtures",
"tests.test_unit.test_scheduler.scheduler_fixtures",
"tests.test_integration.test_scheduler.scheduler_fixtures",
]


Expand Down
3 changes: 3 additions & 0 deletions tests/test_integration/celery_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from syncmaster.worker.config import celery

celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"])
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from tests.test_integration.test_scheduler.scheduler_fixtures.mocker_fixtures import (
mock_add_job,
mock_send_task_to_tick,
)
from tests.test_integration.test_scheduler.scheduler_fixtures.transfer_fixture import (
group_transfer_integration_mock,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio

import pytest
from apscheduler.triggers.cron import CronTrigger
from pytest_mock import MockerFixture, MockType

from syncmaster.scheduler.transfer_job_manager import TransferJobManager
from syncmaster.worker.config import celery


@pytest.fixture
def mock_send_task_to_tick(mocker: MockerFixture) -> MockType:
original_to_thread = asyncio.to_thread
return mocker.patch(
"asyncio.to_thread",
new=lambda func, *args, **kwargs: original_to_thread(celery.send_task, "tick", *args[1:], **kwargs),
)


@pytest.fixture
def mock_add_job(mocker: MockerFixture, transfer_job_manager: TransferJobManager) -> MockType:
original_add_job = transfer_job_manager.scheduler.add_job
return mocker.patch.object(
transfer_job_manager.scheduler,
"add_job",
side_effect=lambda func, id, trigger, misfire_grace_time, args: original_add_job(
func=func,
id=id,
trigger=CronTrigger(second="*"),
misfire_grace_time=misfire_grace_time,
args=args,
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections.abc import AsyncGenerator

import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.backend.api.v1.auth.utils import sign_jwt
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.settings import Settings
from tests.mocks import (
MockConnection,
MockCredentials,
MockGroup,
MockTransfer,
MockUser,
UserTestRoles,
)
from tests.test_unit.conftest import create_group_member
from tests.test_unit.utils import (
create_connection,
create_credentials,
create_group,
create_queue,
create_transfer,
create_user,
)


@pytest_asyncio.fixture
async def group_transfer_integration_mock(
session: AsyncSession,
settings: Settings,
create_connection_data: dict | None,
create_transfer_data: dict | None,
) -> AsyncGenerator[MockTransfer, None]:
group_owner = await create_user(
session=session,
username="group_transfer_owner",
is_active=True,
)
group = await create_group(
session=session,
name="group_for_group_transfer",
owner_id=group_owner.id,
)

queue = await create_queue(
session=session,
name="test_queue",
group_id=group.id,
)

members: list[MockUser] = []
for username in (
"transfer_group_member_maintainer",
"transfer_group_member_developer",
"transfer_group_member_guest",
):
members.append(
await create_group_member(
username=username,
group_id=group.id,
session=session,
settings=settings,
),
)

await session.commit()
mock_group = MockGroup(
group=group,
owner=MockUser(
user=group_owner,
auth_token=sign_jwt(group_owner.id, settings),
role=UserTestRoles.Owner,
),
members=members,
)

source_connection = await create_connection(
session=session,
name="group_transfer_source_connection",
group_id=group.id,
data=create_connection_data,
)
source_connection_creds = await create_credentials(
session=session,
settings=settings,
connection_id=source_connection.id,
)
target_connection = await create_connection(
session=session,
name="group_transfer_target_connection",
group_id=group.id,
data=create_connection_data,
)
target_connection_creds = await create_credentials(
session=session,
settings=settings,
connection_id=target_connection.id,
)

transfer = await create_transfer(
session=session,
name="group_transfer",
group_id=group.id,
source_connection_id=source_connection.id,
target_connection_id=target_connection.id,
queue_id=queue.id,
source_params=create_transfer_data,
target_params=create_transfer_data,
)

yield MockTransfer(
transfer=transfer,
source_connection=MockConnection(
connection=source_connection,
owner_group=mock_group,
credentials=MockCredentials(
value=decrypt_auth_data(source_connection_creds.value, settings=settings),
connection_id=source_connection.id,
),
),
target_connection=MockConnection(
connection=target_connection,
owner_group=mock_group,
credentials=MockCredentials(
value=decrypt_auth_data(target_connection_creds.value, settings=settings),
connection_id=target_connection.id,
),
),
owner_group=mock_group,
)
await session.delete(transfer)
await session.delete(source_connection)
await session.delete(target_connection)
await session.delete(group)
await session.delete(group_owner)
await session.delete(queue)
for member in members:
await session.delete(member.user)
await session.commit()
51 changes: 51 additions & 0 deletions tests/test_integration/test_scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio

import pytest
from pytest_mock import MockType
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from syncmaster.db.models import Run, Status
from syncmaster.scheduler import TransferFetcher, TransferJobManager
from syncmaster.settings import Settings
from tests.mocks import MockTransfer

pytestmark = [pytest.mark.asyncio, pytest.mark.worker, pytest.mark.scheduler_integration]


async def test_scheduler(
session: AsyncSession,
settings: Settings,
group_transfer_integration_mock: MockTransfer,
transfer_job_manager: TransferJobManager,
mock_send_task_to_tick: MockType,
mock_add_job: MockType,
):
group_transfer = group_transfer_integration_mock
transfer_fetcher = TransferFetcher(settings)
transfers = await transfer_fetcher.fetch_updated_jobs()
assert transfers
assert group_transfer.transfer.id in {t.id for t in transfers}

transfer_job_manager.update_jobs(transfers)

job = transfer_job_manager.scheduler.get_job(str(group_transfer.id))
assert job is not None

await asyncio.sleep(1.5) # make sure that created job with every-second cron worked

run = await session.scalar(
select(Run).filter_by(transfer_id=group_transfer.id).order_by(Run.created_at.desc()),
)
assert run is not None
assert run.status in [Status.CREATED, Status.STARTED]

for _ in range(3):
await asyncio.sleep(2)
await session.refresh(run)
run = await session.scalar(select(Run, run.id))
if run.status == Status.FINISHED:
break

assert run.status == Status.FINISHED
assert run.ended_at is not None
28 changes: 28 additions & 0 deletions tests/test_integration/test_scheduler/test_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time
from datetime import datetime, timezone

from sqlalchemy.orm import Session

from syncmaster.db.models.run import Run, Status
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker.base import WorkerTask
from syncmaster.worker.config import celery


@celery.task(name="tick", bind=True, track_started=True)
def tick(self: WorkerTask, run_id: int) -> None:
with Session(self.engine) as session:
run = session.get(Run, run_id)
if run is None:
raise RunNotFoundError

run.started_at = datetime.now(tz=timezone.utc)
run.status = Status.STARTED
session.add(run)
session.commit()

time.sleep(2) # to make sure that previous status is handled in test
run.status = Status.FINISHED
run.ended_at = datetime.now(tz=timezone.utc)
session.add(run)
session.commit()
Loading

0 comments on commit 441d362

Please sign in to comment.