Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-22366] Basic column lineage handling in consumer #155

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ export DATA_RENTGEN__KAFKA__SECURITY__TYPE=scram-sha256
export DATA_RENTGEN__KAFKA__SECURITY__USER=data_rentgen
export DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
export DATA_RENTGEN__KAFKA__COMPRESSION=zstd
#export DATA_RENTGEN__CONSUMER__MAX_RECORDS=100
# Handling events with a lot of column lineage takes so much time
# that Kafka coodrinator consider worker as dead. Limit by total message size.
# This value depends on the number of OL version, complexity of Spark jobs, number of Kafka partitions and number of workers.
export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=200Kb

export DATA_RENTGEN__SERVER__DEBUG=true

Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/consumer/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.extractors.batch import BatchExtractionResult, extract_batch
from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.dataset import (
connect_dataset_with_symlinks,
extract_dataset,
Expand All @@ -15,6 +16,7 @@
from data_rentgen.consumer.extractors.schema import extract_schema

__all__ = [
"extract_column_lineage",
"extract_dataset_and_symlinks",
"extract_dataset",
"connect_dataset_with_symlinks",
Expand Down
30 changes: 28 additions & 2 deletions data_rentgen/consumer/extractors/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@

from typing import TypeVar

from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.input import extract_input
from data_rentgen.consumer.extractors.operation import extract_operation
from data_rentgen.consumer.extractors.output import extract_output
from data_rentgen.consumer.extractors.run import extract_run
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
from data_rentgen.dto import (
ColumnLineageDTO,
DatasetDTO,
DatasetSymlinkDTO,
InputDTO,
JobDTO,
LocationDTO,
Expand All @@ -21,12 +24,12 @@
SchemaDTO,
UserDTO,
)
from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO

T = TypeVar(
"T",
LocationDTO,
DatasetDTO,
ColumnLineageDTO,
DatasetSymlinkDTO,
JobDTO,
RunDTO,
Expand Down Expand Up @@ -65,6 +68,7 @@ def __init__(self):
self._operations: dict[tuple, OperationDTO] = {}
self._inputs: dict[tuple, InputDTO] = {}
self._outputs: dict[tuple, OutputDTO] = {}
self._column_lineage: dict[tuple, ColumnLineageDTO] = {}
self._schemas: dict[tuple, SchemaDTO] = {}
self._users: dict[tuple, UserDTO] = {}

Expand All @@ -79,6 +83,7 @@ def __repr__(self):
f"operations={len(self._operations)}, "
f"inputs={len(self._inputs)}, "
f"outputs={len(self._outputs)}, "
f"column_lineage={len(self._column_lineage)}, "
f"schemas={len(self._schemas)}, "
f"users={len(self._users)}"
")"
Expand Down Expand Up @@ -139,6 +144,12 @@ def add_output(self, output: OutputDTO):
if output.schema:
self.add_schema(output.schema)

def add_column_lineage(self, lineage: ColumnLineageDTO):
self._add(self._column_lineage, lineage)
self.add_dataset(lineage.source_dataset)
self.add_dataset(lineage.target_dataset)
self.add_operation(lineage.operation)

def add_schema(self, schema: SchemaDTO):
self._add(self._schemas, schema)

Expand Down Expand Up @@ -200,6 +211,13 @@ def _get_output(self, output_key: tuple) -> OutputDTO:
output.schema = self._get_schema(output.schema.unique_key)
return output

def _get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO:
lineage = self._column_lineage[output_key]
lineage.operation = self._get_operation(lineage.operation.unique_key)
lineage.source_dataset = self._get_dataset(lineage.source_dataset.unique_key)
lineage.target_dataset = self._get_dataset(lineage.target_dataset.unique_key)
return lineage

def locations(self) -> list[LocationDTO]:
return list(map(self._get_location, self._locations))

Expand All @@ -224,14 +242,17 @@ def inputs(self) -> list[InputDTO]:
def outputs(self) -> list[OutputDTO]:
return list(map(self._get_output, self._outputs))

def column_lineage(self) -> list[ColumnLineageDTO]:
return list(map(self._get_column_lineage, self._column_lineage))

def schemas(self) -> list[SchemaDTO]:
return list(map(self._get_schema, self._schemas))

def users(self) -> list[UserDTO]:
return list(map(self._get_user, self._users))


def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: # noqa: WPS231
result = BatchExtractionResult()

for event in events:
Expand All @@ -250,6 +271,11 @@ def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
for symlink in symlinks: # noqa: WPS440
result.add_dataset_symlink(symlink)

for dataset in event.inputs + event.outputs:
column_lineage = extract_column_lineage(operation, dataset)
for item in column_lineage:
result.add_column_lineage(item)

else:
run = extract_run(event)
result.add_run(run)
Expand Down
108 changes: 108 additions & 0 deletions data_rentgen/consumer/extractors/column_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

import logging
from collections import defaultdict

from data_rentgen.consumer.extractors.dataset import extract_dataset
from data_rentgen.consumer.openlineage.dataset import OpenLineageDataset
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
OpenLineageColumnLineageDatasetFacetFieldTransformation,
)
from data_rentgen.dto import (
ColumnLineageDTO,
DatasetColumnRelationDTO,
DatasetColumnRelationTypeDTO,
)
from data_rentgen.dto.operation import OperationDTO

logger = logging.getLogger(__name__)

TRANSFORMATION_SUBTYPE_MAP_MASKING = {
"TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION_MASKING,
"AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION_MASKING,
}

TRANSFORMATION_SUBTYPE_MAP = {
"IDENTITY": DatasetColumnRelationTypeDTO.IDENTITY,
"TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION,
"AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION,
"FILTER": DatasetColumnRelationTypeDTO.FILTER,
"JOIN": DatasetColumnRelationTypeDTO.JOIN,
"GROUP_BY": DatasetColumnRelationTypeDTO.GROUP_BY,
"SORT": DatasetColumnRelationTypeDTO.SORT,
"WINDOW": DatasetColumnRelationTypeDTO.WINDOW,
"CONDITIONAL": DatasetColumnRelationTypeDTO.CONDITIONAL,
}


def extract_dataset_column_relation_type(
transformation: OpenLineageColumnLineageDatasetFacetFieldTransformation,
) -> DatasetColumnRelationTypeDTO:
result: DatasetColumnRelationTypeDTO | None = None
if transformation.subtype:
if transformation.masking:
result = TRANSFORMATION_SUBTYPE_MAP_MASKING.get(transformation.subtype)
else:
result = TRANSFORMATION_SUBTYPE_MAP.get(transformation.subtype)

return result or DatasetColumnRelationTypeDTO.UNKNOWN


def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageDataset) -> list[ColumnLineageDTO]:
target_dataset_dto = extract_dataset(target_dataset)
if not target_dataset.facets.columnLineage:
return []

# Grouping column lineage by source+target dataset. This is unique combination within operation,
# so we can use it to generate the same fingerprint for all dataset column relations
datasets = {target_dataset_dto.unique_key: target_dataset_dto}
dataset_column_relations = defaultdict(list)

# direct lineage (source_column -> target_column)
for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items():
for input_field in raw_column_lineage.inputFields:
source_dataset_dto = extract_dataset(input_field)
datasets[source_dataset_dto.unique_key] = source_dataset_dto

column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
for transformation in input_field.transformations:
# OL integration for Spark before v1.23 (or with columnLineage.datasetLineageEnabled=false, which is still default)
# produced INDIRECT lineage for each combination source_column x target_column,
# which is amlost the cartesian join. It is VERY expensive to handle, just ignore.
# See https://github.com/OpenLineage/OpenLineage/pull/3097
if transformation.type == "INDIRECT":
continue

column_relation = DatasetColumnRelationDTO(
type=extract_dataset_column_relation_type(transformation),
source_column=input_field.field,
target_column=field,
)
dataset_column_relations[column_lineage_key].append(column_relation)

# indirect lineage (source_column -> target_dataset),
# added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true
for input_field in target_dataset.facets.columnLineage.dataset:
source_dataset_dto = extract_dataset(input_field)
datasets[source_dataset_dto.unique_key] = source_dataset_dto

column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
for transformation in input_field.transformations:
column_relation = DatasetColumnRelationDTO(
type=extract_dataset_column_relation_type(transformation),
source_column=input_field.field,
)
dataset_column_relations[column_lineage_key].append(column_relation)

# merge results into DTO objects
return [
ColumnLineageDTO(
operation=operation,
source_dataset=datasets[source_dataset_dto_key],
target_dataset=datasets[target_dataset_dto_key],
dataset_column_relations=relations,
)
for (source_dataset_dto_key, target_dataset_dto_key), relations in dataset_column_relations.items()
if dataset_column_relations
]
14 changes: 10 additions & 4 deletions data_rentgen/consumer/extractors/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
OpenLineageSymlinkIdentifier,
OpenLineageSymlinkType,
)
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
OpenLineageColumnLineageDatasetFacetFieldRef,
)
from data_rentgen.dto import (
DatasetDTO,
DatasetSymlinkDTO,
Expand All @@ -19,6 +22,9 @@

logger = logging.getLogger(__name__)

OpenLineageDatasetLike = (
OpenLineageDataset | OpenLineageSymlinkIdentifier | OpenLineageColumnLineageDatasetFacetFieldRef
)
METASTORE = DatasetSymlinkTypeDTO.METASTORE
WAREHOUSE = DatasetSymlinkTypeDTO.WAREHOUSE

Expand Down Expand Up @@ -49,7 +55,7 @@ def connect_dataset_with_symlinks(
return sorted(result, key=lambda x: x.type)


def extract_dataset(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> DatasetDTO:
def extract_dataset(dataset: OpenLineageDatasetLike) -> DatasetDTO:
return DatasetDTO(
name=dataset.name,
location=extract_dataset_location(dataset),
Expand Down Expand Up @@ -103,7 +109,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT
return dataset_dto, symlinks


def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> LocationDTO:
def extract_dataset_location(dataset: OpenLineageDatasetLike) -> LocationDTO:
namespace = dataset.namespace
if namespace == "file":
# TODO: remove after https://github.com/OpenLineage/OpenLineage/issues/2709
Expand All @@ -122,8 +128,8 @@ def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIde
)


def extract_dataset_format(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> str | None:
if isinstance(dataset, OpenLineageSymlinkIdentifier):
def extract_dataset_format(dataset: OpenLineageDatasetLike) -> str | None:
if isinstance(dataset, (OpenLineageSymlinkIdentifier, OpenLineageColumnLineageDatasetFacetFieldRef)):
return None

match dataset.facets.storage:
Expand Down
11 changes: 11 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
OpenLineageColumnLineageDatasetFacet,
OpenLineageColumnLineageDatasetFacetField,
OpenLineageColumnLineageDatasetFacetFieldRef,
OpenLineageColumnLineageDatasetFacetFieldTransformation,
)
from data_rentgen.consumer.openlineage.dataset_facets.datasource import (
OpenLineageDatasourceDatasetFacet,
)
Expand Down Expand Up @@ -55,6 +61,10 @@
"OpenLineageDatasetFacets",
"OpenLineageInputDatasetFacets",
"OpenLineageOutputDatasetFacets",
"OpenLineageColumnLineageDatasetFacet",
"OpenLineageColumnLineageDatasetFacetField",
"OpenLineageColumnLineageDatasetFacetFieldRef",
"OpenLineageColumnLineageDatasetFacetFieldTransformation",
]


Expand All @@ -69,6 +79,7 @@ class OpenLineageDatasetFacets(OpenLineageBase):
datasetSchema: OpenLineageSchemaDatasetFacet | None = Field(default=None, alias="schema")
storage: OpenLineageStorageDatasetFacet | None = None
symlinks: OpenLineageSymlinksDatasetFacet | None = None
columnLineage: OpenLineageColumnLineageDatasetFacet | None = None


class OpenLineageInputDatasetFacets(OpenLineageBase):
Expand Down
44 changes: 44 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import Field

from data_rentgen.consumer.openlineage.base import OpenLineageBase
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageColumnLineageDatasetFacetFieldTransformation(OpenLineageBase):
"""Dataset facet describing field transformation."""

type: str
subtype: str | None = None
description: str | None = None
masking: bool = False


class OpenLineageColumnLineageDatasetFacetFieldRef(OpenLineageBase):
"""Dataset facet describing field reference for column lineage facet."""

namespace: str
name: str
field: str
transformations: list[OpenLineageColumnLineageDatasetFacetFieldTransformation]


class OpenLineageColumnLineageDatasetFacetField(OpenLineageBase):
"""Dataset facet describing column lineage for specific field."""

inputFields: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
transformationDescription: str | None = None
transformationType: str | None = None


class OpenLineageColumnLineageDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing column lineage.
See [InputStatisticsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ColumnLineageDatasetFacet.json).
"""

fields: dict[str, OpenLineageColumnLineageDatasetFacetField] = Field(default_factory=dict)
dataset: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
15 changes: 14 additions & 1 deletion data_rentgen/consumer/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ async def runs_events_subscriber(
logger.info("Saved successfully")


async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logger: Logger) -> None: # noqa: WPS217
async def save_to_db( # noqa: WPS217, WPS213
data: BatchExtractionResult,
unit_of_work: UnitOfWork,
logger: Logger,
) -> None:
# To avoid deadlocks when parallel consumer instances insert/update the same row,
# commit changes for each row instead of committing the whole batch. Yes, this cloud be slow.

Expand Down Expand Up @@ -95,3 +99,12 @@ async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logg

logger.debug("Creating outputs")
await unit_of_work.output.create_or_update_bulk(data.outputs())

# If something went wrong here, at least we will have inputs/outputs
async with unit_of_work:
column_lineage = data.column_lineage()
logger.debug("Creating dataset column relations")
await unit_of_work.dataset_column_relation.create_bulk_for_column_lineage(column_lineage)

logger.debug("Creating column lineage")
await unit_of_work.column_lineage.create_bulk(column_lineage)
Loading
Loading