diff --git a/.env.local b/.env.local index cc0fb999..729536c6 100644 --- a/.env.local +++ b/.env.local @@ -7,6 +7,11 @@ export DATA_RENTGEN__KAFKA__SECURITY__TYPE=scram-sha256 export DATA_RENTGEN__KAFKA__SECURITY__USER=data_rentgen export DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme export DATA_RENTGEN__KAFKA__COMPRESSION=zstd +#export DATA_RENTGEN__CONSUMER__MAX_RECORDS=100 +# Handling events with a lot of column lineage takes so much time +# that Kafka coodrinator consider worker as dead. Limit by total message size. +# This value depends on the number of OL version, complexity of Spark jobs, number of Kafka partitions and number of workers. +export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=200Kb export DATA_RENTGEN__SERVER__DEBUG=true diff --git a/data_rentgen/consumer/extractors/__init__.py b/data_rentgen/consumer/extractors/__init__.py index a41c5000..e3703e35 100644 --- a/data_rentgen/consumer/extractors/__init__.py +++ b/data_rentgen/consumer/extractors/__init__.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from data_rentgen.consumer.extractors.batch import BatchExtractionResult, extract_batch +from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage from data_rentgen.consumer.extractors.dataset import ( connect_dataset_with_symlinks, extract_dataset, @@ -15,6 +16,7 @@ from data_rentgen.consumer.extractors.schema import extract_schema __all__ = [ + "extract_column_lineage", "extract_dataset_and_symlinks", "extract_dataset", "connect_dataset_with_symlinks", diff --git a/data_rentgen/consumer/extractors/batch.py b/data_rentgen/consumer/extractors/batch.py index 038e476e..e0d49afb 100644 --- a/data_rentgen/consumer/extractors/batch.py +++ b/data_rentgen/consumer/extractors/batch.py @@ -4,6 +4,7 @@ from typing import TypeVar +from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage from data_rentgen.consumer.extractors.input import extract_input from data_rentgen.consumer.extractors.operation import extract_operation from data_rentgen.consumer.extractors.output import extract_output @@ -11,7 +12,9 @@ from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent from data_rentgen.dto import ( + ColumnLineageDTO, DatasetDTO, + DatasetSymlinkDTO, InputDTO, JobDTO, LocationDTO, @@ -21,12 +24,12 @@ SchemaDTO, UserDTO, ) -from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO T = TypeVar( "T", LocationDTO, DatasetDTO, + ColumnLineageDTO, DatasetSymlinkDTO, JobDTO, RunDTO, @@ -65,6 +68,7 @@ def __init__(self): self._operations: dict[tuple, OperationDTO] = {} self._inputs: dict[tuple, InputDTO] = {} self._outputs: dict[tuple, OutputDTO] = {} + self._column_lineage: dict[tuple, ColumnLineageDTO] = {} self._schemas: dict[tuple, SchemaDTO] = {} self._users: dict[tuple, UserDTO] = {} @@ -79,6 +83,7 @@ def __repr__(self): f"operations={len(self._operations)}, " f"inputs={len(self._inputs)}, " f"outputs={len(self._outputs)}, " + f"column_lineage={len(self._column_lineage)}, " f"schemas={len(self._schemas)}, " f"users={len(self._users)}" ")" @@ -139,6 +144,12 @@ def add_output(self, output: OutputDTO): if output.schema: self.add_schema(output.schema) + def add_column_lineage(self, lineage: ColumnLineageDTO): + self._add(self._column_lineage, lineage) + self.add_dataset(lineage.source_dataset) + self.add_dataset(lineage.target_dataset) + self.add_operation(lineage.operation) + def add_schema(self, schema: SchemaDTO): self._add(self._schemas, schema) @@ -200,6 +211,13 @@ def _get_output(self, output_key: tuple) -> OutputDTO: output.schema = self._get_schema(output.schema.unique_key) return output + def _get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO: + lineage = self._column_lineage[output_key] + lineage.operation = self._get_operation(lineage.operation.unique_key) + lineage.source_dataset = self._get_dataset(lineage.source_dataset.unique_key) + lineage.target_dataset = self._get_dataset(lineage.target_dataset.unique_key) + return lineage + def locations(self) -> list[LocationDTO]: return list(map(self._get_location, self._locations)) @@ -224,6 +242,9 @@ def inputs(self) -> list[InputDTO]: def outputs(self) -> list[OutputDTO]: return list(map(self._get_output, self._outputs)) + def column_lineage(self) -> list[ColumnLineageDTO]: + return list(map(self._get_column_lineage, self._column_lineage)) + def schemas(self) -> list[SchemaDTO]: return list(map(self._get_schema, self._schemas)) @@ -231,7 +252,7 @@ def users(self) -> list[UserDTO]: return list(map(self._get_user, self._users)) -def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: +def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: # noqa: WPS231 result = BatchExtractionResult() for event in events: @@ -250,6 +271,11 @@ def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: for symlink in symlinks: # noqa: WPS440 result.add_dataset_symlink(symlink) + for dataset in event.inputs + event.outputs: + column_lineage = extract_column_lineage(operation, dataset) + for item in column_lineage: + result.add_column_lineage(item) + else: run = extract_run(event) result.add_run(run) diff --git a/data_rentgen/consumer/extractors/column_lineage.py b/data_rentgen/consumer/extractors/column_lineage.py new file mode 100644 index 00000000..74862176 --- /dev/null +++ b/data_rentgen/consumer/extractors/column_lineage.py @@ -0,0 +1,108 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +import logging +from collections import defaultdict + +from data_rentgen.consumer.extractors.dataset import extract_dataset +from data_rentgen.consumer.openlineage.dataset import OpenLineageDataset +from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import ( + OpenLineageColumnLineageDatasetFacetFieldTransformation, +) +from data_rentgen.dto import ( + ColumnLineageDTO, + DatasetColumnRelationDTO, + DatasetColumnRelationTypeDTO, +) +from data_rentgen.dto.operation import OperationDTO + +logger = logging.getLogger(__name__) + +TRANSFORMATION_SUBTYPE_MAP_MASKING = { + "TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION_MASKING, + "AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION_MASKING, +} + +TRANSFORMATION_SUBTYPE_MAP = { + "IDENTITY": DatasetColumnRelationTypeDTO.IDENTITY, + "TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION, + "AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION, + "FILTER": DatasetColumnRelationTypeDTO.FILTER, + "JOIN": DatasetColumnRelationTypeDTO.JOIN, + "GROUP_BY": DatasetColumnRelationTypeDTO.GROUP_BY, + "SORT": DatasetColumnRelationTypeDTO.SORT, + "WINDOW": DatasetColumnRelationTypeDTO.WINDOW, + "CONDITIONAL": DatasetColumnRelationTypeDTO.CONDITIONAL, +} + + +def extract_dataset_column_relation_type( + transformation: OpenLineageColumnLineageDatasetFacetFieldTransformation, +) -> DatasetColumnRelationTypeDTO: + result: DatasetColumnRelationTypeDTO | None = None + if transformation.subtype: + if transformation.masking: + result = TRANSFORMATION_SUBTYPE_MAP_MASKING.get(transformation.subtype) + else: + result = TRANSFORMATION_SUBTYPE_MAP.get(transformation.subtype) + + return result or DatasetColumnRelationTypeDTO.UNKNOWN + + +def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageDataset) -> list[ColumnLineageDTO]: + target_dataset_dto = extract_dataset(target_dataset) + if not target_dataset.facets.columnLineage: + return [] + + # Grouping column lineage by source+target dataset. This is unique combination within operation, + # so we can use it to generate the same fingerprint for all dataset column relations + datasets = {target_dataset_dto.unique_key: target_dataset_dto} + dataset_column_relations = defaultdict(list) + + # direct lineage (source_column -> target_column) + for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items(): + for input_field in raw_column_lineage.inputFields: + source_dataset_dto = extract_dataset(input_field) + datasets[source_dataset_dto.unique_key] = source_dataset_dto + + column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key) + for transformation in input_field.transformations: + # OL integration for Spark before v1.23 (or with columnLineage.datasetLineageEnabled=false, which is still default) + # produced INDIRECT lineage for each combination source_column x target_column, + # which is amlost the cartesian join. It is VERY expensive to handle, just ignore. + # See https://github.com/OpenLineage/OpenLineage/pull/3097 + if transformation.type == "INDIRECT": + continue + + column_relation = DatasetColumnRelationDTO( + type=extract_dataset_column_relation_type(transformation), + source_column=input_field.field, + target_column=field, + ) + dataset_column_relations[column_lineage_key].append(column_relation) + + # indirect lineage (source_column -> target_dataset), + # added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true + for input_field in target_dataset.facets.columnLineage.dataset: + source_dataset_dto = extract_dataset(input_field) + datasets[source_dataset_dto.unique_key] = source_dataset_dto + + column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key) + for transformation in input_field.transformations: + column_relation = DatasetColumnRelationDTO( + type=extract_dataset_column_relation_type(transformation), + source_column=input_field.field, + ) + dataset_column_relations[column_lineage_key].append(column_relation) + + # merge results into DTO objects + return [ + ColumnLineageDTO( + operation=operation, + source_dataset=datasets[source_dataset_dto_key], + target_dataset=datasets[target_dataset_dto_key], + dataset_column_relations=relations, + ) + for (source_dataset_dto_key, target_dataset_dto_key), relations in dataset_column_relations.items() + if dataset_column_relations + ] diff --git a/data_rentgen/consumer/extractors/dataset.py b/data_rentgen/consumer/extractors/dataset.py index b289712c..166189be 100644 --- a/data_rentgen/consumer/extractors/dataset.py +++ b/data_rentgen/consumer/extractors/dataset.py @@ -10,6 +10,9 @@ OpenLineageSymlinkIdentifier, OpenLineageSymlinkType, ) +from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import ( + OpenLineageColumnLineageDatasetFacetFieldRef, +) from data_rentgen.dto import ( DatasetDTO, DatasetSymlinkDTO, @@ -19,6 +22,9 @@ logger = logging.getLogger(__name__) +OpenLineageDatasetLike = ( + OpenLineageDataset | OpenLineageSymlinkIdentifier | OpenLineageColumnLineageDatasetFacetFieldRef +) METASTORE = DatasetSymlinkTypeDTO.METASTORE WAREHOUSE = DatasetSymlinkTypeDTO.WAREHOUSE @@ -49,7 +55,7 @@ def connect_dataset_with_symlinks( return sorted(result, key=lambda x: x.type) -def extract_dataset(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> DatasetDTO: +def extract_dataset(dataset: OpenLineageDatasetLike) -> DatasetDTO: return DatasetDTO( name=dataset.name, location=extract_dataset_location(dataset), @@ -103,7 +109,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT return dataset_dto, symlinks -def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> LocationDTO: +def extract_dataset_location(dataset: OpenLineageDatasetLike) -> LocationDTO: namespace = dataset.namespace if namespace == "file": # TODO: remove after https://github.com/OpenLineage/OpenLineage/issues/2709 @@ -122,8 +128,8 @@ def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIde ) -def extract_dataset_format(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> str | None: - if isinstance(dataset, OpenLineageSymlinkIdentifier): +def extract_dataset_format(dataset: OpenLineageDatasetLike) -> str | None: + if isinstance(dataset, (OpenLineageSymlinkIdentifier, OpenLineageColumnLineageDatasetFacetFieldRef)): return None match dataset.facets.storage: diff --git a/data_rentgen/consumer/openlineage/dataset_facets/__init__.py b/data_rentgen/consumer/openlineage/dataset_facets/__init__.py index f443c671..95d71b4b 100644 --- a/data_rentgen/consumer/openlineage/dataset_facets/__init__.py +++ b/data_rentgen/consumer/openlineage/dataset_facets/__init__.py @@ -7,6 +7,12 @@ from data_rentgen.consumer.openlineage.dataset_facets.base import ( OpenLineageDatasetFacet, ) +from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import ( + OpenLineageColumnLineageDatasetFacet, + OpenLineageColumnLineageDatasetFacetField, + OpenLineageColumnLineageDatasetFacetFieldRef, + OpenLineageColumnLineageDatasetFacetFieldTransformation, +) from data_rentgen.consumer.openlineage.dataset_facets.datasource import ( OpenLineageDatasourceDatasetFacet, ) @@ -55,6 +61,10 @@ "OpenLineageDatasetFacets", "OpenLineageInputDatasetFacets", "OpenLineageOutputDatasetFacets", + "OpenLineageColumnLineageDatasetFacet", + "OpenLineageColumnLineageDatasetFacetField", + "OpenLineageColumnLineageDatasetFacetFieldRef", + "OpenLineageColumnLineageDatasetFacetFieldTransformation", ] @@ -69,6 +79,7 @@ class OpenLineageDatasetFacets(OpenLineageBase): datasetSchema: OpenLineageSchemaDatasetFacet | None = Field(default=None, alias="schema") storage: OpenLineageStorageDatasetFacet | None = None symlinks: OpenLineageSymlinksDatasetFacet | None = None + columnLineage: OpenLineageColumnLineageDatasetFacet | None = None class OpenLineageInputDatasetFacets(OpenLineageBase): diff --git a/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py b/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py new file mode 100644 index 00000000..0aebbcd4 --- /dev/null +++ b/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py @@ -0,0 +1,44 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from pydantic import Field + +from data_rentgen.consumer.openlineage.base import OpenLineageBase +from data_rentgen.consumer.openlineage.dataset_facets.base import ( + OpenLineageDatasetFacet, +) + + +class OpenLineageColumnLineageDatasetFacetFieldTransformation(OpenLineageBase): + """Dataset facet describing field transformation.""" + + type: str + subtype: str | None = None + description: str | None = None + masking: bool = False + + +class OpenLineageColumnLineageDatasetFacetFieldRef(OpenLineageBase): + """Dataset facet describing field reference for column lineage facet.""" + + namespace: str + name: str + field: str + transformations: list[OpenLineageColumnLineageDatasetFacetFieldTransformation] + + +class OpenLineageColumnLineageDatasetFacetField(OpenLineageBase): + """Dataset facet describing column lineage for specific field.""" + + inputFields: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list) + transformationDescription: str | None = None + transformationType: str | None = None + + +class OpenLineageColumnLineageDatasetFacet(OpenLineageDatasetFacet): + """Dataset facet describing column lineage. + See [InputStatisticsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ColumnLineageDatasetFacet.json). + """ + + fields: dict[str, OpenLineageColumnLineageDatasetFacetField] = Field(default_factory=dict) + dataset: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list) diff --git a/data_rentgen/consumer/subscribers.py b/data_rentgen/consumer/subscribers.py index a953eb0a..71cffd37 100644 --- a/data_rentgen/consumer/subscribers.py +++ b/data_rentgen/consumer/subscribers.py @@ -34,7 +34,11 @@ async def runs_events_subscriber( logger.info("Saved successfully") -async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logger: Logger) -> None: # noqa: WPS217 +async def save_to_db( # noqa: WPS217, WPS213 + data: BatchExtractionResult, + unit_of_work: UnitOfWork, + logger: Logger, +) -> None: # To avoid deadlocks when parallel consumer instances insert/update the same row, # commit changes for each row instead of committing the whole batch. Yes, this cloud be slow. @@ -95,3 +99,12 @@ async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logg logger.debug("Creating outputs") await unit_of_work.output.create_or_update_bulk(data.outputs()) + + # If something went wrong here, at least we will have inputs/outputs + async with unit_of_work: + column_lineage = data.column_lineage() + logger.debug("Creating dataset column relations") + await unit_of_work.dataset_column_relation.create_bulk_for_column_lineage(column_lineage) + + logger.debug("Creating column lineage") + await unit_of_work.column_lineage.create_bulk(column_lineage) diff --git a/data_rentgen/db/migrations/env.py b/data_rentgen/db/migrations/env.py index 05d340b5..cf6a25c4 100644 --- a/data_rentgen/db/migrations/env.py +++ b/data_rentgen/db/migrations/env.py @@ -23,7 +23,7 @@ target_metadata = (Base.metadata,) -PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y"] +PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y", "column_lineage_y"] def include_all_except_partitions(object, name, type_, reflected, compare_to): diff --git a/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py b/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py new file mode 100644 index 00000000..0b66da7c --- /dev/null +++ b/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py @@ -0,0 +1,79 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +"""Create column lineage + +Revision ID: 15c0a22b8566 +Revises: f017d4c58658 +Create Date: 2025-02-04 11:30:21.190161 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "15c0a22b8566" +down_revision = "f017d4c58658" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "dataset_column_relation", + sa.Column("id", sa.BigInteger(), nullable=False), + sa.Column("fingerprint", sa.UUID(), nullable=False), + sa.Column("source_column", sa.String(length=255), nullable=False), + sa.Column("target_column", sa.String(length=255), nullable=True), + sa.Column("type", sa.SmallInteger(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk__dataset_column_relation")), + ) + op.create_index( + op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"), + "dataset_column_relation", + ["fingerprint", "source_column", sa.text("coalesce(target_column, '')")], + unique=True, + ) + + op.create_table( + "column_lineage", + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("operation_id", sa.UUID(), nullable=False), + sa.Column("run_id", sa.UUID(), nullable=False), + sa.Column("job_id", sa.BigInteger(), nullable=False), + sa.Column("source_dataset_id", sa.BigInteger(), nullable=False), + sa.Column("target_dataset_id", sa.BigInteger(), nullable=False), + sa.Column("fingerprint", sa.UUID(), nullable=False), + sa.PrimaryKeyConstraint("created_at", "id", name=op.f("pk__column_lineage")), + postgresql_partition_by="RANGE (created_at)", + ) + op.create_index(op.f("ix__column_lineage__job_id"), "column_lineage", ["job_id"], unique=False) + op.create_index(op.f("ix__column_lineage__operation_id"), "column_lineage", ["operation_id"], unique=False) + op.create_index(op.f("ix__column_lineage__run_id"), "column_lineage", ["run_id"], unique=False) + op.create_index( + op.f("ix__column_lineage__source_dataset_id"), + "column_lineage", + ["source_dataset_id"], + unique=False, + ) + op.create_index( + op.f("ix__column_lineage__target_dataset_id"), + "column_lineage", + ["target_dataset_id"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix__column_lineage__target_dataset_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__source_dataset_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__run_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__operation_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__job_id"), table_name="column_lineage") + op.drop_table("column_lineage") + + op.drop_index( + op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"), + table_name="dataset_column_relation", + ) + op.drop_table("dataset_column_relation") diff --git a/data_rentgen/db/models/__init__.py b/data_rentgen/db/models/__init__.py index e587c5d7..c97b16ba 100644 --- a/data_rentgen/db/models/__init__.py +++ b/data_rentgen/db/models/__init__.py @@ -3,9 +3,14 @@ from data_rentgen.db.models.address import Address from data_rentgen.db.models.base import Base +from data_rentgen.db.models.column_lineage import ColumnLineage from data_rentgen.db.models.custom_properties import CustomProperties from data_rentgen.db.models.custom_user_properties import CustomUserProperties from data_rentgen.db.models.dataset import Dataset +from data_rentgen.db.models.dataset_column_relation import ( + DatasetColumnRelation, + DatasetColumnRelationType, +) from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType from data_rentgen.db.models.input import Input from data_rentgen.db.models.job import Job, JobType @@ -19,9 +24,12 @@ __all__ = [ "Address", "Base", + "ColumnLineage", "CustomProperties", "CustomUserProperties", "Dataset", + "DatasetColumnRelation", + "DatasetColumnRelationType", "DatasetSymlink", "DatasetSymlinkType", "Input", diff --git a/data_rentgen/db/models/column_lineage.py b/data_rentgen/db/models/column_lineage.py new file mode 100644 index 00000000..c6e408bb --- /dev/null +++ b/data_rentgen/db/models/column_lineage.py @@ -0,0 +1,117 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import UUID as SQL_UUID +from sqlalchemy import ( + BigInteger, + DateTime, + PrimaryKeyConstraint, +) +from sqlalchemy.orm import Mapped, mapped_column, relationship +from uuid6 import UUID + +from data_rentgen.db.models.base import Base +from data_rentgen.db.models.dataset import Dataset +from data_rentgen.db.models.dataset_column_relation import DatasetColumnRelation +from data_rentgen.db.models.job import Job +from data_rentgen.db.models.operation import Operation +from data_rentgen.db.models.run import Run + + +# no foreign keys to avoid scanning all the partitions +class ColumnLineage(Base): + __tablename__ = "column_lineage" + __table_args__ = ( + PrimaryKeyConstraint("created_at", "id"), + {"postgresql_partition_by": "RANGE (created_at)"}, + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + doc="Timestamp component of UUID, used for table partitioning", + ) + id: Mapped[UUID] = mapped_column(SQL_UUID) + + operation_id: Mapped[UUID] = mapped_column( + SQL_UUID, + index=True, + nullable=False, + doc="Operation caused this column lineage", + ) + operation: Mapped[Operation] = relationship( + Operation, + primaryjoin="ColumnLineage.operation_id == Operation.id", + lazy="noload", + foreign_keys=[operation_id], + ) + + run_id: Mapped[UUID] = mapped_column( + SQL_UUID, + index=True, + nullable=False, + doc="Run the operation is bound to", + ) + run: Mapped[Run] = relationship( + Run, + primaryjoin="ColumnLineage.run_id == Run.id", + lazy="noload", + foreign_keys=[run_id], + ) + + job_id: Mapped[int] = mapped_column( + BigInteger, + index=True, + nullable=False, + doc="Parent job of run", + ) + job: Mapped[Job] = relationship( + Job, + primaryjoin="ColumnLineage.job_id == Job.id", + lazy="noload", + foreign_keys=[job_id], + ) + + source_dataset_id: Mapped[int] = mapped_column( + BigInteger, + index=True, + nullable=False, + doc="Dataset the data is originated from", + ) + source_dataset: Mapped[Dataset] = relationship( + Dataset, + primaryjoin="ColumnLineage.source_dataset_id == Dataset.id", + lazy="noload", + foreign_keys=[source_dataset_id], + ) + + target_dataset_id: Mapped[int] = mapped_column( + BigInteger, + index=True, + nullable=False, + doc="Dataset the data is saved to", + ) + target_dataset: Mapped[Dataset] = relationship( + Dataset, + primaryjoin="ColumnLineage.target_dataset_id == Dataset.id", + lazy="noload", + foreign_keys=[target_dataset_id], + ) + + fingerprint: Mapped[UUID] = mapped_column( + SQL_UUID, + index=False, + nullable=False, + doc="Datase column relation fingerprint", + ) + dataset_column_relations: Mapped[list[DatasetColumnRelation]] = relationship( + DatasetColumnRelation, + uselist=True, + primaryjoin="ColumnLineage.fingerprint == DatasetColumnRelation.fingerprint", + lazy="noload", + foreign_keys=[fingerprint], + ) diff --git a/data_rentgen/db/models/dataset_column_relation.py b/data_rentgen/db/models/dataset_column_relation.py new file mode 100644 index 00000000..3c61113e --- /dev/null +++ b/data_rentgen/db/models/dataset_column_relation.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from enum import Flag + +from sqlalchemy import UUID as SQL_UUID +from sqlalchemy import ( + BigInteger, + Column, + Index, + SmallInteger, + String, + func, +) +from sqlalchemy.orm import Mapped, mapped_column +from uuid6 import UUID + +from data_rentgen.db.models.base import Base + + +class DatasetColumnRelationType(Flag): + # See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40 + # Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough + UNKNOWN = 1 + + # Direct + IDENTITY = 2 + TRANSFORMATION = 4 + TRANSFORMATION_MASKING = 8 + AGGREGATION = 16 + AGGREGATION_MASKING = 32 + + # Indirect + FILTER = 64 + JOIN = 128 + GROUP_BY = 256 + SORT = 512 + WINDOW = 1024 + CONDITIONAL = 2048 + + +# no foreign keys to avoid scanning all the partitions +class DatasetColumnRelation(Base): + __tablename__ = "dataset_column_relation" + __table_args__ = ( + Index( + None, + Column("fingerprint"), + Column("source_column"), + # NULLs are distinct by default, we have to convert them to something else. + # This is mostly for compatibility with PG <15, there is no `NULLS NOT DISTINCT` option + func.coalesce(Column("target_column"), ""), + unique=True, + ), + ) + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + fingerprint: Mapped[UUID] = mapped_column( + SQL_UUID, + index=False, + nullable=False, + doc="Schema SHA-1 digest based used for grouping relations together. Currently this is in form of UUID", + ) + + source_column: Mapped[str] = mapped_column( + String(length=255), + index=False, + nullable=False, + doc="Source dataset column the data is originated from", + ) + + target_column: Mapped[str | None] = mapped_column( + String(length=255), + index=False, + nullable=True, + doc=( + "Target dataset column the data is saved to. " + "NULL means the entire target dataset depends on source column" + ), + ) + + type: Mapped[DatasetColumnRelationType] = mapped_column( + SmallInteger(), + index=False, + nullable=False, + doc="Column relation type", + ) diff --git a/data_rentgen/db/repositories/column_lineage.py b/data_rentgen/db/repositories/column_lineage.py new file mode 100644 index 00000000..bd18ea27 --- /dev/null +++ b/data_rentgen/db/repositories/column_lineage.py @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from uuid import UUID + +from sqlalchemy.dialects.postgresql import insert + +from data_rentgen.db.models import ColumnLineage +from data_rentgen.db.repositories.base import Repository +from data_rentgen.db.utils.uuid import ( + extract_timestamp_from_uuid, + generate_incremental_uuid, +) +from data_rentgen.dto import ColumnLineageDTO + + +class ColumnLineageRepository(Repository[ColumnLineage]): + def get_id(self, item: ColumnLineageDTO) -> UUID: + # `created_at' field of lineage should be the same as operation's, + # to avoid scanning all partitions and speed up queries + created_at = extract_timestamp_from_uuid(item.operation.id) + + # instead of using UniqueConstraint on multiple fields, which may produce a lot of table scans, + # use them to calculate unique id + id_components = [ + str(item.operation.id), + str(item.source_dataset.id), + str(item.target_dataset.id), + str(item.fingerprint), + ] + return generate_incremental_uuid(created_at, ".".join(id_components)) + + async def create_bulk(self, items: list[ColumnLineageDTO]): + if not items: + return + + insert_statement = insert(ColumnLineage) + statement = insert_statement.on_conflict_do_nothing( + index_elements=[ColumnLineage.created_at, ColumnLineage.id], + ) + + await self._session.execute( + statement, + [ + { + "id": self.get_id(item), + "created_at": extract_timestamp_from_uuid(item.operation.id), + "operation_id": item.operation.id, + "run_id": item.operation.run.id, + "job_id": item.operation.run.job.id, # type: ignore[arg-type] + "source_dataset_id": item.source_dataset.id, + "target_dataset_id": item.target_dataset.id, + "fingerprint": item.fingerprint, + } + for item in items + ], + ) diff --git a/data_rentgen/db/repositories/dataset_column_relation.py b/data_rentgen/db/repositories/dataset_column_relation.py new file mode 100644 index 00000000..f3d28a25 --- /dev/null +++ b/data_rentgen/db/repositories/dataset_column_relation.py @@ -0,0 +1,73 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from collections import defaultdict +from uuid import UUID + +from sqlalchemy import any_, select, text +from sqlalchemy.dialects.postgresql import insert + +from data_rentgen.db.models import ColumnLineage +from data_rentgen.db.models.dataset_column_relation import ( + DatasetColumnRelation, + DatasetColumnRelationType, +) +from data_rentgen.db.repositories.base import Repository +from data_rentgen.dto import ColumnLineageDTO + + +class DatasetColumnRelationRepository(Repository[ColumnLineage]): + async def create_bulk_for_column_lineage(self, items: list[ColumnLineageDTO]): + if not items: + return + + # small optimization to avoid creating the same relations over and over. + # although we're using ON CONFLICT DO NOTHING, PG still has to perform index scans + # and to get next sequence value for each row + fingerprints_to_create = await self._get_missing_fingerprints([item.fingerprint for item in items]) + relations_to_create = [item for item in items if item.fingerprint in fingerprints_to_create] + + if relations_to_create: + await self._create_dataset_column_relations_bulk(relations_to_create) + + async def _get_missing_fingerprints(self, fingerprints: list[UUID]) -> set[UUID]: + existing = await self._session.execute( + select(DatasetColumnRelation.fingerprint.distinct()).where( + DatasetColumnRelation.fingerprint == any_(fingerprints), # type: ignore[arg-type] + ), + ) + + return set(fingerprints) - set(existing.scalars().all()) + + async def _create_dataset_column_relations_bulk(self, items: list[ColumnLineageDTO]): + # we don't have to return anything, so there is no need to use o_conflict_update. + # also rows are immutable, so there is no need to acquire any lock + insert_statement = insert(DatasetColumnRelation).on_conflict_do_nothing( + index_elements=[ + DatasetColumnRelation.fingerprint, + DatasetColumnRelation.source_column, + text("coalesce(target_column, '')"), + ], + ) + + # several ColumnLineageDTO may have the same set of column relations, deduplicate them + to_insert = defaultdict(list) + for lineage in items: + for column_relation in lineage.column_relations: + key = (lineage.fingerprint, column_relation.source_column, column_relation.target_column) + to_insert[key].append(column_relation) + + await self._session.execute( + insert_statement, + [ + { + # id is autoincremental + "fingerprint": fingerprint, + "source_column": relation.source_column, + "target_column": relation.target_column, + "type": DatasetColumnRelationType(relation.type).value, + } + for (fingerprint, *_), relations in to_insert.items() + for relation in relations + ], + ) diff --git a/data_rentgen/db/scripts/create_partitions.py b/data_rentgen/db/scripts/create_partitions.py index 127d139d..5525dd5f 100755 --- a/data_rentgen/db/scripts/create_partitions.py +++ b/data_rentgen/db/scripts/create_partitions.py @@ -18,13 +18,20 @@ from data_rentgen.db.factory import create_session_factory from data_rentgen.db.models import Input, Operation, Output, Run +from data_rentgen.db.models.column_lineage import ColumnLineage from data_rentgen.db.settings import DatabaseSettings from data_rentgen.logging.settings import LoggingSettings from data_rentgen.logging.setup_logging import setup_logging logger = logging.getLogger(__name__) -PARTITIONED_TABLES = [Run.__tablename__, Operation.__tablename__, Input.__tablename__, Output.__tablename__] +PARTITIONED_TABLES = [ + Run.__tablename__, + Operation.__tablename__, + Input.__tablename__, + Output.__tablename__, + ColumnLineage.__tablename__, +] class Granularity(str, Enum): diff --git a/data_rentgen/dto/__init__.py b/data_rentgen/dto/__init__.py index 8b08361a..dd10de3d 100644 --- a/data_rentgen/dto/__init__.py +++ b/data_rentgen/dto/__init__.py @@ -1,7 +1,12 @@ # SPDX-FileCopyrightText: 2024-2025 MTS PJSC # SPDX-License-Identifier: Apache-2.0 +from data_rentgen.dto.column_lineage import ColumnLineageDTO from data_rentgen.dto.dataset import DatasetDTO +from data_rentgen.dto.dataset_column_relation import ( + DatasetColumnRelationDTO, + DatasetColumnRelationTypeDTO, +) from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO, DatasetSymlinkTypeDTO from data_rentgen.dto.input import InputDTO from data_rentgen.dto.job import JobDTO, JobTypeDTO @@ -18,7 +23,10 @@ from data_rentgen.dto.user import UserDTO __all__ = [ + "ColumnLineageDTO", "DatasetDTO", + "DatasetColumnRelationDTO", + "DatasetColumnRelationTypeDTO", "DatasetSymlinkDTO", "DatasetSymlinkTypeDTO", "LocationDTO", diff --git a/data_rentgen/dto/column_lineage.py b/data_rentgen/dto/column_lineage.py new file mode 100644 index 00000000..bb6373b1 --- /dev/null +++ b/data_rentgen/dto/column_lineage.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from dataclasses import InitVar, dataclass, field +from functools import cached_property +from uuid import UUID + +from uuid6 import UUID as UUIDv6 + +from data_rentgen.db.utils.uuid import generate_static_uuid +from data_rentgen.dto.dataset import DatasetDTO +from data_rentgen.dto.dataset_column_relation import ( + DatasetColumnRelationDTO, + merge_dataset_column_relations, +) +from data_rentgen.dto.operation import OperationDTO + + +@dataclass +class ColumnLineageDTO: + operation: OperationDTO + source_dataset: DatasetDTO + target_dataset: DatasetDTO + dataset_column_relations: InitVar[list[DatasetColumnRelationDTO]] + _dataset_column_relations: list[DatasetColumnRelationDTO] = field(default_factory=list, init=False) + # id is generated using other ids combination + id: UUIDv6 | None = None + + def __post_init__(self, dataset_column_relations: list[DatasetColumnRelationDTO]): + self._dataset_column_relations = merge_dataset_column_relations(dataset_column_relations) + + @cached_property + def unique_key(self) -> tuple: + return (self.operation.unique_key, self.source_dataset.unique_key, self.target_dataset.unique_key) + + @property + def column_relations(self) -> list[DatasetColumnRelationDTO]: + return self._dataset_column_relations + + @cached_property + def fingerprint(self) -> UUID: + id_components = [(*item.unique_key, item.type) for item in self.column_relations] + str_components = [".".join(map(str, item)) for item in id_components] + return generate_static_uuid(",".join(str_components)) + + def merge(self, new: ColumnLineageDTO) -> ColumnLineageDTO: + return ColumnLineageDTO( + operation=self.operation.merge(new.operation), + source_dataset=self.source_dataset.merge(new.source_dataset), + target_dataset=self.target_dataset.merge(new.target_dataset), + dataset_column_relations=self.column_relations + new.column_relations, + id=new.id or self.id, + ) diff --git a/data_rentgen/dto/dataset_column_relation.py b/data_rentgen/dto/dataset_column_relation.py new file mode 100644 index 00000000..4432d4a9 --- /dev/null +++ b/data_rentgen/dto/dataset_column_relation.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from dataclasses import dataclass +from enum import IntFlag +from functools import cached_property + +from uuid6 import UUID + + +class DatasetColumnRelationTypeDTO(IntFlag): + # See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40 + # Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough + UNKNOWN = 1 + + # Direct + IDENTITY = 2 + TRANSFORMATION = 4 + TRANSFORMATION_MASKING = 8 + AGGREGATION = 16 + AGGREGATION_MASKING = 32 + + # Indirect + FILTER = 64 + JOIN = 128 + GROUP_BY = 256 + SORT = 512 + WINDOW = 1024 + CONDITIONAL = 2048 + + +@dataclass +class DatasetColumnRelationDTO: + type: DatasetColumnRelationTypeDTO + source_column: str + target_column: str | None = None + # id is generated using other ids combination + fingerprint: UUID | None = None + # description is always "", see io.openlineage.spark.agent.lifecycle.plan.column.TransformationInfo + + @cached_property + def unique_key(self) -> tuple: + return ( # noqa: WPS227 + self.source_column, + self.target_column or "", + ) + + def merge(self, new: DatasetColumnRelationDTO) -> DatasetColumnRelationDTO: + return DatasetColumnRelationDTO( + source_column=self.source_column, + target_column=self.target_column, + type=self.type | new.type, + fingerprint=new.fingerprint or self.fingerprint, + ) + + +def merge_dataset_column_relations(relations: list[DatasetColumnRelationDTO]) -> list[DatasetColumnRelationDTO]: + result: dict[tuple, DatasetColumnRelationDTO] = {} + for relation in relations: + if relation.unique_key in result: + existing_relation = result[relation.unique_key] + result[relation.unique_key] = existing_relation.merge(relation) + else: + result[relation.unique_key] = relation + + return sorted(result.values(), key=lambda item: item.unique_key) diff --git a/data_rentgen/services/uow.py b/data_rentgen/services/uow.py index 918cabd0..7e721522 100644 --- a/data_rentgen/services/uow.py +++ b/data_rentgen/services/uow.py @@ -5,7 +5,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from typing_extensions import Annotated +from data_rentgen.db.repositories.column_lineage import ColumnLineageRepository from data_rentgen.db.repositories.dataset import DatasetRepository +from data_rentgen.db.repositories.dataset_column_relation import ( + DatasetColumnRelationRepository, +) from data_rentgen.db.repositories.dataset_symlink import DatasetSymlinkRepository from data_rentgen.db.repositories.input import InputRepository from data_rentgen.db.repositories.job import JobRepository @@ -33,6 +37,8 @@ def __init__( self.schema = SchemaRepository(session) self.input = InputRepository(session) self.output = OutputRepository(session) + self.dataset_column_relation = DatasetColumnRelationRepository(session) + self.column_lineage = ColumnLineageRepository(session) self.user = UserRepository(session) async def __aenter__(self): diff --git a/docs/quickstart/setup_spark.rst b/docs/quickstart/setup_spark.rst index c074f011..a528bb1b 100644 --- a/docs/quickstart/setup_spark.rst +++ b/docs/quickstart/setup_spark.rst @@ -7,7 +7,7 @@ Requirements ------------ * `Apache Spark `_ 3.x or higher -* `OpenLineage integration for Spark `_ 1.19.0 or higher +* `OpenLineage integration for Spark `_ 1.23.0 or higher Setup ----- @@ -23,7 +23,7 @@ Setup # install OpenLineage integration and Kafka client .config( "spark.jars.packages", - "io.openlineage:openlineage-spark_2.12:1.24.2,org.apache.kafka:kafka-clients:3.9.0", + "io.openlineage:openlineage-spark_2.12:1.27.0,org.apache.kafka:kafka-clients:3.9.0", ) .config( "spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener" @@ -65,6 +65,7 @@ Setup r"(.*?)(?(?:\/[^\/=]+=[^\/=]+)+)", ) .config("spark.openlineage.jobName.appendDatasetName", "false") + .config("spark.openlineage.columnLineage.datasetLineageEnabled", "true") .getOrCreate() ) diff --git a/docs/reference/database/structure.rst b/docs/reference/database/structure.rst index 850cca9d..a42b50aa 100644 --- a/docs/reference/database/structure.rst +++ b/docs/reference/database/structure.rst @@ -128,6 +128,27 @@ Database structure num_files: bigint } + entity DatasetColumnRelation { + * id: bigint + ---- + * fingerprint: uuid(v5) + * source_column: varchar(255) + * target_column: varchar(255) null + type: smallint + } + + entity ColumnLineage { + * id: uuid(v7) + * created_at: timestamptz + ---- + * operation_id: uuid(v7) + * run_id: uuid(v7) + * job_id: bigint + * source_dataset_id: bigint + * target_dataset_id: bigint + * fingerprint: uuid(v5) + } + Address ||--o{ Location Dataset ||--o{ Location @@ -153,4 +174,11 @@ Database structure Output ||--o{ Dataset Output |o--o{ Schema + ColumnLineage ||--o{ Operation + ColumnLineage ||--o{ Run + ColumnLineage ||--o{ Job + ColumnLineage "source_dataset_id" ||--o{ Dataset + ColumnLineage "target_dataset_id" ||--o{ Dataset + ColumnLineage ||--o{ DatasetColumnRelation + @enduml