diff --git a/.env.local b/.env.local
index cc0fb999..729536c6 100644
--- a/.env.local
+++ b/.env.local
@@ -7,6 +7,11 @@ export DATA_RENTGEN__KAFKA__SECURITY__TYPE=scram-sha256
export DATA_RENTGEN__KAFKA__SECURITY__USER=data_rentgen
export DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
export DATA_RENTGEN__KAFKA__COMPRESSION=zstd
+#export DATA_RENTGEN__CONSUMER__MAX_RECORDS=100
+# Handling events with a lot of column lineage takes so much time
+# that Kafka coodrinator consider worker as dead. Limit by total message size.
+# This value depends on the number of OL version, complexity of Spark jobs, number of Kafka partitions and number of workers.
+export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=200Kb
export DATA_RENTGEN__SERVER__DEBUG=true
diff --git a/data_rentgen/consumer/extractors/__init__.py b/data_rentgen/consumer/extractors/__init__.py
index a41c5000..e3703e35 100644
--- a/data_rentgen/consumer/extractors/__init__.py
+++ b/data_rentgen/consumer/extractors/__init__.py
@@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
from data_rentgen.consumer.extractors.batch import BatchExtractionResult, extract_batch
+from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.dataset import (
connect_dataset_with_symlinks,
extract_dataset,
@@ -15,6 +16,7 @@
from data_rentgen.consumer.extractors.schema import extract_schema
__all__ = [
+ "extract_column_lineage",
"extract_dataset_and_symlinks",
"extract_dataset",
"connect_dataset_with_symlinks",
diff --git a/data_rentgen/consumer/extractors/batch.py b/data_rentgen/consumer/extractors/batch.py
index 038e476e..e0d49afb 100644
--- a/data_rentgen/consumer/extractors/batch.py
+++ b/data_rentgen/consumer/extractors/batch.py
@@ -4,6 +4,7 @@
from typing import TypeVar
+from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.input import extract_input
from data_rentgen.consumer.extractors.operation import extract_operation
from data_rentgen.consumer.extractors.output import extract_output
@@ -11,7 +12,9 @@
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
from data_rentgen.dto import (
+ ColumnLineageDTO,
DatasetDTO,
+ DatasetSymlinkDTO,
InputDTO,
JobDTO,
LocationDTO,
@@ -21,12 +24,12 @@
SchemaDTO,
UserDTO,
)
-from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO
T = TypeVar(
"T",
LocationDTO,
DatasetDTO,
+ ColumnLineageDTO,
DatasetSymlinkDTO,
JobDTO,
RunDTO,
@@ -65,6 +68,7 @@ def __init__(self):
self._operations: dict[tuple, OperationDTO] = {}
self._inputs: dict[tuple, InputDTO] = {}
self._outputs: dict[tuple, OutputDTO] = {}
+ self._column_lineage: dict[tuple, ColumnLineageDTO] = {}
self._schemas: dict[tuple, SchemaDTO] = {}
self._users: dict[tuple, UserDTO] = {}
@@ -79,6 +83,7 @@ def __repr__(self):
f"operations={len(self._operations)}, "
f"inputs={len(self._inputs)}, "
f"outputs={len(self._outputs)}, "
+ f"column_lineage={len(self._column_lineage)}, "
f"schemas={len(self._schemas)}, "
f"users={len(self._users)}"
")"
@@ -139,6 +144,12 @@ def add_output(self, output: OutputDTO):
if output.schema:
self.add_schema(output.schema)
+ def add_column_lineage(self, lineage: ColumnLineageDTO):
+ self._add(self._column_lineage, lineage)
+ self.add_dataset(lineage.source_dataset)
+ self.add_dataset(lineage.target_dataset)
+ self.add_operation(lineage.operation)
+
def add_schema(self, schema: SchemaDTO):
self._add(self._schemas, schema)
@@ -200,6 +211,13 @@ def _get_output(self, output_key: tuple) -> OutputDTO:
output.schema = self._get_schema(output.schema.unique_key)
return output
+ def _get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO:
+ lineage = self._column_lineage[output_key]
+ lineage.operation = self._get_operation(lineage.operation.unique_key)
+ lineage.source_dataset = self._get_dataset(lineage.source_dataset.unique_key)
+ lineage.target_dataset = self._get_dataset(lineage.target_dataset.unique_key)
+ return lineage
+
def locations(self) -> list[LocationDTO]:
return list(map(self._get_location, self._locations))
@@ -224,6 +242,9 @@ def inputs(self) -> list[InputDTO]:
def outputs(self) -> list[OutputDTO]:
return list(map(self._get_output, self._outputs))
+ def column_lineage(self) -> list[ColumnLineageDTO]:
+ return list(map(self._get_column_lineage, self._column_lineage))
+
def schemas(self) -> list[SchemaDTO]:
return list(map(self._get_schema, self._schemas))
@@ -231,7 +252,7 @@ def users(self) -> list[UserDTO]:
return list(map(self._get_user, self._users))
-def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
+def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: # noqa: WPS231
result = BatchExtractionResult()
for event in events:
@@ -250,6 +271,11 @@ def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
for symlink in symlinks: # noqa: WPS440
result.add_dataset_symlink(symlink)
+ for dataset in event.inputs + event.outputs:
+ column_lineage = extract_column_lineage(operation, dataset)
+ for item in column_lineage:
+ result.add_column_lineage(item)
+
else:
run = extract_run(event)
result.add_run(run)
diff --git a/data_rentgen/consumer/extractors/column_lineage.py b/data_rentgen/consumer/extractors/column_lineage.py
new file mode 100644
index 00000000..74862176
--- /dev/null
+++ b/data_rentgen/consumer/extractors/column_lineage.py
@@ -0,0 +1,108 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+from collections import defaultdict
+
+from data_rentgen.consumer.extractors.dataset import extract_dataset
+from data_rentgen.consumer.openlineage.dataset import OpenLineageDataset
+from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
+ OpenLineageColumnLineageDatasetFacetFieldTransformation,
+)
+from data_rentgen.dto import (
+ ColumnLineageDTO,
+ DatasetColumnRelationDTO,
+ DatasetColumnRelationTypeDTO,
+)
+from data_rentgen.dto.operation import OperationDTO
+
+logger = logging.getLogger(__name__)
+
+TRANSFORMATION_SUBTYPE_MAP_MASKING = {
+ "TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION_MASKING,
+ "AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION_MASKING,
+}
+
+TRANSFORMATION_SUBTYPE_MAP = {
+ "IDENTITY": DatasetColumnRelationTypeDTO.IDENTITY,
+ "TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION,
+ "AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION,
+ "FILTER": DatasetColumnRelationTypeDTO.FILTER,
+ "JOIN": DatasetColumnRelationTypeDTO.JOIN,
+ "GROUP_BY": DatasetColumnRelationTypeDTO.GROUP_BY,
+ "SORT": DatasetColumnRelationTypeDTO.SORT,
+ "WINDOW": DatasetColumnRelationTypeDTO.WINDOW,
+ "CONDITIONAL": DatasetColumnRelationTypeDTO.CONDITIONAL,
+}
+
+
+def extract_dataset_column_relation_type(
+ transformation: OpenLineageColumnLineageDatasetFacetFieldTransformation,
+) -> DatasetColumnRelationTypeDTO:
+ result: DatasetColumnRelationTypeDTO | None = None
+ if transformation.subtype:
+ if transformation.masking:
+ result = TRANSFORMATION_SUBTYPE_MAP_MASKING.get(transformation.subtype)
+ else:
+ result = TRANSFORMATION_SUBTYPE_MAP.get(transformation.subtype)
+
+ return result or DatasetColumnRelationTypeDTO.UNKNOWN
+
+
+def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageDataset) -> list[ColumnLineageDTO]:
+ target_dataset_dto = extract_dataset(target_dataset)
+ if not target_dataset.facets.columnLineage:
+ return []
+
+ # Grouping column lineage by source+target dataset. This is unique combination within operation,
+ # so we can use it to generate the same fingerprint for all dataset column relations
+ datasets = {target_dataset_dto.unique_key: target_dataset_dto}
+ dataset_column_relations = defaultdict(list)
+
+ # direct lineage (source_column -> target_column)
+ for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items():
+ for input_field in raw_column_lineage.inputFields:
+ source_dataset_dto = extract_dataset(input_field)
+ datasets[source_dataset_dto.unique_key] = source_dataset_dto
+
+ column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
+ for transformation in input_field.transformations:
+ # OL integration for Spark before v1.23 (or with columnLineage.datasetLineageEnabled=false, which is still default)
+ # produced INDIRECT lineage for each combination source_column x target_column,
+ # which is amlost the cartesian join. It is VERY expensive to handle, just ignore.
+ # See https://github.com/OpenLineage/OpenLineage/pull/3097
+ if transformation.type == "INDIRECT":
+ continue
+
+ column_relation = DatasetColumnRelationDTO(
+ type=extract_dataset_column_relation_type(transformation),
+ source_column=input_field.field,
+ target_column=field,
+ )
+ dataset_column_relations[column_lineage_key].append(column_relation)
+
+ # indirect lineage (source_column -> target_dataset),
+ # added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true
+ for input_field in target_dataset.facets.columnLineage.dataset:
+ source_dataset_dto = extract_dataset(input_field)
+ datasets[source_dataset_dto.unique_key] = source_dataset_dto
+
+ column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
+ for transformation in input_field.transformations:
+ column_relation = DatasetColumnRelationDTO(
+ type=extract_dataset_column_relation_type(transformation),
+ source_column=input_field.field,
+ )
+ dataset_column_relations[column_lineage_key].append(column_relation)
+
+ # merge results into DTO objects
+ return [
+ ColumnLineageDTO(
+ operation=operation,
+ source_dataset=datasets[source_dataset_dto_key],
+ target_dataset=datasets[target_dataset_dto_key],
+ dataset_column_relations=relations,
+ )
+ for (source_dataset_dto_key, target_dataset_dto_key), relations in dataset_column_relations.items()
+ if dataset_column_relations
+ ]
diff --git a/data_rentgen/consumer/extractors/dataset.py b/data_rentgen/consumer/extractors/dataset.py
index b289712c..166189be 100644
--- a/data_rentgen/consumer/extractors/dataset.py
+++ b/data_rentgen/consumer/extractors/dataset.py
@@ -10,6 +10,9 @@
OpenLineageSymlinkIdentifier,
OpenLineageSymlinkType,
)
+from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
+ OpenLineageColumnLineageDatasetFacetFieldRef,
+)
from data_rentgen.dto import (
DatasetDTO,
DatasetSymlinkDTO,
@@ -19,6 +22,9 @@
logger = logging.getLogger(__name__)
+OpenLineageDatasetLike = (
+ OpenLineageDataset | OpenLineageSymlinkIdentifier | OpenLineageColumnLineageDatasetFacetFieldRef
+)
METASTORE = DatasetSymlinkTypeDTO.METASTORE
WAREHOUSE = DatasetSymlinkTypeDTO.WAREHOUSE
@@ -49,7 +55,7 @@ def connect_dataset_with_symlinks(
return sorted(result, key=lambda x: x.type)
-def extract_dataset(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> DatasetDTO:
+def extract_dataset(dataset: OpenLineageDatasetLike) -> DatasetDTO:
return DatasetDTO(
name=dataset.name,
location=extract_dataset_location(dataset),
@@ -103,7 +109,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT
return dataset_dto, symlinks
-def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> LocationDTO:
+def extract_dataset_location(dataset: OpenLineageDatasetLike) -> LocationDTO:
namespace = dataset.namespace
if namespace == "file":
# TODO: remove after https://github.com/OpenLineage/OpenLineage/issues/2709
@@ -122,8 +128,8 @@ def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIde
)
-def extract_dataset_format(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> str | None:
- if isinstance(dataset, OpenLineageSymlinkIdentifier):
+def extract_dataset_format(dataset: OpenLineageDatasetLike) -> str | None:
+ if isinstance(dataset, (OpenLineageSymlinkIdentifier, OpenLineageColumnLineageDatasetFacetFieldRef)):
return None
match dataset.facets.storage:
diff --git a/data_rentgen/consumer/openlineage/dataset_facets/__init__.py b/data_rentgen/consumer/openlineage/dataset_facets/__init__.py
index f443c671..95d71b4b 100644
--- a/data_rentgen/consumer/openlineage/dataset_facets/__init__.py
+++ b/data_rentgen/consumer/openlineage/dataset_facets/__init__.py
@@ -7,6 +7,12 @@
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)
+from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
+ OpenLineageColumnLineageDatasetFacet,
+ OpenLineageColumnLineageDatasetFacetField,
+ OpenLineageColumnLineageDatasetFacetFieldRef,
+ OpenLineageColumnLineageDatasetFacetFieldTransformation,
+)
from data_rentgen.consumer.openlineage.dataset_facets.datasource import (
OpenLineageDatasourceDatasetFacet,
)
@@ -55,6 +61,10 @@
"OpenLineageDatasetFacets",
"OpenLineageInputDatasetFacets",
"OpenLineageOutputDatasetFacets",
+ "OpenLineageColumnLineageDatasetFacet",
+ "OpenLineageColumnLineageDatasetFacetField",
+ "OpenLineageColumnLineageDatasetFacetFieldRef",
+ "OpenLineageColumnLineageDatasetFacetFieldTransformation",
]
@@ -69,6 +79,7 @@ class OpenLineageDatasetFacets(OpenLineageBase):
datasetSchema: OpenLineageSchemaDatasetFacet | None = Field(default=None, alias="schema")
storage: OpenLineageStorageDatasetFacet | None = None
symlinks: OpenLineageSymlinksDatasetFacet | None = None
+ columnLineage: OpenLineageColumnLineageDatasetFacet | None = None
class OpenLineageInputDatasetFacets(OpenLineageBase):
diff --git a/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py b/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py
new file mode 100644
index 00000000..0aebbcd4
--- /dev/null
+++ b/data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py
@@ -0,0 +1,44 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from pydantic import Field
+
+from data_rentgen.consumer.openlineage.base import OpenLineageBase
+from data_rentgen.consumer.openlineage.dataset_facets.base import (
+ OpenLineageDatasetFacet,
+)
+
+
+class OpenLineageColumnLineageDatasetFacetFieldTransformation(OpenLineageBase):
+ """Dataset facet describing field transformation."""
+
+ type: str
+ subtype: str | None = None
+ description: str | None = None
+ masking: bool = False
+
+
+class OpenLineageColumnLineageDatasetFacetFieldRef(OpenLineageBase):
+ """Dataset facet describing field reference for column lineage facet."""
+
+ namespace: str
+ name: str
+ field: str
+ transformations: list[OpenLineageColumnLineageDatasetFacetFieldTransformation]
+
+
+class OpenLineageColumnLineageDatasetFacetField(OpenLineageBase):
+ """Dataset facet describing column lineage for specific field."""
+
+ inputFields: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
+ transformationDescription: str | None = None
+ transformationType: str | None = None
+
+
+class OpenLineageColumnLineageDatasetFacet(OpenLineageDatasetFacet):
+ """Dataset facet describing column lineage.
+ See [InputStatisticsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ColumnLineageDatasetFacet.json).
+ """
+
+ fields: dict[str, OpenLineageColumnLineageDatasetFacetField] = Field(default_factory=dict)
+ dataset: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
diff --git a/data_rentgen/consumer/subscribers.py b/data_rentgen/consumer/subscribers.py
index a953eb0a..71cffd37 100644
--- a/data_rentgen/consumer/subscribers.py
+++ b/data_rentgen/consumer/subscribers.py
@@ -34,7 +34,11 @@ async def runs_events_subscriber(
logger.info("Saved successfully")
-async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logger: Logger) -> None: # noqa: WPS217
+async def save_to_db( # noqa: WPS217, WPS213
+ data: BatchExtractionResult,
+ unit_of_work: UnitOfWork,
+ logger: Logger,
+) -> None:
# To avoid deadlocks when parallel consumer instances insert/update the same row,
# commit changes for each row instead of committing the whole batch. Yes, this cloud be slow.
@@ -95,3 +99,12 @@ async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logg
logger.debug("Creating outputs")
await unit_of_work.output.create_or_update_bulk(data.outputs())
+
+ # If something went wrong here, at least we will have inputs/outputs
+ async with unit_of_work:
+ column_lineage = data.column_lineage()
+ logger.debug("Creating dataset column relations")
+ await unit_of_work.dataset_column_relation.create_bulk_for_column_lineage(column_lineage)
+
+ logger.debug("Creating column lineage")
+ await unit_of_work.column_lineage.create_bulk(column_lineage)
diff --git a/data_rentgen/db/models/dataset_column_relation.py b/data_rentgen/db/models/dataset_column_relation.py
index 13f8835f..3c61113e 100644
--- a/data_rentgen/db/models/dataset_column_relation.py
+++ b/data_rentgen/db/models/dataset_column_relation.py
@@ -85,5 +85,5 @@ class DatasetColumnRelation(Base):
SmallInteger(),
index=False,
nullable=False,
- doc="Column transformation type",
+ doc="Column relation type",
)
diff --git a/data_rentgen/db/repositories/column_lineage.py b/data_rentgen/db/repositories/column_lineage.py
new file mode 100644
index 00000000..bd18ea27
--- /dev/null
+++ b/data_rentgen/db/repositories/column_lineage.py
@@ -0,0 +1,57 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from uuid import UUID
+
+from sqlalchemy.dialects.postgresql import insert
+
+from data_rentgen.db.models import ColumnLineage
+from data_rentgen.db.repositories.base import Repository
+from data_rentgen.db.utils.uuid import (
+ extract_timestamp_from_uuid,
+ generate_incremental_uuid,
+)
+from data_rentgen.dto import ColumnLineageDTO
+
+
+class ColumnLineageRepository(Repository[ColumnLineage]):
+ def get_id(self, item: ColumnLineageDTO) -> UUID:
+ # `created_at' field of lineage should be the same as operation's,
+ # to avoid scanning all partitions and speed up queries
+ created_at = extract_timestamp_from_uuid(item.operation.id)
+
+ # instead of using UniqueConstraint on multiple fields, which may produce a lot of table scans,
+ # use them to calculate unique id
+ id_components = [
+ str(item.operation.id),
+ str(item.source_dataset.id),
+ str(item.target_dataset.id),
+ str(item.fingerprint),
+ ]
+ return generate_incremental_uuid(created_at, ".".join(id_components))
+
+ async def create_bulk(self, items: list[ColumnLineageDTO]):
+ if not items:
+ return
+
+ insert_statement = insert(ColumnLineage)
+ statement = insert_statement.on_conflict_do_nothing(
+ index_elements=[ColumnLineage.created_at, ColumnLineage.id],
+ )
+
+ await self._session.execute(
+ statement,
+ [
+ {
+ "id": self.get_id(item),
+ "created_at": extract_timestamp_from_uuid(item.operation.id),
+ "operation_id": item.operation.id,
+ "run_id": item.operation.run.id,
+ "job_id": item.operation.run.job.id, # type: ignore[arg-type]
+ "source_dataset_id": item.source_dataset.id,
+ "target_dataset_id": item.target_dataset.id,
+ "fingerprint": item.fingerprint,
+ }
+ for item in items
+ ],
+ )
diff --git a/data_rentgen/db/repositories/dataset_column_relation.py b/data_rentgen/db/repositories/dataset_column_relation.py
new file mode 100644
index 00000000..f3d28a25
--- /dev/null
+++ b/data_rentgen/db/repositories/dataset_column_relation.py
@@ -0,0 +1,73 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from collections import defaultdict
+from uuid import UUID
+
+from sqlalchemy import any_, select, text
+from sqlalchemy.dialects.postgresql import insert
+
+from data_rentgen.db.models import ColumnLineage
+from data_rentgen.db.models.dataset_column_relation import (
+ DatasetColumnRelation,
+ DatasetColumnRelationType,
+)
+from data_rentgen.db.repositories.base import Repository
+from data_rentgen.dto import ColumnLineageDTO
+
+
+class DatasetColumnRelationRepository(Repository[ColumnLineage]):
+ async def create_bulk_for_column_lineage(self, items: list[ColumnLineageDTO]):
+ if not items:
+ return
+
+ # small optimization to avoid creating the same relations over and over.
+ # although we're using ON CONFLICT DO NOTHING, PG still has to perform index scans
+ # and to get next sequence value for each row
+ fingerprints_to_create = await self._get_missing_fingerprints([item.fingerprint for item in items])
+ relations_to_create = [item for item in items if item.fingerprint in fingerprints_to_create]
+
+ if relations_to_create:
+ await self._create_dataset_column_relations_bulk(relations_to_create)
+
+ async def _get_missing_fingerprints(self, fingerprints: list[UUID]) -> set[UUID]:
+ existing = await self._session.execute(
+ select(DatasetColumnRelation.fingerprint.distinct()).where(
+ DatasetColumnRelation.fingerprint == any_(fingerprints), # type: ignore[arg-type]
+ ),
+ )
+
+ return set(fingerprints) - set(existing.scalars().all())
+
+ async def _create_dataset_column_relations_bulk(self, items: list[ColumnLineageDTO]):
+ # we don't have to return anything, so there is no need to use o_conflict_update.
+ # also rows are immutable, so there is no need to acquire any lock
+ insert_statement = insert(DatasetColumnRelation).on_conflict_do_nothing(
+ index_elements=[
+ DatasetColumnRelation.fingerprint,
+ DatasetColumnRelation.source_column,
+ text("coalesce(target_column, '')"),
+ ],
+ )
+
+ # several ColumnLineageDTO may have the same set of column relations, deduplicate them
+ to_insert = defaultdict(list)
+ for lineage in items:
+ for column_relation in lineage.column_relations:
+ key = (lineage.fingerprint, column_relation.source_column, column_relation.target_column)
+ to_insert[key].append(column_relation)
+
+ await self._session.execute(
+ insert_statement,
+ [
+ {
+ # id is autoincremental
+ "fingerprint": fingerprint,
+ "source_column": relation.source_column,
+ "target_column": relation.target_column,
+ "type": DatasetColumnRelationType(relation.type).value,
+ }
+ for (fingerprint, *_), relations in to_insert.items()
+ for relation in relations
+ ],
+ )
diff --git a/data_rentgen/db/scripts/create_partitions.py b/data_rentgen/db/scripts/create_partitions.py
index 127d139d..5525dd5f 100755
--- a/data_rentgen/db/scripts/create_partitions.py
+++ b/data_rentgen/db/scripts/create_partitions.py
@@ -18,13 +18,20 @@
from data_rentgen.db.factory import create_session_factory
from data_rentgen.db.models import Input, Operation, Output, Run
+from data_rentgen.db.models.column_lineage import ColumnLineage
from data_rentgen.db.settings import DatabaseSettings
from data_rentgen.logging.settings import LoggingSettings
from data_rentgen.logging.setup_logging import setup_logging
logger = logging.getLogger(__name__)
-PARTITIONED_TABLES = [Run.__tablename__, Operation.__tablename__, Input.__tablename__, Output.__tablename__]
+PARTITIONED_TABLES = [
+ Run.__tablename__,
+ Operation.__tablename__,
+ Input.__tablename__,
+ Output.__tablename__,
+ ColumnLineage.__tablename__,
+]
class Granularity(str, Enum):
diff --git a/data_rentgen/dto/__init__.py b/data_rentgen/dto/__init__.py
index 8b08361a..dd10de3d 100644
--- a/data_rentgen/dto/__init__.py
+++ b/data_rentgen/dto/__init__.py
@@ -1,7 +1,12 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
+from data_rentgen.dto.column_lineage import ColumnLineageDTO
from data_rentgen.dto.dataset import DatasetDTO
+from data_rentgen.dto.dataset_column_relation import (
+ DatasetColumnRelationDTO,
+ DatasetColumnRelationTypeDTO,
+)
from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO, DatasetSymlinkTypeDTO
from data_rentgen.dto.input import InputDTO
from data_rentgen.dto.job import JobDTO, JobTypeDTO
@@ -18,7 +23,10 @@
from data_rentgen.dto.user import UserDTO
__all__ = [
+ "ColumnLineageDTO",
"DatasetDTO",
+ "DatasetColumnRelationDTO",
+ "DatasetColumnRelationTypeDTO",
"DatasetSymlinkDTO",
"DatasetSymlinkTypeDTO",
"LocationDTO",
diff --git a/data_rentgen/dto/column_lineage.py b/data_rentgen/dto/column_lineage.py
new file mode 100644
index 00000000..bb6373b1
--- /dev/null
+++ b/data_rentgen/dto/column_lineage.py
@@ -0,0 +1,55 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from dataclasses import InitVar, dataclass, field
+from functools import cached_property
+from uuid import UUID
+
+from uuid6 import UUID as UUIDv6
+
+from data_rentgen.db.utils.uuid import generate_static_uuid
+from data_rentgen.dto.dataset import DatasetDTO
+from data_rentgen.dto.dataset_column_relation import (
+ DatasetColumnRelationDTO,
+ merge_dataset_column_relations,
+)
+from data_rentgen.dto.operation import OperationDTO
+
+
+@dataclass
+class ColumnLineageDTO:
+ operation: OperationDTO
+ source_dataset: DatasetDTO
+ target_dataset: DatasetDTO
+ dataset_column_relations: InitVar[list[DatasetColumnRelationDTO]]
+ _dataset_column_relations: list[DatasetColumnRelationDTO] = field(default_factory=list, init=False)
+ # id is generated using other ids combination
+ id: UUIDv6 | None = None
+
+ def __post_init__(self, dataset_column_relations: list[DatasetColumnRelationDTO]):
+ self._dataset_column_relations = merge_dataset_column_relations(dataset_column_relations)
+
+ @cached_property
+ def unique_key(self) -> tuple:
+ return (self.operation.unique_key, self.source_dataset.unique_key, self.target_dataset.unique_key)
+
+ @property
+ def column_relations(self) -> list[DatasetColumnRelationDTO]:
+ return self._dataset_column_relations
+
+ @cached_property
+ def fingerprint(self) -> UUID:
+ id_components = [(*item.unique_key, item.type) for item in self.column_relations]
+ str_components = [".".join(map(str, item)) for item in id_components]
+ return generate_static_uuid(",".join(str_components))
+
+ def merge(self, new: ColumnLineageDTO) -> ColumnLineageDTO:
+ return ColumnLineageDTO(
+ operation=self.operation.merge(new.operation),
+ source_dataset=self.source_dataset.merge(new.source_dataset),
+ target_dataset=self.target_dataset.merge(new.target_dataset),
+ dataset_column_relations=self.column_relations + new.column_relations,
+ id=new.id or self.id,
+ )
diff --git a/data_rentgen/dto/dataset_column_relation.py b/data_rentgen/dto/dataset_column_relation.py
new file mode 100644
index 00000000..4432d4a9
--- /dev/null
+++ b/data_rentgen/dto/dataset_column_relation.py
@@ -0,0 +1,68 @@
+# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import IntFlag
+from functools import cached_property
+
+from uuid6 import UUID
+
+
+class DatasetColumnRelationTypeDTO(IntFlag):
+ # See https://github.com/OpenLineage/OpenLineage/blob/1.27.0/integration/spark/shared/src/main/java/io/openlineage/spark/agent/lifecycle/plan/column/TransformationInfo.java#L30-L40
+ # Using IntFlag to avoid messing up with ARRAY type, bitwise OR is enough
+ UNKNOWN = 1
+
+ # Direct
+ IDENTITY = 2
+ TRANSFORMATION = 4
+ TRANSFORMATION_MASKING = 8
+ AGGREGATION = 16
+ AGGREGATION_MASKING = 32
+
+ # Indirect
+ FILTER = 64
+ JOIN = 128
+ GROUP_BY = 256
+ SORT = 512
+ WINDOW = 1024
+ CONDITIONAL = 2048
+
+
+@dataclass
+class DatasetColumnRelationDTO:
+ type: DatasetColumnRelationTypeDTO
+ source_column: str
+ target_column: str | None = None
+ # id is generated using other ids combination
+ fingerprint: UUID | None = None
+ # description is always "", see io.openlineage.spark.agent.lifecycle.plan.column.TransformationInfo
+
+ @cached_property
+ def unique_key(self) -> tuple:
+ return ( # noqa: WPS227
+ self.source_column,
+ self.target_column or "",
+ )
+
+ def merge(self, new: DatasetColumnRelationDTO) -> DatasetColumnRelationDTO:
+ return DatasetColumnRelationDTO(
+ source_column=self.source_column,
+ target_column=self.target_column,
+ type=self.type | new.type,
+ fingerprint=new.fingerprint or self.fingerprint,
+ )
+
+
+def merge_dataset_column_relations(relations: list[DatasetColumnRelationDTO]) -> list[DatasetColumnRelationDTO]:
+ result: dict[tuple, DatasetColumnRelationDTO] = {}
+ for relation in relations:
+ if relation.unique_key in result:
+ existing_relation = result[relation.unique_key]
+ result[relation.unique_key] = existing_relation.merge(relation)
+ else:
+ result[relation.unique_key] = relation
+
+ return sorted(result.values(), key=lambda item: item.unique_key)
diff --git a/data_rentgen/services/uow.py b/data_rentgen/services/uow.py
index 918cabd0..7e721522 100644
--- a/data_rentgen/services/uow.py
+++ b/data_rentgen/services/uow.py
@@ -5,7 +5,11 @@
from sqlalchemy.ext.asyncio import AsyncSession
from typing_extensions import Annotated
+from data_rentgen.db.repositories.column_lineage import ColumnLineageRepository
from data_rentgen.db.repositories.dataset import DatasetRepository
+from data_rentgen.db.repositories.dataset_column_relation import (
+ DatasetColumnRelationRepository,
+)
from data_rentgen.db.repositories.dataset_symlink import DatasetSymlinkRepository
from data_rentgen.db.repositories.input import InputRepository
from data_rentgen.db.repositories.job import JobRepository
@@ -33,6 +37,8 @@ def __init__(
self.schema = SchemaRepository(session)
self.input = InputRepository(session)
self.output = OutputRepository(session)
+ self.dataset_column_relation = DatasetColumnRelationRepository(session)
+ self.column_lineage = ColumnLineageRepository(session)
self.user = UserRepository(session)
async def __aenter__(self):
diff --git a/docs/quickstart/setup_spark.rst b/docs/quickstart/setup_spark.rst
index c074f011..a528bb1b 100644
--- a/docs/quickstart/setup_spark.rst
+++ b/docs/quickstart/setup_spark.rst
@@ -7,7 +7,7 @@ Requirements
------------
* `Apache Spark `_ 3.x or higher
-* `OpenLineage integration for Spark `_ 1.19.0 or higher
+* `OpenLineage integration for Spark `_ 1.23.0 or higher
Setup
-----
@@ -23,7 +23,7 @@ Setup
# install OpenLineage integration and Kafka client
.config(
"spark.jars.packages",
- "io.openlineage:openlineage-spark_2.12:1.24.2,org.apache.kafka:kafka-clients:3.9.0",
+ "io.openlineage:openlineage-spark_2.12:1.27.0,org.apache.kafka:kafka-clients:3.9.0",
)
.config(
"spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener"
@@ -65,6 +65,7 @@ Setup
r"(.*?)(?(?:\/[^\/=]+=[^\/=]+)+)",
)
.config("spark.openlineage.jobName.appendDatasetName", "false")
+ .config("spark.openlineage.columnLineage.datasetLineageEnabled", "true")
.getOrCreate()
)