Skip to content

Commit

Permalink
[DOP-22267] - remove is_deleted column to deleted object for Group, T…
Browse files Browse the repository at this point in the history
…ransfer, Queue objects (#168)

* [DOP-22267] - remove is_deleted column to deleted object for (Group, Queue, Transfer)

* [DOP-22267] - add removing orphan transfers from scheduler
  • Loading branch information
maxim-lixakov authored Dec 12, 2024
1 parent db2374f commit 5178354
Show file tree
Hide file tree
Showing 16 changed files with 73 additions and 45 deletions.
2 changes: 2 additions & 0 deletions docs/changelog/next_release/168.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add deletion of **transfers**, **queues** and **groups** records
instead of marking them as deleted
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def upgrade():
sa.Column("owner_id", sa.BigInteger(), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
sa.Column("is_deleted", sa.Boolean(), nullable=False),
sa.Column(
"search_vector",
postgresql.TSVECTOR(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def upgrade():
sa.Column("description", sa.String(length=512), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
sa.Column("is_deleted", sa.Boolean(), nullable=False),
sa.Column(
"search_vector",
postgresql.TSVECTOR(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def upgrade():
sa.Column("is_scheduled", sa.Boolean(), nullable=False),
sa.Column("schedule", sa.String(length=32), nullable=False),
sa.Column("queue_id", sa.BigInteger(), nullable=False),
sa.Column("is_deleted", sa.Boolean(), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False),
sa.ForeignKeyConstraint(
Expand Down
4 changes: 2 additions & 2 deletions syncmaster/db/models/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy_utils import ChoiceType

from syncmaster.db.mixins import DeletableMixin, TimestampMixin
from syncmaster.db.mixins import TimestampMixin
from syncmaster.db.models.base import Base
from syncmaster.db.models.user import User

Expand Down Expand Up @@ -69,7 +69,7 @@ def roles_at_least(cls, role: str | GroupMemberRole) -> list[str]:
return [r.value for r, level in cls.role_hierarchy().items() if level >= required_level]


class Group(Base, TimestampMixin, DeletableMixin):
class Group(Base, TimestampMixin):
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
name: Mapped[str] = mapped_column(String(256), nullable=False, unique=True)
description: Mapped[str] = mapped_column(String(512), nullable=False, default="")
Expand Down
4 changes: 2 additions & 2 deletions syncmaster/db/models/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy.orm import Mapped, mapped_column, relationship

from syncmaster.db.mixins import DeletableMixin, ResourceMixin, TimestampMixin
from syncmaster.db.mixins import ResourceMixin, TimestampMixin
from syncmaster.db.models.base import Base

if TYPE_CHECKING:
from syncmaster.db.models.group import Group
from syncmaster.db.models.transfer import Transfer


class Queue(Base, ResourceMixin, TimestampMixin, DeletableMixin):
class Queue(Base, ResourceMixin, TimestampMixin):
name: Mapped[str] = mapped_column(String(128), nullable=False)
slug: Mapped[str] = mapped_column(String(256), nullable=False, unique=True)

Expand Down
3 changes: 1 addition & 2 deletions syncmaster/db/models/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship

from syncmaster.db.mixins import DeletableMixin, ResourceMixin, TimestampMixin
from syncmaster.db.mixins import ResourceMixin, TimestampMixin
from syncmaster.db.models.base import Base
from syncmaster.db.models.group import Group

Expand All @@ -29,7 +29,6 @@
class Transfer(
Base,
ResourceMixin,
DeletableMixin,
TimestampMixin,
):
source_connection_id: Mapped[int] = mapped_column(
Expand Down
10 changes: 4 additions & 6 deletions syncmaster/db/repositories/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def paginate_all(
page_size: int,
search_query: str | None = None,
) -> Pagination:
stmt = select(Group).where(Group.is_deleted.is_(False))
stmt = select(Group)
if search_query:
stmt = self._construct_vector_search(stmt, search_query)

Expand Down Expand Up @@ -73,7 +73,6 @@ async def paginate_for_user(
select(Group)
.where(
Group.owner_id == current_user_id,
Group.is_deleted.is_(False),
)
.order_by(Group.name)
)
Expand Down Expand Up @@ -104,7 +103,6 @@ async def paginate_for_user(
.join(Group, UserGroup.group_id == Group.id)
.where(
UserGroup.user_id == current_user_id,
Group.is_deleted.is_(False),
)
.order_by(Group.name)
)
Expand Down Expand Up @@ -154,7 +152,7 @@ async def read_by_id(
self,
group_id: int,
) -> Group:
stmt = select(Group).where(Group.id == group_id, Group.is_deleted.is_(False))
stmt = select(Group).where(Group.id == group_id)
try:
result: ScalarResult[Group] = await self._session.scalars(stmt)
return result.one()
Expand Down Expand Up @@ -186,7 +184,7 @@ async def update(
description: str,
owner_id: int,
) -> Group:
args = [Group.id == group_id, Group.is_deleted.is_(False)]
args = [Group.id == group_id]
try:
return await self._update(
*args,
Expand Down Expand Up @@ -278,7 +276,7 @@ async def get_member_paginate(
async def delete(self, group_id: int) -> None:
try:
await self._delete(group_id)
except EntityNotFoundError as e:
except (EntityNotFoundError, NoResultFound) as e:
raise GroupNotFoundError from e

async def add_user(
Expand Down
4 changes: 1 addition & 3 deletions syncmaster/db/repositories/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def read_by_id(
) -> Queue:
stmt = (
select(Queue)
.where(Queue.id == queue_id, Queue.is_deleted.is_(False))
.where(Queue.id == queue_id)
.options(selectinload(Queue.transfers))
.options(selectinload(Queue.group))
)
Expand All @@ -56,7 +56,6 @@ async def paginate(
search_query: str | None = None,
):
stmt = select(Queue).where(
Queue.is_deleted.is_(False),
Queue.group_id == group_id,
)
if search_query:
Expand All @@ -77,7 +76,6 @@ async def update(
queue = await self.read_by_id(queue_id=queue_id)
return await self._update(
Queue.id == queue_id,
Queue.is_deleted.is_(False),
description=queue_data.description or queue.description,
)
except IntegrityError as e:
Expand Down
3 changes: 0 additions & 3 deletions syncmaster/db/repositories/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ async def paginate(
is_scheduled: bool | None = None,
) -> Pagination:
stmt = select(Transfer).where(
Transfer.is_deleted.is_(False),
Transfer.group_id == group_id,
)

Expand Down Expand Up @@ -97,7 +96,6 @@ async def read_by_id(
select(Transfer)
.where(
Transfer.id == transfer_id,
Transfer.is_deleted.is_(False),
)
.options(selectinload(Transfer.queue))
)
Expand Down Expand Up @@ -172,7 +170,6 @@ async def update(
strategy_params[key] = transfer.strategy_params[key]
return await self._update(
Transfer.id == transfer.id,
Transfer.is_deleted.is_(False),
name=name or transfer.name,
description=description or transfer.description,
strategy_params=strategy_params,
Expand Down
3 changes: 3 additions & 0 deletions syncmaster/scheduler/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ async def main():

while True:
logger.info("Looking at the transfer table...")

await transfer_job_manager.remove_orphan_jobs()
transfers = await transfer_fetcher.fetch_updated_jobs()

if transfers:
Expand All @@ -29,6 +31,7 @@ async def main():
", ".join(str(t.id) for t in transfers),
)
transfer_job_manager.update_jobs(transfers)

transfer_fetcher.last_updated_at = max(t.updated_at for t in transfers)
logger.info("Scheduler state has been updated. Last updated at: %s", transfer_fetcher.last_updated_at)

Expand Down
26 changes: 24 additions & 2 deletions syncmaster/scheduler/transfer_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from kombu.exceptions import KombuError
from sqlalchemy import any_, select

from syncmaster.backend.services.unit_of_work import UnitOfWork
from syncmaster.db.models import RunType, Status, Transfer
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
from syncmaster.exceptions.transfer import TransferNotFoundError
from syncmaster.scheduler.celery import app as celery
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
from syncmaster.scheduler.utils import get_async_session
Expand All @@ -26,7 +28,7 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
job_id = str(transfer.id)
existing_job = self.scheduler.get_job(job_id)

if not transfer.is_scheduled or transfer.is_deleted:
if not transfer.is_scheduled:
if existing_job:
self.scheduler.remove_job(job_id)
continue
Expand All @@ -47,6 +49,22 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
args=(transfer.id,),
)

async def remove_orphan_jobs(self) -> None:
all_jobs = self.scheduler.get_jobs()
settings = self.settings
job_transfer_ids = [int(job.id) for job in all_jobs]

async with get_async_session(settings) as session:
result = await session.execute(
select(Transfer).where(Transfer.id == any_(job_transfer_ids)), # type: ignore[arg-type]
)
existing_transfers = result.scalars().all()
existing_transfer_ids = {t.id for t in existing_transfers}

missing_job_ids = set(job_transfer_ids) - existing_transfer_ids
for job_id in missing_job_ids:
self.scheduler.remove_job(str(job_id))

@staticmethod
async def send_job_to_celery(transfer_id: int) -> None:
"""
Expand All @@ -61,7 +79,11 @@ async def send_job_to_celery(transfer_id: int) -> None:
async with get_async_session(settings) as session:
unit_of_work = UnitOfWork(session=session, settings=settings)

transfer = await unit_of_work.transfer.read_by_id(transfer_id)
try:
transfer = await unit_of_work.transfer.read_by_id(transfer_id)
except TransferNotFoundError:
return

credentials_source = await unit_of_work.credentials.read(transfer.source_connection_id)
credentials_target = await unit_of_work.credentials.read(transfer.target_connection_id)

Expand Down
9 changes: 5 additions & 4 deletions tests/test_unit/test_groups/test_delete_group_by_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ async def test_only_superuser_can_delete_group(
session: AsyncSession,
):
# Arrange
group = await session.get(Group, empty_group.group.id)
assert not group.is_deleted
g_id = empty_group.group.id

# Act
result = await client.delete(
Expand All @@ -33,8 +32,10 @@ async def test_only_superuser_can_delete_group(
"message": "Group was deleted",
}

await session.refresh(group)
assert group.is_deleted
# Assert group was deleted
session.expunge_all()
group_in_db = await session.get(Group, g_id)
assert group_in_db is None


async def test_not_superuser_cannot_delete_group(
Expand Down
19 changes: 11 additions & 8 deletions tests/test_unit/test_queue/test_delete_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async def test_maintainer_plus_can_delete_queue(
):
# Arrange
user = mock_group.get_member_of_role(role_maintainer_plus)
q_id = group_queue.id

# Act
result = await client.delete(
Expand All @@ -31,10 +32,10 @@ async def test_maintainer_plus_can_delete_queue(
}
assert result.status_code == 200

queue = await session.get(Queue, group_queue.id)
await session.refresh(queue)

assert queue.is_deleted
# Assert queue was deleted
session.expunge_all()
queue_in_db = await session.get(Queue, q_id)
assert queue_in_db is None


async def test_superuser_can_delete_queue(
Expand All @@ -44,6 +45,8 @@ async def test_superuser_can_delete_queue(
mock_group: MockGroup,
superuser: MockUser,
):
q_id = group_queue.id

# Act
result = await client.delete(
f"v1/queues/{group_queue.id}",
Expand All @@ -57,10 +60,10 @@ async def test_superuser_can_delete_queue(
}
assert result.status_code == 200

queue = await session.get(Queue, group_queue.id)
await session.refresh(queue)

assert queue.is_deleted
# Assert queue was deleted
session.expunge_all()
queue_in_db = await session.get(Queue, q_id)
assert queue_in_db is None


async def test_groupless_user_cannot_delete_queue(
Expand Down
2 changes: 0 additions & 2 deletions tests/test_unit/test_scheduler/test_transfer_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ async def test_modifying_existing_jobs(transfer_job_manager: TransferJobManager,
@pytest.mark.parametrize(
"transfer_attr, expected_state, is_existing_job",
[
("is_deleted", True, True),
("is_deleted", True, False),
("is_scheduled", False, True),
("is_scheduled", False, False),
],
Expand Down
Loading

0 comments on commit 5178354

Please sign in to comment.