Skip to content

Commit

Permalink
[DOP-22364] Add column lineage tables
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Feb 4, 2025
1 parent c5dfe66 commit 581b22a
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 1 deletion.
2 changes: 1 addition & 1 deletion data_rentgen/db/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

target_metadata = (Base.metadata,)

PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y"]
PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y", "column_lineage_y"]


def include_all_except_partitions(object, name, type_, reflected, compare_to):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
"""Create column lineage
Revision ID: 15c0a22b8566
Revises: f017d4c58658
Create Date: 2025-02-04 11:30:21.190161
"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "15c0a22b8566"
down_revision = "f017d4c58658"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"dataset_column_relation",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("fingerprint", sa.UUID(), nullable=False),
sa.Column("source_column", sa.String(length=255), nullable=False),
sa.Column("target_column", sa.String(length=255), nullable=True),
sa.Column("type", sa.SmallInteger(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk__dataset_column_relation")),
)
op.create_index(
op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"),
"dataset_column_relation",
["fingerprint", "source_column", sa.text("coalesce(target_column, '')")],
unique=True,
)

op.create_table(
"column_lineage",
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("operation_id", sa.UUID(), nullable=False),
sa.Column("run_id", sa.UUID(), nullable=False),
sa.Column("job_id", sa.BigInteger(), nullable=False),
sa.Column("source_dataset_id", sa.BigInteger(), nullable=False),
sa.Column("target_dataset_id", sa.BigInteger(), nullable=False),
sa.Column("fingerprint", sa.UUID(), nullable=False),
sa.PrimaryKeyConstraint("created_at", "id", name=op.f("pk__column_lineage")),
postgresql_partition_by="RANGE (created_at)",
)
op.create_index(op.f("ix__column_lineage__job_id"), "column_lineage", ["job_id"], unique=False)
op.create_index(op.f("ix__column_lineage__operation_id"), "column_lineage", ["operation_id"], unique=False)
op.create_index(op.f("ix__column_lineage__run_id"), "column_lineage", ["run_id"], unique=False)
op.create_index(
op.f("ix__column_lineage__source_dataset_id"),
"column_lineage",
["source_dataset_id"],
unique=False,
)
op.create_index(
op.f("ix__column_lineage__target_dataset_id"),
"column_lineage",
["target_dataset_id"],
unique=False,
)


def downgrade() -> None:
op.drop_index(op.f("ix__column_lineage__target_dataset_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__source_dataset_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__run_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__operation_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__job_id"), table_name="column_lineage")
op.drop_table("column_lineage")

op.drop_index(
op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"),
table_name="dataset_column_relation",
)
op.drop_table("dataset_column_relation")
8 changes: 8 additions & 0 deletions data_rentgen/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

from data_rentgen.db.models.address import Address
from data_rentgen.db.models.base import Base
from data_rentgen.db.models.column_lineage import ColumnLineage
from data_rentgen.db.models.custom_properties import CustomProperties
from data_rentgen.db.models.custom_user_properties import CustomUserProperties
from data_rentgen.db.models.dataset import Dataset
from data_rentgen.db.models.dataset_column_relation import (
DatasetColumnRelation,
DatasetColumnRelationType,
)
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
from data_rentgen.db.models.input import Input
from data_rentgen.db.models.job import Job, JobType
Expand All @@ -19,9 +24,12 @@
__all__ = [
"Address",
"Base",
"ColumnLineage",
"CustomProperties",
"CustomUserProperties",
"Dataset",
"DatasetColumnRelation",
"DatasetColumnRelationType",
"DatasetSymlink",
"DatasetSymlinkType",
"Input",
Expand Down
117 changes: 117 additions & 0 deletions data_rentgen/db/models/column_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from datetime import datetime

from sqlalchemy import UUID as SQL_UUID
from sqlalchemy import (
BigInteger,
DateTime,
PrimaryKeyConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from uuid6 import UUID

from data_rentgen.db.models.base import Base
from data_rentgen.db.models.dataset import Dataset
from data_rentgen.db.models.dataset_column_relation import DatasetColumnRelation
from data_rentgen.db.models.job import Job
from data_rentgen.db.models.operation import Operation
from data_rentgen.db.models.run import Run


# no foreign keys to avoid scanning all the partitions
class ColumnLineage(Base):
__tablename__ = "column_lineage"
__table_args__ = (
PrimaryKeyConstraint("created_at", "id"),
{"postgresql_partition_by": "RANGE (created_at)"},
)

created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
doc="Timestamp component of UUID, used for table partitioning",
)
id: Mapped[UUID] = mapped_column(SQL_UUID)

operation_id: Mapped[UUID] = mapped_column(
SQL_UUID,
index=True,
nullable=False,
doc="Operation caused this column lineage",
)
operation: Mapped[Operation] = relationship(
Operation,
primaryjoin="ColumnLineage.operation_id == Operation.id",
lazy="noload",
foreign_keys=[operation_id],
)

run_id: Mapped[UUID] = mapped_column(
SQL_UUID,
index=True,
nullable=False,
doc="Run the operation is bound to",
)
run: Mapped[Run] = relationship(
Run,
primaryjoin="ColumnLineage.run_id == Run.id",
lazy="noload",
foreign_keys=[run_id],
)

job_id: Mapped[int] = mapped_column(
BigInteger,
index=True,
nullable=False,
doc="Parent job of run",
)
job: Mapped[Job] = relationship(
Job,
primaryjoin="ColumnLineage.job_id == Job.id",
lazy="noload",
foreign_keys=[job_id],
)

source_dataset_id: Mapped[int] = mapped_column(
BigInteger,
index=True,
nullable=False,
doc="Dataset the data is originated from",
)
source_dataset: Mapped[Dataset] = relationship(
Dataset,
primaryjoin="ColumnLineage.source_dataset_id == Dataset.id",
lazy="noload",
foreign_keys=[source_dataset_id],
)

target_dataset_id: Mapped[int] = mapped_column(
BigInteger,
index=True,
nullable=False,
doc="Dataset the data is saved to",
)
target_dataset: Mapped[Dataset] = relationship(
Dataset,
primaryjoin="ColumnLineage.target_dataset_id == Dataset.id",
lazy="noload",
foreign_keys=[target_dataset_id],
)

fingerprint: Mapped[UUID] = mapped_column(
SQL_UUID,
index=False,
nullable=False,
doc="Datase column relation fingerprint",
)
dataset_column_relations: Mapped[list[DatasetColumnRelation]] = relationship(
DatasetColumnRelation,
uselist=True,
primaryjoin="ColumnLineage.fingerprint == DatasetColumnRelation.fingerprint",
lazy="noload",
foreign_keys=[fingerprint],
)
89 changes: 89 additions & 0 deletions data_rentgen/db/models/dataset_column_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from enum import Flag

from sqlalchemy import UUID as SQL_UUID
from sqlalchemy import (
BigInteger,
Column,
Index,
SmallInteger,
String,
func,
)
from sqlalchemy.orm import Mapped, mapped_column
from uuid6 import UUID

from data_rentgen.db.models.base import Base


class DatasetColumnRelationType(Flag):
# See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40
# Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough
UNKNOWN = 1

# Direct
IDENTITY = 2
TRANSFORMATION = 4
TRANSFORMATION_MASKING = 8
AGGREGATION = 16
AGGREGATION_MASKING = 32

# Indirect
FILTER = 64
JOIN = 128
GROUP_BY = 256
SORT = 512
WINDOW = 1024
CONDITIONAL = 2048


# no foreign keys to avoid scanning all the partitions
class DatasetColumnRelation(Base):
__tablename__ = "dataset_column_relation"
__table_args__ = (
Index(
None,
Column("fingerprint"),
Column("source_column"),
# NULLs are distinct by default, we have to convert them to something else.
# This is mostly for compatibility with PG <15, there is no `NULLS NOT DISTINCT` option
func.coalesce(Column("target_column"), ""),
unique=True,
),
)

id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
fingerprint: Mapped[UUID] = mapped_column(
SQL_UUID,
index=False,
nullable=False,
doc="Schema SHA-1 digest based used for grouping relations together. Currently this is in form of UUID",
)

source_column: Mapped[str] = mapped_column(
String(length=255),
index=False,
nullable=False,
doc="Source dataset column the data is originated from",
)

target_column: Mapped[str | None] = mapped_column(
String(length=255),
index=False,
nullable=True,
doc=(
"Target dataset column the data is saved to. "
"NULL means the entire target dataset depends on source column"
),
)

type: Mapped[DatasetColumnRelationType] = mapped_column(
SmallInteger(),
index=False,
nullable=False,
doc="Column transformation type",
)
28 changes: 28 additions & 0 deletions docs/reference/database/structure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ Database structure
num_files: bigint
}

entity DatasetColumnRelation {
* id: bigint
----
* fingerprint: uuid(v5)
* source_column: varchar(255)
* target_column: varchar(255) null
type: smallint
}

entity ColumnLineage {
* id: uuid(v7)
* created_at: timestamptz
----
* operation_id: uuid(v7)
* run_id: uuid(v7)
* job_id: bigint
* source_dataset_id: bigint
* target_dataset_id: bigint
* fingerprint: uuid(v5)
}

Address ||--o{ Location

Dataset ||--o{ Location
Expand All @@ -153,4 +174,11 @@ Database structure
Output ||--o{ Dataset
Output |o--o{ Schema
ColumnLineage ||--o{ Operation
ColumnLineage ||--o{ Run
ColumnLineage ||--o{ Job
ColumnLineage "source_dataset_id" ||--o{ Dataset
ColumnLineage "target_dataset_id" ||--o{ Dataset
ColumnLineage ||--o{ DatasetColumnRelation

@enduml

0 comments on commit 581b22a

Please sign in to comment.