diff --git a/data_rentgen/db/migrations/env.py b/data_rentgen/db/migrations/env.py index 05d340b..cf6a25c 100644 --- a/data_rentgen/db/migrations/env.py +++ b/data_rentgen/db/migrations/env.py @@ -23,7 +23,7 @@ target_metadata = (Base.metadata,) -PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y"] +PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y", "column_lineage_y"] def include_all_except_partitions(object, name, type_, reflected, compare_to): diff --git a/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py b/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py new file mode 100644 index 0000000..0b66da7 --- /dev/null +++ b/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py @@ -0,0 +1,79 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +"""Create column lineage + +Revision ID: 15c0a22b8566 +Revises: f017d4c58658 +Create Date: 2025-02-04 11:30:21.190161 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "15c0a22b8566" +down_revision = "f017d4c58658" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "dataset_column_relation", + sa.Column("id", sa.BigInteger(), nullable=False), + sa.Column("fingerprint", sa.UUID(), nullable=False), + sa.Column("source_column", sa.String(length=255), nullable=False), + sa.Column("target_column", sa.String(length=255), nullable=True), + sa.Column("type", sa.SmallInteger(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk__dataset_column_relation")), + ) + op.create_index( + op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"), + "dataset_column_relation", + ["fingerprint", "source_column", sa.text("coalesce(target_column, '')")], + unique=True, + ) + + op.create_table( + "column_lineage", + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("operation_id", sa.UUID(), nullable=False), + sa.Column("run_id", sa.UUID(), nullable=False), + sa.Column("job_id", sa.BigInteger(), nullable=False), + sa.Column("source_dataset_id", sa.BigInteger(), nullable=False), + sa.Column("target_dataset_id", sa.BigInteger(), nullable=False), + sa.Column("fingerprint", sa.UUID(), nullable=False), + sa.PrimaryKeyConstraint("created_at", "id", name=op.f("pk__column_lineage")), + postgresql_partition_by="RANGE (created_at)", + ) + op.create_index(op.f("ix__column_lineage__job_id"), "column_lineage", ["job_id"], unique=False) + op.create_index(op.f("ix__column_lineage__operation_id"), "column_lineage", ["operation_id"], unique=False) + op.create_index(op.f("ix__column_lineage__run_id"), "column_lineage", ["run_id"], unique=False) + op.create_index( + op.f("ix__column_lineage__source_dataset_id"), + "column_lineage", + ["source_dataset_id"], + unique=False, + ) + op.create_index( + op.f("ix__column_lineage__target_dataset_id"), + "column_lineage", + ["target_dataset_id"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix__column_lineage__target_dataset_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__source_dataset_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__run_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__operation_id"), table_name="column_lineage") + op.drop_index(op.f("ix__column_lineage__job_id"), table_name="column_lineage") + op.drop_table("column_lineage") + + op.drop_index( + op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"), + table_name="dataset_column_relation", + ) + op.drop_table("dataset_column_relation") diff --git a/data_rentgen/db/models/__init__.py b/data_rentgen/db/models/__init__.py index e587c5d..c97b16b 100644 --- a/data_rentgen/db/models/__init__.py +++ b/data_rentgen/db/models/__init__.py @@ -3,9 +3,14 @@ from data_rentgen.db.models.address import Address from data_rentgen.db.models.base import Base +from data_rentgen.db.models.column_lineage import ColumnLineage from data_rentgen.db.models.custom_properties import CustomProperties from data_rentgen.db.models.custom_user_properties import CustomUserProperties from data_rentgen.db.models.dataset import Dataset +from data_rentgen.db.models.dataset_column_relation import ( + DatasetColumnRelation, + DatasetColumnRelationType, +) from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType from data_rentgen.db.models.input import Input from data_rentgen.db.models.job import Job, JobType @@ -19,9 +24,12 @@ __all__ = [ "Address", "Base", + "ColumnLineage", "CustomProperties", "CustomUserProperties", "Dataset", + "DatasetColumnRelation", + "DatasetColumnRelationType", "DatasetSymlink", "DatasetSymlinkType", "Input", diff --git a/data_rentgen/db/models/column_lineage.py b/data_rentgen/db/models/column_lineage.py new file mode 100644 index 0000000..c6e408b --- /dev/null +++ b/data_rentgen/db/models/column_lineage.py @@ -0,0 +1,117 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import UUID as SQL_UUID +from sqlalchemy import ( + BigInteger, + DateTime, + PrimaryKeyConstraint, +) +from sqlalchemy.orm import Mapped, mapped_column, relationship +from uuid6 import UUID + +from data_rentgen.db.models.base import Base +from data_rentgen.db.models.dataset import Dataset +from data_rentgen.db.models.dataset_column_relation import DatasetColumnRelation +from data_rentgen.db.models.job import Job +from data_rentgen.db.models.operation import Operation +from data_rentgen.db.models.run import Run + + +# no foreign keys to avoid scanning all the partitions +class ColumnLineage(Base): + __tablename__ = "column_lineage" + __table_args__ = ( + PrimaryKeyConstraint("created_at", "id"), + {"postgresql_partition_by": "RANGE (created_at)"}, + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + doc="Timestamp component of UUID, used for table partitioning", + ) + id: Mapped[UUID] = mapped_column(SQL_UUID) + + operation_id: Mapped[UUID] = mapped_column( + SQL_UUID, + index=True, + nullable=False, + doc="Operation caused this column lineage", + ) + operation: Mapped[Operation] = relationship( + Operation, + primaryjoin="ColumnLineage.operation_id == Operation.id", + lazy="noload", + foreign_keys=[operation_id], + ) + + run_id: Mapped[UUID] = mapped_column( + SQL_UUID, + index=True, + nullable=False, + doc="Run the operation is bound to", + ) + run: Mapped[Run] = relationship( + Run, + primaryjoin="ColumnLineage.run_id == Run.id", + lazy="noload", + foreign_keys=[run_id], + ) + + job_id: Mapped[int] = mapped_column( + BigInteger, + index=True, + nullable=False, + doc="Parent job of run", + ) + job: Mapped[Job] = relationship( + Job, + primaryjoin="ColumnLineage.job_id == Job.id", + lazy="noload", + foreign_keys=[job_id], + ) + + source_dataset_id: Mapped[int] = mapped_column( + BigInteger, + index=True, + nullable=False, + doc="Dataset the data is originated from", + ) + source_dataset: Mapped[Dataset] = relationship( + Dataset, + primaryjoin="ColumnLineage.source_dataset_id == Dataset.id", + lazy="noload", + foreign_keys=[source_dataset_id], + ) + + target_dataset_id: Mapped[int] = mapped_column( + BigInteger, + index=True, + nullable=False, + doc="Dataset the data is saved to", + ) + target_dataset: Mapped[Dataset] = relationship( + Dataset, + primaryjoin="ColumnLineage.target_dataset_id == Dataset.id", + lazy="noload", + foreign_keys=[target_dataset_id], + ) + + fingerprint: Mapped[UUID] = mapped_column( + SQL_UUID, + index=False, + nullable=False, + doc="Datase column relation fingerprint", + ) + dataset_column_relations: Mapped[list[DatasetColumnRelation]] = relationship( + DatasetColumnRelation, + uselist=True, + primaryjoin="ColumnLineage.fingerprint == DatasetColumnRelation.fingerprint", + lazy="noload", + foreign_keys=[fingerprint], + ) diff --git a/data_rentgen/db/models/dataset_column_relation.py b/data_rentgen/db/models/dataset_column_relation.py new file mode 100644 index 0000000..13f8835 --- /dev/null +++ b/data_rentgen/db/models/dataset_column_relation.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: 2024-2025 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from enum import Flag + +from sqlalchemy import UUID as SQL_UUID +from sqlalchemy import ( + BigInteger, + Column, + Index, + SmallInteger, + String, + func, +) +from sqlalchemy.orm import Mapped, mapped_column +from uuid6 import UUID + +from data_rentgen.db.models.base import Base + + +class DatasetColumnRelationType(Flag): + # See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40 + # Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough + UNKNOWN = 1 + + # Direct + IDENTITY = 2 + TRANSFORMATION = 4 + TRANSFORMATION_MASKING = 8 + AGGREGATION = 16 + AGGREGATION_MASKING = 32 + + # Indirect + FILTER = 64 + JOIN = 128 + GROUP_BY = 256 + SORT = 512 + WINDOW = 1024 + CONDITIONAL = 2048 + + +# no foreign keys to avoid scanning all the partitions +class DatasetColumnRelation(Base): + __tablename__ = "dataset_column_relation" + __table_args__ = ( + Index( + None, + Column("fingerprint"), + Column("source_column"), + # NULLs are distinct by default, we have to convert them to something else. + # This is mostly for compatibility with PG <15, there is no `NULLS NOT DISTINCT` option + func.coalesce(Column("target_column"), ""), + unique=True, + ), + ) + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + fingerprint: Mapped[UUID] = mapped_column( + SQL_UUID, + index=False, + nullable=False, + doc="Schema SHA-1 digest based used for grouping relations together. Currently this is in form of UUID", + ) + + source_column: Mapped[str] = mapped_column( + String(length=255), + index=False, + nullable=False, + doc="Source dataset column the data is originated from", + ) + + target_column: Mapped[str | None] = mapped_column( + String(length=255), + index=False, + nullable=True, + doc=( + "Target dataset column the data is saved to. " + "NULL means the entire target dataset depends on source column" + ), + ) + + type: Mapped[DatasetColumnRelationType] = mapped_column( + SmallInteger(), + index=False, + nullable=False, + doc="Column transformation type", + ) diff --git a/docs/reference/database/structure.rst b/docs/reference/database/structure.rst index 850cca9..a42b50a 100644 --- a/docs/reference/database/structure.rst +++ b/docs/reference/database/structure.rst @@ -128,6 +128,27 @@ Database structure num_files: bigint } + entity DatasetColumnRelation { + * id: bigint + ---- + * fingerprint: uuid(v5) + * source_column: varchar(255) + * target_column: varchar(255) null + type: smallint + } + + entity ColumnLineage { + * id: uuid(v7) + * created_at: timestamptz + ---- + * operation_id: uuid(v7) + * run_id: uuid(v7) + * job_id: bigint + * source_dataset_id: bigint + * target_dataset_id: bigint + * fingerprint: uuid(v5) + } + Address ||--o{ Location Dataset ||--o{ Location @@ -153,4 +174,11 @@ Database structure Output ||--o{ Dataset Output |o--o{ Schema + ColumnLineage ||--o{ Operation + ColumnLineage ||--o{ Run + ColumnLineage ||--o{ Job + ColumnLineage "source_dataset_id" ||--o{ Dataset + ColumnLineage "target_dataset_id" ||--o{ Dataset + ColumnLineage ||--o{ DatasetColumnRelation + @enduml