Skip to content

Commit

Permalink
[DOP-16664] Create location for job
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jul 8, 2024
1 parent aff1550 commit cd2034e
Show file tree
Hide file tree
Showing 24 changed files with 268 additions and 95 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
- name: Run Tests
run: |
source .env.local
make db
mkdir -p reports/
poetry run coverage run -m pytest
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ repos:
files: ^(.*__main__\.py|data_rentgen/server/scripts/export_openapi_schema\.py|data_rentgen/db/scripts/create_partitions\.py)$
- id: insert-license
files: .*\.py$
exclude: ^(data_rentgen/server/dependencies/stub.py|docs/.*\.py|tests/.*\.py)$
exclude: ^(data_rentgen/dependencies/stub.py|docs/.*\.py|tests/.*\.py)$
args:
- --license-filepath
- .spdx-license-header.txt
Expand Down
4 changes: 4 additions & 0 deletions data_rentgen/consumer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

import logging

from fast_depends import dependency_provider
from faststream import FastStream
from faststream.kafka import KafkaBroker
from sqlalchemy.ext.asyncio import AsyncSession

import data_rentgen
from data_rentgen.consumer.handlers import router
from data_rentgen.consumer.settings import ConsumerApplicationSettings
from data_rentgen.consumer.settings.security import get_broker_security
from data_rentgen.db.factory import create_session_factory
from data_rentgen.logging.setup_logging import setup_logging

logger = logging.getLogger(__name__)
Expand All @@ -22,6 +25,7 @@ def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
logger=logger,
)
broker.include_router(router)
dependency_provider.override(AsyncSession, create_session_factory(settings.database))
return broker


Expand Down
8 changes: 8 additions & 0 deletions data_rentgen/consumer/extractors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.extractors.location import location_for_job

__all__ = [
"location_for_job",
]
19 changes: 19 additions & 0 deletions data_rentgen/consumer/extractors/location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from urllib.parse import urlparse

from data_rentgen.consumer.openlineage.job import OpenLineageJob
from data_rentgen.consumer.openlineage.run_facets import OpenLineageParentJob
from data_rentgen.dto import LocationDTO


def location_for_job(job: OpenLineageJob | OpenLineageParentJob) -> LocationDTO:
url = urlparse(job.namespace)
scheme = url.scheme or "unknown"
netloc = url.netloc or url.path
return LocationDTO(
type=scheme,
name=netloc,
urls=[f"{scheme}://{netloc}"],
)
25 changes: 22 additions & 3 deletions data_rentgen/consumer/handlers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from faststream import Logger
from faststream import Depends
from faststream.kafka import KafkaRouter
from sqlalchemy.ext.asyncio import AsyncSession

from data_rentgen.consumer.extractors import location_for_job
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
from data_rentgen.dependencies import Stub
from data_rentgen.dto.location import LocationDTO
from data_rentgen.services.uow import UnitOfWork

router = KafkaRouter()


def get_unit_of_work(session: AsyncSession = Depends(Stub(AsyncSession))) -> UnitOfWork:
return UnitOfWork(session)


@router.subscriber("input.runs")
async def runs_handler(msg: OpenLineageRunEvent, logger: Logger):
logger.info("Successfully handled, %s", msg)
async def runs_handler(event: OpenLineageRunEvent, unit_of_work: UnitOfWork = Depends(get_unit_of_work)):
job_location = location_for_job(event.job)

parent_location: LocationDTO | None = None
parent_run_facet = event.run.facets.get("parent", None)
if parent_run_facet:
parent_location = location_for_job(parent_run_facet.job)

async with unit_of_work:
await unit_of_work.location.get_or_create(job_location)
if parent_location:
await unit_of_work.location.get_or_create(parent_location)
27 changes: 4 additions & 23 deletions data_rentgen/db/repositories/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
from abc import ABC
from typing import Generic, TypeVar

from sqlalchemy import ScalarResult, Select, func, insert, select, update
from sqlalchemy import ScalarResult, Select, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql import ColumnElement, SQLColumnExpression
from sqlalchemy.sql.dml import ReturningInsert, ReturningUpdate

from data_rentgen.db.models import Base
from data_rentgen.server.dto.pagination import Pagination
from data_rentgen.dto import PaginationDTO

Model = TypeVar("Model", bound=Base)

Expand Down Expand Up @@ -40,31 +39,13 @@ async def _get(
query: Select = select(model_type).where(*where)
return await self._session.scalar(query)

async def _create(
self,
data: dict,
) -> Model:
model_type = self.model_type()
query: ReturningInsert[tuple[Model]] = insert(model_type).values(**data).returning(model_type)
result = await self._session.scalars(query)
return result.one()

async def _update(
self,
where: list[ColumnElement],
changes: dict,
) -> Model | None:
model_type = self.model_type()
query: ReturningUpdate[tuple[Model]] = update(model_type).where(*where).values(**changes).returning(model_type)
return await self._session.scalar(query)

async def _paginate(
self,
order_by: list[SQLColumnExpression],
page: int,
page_size: int,
where: list[SQLColumnExpression] | None = None,
) -> Pagination[Model]:
) -> PaginationDTO[Model]:
model_type = self.model_type()
query: Select = select(model_type)
if where:
Expand All @@ -76,7 +57,7 @@ async def _paginate(
total_count: int = await self._session.scalar( # type: ignore[assignment]
select(func.count()).select_from(query.subquery()),
)
return Pagination[model_type]( # type: ignore[valid-type]
return PaginationDTO[model_type]( # type: ignore[valid-type]
items=list(items_result.all()),
total_count=total_count,
page=page,
Expand Down
40 changes: 40 additions & 0 deletions data_rentgen/db/repositories/location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from sqlalchemy import select
from sqlalchemy.orm import selectinload

from data_rentgen.db.models import Address, Location
from data_rentgen.db.repositories.base import Repository
from data_rentgen.dto import LocationDTO


class LocationRepository(Repository[Location]):
async def get_or_create(self, location: LocationDTO) -> Location:
by_name = select(Location).where(Location.type == location.type, Location.name == location.name)
by_addresses = (
select(Location)
.join(Location.addresses)
.where(Location.type == location.type, Address.url.in_(location.urls))
)
statement = (
select(Location).from_statement(by_name.union(by_addresses)).options(selectinload(Location.addresses))
)

result = await self._session.scalar(statement)
changed: bool = False
if not result:
result = Location(type=location.type, name=location.name)
self._session.add(result)
changed = True

existing_urls = {address.url for address in result.addresses}
new_urls = set(location.urls) - existing_urls
for url in new_urls:
# currently we automatically add new addresses, but not delete old ones
result.addresses.append(Address(url=url))
changed = True

if changed:
await self._session.flush()
return result
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.server.dependencies.stub import Stub
from data_rentgen.dependencies.stub import Stub

__all__ = ["Stub"]
File renamed without changes.
10 changes: 10 additions & 0 deletions data_rentgen/dto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.dto.location import LocationDTO
from data_rentgen.dto.pagination import PaginationDTO

__all__ = [
"LocationDTO",
"PaginationDTO",
]
11 changes: 11 additions & 0 deletions data_rentgen/dto/location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from dataclasses import dataclass


@dataclass
class LocationDTO:
type: str
name: str
urls: list[str]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@dataclass
class Pagination(Generic[T]):
class PaginationDTO(Generic[T]):
items: list[T]
page: int
page_size: int
Expand Down
8 changes: 0 additions & 8 deletions data_rentgen/server/dto/__init__.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: 2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.server.services.uow import UnitOfWork
from data_rentgen.services.uow import UnitOfWork

__all__ = ["UnitOfWork"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from sqlalchemy.ext.asyncio import AsyncSession
from typing_extensions import Annotated

from data_rentgen.server.dependencies import Stub
from data_rentgen.db.repositories.location import LocationRepository
from data_rentgen.dependencies import Stub


class UnitOfWork:
Expand All @@ -14,6 +15,7 @@ def __init__(
session: Annotated[AsyncSession, Depends(Stub(AsyncSession))],
):
self._session = session
self.location = LocationRepository(session)

async def __aenter__(self):
return self
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ ignore = [
"WPS600",
# Found underscored number: 5_000_000
"WPS303",
# Found complex default value: Depends(some)
"WPS404",
# Do not perform function calls in argument defaults: Depends(some)
"B008",
]

per-file-ignores = [
Expand Down
9 changes: 9 additions & 0 deletions tests/fixtures/async_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from typing import TYPE_CHECKING

import pytest_asyncio
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from data_rentgen.db.models import Base

if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncEngine

Expand All @@ -23,6 +26,12 @@ async def async_session_maker(async_engine: AsyncEngine):
@pytest_asyncio.fixture
async def async_session(async_session_maker: async_sessionmaker[AsyncSession]):
session: AsyncSession = async_session_maker()

# start each test on fresh database
for table in reversed(Base.metadata.sorted_tables):
await session.execute(delete(table))
await session.commit()

try:
yield session
await session.commit()
Expand Down
Loading

0 comments on commit cd2034e

Please sign in to comment.