Skip to content

Commit

Permalink
[DOP-22366] Basic column lineage handling in consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Feb 4, 2025
1 parent 581b22a commit 9d974ec
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ export DATA_RENTGEN__KAFKA__SECURITY__TYPE=scram-sha256
export DATA_RENTGEN__KAFKA__SECURITY__USER=data_rentgen
export DATA_RENTGEN__KAFKA__SECURITY__PASSWORD=changeme
export DATA_RENTGEN__KAFKA__COMPRESSION=zstd
#export DATA_RENTGEN__CONSUMER__MAX_RECORDS=100
# Handling events with a lot of column lineage takes so much time
# that Kafka coodrinator consider worker as dead. Limit by total message size.
# This value depends on the number of OL version, complexity of Spark jobs, number of Kafka partitions and number of workers.
export DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=200Kb

export DATA_RENTGEN__SERVER__DEBUG=true

Expand Down
2 changes: 2 additions & 0 deletions data_rentgen/consumer/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

from data_rentgen.consumer.extractors.batch import BatchExtractionResult, extract_batch
from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.dataset import (
connect_dataset_with_symlinks,
extract_dataset,
Expand All @@ -15,6 +16,7 @@
from data_rentgen.consumer.extractors.schema import extract_schema

__all__ = [
"extract_column_lineage",
"extract_dataset_and_symlinks",
"extract_dataset",
"connect_dataset_with_symlinks",
Expand Down
30 changes: 28 additions & 2 deletions data_rentgen/consumer/extractors/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@

from typing import TypeVar

from data_rentgen.consumer.extractors.column_lineage import extract_column_lineage
from data_rentgen.consumer.extractors.input import extract_input
from data_rentgen.consumer.extractors.operation import extract_operation
from data_rentgen.consumer.extractors.output import extract_output
from data_rentgen.consumer.extractors.run import extract_run
from data_rentgen.consumer.openlineage.job_facets.job_type import OpenLineageJobType
from data_rentgen.consumer.openlineage.run_event import OpenLineageRunEvent
from data_rentgen.dto import (
ColumnLineageDTO,
DatasetDTO,
DatasetSymlinkDTO,
InputDTO,
JobDTO,
LocationDTO,
Expand All @@ -21,12 +24,12 @@
SchemaDTO,
UserDTO,
)
from data_rentgen.dto.dataset_symlink import DatasetSymlinkDTO

T = TypeVar(
"T",
LocationDTO,
DatasetDTO,
ColumnLineageDTO,
DatasetSymlinkDTO,
JobDTO,
RunDTO,
Expand Down Expand Up @@ -65,6 +68,7 @@ def __init__(self):
self._operations: dict[tuple, OperationDTO] = {}
self._inputs: dict[tuple, InputDTO] = {}
self._outputs: dict[tuple, OutputDTO] = {}
self._column_lineage: dict[tuple, ColumnLineageDTO] = {}
self._schemas: dict[tuple, SchemaDTO] = {}
self._users: dict[tuple, UserDTO] = {}

Expand All @@ -79,6 +83,7 @@ def __repr__(self):
f"operations={len(self._operations)}, "
f"inputs={len(self._inputs)}, "
f"outputs={len(self._outputs)}, "
f"column_lineage={len(self._column_lineage)}, "
f"schemas={len(self._schemas)}, "
f"users={len(self._users)}"
")"
Expand Down Expand Up @@ -139,6 +144,12 @@ def add_output(self, output: OutputDTO):
if output.schema:
self.add_schema(output.schema)

def add_column_lineage(self, lineage: ColumnLineageDTO):
self._add(self._column_lineage, lineage)
self.add_dataset(lineage.source_dataset)
self.add_dataset(lineage.target_dataset)
self.add_operation(lineage.operation)

def add_schema(self, schema: SchemaDTO):
self._add(self._schemas, schema)

Expand Down Expand Up @@ -200,6 +211,13 @@ def _get_output(self, output_key: tuple) -> OutputDTO:
output.schema = self._get_schema(output.schema.unique_key)
return output

def _get_column_lineage(self, output_key: tuple) -> ColumnLineageDTO:
lineage = self._column_lineage[output_key]
lineage.operation = self._get_operation(lineage.operation.unique_key)
lineage.source_dataset = self._get_dataset(lineage.source_dataset.unique_key)
lineage.target_dataset = self._get_dataset(lineage.target_dataset.unique_key)
return lineage

def locations(self) -> list[LocationDTO]:
return list(map(self._get_location, self._locations))

Expand All @@ -224,14 +242,17 @@ def inputs(self) -> list[InputDTO]:
def outputs(self) -> list[OutputDTO]:
return list(map(self._get_output, self._outputs))

def column_lineage(self) -> list[ColumnLineageDTO]:
return list(map(self._get_column_lineage, self._column_lineage))

def schemas(self) -> list[SchemaDTO]:
return list(map(self._get_schema, self._schemas))

def users(self) -> list[UserDTO]:
return list(map(self._get_user, self._users))


def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult: # noqa: WPS231
result = BatchExtractionResult()

for event in events:
Expand All @@ -250,6 +271,11 @@ def extract_batch(events: list[OpenLineageRunEvent]) -> BatchExtractionResult:
for symlink in symlinks: # noqa: WPS440
result.add_dataset_symlink(symlink)

for dataset in event.inputs + event.outputs:
column_lineage = extract_column_lineage(operation, dataset)
for item in column_lineage:
result.add_column_lineage(item)

else:
run = extract_run(event)
result.add_run(run)
Expand Down
108 changes: 108 additions & 0 deletions data_rentgen/consumer/extractors/column_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

import logging
from collections import defaultdict

from data_rentgen.consumer.extractors.dataset import extract_dataset
from data_rentgen.consumer.openlineage.dataset import OpenLineageDataset
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
OpenLineageColumnLineageDatasetFacetFieldTransformation,
)
from data_rentgen.dto import (
ColumnLineageDTO,
DatasetColumnRelationDTO,
DatasetColumnRelationTypeDTO,
)
from data_rentgen.dto.operation import OperationDTO

logger = logging.getLogger(__name__)

TRANSFORMATION_SUBTYPE_MAP_MASKING = {
"TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION_MASKING,
"AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION_MASKING,
}

TRANSFORMATION_SUBTYPE_MAP = {
"IDENTITY": DatasetColumnRelationTypeDTO.IDENTITY,
"TRANSFORMATION": DatasetColumnRelationTypeDTO.TRANSFORMATION,
"AGGREGATION": DatasetColumnRelationTypeDTO.AGGREGATION,
"FILTER": DatasetColumnRelationTypeDTO.FILTER,
"JOIN": DatasetColumnRelationTypeDTO.JOIN,
"GROUP_BY": DatasetColumnRelationTypeDTO.GROUP_BY,
"SORT": DatasetColumnRelationTypeDTO.SORT,
"WINDOW": DatasetColumnRelationTypeDTO.WINDOW,
"CONDITIONAL": DatasetColumnRelationTypeDTO.CONDITIONAL,
}


def extract_dataset_column_relation_type(
transformation: OpenLineageColumnLineageDatasetFacetFieldTransformation,
) -> DatasetColumnRelationTypeDTO:
result: DatasetColumnRelationTypeDTO | None = None
if transformation.subtype:
if transformation.masking:
result = TRANSFORMATION_SUBTYPE_MAP_MASKING.get(transformation.subtype)
else:
result = TRANSFORMATION_SUBTYPE_MAP.get(transformation.subtype)

return result or DatasetColumnRelationTypeDTO.UNKNOWN


def extract_column_lineage(operation: OperationDTO, target_dataset: OpenLineageDataset) -> list[ColumnLineageDTO]:
target_dataset_dto = extract_dataset(target_dataset)
if not target_dataset.facets.columnLineage:
return []

# Grouping column lineage by source+target dataset. This is unique combination within operation,
# so we can use it to generate the same fingerprint for all dataset column relations
datasets = {target_dataset_dto.unique_key: target_dataset_dto}
dataset_column_relations = defaultdict(list)

# direct lineage (source_column -> target_column)
for field, raw_column_lineage in target_dataset.facets.columnLineage.fields.items():
for input_field in raw_column_lineage.inputFields:
source_dataset_dto = extract_dataset(input_field)
datasets[source_dataset_dto.unique_key] = source_dataset_dto

column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
for transformation in input_field.transformations:
# OL integration for Spark before v1.23 (or with columnLineage.datasetLineageEnabled=false, which is still default)
# produced INDIRECT lineage for each combination source_column x target_column,
# which is amlost the cartesian join. It is VERY expensive to handle, just ignore.
# See https://github.com/OpenLineage/OpenLineage/pull/3097
if transformation.type == "INDIRECT":
continue

column_relation = DatasetColumnRelationDTO(
type=extract_dataset_column_relation_type(transformation),
source_column=input_field.field,
target_column=field,
)
dataset_column_relations[column_lineage_key].append(column_relation)

# indirect lineage (source_column -> target_dataset),
# added to OL since v1.23 and send only when columnLineage.datasetLineageEnabled=true
for input_field in target_dataset.facets.columnLineage.dataset:
source_dataset_dto = extract_dataset(input_field)
datasets[source_dataset_dto.unique_key] = source_dataset_dto

column_lineage_key = (source_dataset_dto.unique_key, target_dataset_dto.unique_key)
for transformation in input_field.transformations:
column_relation = DatasetColumnRelationDTO(
type=extract_dataset_column_relation_type(transformation),
source_column=input_field.field,
)
dataset_column_relations[column_lineage_key].append(column_relation)

# merge results into DTO objects
return [
ColumnLineageDTO(
operation=operation,
source_dataset=datasets[source_dataset_dto_key],
target_dataset=datasets[target_dataset_dto_key],
dataset_column_relations=relations,
)
for (source_dataset_dto_key, target_dataset_dto_key), relations in dataset_column_relations.items()
if dataset_column_relations
]
14 changes: 10 additions & 4 deletions data_rentgen/consumer/extractors/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
OpenLineageSymlinkIdentifier,
OpenLineageSymlinkType,
)
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
OpenLineageColumnLineageDatasetFacetFieldRef,
)
from data_rentgen.dto import (
DatasetDTO,
DatasetSymlinkDTO,
Expand All @@ -19,6 +22,9 @@

logger = logging.getLogger(__name__)

OpenLineageDatasetLike = (
OpenLineageDataset | OpenLineageSymlinkIdentifier | OpenLineageColumnLineageDatasetFacetFieldRef
)
METASTORE = DatasetSymlinkTypeDTO.METASTORE
WAREHOUSE = DatasetSymlinkTypeDTO.WAREHOUSE

Expand Down Expand Up @@ -49,7 +55,7 @@ def connect_dataset_with_symlinks(
return sorted(result, key=lambda x: x.type)


def extract_dataset(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> DatasetDTO:
def extract_dataset(dataset: OpenLineageDatasetLike) -> DatasetDTO:
return DatasetDTO(
name=dataset.name,
location=extract_dataset_location(dataset),
Expand Down Expand Up @@ -103,7 +109,7 @@ def extract_dataset_and_symlinks(dataset: OpenLineageDataset) -> tuple[DatasetDT
return dataset_dto, symlinks


def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> LocationDTO:
def extract_dataset_location(dataset: OpenLineageDatasetLike) -> LocationDTO:
namespace = dataset.namespace
if namespace == "file":
# TODO: remove after https://github.com/OpenLineage/OpenLineage/issues/2709
Expand All @@ -122,8 +128,8 @@ def extract_dataset_location(dataset: OpenLineageDataset | OpenLineageSymlinkIde
)


def extract_dataset_format(dataset: OpenLineageDataset | OpenLineageSymlinkIdentifier) -> str | None:
if isinstance(dataset, OpenLineageSymlinkIdentifier):
def extract_dataset_format(dataset: OpenLineageDatasetLike) -> str | None:
if isinstance(dataset, (OpenLineageSymlinkIdentifier, OpenLineageColumnLineageDatasetFacetFieldRef)):
return None

match dataset.facets.storage:
Expand Down
11 changes: 11 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)
from data_rentgen.consumer.openlineage.dataset_facets.column_lineage import (
OpenLineageColumnLineageDatasetFacet,
OpenLineageColumnLineageDatasetFacetField,
OpenLineageColumnLineageDatasetFacetFieldRef,
OpenLineageColumnLineageDatasetFacetFieldTransformation,
)
from data_rentgen.consumer.openlineage.dataset_facets.datasource import (
OpenLineageDatasourceDatasetFacet,
)
Expand Down Expand Up @@ -55,6 +61,10 @@
"OpenLineageDatasetFacets",
"OpenLineageInputDatasetFacets",
"OpenLineageOutputDatasetFacets",
"OpenLineageColumnLineageDatasetFacet",
"OpenLineageColumnLineageDatasetFacetField",
"OpenLineageColumnLineageDatasetFacetFieldRef",
"OpenLineageColumnLineageDatasetFacetFieldTransformation",
]


Expand All @@ -69,6 +79,7 @@ class OpenLineageDatasetFacets(OpenLineageBase):
datasetSchema: OpenLineageSchemaDatasetFacet | None = Field(default=None, alias="schema")
storage: OpenLineageStorageDatasetFacet | None = None
symlinks: OpenLineageSymlinksDatasetFacet | None = None
columnLineage: OpenLineageColumnLineageDatasetFacet | None = None


class OpenLineageInputDatasetFacets(OpenLineageBase):
Expand Down
44 changes: 44 additions & 0 deletions data_rentgen/consumer/openlineage/dataset_facets/column_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# SPDX-FileCopyrightText: 2024-2025 MTS PJSC
# SPDX-License-Identifier: Apache-2.0

from pydantic import Field

from data_rentgen.consumer.openlineage.base import OpenLineageBase
from data_rentgen.consumer.openlineage.dataset_facets.base import (
OpenLineageDatasetFacet,
)


class OpenLineageColumnLineageDatasetFacetFieldTransformation(OpenLineageBase):
"""Dataset facet describing field transformation."""

type: str
subtype: str | None = None
description: str | None = None
masking: bool = False


class OpenLineageColumnLineageDatasetFacetFieldRef(OpenLineageBase):
"""Dataset facet describing field reference for column lineage facet."""

namespace: str
name: str
field: str
transformations: list[OpenLineageColumnLineageDatasetFacetFieldTransformation]


class OpenLineageColumnLineageDatasetFacetField(OpenLineageBase):
"""Dataset facet describing column lineage for specific field."""

inputFields: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
transformationDescription: str | None = None
transformationType: str | None = None


class OpenLineageColumnLineageDatasetFacet(OpenLineageDatasetFacet):
"""Dataset facet describing column lineage.
See [InputStatisticsInputDatasetFacet](https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ColumnLineageDatasetFacet.json).
"""

fields: dict[str, OpenLineageColumnLineageDatasetFacetField] = Field(default_factory=dict)
dataset: list[OpenLineageColumnLineageDatasetFacetFieldRef] = Field(default_factory=list)
15 changes: 14 additions & 1 deletion data_rentgen/consumer/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ async def runs_events_subscriber(
logger.info("Saved successfully")


async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logger: Logger) -> None: # noqa: WPS217
async def save_to_db( # noqa: WPS217, WPS213
data: BatchExtractionResult,
unit_of_work: UnitOfWork,
logger: Logger,
) -> None:
# To avoid deadlocks when parallel consumer instances insert/update the same row,
# commit changes for each row instead of committing the whole batch. Yes, this cloud be slow.

Expand Down Expand Up @@ -95,3 +99,12 @@ async def save_to_db(data: BatchExtractionResult, unit_of_work: UnitOfWork, logg

logger.debug("Creating outputs")
await unit_of_work.output.create_or_update_bulk(data.outputs())

# If something went wrong here, at least we will have inputs/outputs
async with unit_of_work:
column_lineage = data.column_lineage()
logger.debug("Creating dataset column relations")
await unit_of_work.dataset_column_relation.create_bulk_for_column_lineage(column_lineage)

logger.debug("Creating column lineage")
await unit_of_work.column_lineage.create_bulk(column_lineage)
Loading

0 comments on commit 9d974ec

Please sign in to comment.