Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-22364] Add column lineage tables #153

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data_rentgen/db/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

target_metadata = (Base.metadata,)

PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y"]
PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y", "column_lineage_y"]


def include_all_except_partitions(object, name, type_, reflected, compare_to):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
"""Create column lineage

Revision ID: 15c0a22b8566
Revises: f017d4c58658
Create Date: 2025-02-04 11:30:21.190161

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "15c0a22b8566"
down_revision = "f017d4c58658"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.create_table(
"dataset_column_relation",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("fingerprint", sa.UUID(), nullable=False),
sa.Column("source_column", sa.String(length=255), nullable=False),
sa.Column("target_column", sa.String(length=255), nullable=True),
sa.Column("type", sa.SmallInteger(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk__dataset_column_relation")),
)
op.create_index(
op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"),
"dataset_column_relation",
["fingerprint", "source_column", sa.text("coalesce(target_column, '')")],
unique=True,
)

op.create_table(
"column_lineage",
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("id", sa.UUID(), nullable=False),
sa.Column("operation_id", sa.UUID(), nullable=False),
sa.Column("run_id", sa.UUID(), nullable=False),
sa.Column("job_id", sa.BigInteger(), nullable=False),
sa.Column("source_dataset_id", sa.BigInteger(), nullable=False),
sa.Column("target_dataset_id", sa.BigInteger(), nullable=False),
sa.Column("fingerprint", sa.UUID(), nullable=False),
sa.PrimaryKeyConstraint("created_at", "id", name=op.f("pk__column_lineage")),
postgresql_partition_by="RANGE (created_at)",
)
op.create_index(op.f("ix__column_lineage__job_id"), "column_lineage", ["job_id"], unique=False)
op.create_index(op.f("ix__column_lineage__operation_id"), "column_lineage", ["operation_id"], unique=False)
op.create_index(op.f("ix__column_lineage__run_id"), "column_lineage", ["run_id"], unique=False)
op.create_index(
op.f("ix__column_lineage__source_dataset_id"),
"column_lineage",
["source_dataset_id"],
unique=False,
)
op.create_index(
op.f("ix__column_lineage__target_dataset_id"),
"column_lineage",
["target_dataset_id"],
unique=False,
)


def downgrade() -> None:
op.drop_index(op.f("ix__column_lineage__target_dataset_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__source_dataset_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__run_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__operation_id"), table_name="column_lineage")
op.drop_index(op.f("ix__column_lineage__job_id"), table_name="column_lineage")
op.drop_table("column_lineage")

op.drop_index(
op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"),
table_name="dataset_column_relation",
)
op.drop_table("dataset_column_relation")
8 changes: 8 additions & 0 deletions data_rentgen/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

from data_rentgen.db.models.address import Address
from data_rentgen.db.models.base import Base
from data_rentgen.db.models.column_lineage import ColumnLineage
from data_rentgen.db.models.custom_properties import CustomProperties
from data_rentgen.db.models.custom_user_properties import CustomUserProperties
from data_rentgen.db.models.dataset import Dataset
from data_rentgen.db.models.dataset_column_relation import (
DatasetColumnRelation,
DatasetColumnRelationType,
)
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
from data_rentgen.db.models.input import Input
from data_rentgen.db.models.job import Job, JobType
Expand All @@ -19,9 +24,12 @@
__all__ = [
"Address",
"Base",
"ColumnLineage",
"CustomProperties",
"CustomUserProperties",
"Dataset",
"DatasetColumnRelation",
"DatasetColumnRelationType",
"DatasetSymlink",
"DatasetSymlinkType",
"Input",
Expand Down
117 changes: 117 additions & 0 deletions data_rentgen/db/models/column_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from datetime import datetime

from sqlalchemy import UUID as SQL_UUID
from sqlalchemy import (
BigInteger,
DateTime,
PrimaryKeyConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from uuid6 import UUID

from data_rentgen.db.models.base import Base
from data_rentgen.db.models.dataset import Dataset
from data_rentgen.db.models.dataset_column_relation import DatasetColumnRelation
from data_rentgen.db.models.job import Job
from data_rentgen.db.models.operation import Operation
from data_rentgen.db.models.run import Run


# no foreign keys to avoid scanning all the partitions
class ColumnLineage(Base):
__tablename__ = "column_lineage"
__table_args__ = (
PrimaryKeyConstraint("created_at", "id"),
{"postgresql_partition_by": "RANGE (created_at)"},
)

created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
doc="Timestamp component of UUID, used for table partitioning",
)
id: Mapped[UUID] = mapped_column(SQL_UUID)

operation_id: Mapped[UUID] = mapped_column(
SQL_UUID,
index=True,
nullable=False,
doc="Operation caused this column lineage",
)
operation: Mapped[Operation] = relationship(
Operation,
primaryjoin="ColumnLineage.operation_id == Operation.id",
lazy="noload",
foreign_keys=[operation_id],
)

run_id: Mapped[UUID] = mapped_column(
SQL_UUID,
index=True,
nullable=False,
doc="Run the operation is bound to",
)
run: Mapped[Run] = relationship(
Run,
primaryjoin="ColumnLineage.run_id == Run.id",
lazy="noload",
foreign_keys=[run_id],
)

job_id: Mapped[int] = mapped_column(
BigInteger,
index=True,
nullable=False,
doc="Parent job of run",
)
job: Mapped[Job] = relationship(
Job,
primaryjoin="ColumnLineage.job_id == Job.id",
lazy="noload",
foreign_keys=[job_id],
)

source_dataset_id: Mapped[int] = mapped_column(
BigInteger,
index=True,
nullable=False,
doc="Dataset the data is originated from",
)
source_dataset: Mapped[Dataset] = relationship(
Dataset,
primaryjoin="ColumnLineage.source_dataset_id == Dataset.id",
lazy="noload",
foreign_keys=[source_dataset_id],
)

target_dataset_id: Mapped[int] = mapped_column(
BigInteger,
index=True,
nullable=False,
doc="Dataset the data is saved to",
)
target_dataset: Mapped[Dataset] = relationship(
Dataset,
primaryjoin="ColumnLineage.target_dataset_id == Dataset.id",
lazy="noload",
foreign_keys=[target_dataset_id],
)

fingerprint: Mapped[UUID] = mapped_column(
SQL_UUID,
index=False,
nullable=False,
doc="Datase column relation fingerprint",
dolfinus marked this conversation as resolved.
Show resolved Hide resolved
)
dataset_column_relations: Mapped[list[DatasetColumnRelation]] = relationship(
DatasetColumnRelation,
uselist=True,
primaryjoin="ColumnLineage.fingerprint == DatasetColumnRelation.fingerprint",
lazy="noload",
foreign_keys=[fingerprint],
)
89 changes: 89 additions & 0 deletions data_rentgen/db/models/dataset_column_relation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from enum import Flag

from sqlalchemy import UUID as SQL_UUID
from sqlalchemy import (
BigInteger,
Column,
Index,
SmallInteger,
String,
func,
)
from sqlalchemy.orm import Mapped, mapped_column
from uuid6 import UUID

from data_rentgen.db.models.base import Base


class DatasetColumnRelationType(Flag):
# See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40
# Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough
UNKNOWN = 1

# Direct
IDENTITY = 2
TRANSFORMATION = 4
TRANSFORMATION_MASKING = 8
AGGREGATION = 16
AGGREGATION_MASKING = 32

# Indirect
FILTER = 64
JOIN = 128
GROUP_BY = 256
SORT = 512
WINDOW = 1024
CONDITIONAL = 2048


# no foreign keys to avoid scanning all the partitions
class DatasetColumnRelation(Base):
__tablename__ = "dataset_column_relation"
__table_args__ = (
Index(
None,
Column("fingerprint"),
Column("source_column"),
# NULLs are distinct by default, we have to convert them to something else.
# This is mostly for compatibility with PG <15, there is no `NULLS NOT DISTINCT` option
func.coalesce(Column("target_column"), ""),
unique=True,
),
)

id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
fingerprint: Mapped[UUID] = mapped_column(
SQL_UUID,
index=False,
nullable=False,
doc="Schema SHA-1 digest based used for grouping relations together. Currently this is in form of UUID",
)

source_column: Mapped[str] = mapped_column(
String(length=255),
index=False,
nullable=False,
doc="Source dataset column the data is originated from",
)

target_column: Mapped[str | None] = mapped_column(
String(length=255),
index=False,
nullable=True,
doc=(
"Target dataset column the data is saved to. "
"NULL means the entire target dataset depends on source column"
),
)

type: Mapped[DatasetColumnRelationType] = mapped_column(
SmallInteger(),
index=False,
nullable=False,
doc="Column transformation type",
)
28 changes: 28 additions & 0 deletions docs/reference/database/structure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ Database structure
num_files: bigint
}

entity DatasetColumnRelation {
* id: bigint
----
* fingerprint: uuid(v5)
* source_column: varchar(255)
* target_column: varchar(255) null
type: smallint
}

entity ColumnLineage {
* id: uuid(v7)
* created_at: timestamptz
----
* operation_id: uuid(v7)
* run_id: uuid(v7)
* job_id: bigint
* source_dataset_id: bigint
* target_dataset_id: bigint
* fingerprint: uuid(v5)
}

Address ||--o{ Location

Dataset ||--o{ Location
Expand All @@ -153,4 +174,11 @@ Database structure
Output ||--o{ Dataset
Output |o--o{ Schema
ColumnLineage ||--o{ Operation
ColumnLineage ||--o{ Run
ColumnLineage ||--o{ Job
ColumnLineage "source_dataset_id" ||--o{ Dataset
ColumnLineage "target_dataset_id" ||--o{ Dataset
ColumnLineage ||--o{ DatasetColumnRelation

@enduml
Loading