diff --git a/.env.local b/.env.local
index cc0fb999..729536c6 100644
--- a/.env.local
+++ b/.env.local
@@ -7,6 +7,11 @@ export DATA_RENTGEN__KAFKA__SECURITY__TYPE=scram-sha256
export DATA_RENTGEN__KAFKA__SECURITY__USER=data_rentgen
export DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
export DATA_RENTGEN__KAFKA__COMPRESSION=zstd
+#export DATA_RENTGEN__CONSUMER__MAX_RECORDS=100
+# Handling events with a lot of column lineage takes so much time
+# that Kafka coodrinator consider worker as dead. Limit by total message size.
+# This value depends on the number of OL version, complexity of Spark jobs, number of Kafka partitions and number of workers.
+export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=200Kb
export DATA_RENTGEN__SERVER__DEBUG=true
diff --git a/data_rentgen/consumer/extractors/__init__.py b/data_rentgen/consumer/extractors/__init__.py
index a41c5000..e3703e35 100644
--- a/data_rentgen/consumer/extractors/__init__.py
+++ b/data_rentgen/consumer/extractors/__init__.py
@@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
from data_rentgen.consumer.extractors.batch import BatchExtractionResult, extract_batch
+from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.dataset import (
connect_dataset_with_symlinks,
extract_dataset,
@@ -15,6 +16,7 @@
from data_rentgen.consumer.extractors.schema import extract_schema
__all__ = [
+ "extract_column_lineage",
"extract_dataset_and_symlinks",
"extract_dataset",
"connect_dataset_with_symlinks",
diff --git a/data_rentgen/consumer/extractors/batch.py b/data_rentgen/consumer/extractors/batch.py
index 038e476e..e0d49afb 100644
--- a/data_rentgen/consumer/extractors/batch.py
+++ b/data_rentgen/consumer/extractors/batch.py
@@ -4,6 +4,7 @@
from typing import TypeVar
+from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.input import extract_input
from data_rentgen.consumer.extractors.operation import extract_operation
from data_rentgen.consumer.extractors.output import extract_output
@@ -11,7 +12,9 @@
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
from data_rentgen.dto import (
+ ColumnLineageDTO,
DatasetDTO,
+ DatasetSymlinkDTO,
InputDTO,
JobDTO,
LocationDTO,
@@ -21,12 +24,12 @@
SchemaDTO,
UserDTO,
)
-from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO
T = TypeVar(
"T",
LocationDTO,
DatasetDTO,
+ ColumnLineageDTO,
DatasetSymlinkDTO,
JobDTO,
RunDTO,
@@ -65,6 +68,7 @@ def __init__(self):
self._operations: dict[tuple, OperationDTO] = {}
self._inputs: dict[tuple, InputDTO] = {}
self._outputs: dict[tuple, OutputDTO] = {}
+ self._column_lineage: dict[tuple, ColumnLineageDTO] = {}
self._schemas: dict[tuple, SchemaDTO] = {}
self._users: dict[tuple, UserDTO] = {}
@@ -79,6 +83,7 @@ def __repr__(self):
f"operations={len(self._operations)}, "
f"inputs={len(self._inputs)}, "
f"outputs={len(self._outputs)}, "
+ f"column_lineage={len(self._column_lineage)}, "
f"schemas={len(self._schemas)}, "
f"users={len(self._users)}"
")"
@@ -139,6 +144,12 @@ def add_output(self, output: OutputDTO):
if output.schema:
self.add_schema(output.schema)
+ def add_column_lineage(self, lineage: ColumnLineageDTO):
+ self._add(self._column_lineage, lineage)
+ self.add_dataset(lineage.source_dataset)
+ self.add_dataset(lineage.target_dataset)
+ self.add_operation(lineage.operation)
+
def add_schema(self, schema: SchemaDTO):
self._add(self._schemas, schema)
@@ -200,6 +211,13 @@ def _get_output(self, output_key: tuple) -> OutputDTO:
output.schema = self._get_schema(output.schema.unique_key)
return output
+ def _get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO:
+ lineage = self._column_lineage[output_key]
+ lineage.operation = self._get_operation(lineage.operation.unique_key)
+ lineage.source_dataset = self._get_dataset(lineage.source_dataset.unique_key)
+ lineage.target_dataset = self._get_dataset(lineage.target_dataset.unique_key)
+ return lineage
+
def locations(self) -> list[LocationDTO]:
return list(map(self._get_location, self._locations))
@@ -224,6 +242,9 @@ def inputs(self) -> list[InputDTO]:
def outputs(self) -> list[OutputDTO]:
return list(map(self._get_output, self._outputs))
+ def column_lineage(self) -> list[ColumnLineageDTO]:
+ return list(map(self._get_column_lineage, self._column_lineage))
+
def schemas(self) -> list[SchemaDTO]:
return list(map(self._get_schema, self._schemas))
@@ -231,7 +252,7 @@ def users(self) -> list[UserDTO]:
return list(map(self._get_user, self._users))
-def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
+def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: # noqa: WPS231
result = BatchExtractionResult()
for event in events:
@@ -250,6 +271,11 @@ def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
for symlink in symlinks: # noqa: WPS440
result.add_dataset_symlink(symlink)
+ for dataset in event.inputs + event.outputs:
+ column_lineage = extract_column_lineage(operation, dataset)
+ for item in column_lineage:
+ result.add_column_lineage(item)
+
else:
run = extract_run(event)
result.add_run(run)
diff --git a/data_rentgen/consumer/extractors/column_lineage.py b/data_rentgen/consumer/extractors/column_lineage.py
new file mode 100644
index 00000000..74862176
--- /dev/null
+++ b/data_rentgen/consumer/extractors/column_lineage.py
@@ -0,0 +1,108 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+from collections import defaultdict
+
+from data_rentgen.consumer.extractors.dataset import extract_dataset
+from data_rentgen.consumer.openlineage.dataset import OpenLineageDataset
+from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
+ OpenLineageColumnLineageDatasetFacetFieldTransformation,
+)
+from data_rentgen.dto import (
+ ColumnLineageDTO,
+ DatasetColumnRelationDTO,
+ DatasetColumnRelationTypeDTO,
+)
+from data_rentgen.dto.operation import OperationDTO
+
+logger = logging.getLogger(__name__)
+
+TRANSFORMATION_SUBTYPE_MAP_MASKING = {
+ "TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION_MASKING,
+ "AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION_MASKING,
+}
+
+TRANSFORMATION_SUBTYPE_MAP = {
+ "IDENTITY": DatasetColumnRelationTypeDTO.IDENTITY,
+ "TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION,
+ "AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION,
+ "FILTER": DatasetColumnRelationTypeDTO.FILTER,
+ "JOIN": DatasetColumnRelationTypeDTO.JOIN,
+ "GROUP_BY": DatasetColumnRelationTypeDTO.GROUP_BY,
+ "SORT": DatasetColumnRelationTypeDTO.SORT,
+ "WINDOW": DatasetColumnRelationTypeDTO.WINDOW,
+ "CONDITIONAL": DatasetColumnRelationTypeDTO.CONDITIONAL,
+}
+
+
+def extract_dataset_column_relation_type(
+ transformation: OpenLineageColumnLineageDatasetFacetFieldTransformation,
+) -> DatasetColumnRelationTypeDTO:
+ result: DatasetColumnRelationTypeDTO | None = None
+ if transformation.subtype:
+ if transformation.masking:
+ result = TRANSFORMATION_SUBTYPE_MAP_MASKING.get(transformation.subtype)
+ else:
+ result = TRANSFORMATION_SUBTYPE_MAP.get(transformation.subtype)
+
+ return result or DatasetColumnRelationTypeDTO.UNKNOWN
+
+
+def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageDataset) -> list[ColumnLineageDTO]:
+ target_dataset_dto = extract_dataset(target_dataset)
+ if not target_dataset.facets.columnLineage:
+ return []
+
+ # Grouping column lineage by source+target dataset. This is unique combination within operation,
+ # so we can use it to generate the same fingerprint for all dataset column relations
+ datasets = {target_dataset_dto.unique_key: target_dataset_dto}
+ dataset_column_relations = defaultdict(list)
+
+ # direct lineage (source_column -> target_column)
+ for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items():
+ for input_field in raw_column_lineage.inputFields:
+ source_dataset_dto = extract_dataset(input_field)
+ datasets[source_dataset_dto.unique_key] = source_dataset_dto
+
+ column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
+ for transformation in input_field.transformations:
+ # OL integration for Spark before v1.23 (or with columnLineage.datasetLineageEnabled=false, which is still default)
+ # produced INDIRECT lineage for each combination source_column x target_column,
+ # which is amlost the cartesian join. It is VERY expensive to handle, just ignore.
+ # See https://github.com/OpenLineage/OpenLineage/pull/3097
+ if transformation.type == "INDIRECT":
+ continue
+
+ column_relation = DatasetColumnRelationDTO(
+ type=extract_dataset_column_relation_type(transformation),
+ source_column=input_field.field,
+ target_column=field,
+ )
+ dataset_column_relations[column_lineage_key].append(column_relation)
+
+ # indirect lineage (source_column -> target_dataset),
+ # added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true
+ for input_field in target_dataset.facets.columnLineage.dataset:
+ source_dataset_dto = extract_dataset(input_field)
+ datasets[source_dataset_dto.unique_key] = source_dataset_dto
+
+ column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
+ for transformation in input_field.transformations:
+ column_relation = DatasetColumnRelationDTO(
+ type=extract_dataset_column_relation_type(transformation),
+ source_column=input_field.field,
+ )
+ dataset_column_relations[column_lineage_key].append(column_relation)
+
+ # merge results into DTO objects
+ return [
+ ColumnLineageDTO(
+ operation=operation,
+ source_dataset=datasets[source_dataset_dto_key],
+ target_dataset=datasets[target_dataset_dto_key],
+ dataset_column_relations=relations,
+ )
+ for (source_dataset_dto_key, target_dataset_dto_key), relations in dataset_column_relations.items()
+ if dataset_column_relations
+ ]
diff --git a/data_rentgen/consumer/extractors/dataset.py b/data_rentgen/consumer/extractors/dataset.py
index b289712c..166189be 100644
--- a/data_rentgen/consumer/extractors/dataset.py
+++ b/data_rentgen/consumer/extractors/dataset.py
@@ -10,6 +10,9 @@
OpenLineageSymlinkIdentifier,
OpenLineageSymlinkType,
)
+from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
+ OpenLineageColumnLineageDatasetFacetFieldRef,
+)
from data_rentgen.dto import (
DatasetDTO,
DatasetSymlinkDTO,
@@ -19,6 +22,9 @@
logger = logging.getLogger(__name__)
+OpenLineageDatasetLike = (
+ OpenLineageDataset | OpenLineageSymlinkIdentifier | OpenLineageColumnLineageDatasetFacetFieldRef
+)
METASTORE = DatasetSymlinkTypeDTO.METASTORE
WAREHOUSE = DatasetSymlinkTypeDTO.WAREHOUSE
@@ -49,7 +55,7 @@ def connect_dataset_with_symlinks(
return sorted(result, key=lambda x: x.type)
-def extract_dataset(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> DatasetDTO:
+def extract_dataset(dataset: OpenLineageDatasetLike) -> DatasetDTO:
return DatasetDTO(
name=dataset.name,
location=extract_dataset_location(dataset),
@@ -103,7 +109,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT
return dataset_dto, symlinks
-def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> LocationDTO:
+def extract_dataset_location(dataset: OpenLineageDatasetLike) -> LocationDTO:
namespace = dataset.namespace
if namespace == "file":
# TODO: remove after https://github.com/OpenLineage/OpenLineage/issues/2709
@@ -122,8 +128,8 @@ def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIde
)
-def extract_dataset_format(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> str | None:
- if isinstance(dataset, OpenLineageSymlinkIdentifier):
+def extract_dataset_format(dataset: OpenLineageDatasetLike) -> str | None:
+ if isinstance(dataset, (OpenLineageSymlinkIdentifier, OpenLineageColumnLineageDatasetFacetFieldRef)):
return None
match dataset.facets.storage:
diff --git a/data_rentgen/consumer/openlineage/dataset_facets/__init__.py b/data_rentgen/consumer/openlineage/dataset_facets/__init__.py
index f443c671..95d71b4b 100644
--- a/data_rentgen/consumer/openlineage/dataset_facets/__init__.py
+++ b/data_rentgen/consumer/openlineage/dataset_facets/__init__.py
@@ -7,6 +7,12 @@
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)
+from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
+ OpenLineageColumnLineageDatasetFacet,
+ OpenLineageColumnLineageDatasetFacetField,
+ OpenLineageColumnLineageDatasetFacetFieldRef,
+ OpenLineageColumnLineageDatasetFacetFieldTransformation,
+)
from data_rentgen.consumer.openlineage.dataset_facets.datasource import (
OpenLineageDatasourceDatasetFacet,
)
@@ -55,6 +61,10 @@
"OpenLineageDatasetFacets",
"OpenLineageInputDatasetFacets",
"OpenLineageOutputDatasetFacets",
+ "OpenLineageColumnLineageDatasetFacet",
+ "OpenLineageColumnLineageDatasetFacetField",
+ "OpenLineageColumnLineageDatasetFacetFieldRef",
+ "OpenLineageColumnLineageDatasetFacetFieldTransformation",
]
@@ -69,6 +79,7 @@ class OpenLineageDatasetFacets(OpenLineageBase):
datasetSchema: OpenLineageSchemaDatasetFacet | None = Field(default=None, alias="schema")
storage: OpenLineageStorageDatasetFacet | None = None
symlinks: OpenLineageSymlinksDatasetFacet | None = None
+ columnLineage: OpenLineageColumnLineageDatasetFacet | None = None
class OpenLineageInputDatasetFacets(OpenLineageBase):
diff --git a/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py b/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py
new file mode 100644
index 00000000..0aebbcd4
--- /dev/null
+++ b/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py
@@ -0,0 +1,44 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from pydantic import Field
+
+from data_rentgen.consumer.openlineage.base import OpenLineageBase
+from data_rentgen.consumer.openlineage.dataset_facets.base import (
+ OpenLineageDatasetFacet,
+)
+
+
+class OpenLineageColumnLineageDatasetFacetFieldTransformation(OpenLineageBase):
+ """Dataset facet describing field transformation."""
+
+ type: str
+ subtype: str | None = None
+ description: str | None = None
+ masking: bool = False
+
+
+class OpenLineageColumnLineageDatasetFacetFieldRef(OpenLineageBase):
+ """Dataset facet describing field reference for column lineage facet."""
+
+ namespace: str
+ name: str
+ field: str
+ transformations: list[OpenLineageColumnLineageDatasetFacetFieldTransformation]
+
+
+class OpenLineageColumnLineageDatasetFacetField(OpenLineageBase):
+ """Dataset facet describing column lineage for specific field."""
+
+ inputFields: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
+ transformationDescription: str | None = None
+ transformationType: str | None = None
+
+
+class OpenLineageColumnLineageDatasetFacet(OpenLineageDatasetFacet):
+ """Dataset facet describing column lineage.
+ See [InputStatisticsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ColumnLineageDatasetFacet.json).
+ """
+
+ fields: dict[str, OpenLineageColumnLineageDatasetFacetField] = Field(default_factory=dict)
+ dataset: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
diff --git a/data_rentgen/consumer/subscribers.py b/data_rentgen/consumer/subscribers.py
index a953eb0a..71cffd37 100644
--- a/data_rentgen/consumer/subscribers.py
+++ b/data_rentgen/consumer/subscribers.py
@@ -34,7 +34,11 @@ async def runs_events_subscriber(
logger.info("Saved successfully")
-async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logger: Logger) -> None: # noqa: WPS217
+async def save_to_db( # noqa: WPS217, WPS213
+ data: BatchExtractionResult,
+ unit_of_work: UnitOfWork,
+ logger: Logger,
+) -> None:
# To avoid deadlocks when parallel consumer instances insert/update the same row,
# commit changes for each row instead of committing the whole batch. Yes, this cloud be slow.
@@ -95,3 +99,12 @@ async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logg
logger.debug("Creating outputs")
await unit_of_work.output.create_or_update_bulk(data.outputs())
+
+ # If something went wrong here, at least we will have inputs/outputs
+ async with unit_of_work:
+ column_lineage = data.column_lineage()
+ logger.debug("Creating dataset column relations")
+ await unit_of_work.dataset_column_relation.create_bulk_for_column_lineage(column_lineage)
+
+ logger.debug("Creating column lineage")
+ await unit_of_work.column_lineage.create_bulk(column_lineage)
diff --git a/data_rentgen/db/migrations/env.py b/data_rentgen/db/migrations/env.py
index 05d340b5..cf6a25c4 100644
--- a/data_rentgen/db/migrations/env.py
+++ b/data_rentgen/db/migrations/env.py
@@ -23,7 +23,7 @@
target_metadata = (Base.metadata,)
-PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y"]
+PARTITION_PREFIXES = ["run_y", "operation_y", "input_y", "output_y", "column_lineage_y"]
def include_all_except_partitions(object, name, type_, reflected, compare_to):
diff --git a/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py b/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py
new file mode 100644
index 00000000..0b66da7c
--- /dev/null
+++ b/data_rentgen/db/migrations/versions/2025-02-04_15c0a22b8566_create_column_lineage.py
@@ -0,0 +1,79 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+"""Create column lineage
+
+Revision ID: 15c0a22b8566
+Revises: f017d4c58658
+Create Date: 2025-02-04 11:30:21.190161
+
+"""
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "15c0a22b8566"
+down_revision = "f017d4c58658"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "dataset_column_relation",
+ sa.Column("id", sa.BigInteger(), nullable=False),
+ sa.Column("fingerprint", sa.UUID(), nullable=False),
+ sa.Column("source_column", sa.String(length=255), nullable=False),
+ sa.Column("target_column", sa.String(length=255), nullable=True),
+ sa.Column("type", sa.SmallInteger(), nullable=False),
+ sa.PrimaryKeyConstraint("id", name=op.f("pk__dataset_column_relation")),
+ )
+ op.create_index(
+ op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"),
+ "dataset_column_relation",
+ ["fingerprint", "source_column", sa.text("coalesce(target_column, '')")],
+ unique=True,
+ )
+
+ op.create_table(
+ "column_lineage",
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("id", sa.UUID(), nullable=False),
+ sa.Column("operation_id", sa.UUID(), nullable=False),
+ sa.Column("run_id", sa.UUID(), nullable=False),
+ sa.Column("job_id", sa.BigInteger(), nullable=False),
+ sa.Column("source_dataset_id", sa.BigInteger(), nullable=False),
+ sa.Column("target_dataset_id", sa.BigInteger(), nullable=False),
+ sa.Column("fingerprint", sa.UUID(), nullable=False),
+ sa.PrimaryKeyConstraint("created_at", "id", name=op.f("pk__column_lineage")),
+ postgresql_partition_by="RANGE (created_at)",
+ )
+ op.create_index(op.f("ix__column_lineage__job_id"), "column_lineage", ["job_id"], unique=False)
+ op.create_index(op.f("ix__column_lineage__operation_id"), "column_lineage", ["operation_id"], unique=False)
+ op.create_index(op.f("ix__column_lineage__run_id"), "column_lineage", ["run_id"], unique=False)
+ op.create_index(
+ op.f("ix__column_lineage__source_dataset_id"),
+ "column_lineage",
+ ["source_dataset_id"],
+ unique=False,
+ )
+ op.create_index(
+ op.f("ix__column_lineage__target_dataset_id"),
+ "column_lineage",
+ ["target_dataset_id"],
+ unique=False,
+ )
+
+
+def downgrade() -> None:
+ op.drop_index(op.f("ix__column_lineage__target_dataset_id"), table_name="column_lineage")
+ op.drop_index(op.f("ix__column_lineage__source_dataset_id"), table_name="column_lineage")
+ op.drop_index(op.f("ix__column_lineage__run_id"), table_name="column_lineage")
+ op.drop_index(op.f("ix__column_lineage__operation_id"), table_name="column_lineage")
+ op.drop_index(op.f("ix__column_lineage__job_id"), table_name="column_lineage")
+ op.drop_table("column_lineage")
+
+ op.drop_index(
+ op.f("ix__dataset_column_relation__fingerprint_source_column_target_column"),
+ table_name="dataset_column_relation",
+ )
+ op.drop_table("dataset_column_relation")
diff --git a/data_rentgen/db/models/__init__.py b/data_rentgen/db/models/__init__.py
index e587c5d7..c97b16ba 100644
--- a/data_rentgen/db/models/__init__.py
+++ b/data_rentgen/db/models/__init__.py
@@ -3,9 +3,14 @@
from data_rentgen.db.models.address import Address
from data_rentgen.db.models.base import Base
+from data_rentgen.db.models.column_lineage import ColumnLineage
from data_rentgen.db.models.custom_properties import CustomProperties
from data_rentgen.db.models.custom_user_properties import CustomUserProperties
from data_rentgen.db.models.dataset import Dataset
+from data_rentgen.db.models.dataset_column_relation import (
+ DatasetColumnRelation,
+ DatasetColumnRelationType,
+)
from data_rentgen.db.models.dataset_symlink import DatasetSymlink, DatasetSymlinkType
from data_rentgen.db.models.input import Input
from data_rentgen.db.models.job import Job, JobType
@@ -19,9 +24,12 @@
__all__ = [
"Address",
"Base",
+ "ColumnLineage",
"CustomProperties",
"CustomUserProperties",
"Dataset",
+ "DatasetColumnRelation",
+ "DatasetColumnRelationType",
"DatasetSymlink",
"DatasetSymlinkType",
"Input",
diff --git a/data_rentgen/db/models/column_lineage.py b/data_rentgen/db/models/column_lineage.py
new file mode 100644
index 00000000..c6e408bb
--- /dev/null
+++ b/data_rentgen/db/models/column_lineage.py
@@ -0,0 +1,117 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from datetime import datetime
+
+from sqlalchemy import UUID as SQL_UUID
+from sqlalchemy import (
+ BigInteger,
+ DateTime,
+ PrimaryKeyConstraint,
+)
+from sqlalchemy.orm import Mapped, mapped_column, relationship
+from uuid6 import UUID
+
+from data_rentgen.db.models.base import Base
+from data_rentgen.db.models.dataset import Dataset
+from data_rentgen.db.models.dataset_column_relation import DatasetColumnRelation
+from data_rentgen.db.models.job import Job
+from data_rentgen.db.models.operation import Operation
+from data_rentgen.db.models.run import Run
+
+
+# no foreign keys to avoid scanning all the partitions
+class ColumnLineage(Base):
+ __tablename__ = "column_lineage"
+ __table_args__ = (
+ PrimaryKeyConstraint("created_at", "id"),
+ {"postgresql_partition_by": "RANGE (created_at)"},
+ )
+
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True),
+ nullable=False,
+ doc="Timestamp component of UUID, used for table partitioning",
+ )
+ id: Mapped[UUID] = mapped_column(SQL_UUID)
+
+ operation_id: Mapped[UUID] = mapped_column(
+ SQL_UUID,
+ index=True,
+ nullable=False,
+ doc="Operation caused this column lineage",
+ )
+ operation: Mapped[Operation] = relationship(
+ Operation,
+ primaryjoin="ColumnLineage.operation_id == Operation.id",
+ lazy="noload",
+ foreign_keys=[operation_id],
+ )
+
+ run_id: Mapped[UUID] = mapped_column(
+ SQL_UUID,
+ index=True,
+ nullable=False,
+ doc="Run the operation is bound to",
+ )
+ run: Mapped[Run] = relationship(
+ Run,
+ primaryjoin="ColumnLineage.run_id == Run.id",
+ lazy="noload",
+ foreign_keys=[run_id],
+ )
+
+ job_id: Mapped[int] = mapped_column(
+ BigInteger,
+ index=True,
+ nullable=False,
+ doc="Parent job of run",
+ )
+ job: Mapped[Job] = relationship(
+ Job,
+ primaryjoin="ColumnLineage.job_id == Job.id",
+ lazy="noload",
+ foreign_keys=[job_id],
+ )
+
+ source_dataset_id: Mapped[int] = mapped_column(
+ BigInteger,
+ index=True,
+ nullable=False,
+ doc="Dataset the data is originated from",
+ )
+ source_dataset: Mapped[Dataset] = relationship(
+ Dataset,
+ primaryjoin="ColumnLineage.source_dataset_id == Dataset.id",
+ lazy="noload",
+ foreign_keys=[source_dataset_id],
+ )
+
+ target_dataset_id: Mapped[int] = mapped_column(
+ BigInteger,
+ index=True,
+ nullable=False,
+ doc="Dataset the data is saved to",
+ )
+ target_dataset: Mapped[Dataset] = relationship(
+ Dataset,
+ primaryjoin="ColumnLineage.target_dataset_id == Dataset.id",
+ lazy="noload",
+ foreign_keys=[target_dataset_id],
+ )
+
+ fingerprint: Mapped[UUID] = mapped_column(
+ SQL_UUID,
+ index=False,
+ nullable=False,
+ doc="Datase column relation fingerprint",
+ )
+ dataset_column_relations: Mapped[list[DatasetColumnRelation]] = relationship(
+ DatasetColumnRelation,
+ uselist=True,
+ primaryjoin="ColumnLineage.fingerprint == DatasetColumnRelation.fingerprint",
+ lazy="noload",
+ foreign_keys=[fingerprint],
+ )
diff --git a/data_rentgen/db/models/dataset_column_relation.py b/data_rentgen/db/models/dataset_column_relation.py
new file mode 100644
index 00000000..3c61113e
--- /dev/null
+++ b/data_rentgen/db/models/dataset_column_relation.py
@@ -0,0 +1,89 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from enum import Flag
+
+from sqlalchemy import UUID as SQL_UUID
+from sqlalchemy import (
+ BigInteger,
+ Column,
+ Index,
+ SmallInteger,
+ String,
+ func,
+)
+from sqlalchemy.orm import Mapped, mapped_column
+from uuid6 import UUID
+
+from data_rentgen.db.models.base import Base
+
+
+class DatasetColumnRelationType(Flag):
+ # See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40
+ # Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough
+ UNKNOWN = 1
+
+ # Direct
+ IDENTITY = 2
+ TRANSFORMATION = 4
+ TRANSFORMATION_MASKING = 8
+ AGGREGATION = 16
+ AGGREGATION_MASKING = 32
+
+ # Indirect
+ FILTER = 64
+ JOIN = 128
+ GROUP_BY = 256
+ SORT = 512
+ WINDOW = 1024
+ CONDITIONAL = 2048
+
+
+# no foreign keys to avoid scanning all the partitions
+class DatasetColumnRelation(Base):
+ __tablename__ = "dataset_column_relation"
+ __table_args__ = (
+ Index(
+ None,
+ Column("fingerprint"),
+ Column("source_column"),
+ # NULLs are distinct by default, we have to convert them to something else.
+ # This is mostly for compatibility with PG <15, there is no `NULLS NOT DISTINCT` option
+ func.coalesce(Column("target_column"), ""),
+ unique=True,
+ ),
+ )
+
+ id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
+ fingerprint: Mapped[UUID] = mapped_column(
+ SQL_UUID,
+ index=False,
+ nullable=False,
+ doc="Schema SHA-1 digest based used for grouping relations together. Currently this is in form of UUID",
+ )
+
+ source_column: Mapped[str] = mapped_column(
+ String(length=255),
+ index=False,
+ nullable=False,
+ doc="Source dataset column the data is originated from",
+ )
+
+ target_column: Mapped[str | None] = mapped_column(
+ String(length=255),
+ index=False,
+ nullable=True,
+ doc=(
+ "Target dataset column the data is saved to. "
+ "NULL means the entire target dataset depends on source column"
+ ),
+ )
+
+ type: Mapped[DatasetColumnRelationType] = mapped_column(
+ SmallInteger(),
+ index=False,
+ nullable=False,
+ doc="Column relation type",
+ )
diff --git a/data_rentgen/db/repositories/column_lineage.py b/data_rentgen/db/repositories/column_lineage.py
new file mode 100644
index 00000000..bd18ea27
--- /dev/null
+++ b/data_rentgen/db/repositories/column_lineage.py
@@ -0,0 +1,57 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from uuid import UUID
+
+from sqlalchemy.dialects.postgresql import insert
+
+from data_rentgen.db.models import ColumnLineage
+from data_rentgen.db.repositories.base import Repository
+from data_rentgen.db.utils.uuid import (
+ extract_timestamp_from_uuid,
+ generate_incremental_uuid,
+)
+from data_rentgen.dto import ColumnLineageDTO
+
+
+class ColumnLineageRepository(Repository[ColumnLineage]):
+ def get_id(self, item: ColumnLineageDTO) -> UUID:
+ # `created_at' field of lineage should be the same as operation's,
+ # to avoid scanning all partitions and speed up queries
+ created_at = extract_timestamp_from_uuid(item.operation.id)
+
+ # instead of using UniqueConstraint on multiple fields, which may produce a lot of table scans,
+ # use them to calculate unique id
+ id_components = [
+ str(item.operation.id),
+ str(item.source_dataset.id),
+ str(item.target_dataset.id),
+ str(item.fingerprint),
+ ]
+ return generate_incremental_uuid(created_at, ".".join(id_components))
+
+ async def create_bulk(self, items: list[ColumnLineageDTO]):
+ if not items:
+ return
+
+ insert_statement = insert(ColumnLineage)
+ statement = insert_statement.on_conflict_do_nothing(
+ index_elements=[ColumnLineage.created_at, ColumnLineage.id],
+ )
+
+ await self._session.execute(
+ statement,
+ [
+ {
+ "id": self.get_id(item),
+ "created_at": extract_timestamp_from_uuid(item.operation.id),
+ "operation_id": item.operation.id,
+ "run_id": item.operation.run.id,
+ "job_id": item.operation.run.job.id, # type: ignore[arg-type]
+ "source_dataset_id": item.source_dataset.id,
+ "target_dataset_id": item.target_dataset.id,
+ "fingerprint": item.fingerprint,
+ }
+ for item in items
+ ],
+ )
diff --git a/data_rentgen/db/repositories/dataset_column_relation.py b/data_rentgen/db/repositories/dataset_column_relation.py
new file mode 100644
index 00000000..f3d28a25
--- /dev/null
+++ b/data_rentgen/db/repositories/dataset_column_relation.py
@@ -0,0 +1,73 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from collections import defaultdict
+from uuid import UUID
+
+from sqlalchemy import any_, select, text
+from sqlalchemy.dialects.postgresql import insert
+
+from data_rentgen.db.models import ColumnLineage
+from data_rentgen.db.models.dataset_column_relation import (
+ DatasetColumnRelation,
+ DatasetColumnRelationType,
+)
+from data_rentgen.db.repositories.base import Repository
+from data_rentgen.dto import ColumnLineageDTO
+
+
+class DatasetColumnRelationRepository(Repository[ColumnLineage]):
+ async def create_bulk_for_column_lineage(self, items: list[ColumnLineageDTO]):
+ if not items:
+ return
+
+ # small optimization to avoid creating the same relations over and over.
+ # although we're using ON CONFLICT DO NOTHING, PG still has to perform index scans
+ # and to get next sequence value for each row
+ fingerprints_to_create = await self._get_missing_fingerprints([item.fingerprint for item in items])
+ relations_to_create = [item for item in items if item.fingerprint in fingerprints_to_create]
+
+ if relations_to_create:
+ await self._create_dataset_column_relations_bulk(relations_to_create)
+
+ async def _get_missing_fingerprints(self, fingerprints: list[UUID]) -> set[UUID]:
+ existing = await self._session.execute(
+ select(DatasetColumnRelation.fingerprint.distinct()).where(
+ DatasetColumnRelation.fingerprint == any_(fingerprints), # type: ignore[arg-type]
+ ),
+ )
+
+ return set(fingerprints) - set(existing.scalars().all())
+
+ async def _create_dataset_column_relations_bulk(self, items: list[ColumnLineageDTO]):
+ # we don't have to return anything, so there is no need to use o_conflict_update.
+ # also rows are immutable, so there is no need to acquire any lock
+ insert_statement = insert(DatasetColumnRelation).on_conflict_do_nothing(
+ index_elements=[
+ DatasetColumnRelation.fingerprint,
+ DatasetColumnRelation.source_column,
+ text("coalesce(target_column, '')"),
+ ],
+ )
+
+ # several ColumnLineageDTO may have the same set of column relations, deduplicate them
+ to_insert = defaultdict(list)
+ for lineage in items:
+ for column_relation in lineage.column_relations:
+ key = (lineage.fingerprint, column_relation.source_column, column_relation.target_column)
+ to_insert[key].append(column_relation)
+
+ await self._session.execute(
+ insert_statement,
+ [
+ {
+ # id is autoincremental
+ "fingerprint": fingerprint,
+ "source_column": relation.source_column,
+ "target_column": relation.target_column,
+ "type": DatasetColumnRelationType(relation.type).value,
+ }
+ for (fingerprint, *_), relations in to_insert.items()
+ for relation in relations
+ ],
+ )
diff --git a/data_rentgen/db/scripts/create_partitions.py b/data_rentgen/db/scripts/create_partitions.py
index 127d139d..5525dd5f 100755
--- a/data_rentgen/db/scripts/create_partitions.py
+++ b/data_rentgen/db/scripts/create_partitions.py
@@ -18,13 +18,20 @@
from data_rentgen.db.factory import create_session_factory
from data_rentgen.db.models import Input, Operation, Output, Run
+from data_rentgen.db.models.column_lineage import ColumnLineage
from data_rentgen.db.settings import DatabaseSettings
from data_rentgen.logging.settings import LoggingSettings
from data_rentgen.logging.setup_logging import setup_logging
logger = logging.getLogger(__name__)
-PARTITIONED_TABLES = [Run.__tablename__, Operation.__tablename__, Input.__tablename__, Output.__tablename__]
+PARTITIONED_TABLES = [
+ Run.__tablename__,
+ Operation.__tablename__,
+ Input.__tablename__,
+ Output.__tablename__,
+ ColumnLineage.__tablename__,
+]
class Granularity(str, Enum):
diff --git a/data_rentgen/dto/__init__.py b/data_rentgen/dto/__init__.py
index 8b08361a..dd10de3d 100644
--- a/data_rentgen/dto/__init__.py
+++ b/data_rentgen/dto/__init__.py
@@ -1,7 +1,12 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
+from data_rentgen.dto.column_lineage import ColumnLineageDTO
from data_rentgen.dto.dataset import DatasetDTO
+from data_rentgen.dto.dataset_column_relation import (
+ DatasetColumnRelationDTO,
+ DatasetColumnRelationTypeDTO,
+)
from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO, DatasetSymlinkTypeDTO
from data_rentgen.dto.input import InputDTO
from data_rentgen.dto.job import JobDTO, JobTypeDTO
@@ -18,7 +23,10 @@
from data_rentgen.dto.user import UserDTO
__all__ = [
+ "ColumnLineageDTO",
"DatasetDTO",
+ "DatasetColumnRelationDTO",
+ "DatasetColumnRelationTypeDTO",
"DatasetSymlinkDTO",
"DatasetSymlinkTypeDTO",
"LocationDTO",
diff --git a/data_rentgen/dto/column_lineage.py b/data_rentgen/dto/column_lineage.py
new file mode 100644
index 00000000..bb6373b1
--- /dev/null
+++ b/data_rentgen/dto/column_lineage.py
@@ -0,0 +1,55 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from dataclasses import InitVar, dataclass, field
+from functools import cached_property
+from uuid import UUID
+
+from uuid6 import UUID as UUIDv6
+
+from data_rentgen.db.utils.uuid import generate_static_uuid
+from data_rentgen.dto.dataset import DatasetDTO
+from data_rentgen.dto.dataset_column_relation import (
+ DatasetColumnRelationDTO,
+ merge_dataset_column_relations,
+)
+from data_rentgen.dto.operation import OperationDTO
+
+
+@dataclass
+class ColumnLineageDTO:
+ operation: OperationDTO
+ source_dataset: DatasetDTO
+ target_dataset: DatasetDTO
+ dataset_column_relations: InitVar[list[DatasetColumnRelationDTO]]
+ _dataset_column_relations: list[DatasetColumnRelationDTO] = field(default_factory=list, init=False)
+ # id is generated using other ids combination
+ id: UUIDv6 | None = None
+
+ def __post_init__(self, dataset_column_relations: list[DatasetColumnRelationDTO]):
+ self._dataset_column_relations = merge_dataset_column_relations(dataset_column_relations)
+
+ @cached_property
+ def unique_key(self) -> tuple:
+ return (self.operation.unique_key, self.source_dataset.unique_key, self.target_dataset.unique_key)
+
+ @property
+ def column_relations(self) -> list[DatasetColumnRelationDTO]:
+ return self._dataset_column_relations
+
+ @cached_property
+ def fingerprint(self) -> UUID:
+ id_components = [(*item.unique_key, item.type) for item in self.column_relations]
+ str_components = [".".join(map(str, item)) for item in id_components]
+ return generate_static_uuid(",".join(str_components))
+
+ def merge(self, new: ColumnLineageDTO) -> ColumnLineageDTO:
+ return ColumnLineageDTO(
+ operation=self.operation.merge(new.operation),
+ source_dataset=self.source_dataset.merge(new.source_dataset),
+ target_dataset=self.target_dataset.merge(new.target_dataset),
+ dataset_column_relations=self.column_relations + new.column_relations,
+ id=new.id or self.id,
+ )
diff --git a/data_rentgen/dto/dataset_column_relation.py b/data_rentgen/dto/dataset_column_relation.py
new file mode 100644
index 00000000..4432d4a9
--- /dev/null
+++ b/data_rentgen/dto/dataset_column_relation.py
@@ -0,0 +1,68 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import IntFlag
+from functools import cached_property
+
+from uuid6 import UUID
+
+
+class DatasetColumnRelationTypeDTO(IntFlag):
+ # See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40
+ # Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough
+ UNKNOWN = 1
+
+ # Direct
+ IDENTITY = 2
+ TRANSFORMATION = 4
+ TRANSFORMATION_MASKING = 8
+ AGGREGATION = 16
+ AGGREGATION_MASKING = 32
+
+ # Indirect
+ FILTER = 64
+ JOIN = 128
+ GROUP_BY = 256
+ SORT = 512
+ WINDOW = 1024
+ CONDITIONAL = 2048
+
+
+@dataclass
+class DatasetColumnRelationDTO:
+ type: DatasetColumnRelationTypeDTO
+ source_column: str
+ target_column: str | None = None
+ # id is generated using other ids combination
+ fingerprint: UUID | None = None
+ # description is always "", see io.openlineage.spark.agent.lifecycle.plan.column.TransformationInfo
+
+ @cached_property
+ def unique_key(self) -> tuple:
+ return ( # noqa: WPS227
+ self.source_column,
+ self.target_column or "",
+ )
+
+ def merge(self, new: DatasetColumnRelationDTO) -> DatasetColumnRelationDTO:
+ return DatasetColumnRelationDTO(
+ source_column=self.source_column,
+ target_column=self.target_column,
+ type=self.type | new.type,
+ fingerprint=new.fingerprint or self.fingerprint,
+ )
+
+
+def merge_dataset_column_relations(relations: list[DatasetColumnRelationDTO]) -> list[DatasetColumnRelationDTO]:
+ result: dict[tuple, DatasetColumnRelationDTO] = {}
+ for relation in relations:
+ if relation.unique_key in result:
+ existing_relation = result[relation.unique_key]
+ result[relation.unique_key] = existing_relation.merge(relation)
+ else:
+ result[relation.unique_key] = relation
+
+ return sorted(result.values(), key=lambda item: item.unique_key)
diff --git a/data_rentgen/services/uow.py b/data_rentgen/services/uow.py
index 918cabd0..7e721522 100644
--- a/data_rentgen/services/uow.py
+++ b/data_rentgen/services/uow.py
@@ -5,7 +5,11 @@
from sqlalchemy.ext.asyncio import AsyncSession
from typing_extensions import Annotated
+from data_rentgen.db.repositories.column_lineage import ColumnLineageRepository
from data_rentgen.db.repositories.dataset import DatasetRepository
+from data_rentgen.db.repositories.dataset_column_relation import (
+ DatasetColumnRelationRepository,
+)
from data_rentgen.db.repositories.dataset_symlink import DatasetSymlinkRepository
from data_rentgen.db.repositories.input import InputRepository
from data_rentgen.db.repositories.job import JobRepository
@@ -33,6 +37,8 @@ def __init__(
self.schema = SchemaRepository(session)
self.input = InputRepository(session)
self.output = OutputRepository(session)
+ self.dataset_column_relation = DatasetColumnRelationRepository(session)
+ self.column_lineage = ColumnLineageRepository(session)
self.user = UserRepository(session)
async def __aenter__(self):
diff --git a/docs/quickstart/setup_spark.rst b/docs/quickstart/setup_spark.rst
index c074f011..a528bb1b 100644
--- a/docs/quickstart/setup_spark.rst
+++ b/docs/quickstart/setup_spark.rst
@@ -7,7 +7,7 @@ Requirements
------------
* `Apache Spark `_ 3.x or higher
-* `OpenLineage integration for Spark `_ 1.19.0 or higher
+* `OpenLineage integration for Spark `_ 1.23.0 or higher
Setup
-----
@@ -23,7 +23,7 @@ Setup
# install OpenLineage integration and Kafka client
.config(
"spark.jars.packages",
- "io.openlineage:openlineage-spark_2.12:1.24.2,org.apache.kafka:kafka-clients:3.9.0",
+ "io.openlineage:openlineage-spark_2.12:1.27.0,org.apache.kafka:kafka-clients:3.9.0",
)
.config(
"spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener"
@@ -65,6 +65,7 @@ Setup
r"(.*?)(?(?:\/[^\/=]+=[^\/=]+)+)",
)
.config("spark.openlineage.jobName.appendDatasetName", "false")
+ .config("spark.openlineage.columnLineage.datasetLineageEnabled", "true")
.getOrCreate()
)
diff --git a/docs/reference/database/structure.rst b/docs/reference/database/structure.rst
index 850cca9d..a42b50aa 100644
--- a/docs/reference/database/structure.rst
+++ b/docs/reference/database/structure.rst
@@ -128,6 +128,27 @@ Database structure
num_files: bigint
}
+ entity DatasetColumnRelation {
+ * id: bigint
+ ----
+ * fingerprint: uuid(v5)
+ * source_column: varchar(255)
+ * target_column: varchar(255) null
+ type: smallint
+ }
+
+ entity ColumnLineage {
+ * id: uuid(v7)
+ * created_at: timestamptz
+ ----
+ * operation_id: uuid(v7)
+ * run_id: uuid(v7)
+ * job_id: bigint
+ * source_dataset_id: bigint
+ * target_dataset_id: bigint
+ * fingerprint: uuid(v5)
+ }
+
Address ||--o{ Location
Dataset ||--o{ Location
@@ -153,4 +174,11 @@ Database structure
Output ||--o{ Dataset
Output |o--o{ Schema
+ ColumnLineage ||--o{ Operation
+ ColumnLineage ||--o{ Run
+ ColumnLineage ||--o{ Job
+ ColumnLineage "source_dataset_id" ||--o{ Dataset
+ ColumnLineage "target_dataset_id" ||--o{ Dataset
+ ColumnLineage ||--o{ DatasetColumnRelation
+
@enduml