Skip to content

Commit

Permalink
[DOP-2330] - add KeyValueHwm class
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Dec 26, 2023
1 parent 23bc567 commit 1474395
Show file tree
Hide file tree
Showing 19 changed files with 333 additions and 87 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/71.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``KeyValueIntHWM`` class, designed to manage HWM for partitioned data sources like Kafka topics. It extends the functionality of the base HWM classes to handle key-value pairs.
2 changes: 1 addition & 1 deletion docs/hwm/column/date_hwm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ Column Date HWM
.. currentmodule:: etl_entities.hwm.column.date_hwm

.. autoclass:: ColumnDateHWM
:members: name, set_value, dict, json, copy, deserialize, covers
:members: name, set_value, dict, json, copy, deserialize
:special-members: __bool__, __add__, __sub__, __eq__, __lt__
2 changes: 1 addition & 1 deletion docs/hwm/column/datetime_hwm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ Column Datetime HWM
.. currentmodule:: etl_entities.hwm.column.datetime_hwm

.. autoclass:: ColumnDateTimeHWM
:members: name, set_value, dict, json, copy, deserialize, covers
:members: name, set_value, dict, json, copy, deserialize
:special-members: __bool__, __add__, __sub__, __eq__, __lt__
2 changes: 1 addition & 1 deletion docs/hwm/column/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Column HWM
date_hwm
datetime_hwm

What is HWM?
What is Column HWM?
-------------

Sometimes it's necessary to read only changed rows from a table.
Expand Down
2 changes: 1 addition & 1 deletion docs/hwm/column/int_hwm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ Column Integer HWM
.. currentmodule:: etl_entities.hwm.column.int_hwm

.. autoclass:: ColumnIntHWM
:members: name, set_value, dict, json, copy, deserialize, covers
:members: name, set_value, dict, json, copy, deserialize
:special-members: __bool__, __add__, __sub__, __eq__, __lt__
2 changes: 1 addition & 1 deletion docs/hwm/file/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ File HWM

file_list_hwm

What is HWM?
What is File HWM?
-------------

Sometimes it's necessary to read/download only new files from a source folder.
Expand Down
6 changes: 6 additions & 0 deletions docs/hwm/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ HWM
:caption: File HWM

file/index

.. toctree::
:maxdepth: 2
:caption: KeyValue HWM

key_value/index
76 changes: 76 additions & 0 deletions docs/hwm/key_value/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
.. _key_value_hwm_classes:

KeyValue HWM
========

.. toctree::
:maxdepth: 2
:caption: HWM classes
:name: key_value_hwm_classes

key_value_int_hwm

What is KeyValue HWM?
----------------------

The KeyValue High Water Mark (HWM) is a specialized class designed to manage and track incremental data changes in systems where data is stored or represented as key-value pairs, such as in message queues like Kafka.

Use Case
----------------------

The ``KeyValueHWM`` class is particularly beneficial in scenarios where there is a need to `incrementally <https://onetl.readthedocs.io/en/0.10.0/strategy/incremental_strategy.html>`_ upload data in an ETL process.

For instance, in typical ETL processes using `Spark with Kafka <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_, data re-written entirely from all partitions in topics starting from **zero** offset. This approach can be inefficient, time-consuming and create duplicates in target. By leveraging the ``KeyValueIntHWM`` class, it becomes possible to track the last offset of data processed. This enables the ETL process to only write new data increments, significantly reducing the amount of data transferred during each run.

Example Usage with Kafka Messages
---------------------------------

Consider a Kafka topic with several partitions, each having its own offset indicating the latest message.

Initial Kafka Topic State:

.. code:: bash
Partition 0: Offset 100
Partition 1: Offset 150
Partition 2: Offset 200
When a new batch of messages arrives, the offsets in the Kafka partitions are updated:

.. code:: bash
Partition 0: Offset 110 # 10 new messages
Partition 1: Offset 155 # 5 new messages
Partition 2: Offset 200 # No new messages
Using the ``KeyValueIntHWM`` class, we can track these offsets:

.. code:: python
from etl_entities.hwm import KeyValueIntHWM
initial_offsets = {
0: 100, # Partition 0 offset 100
1: 150, # Partition 1 offset 150
2: 200, # Partition 2 offset 200
}
# Creating an instance of KeyValueIntHWM with initial offsets
hwm = KeyValueIntHWM(value=initial_offsets, ...)
# Running some ETL process, which updates the HWM value after finish
run_etl_process(hwm, new_batch_data)
# HWM values after running the ETL process
assert hwm.value == {0: 110, 1: 155, 2: 200}
This approach ensures that only new messages (i.e., those after the last recorded offset in each partition) are considered in the next ETL process. For Partition 0 and Partition 1, the new offsets (110 and 155, respectively) are stored in the HWM, while Partition 2 remains unchanged as there were no new messages.


Restrictions
------------

- **Non-Decreasing Values**: The ``KeyValueHWM`` class is designed to handle only non-decreasing values. During the update process, if the new offset provided for a given partition is less than the current offset, the value will not be updated.

- **Incomplete Key Updates**: If a key is not included in new hwm value, its value remains unchanged. This is essential because keys in systems like Kafka (partitions) cannot be deleted, and their last known
7 changes: 7 additions & 0 deletions docs/hwm/key_value/key_value_int_hwm.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
KeyValue Int HWM
=============

.. currentmodule:: etl_entities.hwm.key_value.key_value_int_hwm

.. autoclass:: KeyValueIntHWM
:members: name, set_value, dict, json, copy, update
4 changes: 4 additions & 0 deletions etl_entities/hwm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from etl_entities.hwm.file.file_list_hwm import FileListHWM
from etl_entities.hwm.hwm import HWM
from etl_entities.hwm.hwm_type_registry import HWMTypeRegistry, register_hwm_type
from etl_entities.hwm.key_value.key_value_hwm import KeyValueHWM
from etl_entities.hwm.key_value.key_value_int_hwm import KeyValueIntHWM

__all__ = [
"HWM",
Expand All @@ -29,6 +31,8 @@
"ColumnIntHWM",
"FileHWM",
"FileListHWM",
"KeyValueHWM",
"KeyValueIntHWM",
"HWMTypeRegistry",
"register_hwm_type",
]
29 changes: 0 additions & 29 deletions etl_entities/hwm/column/column_hwm.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,6 @@ class ColumnHWM(HWM[Optional[ColumnValueType]], Generic[ColumnValueType], Generi
entity: Optional[str] = Field(default=None, alias="source")
value: Optional[ColumnValueType] = None

def covers(self, value: Optional[ColumnValueType]) -> bool:
"""Return ``True`` if input value is already covered by HWM
Examples
----------
.. code:: python
hwm = ColumnIntHWM(name="somename", value=1)
assert hwm.covers(0) # 0 <= 1
assert hwm.covers(1) # 1 <= 1
assert hwm.covers(0.5) # 0.5 <= 1
assert not hwm.covers(2) # 2 > 1
empty_hwm = ColumnIntHWM(name="somename")
# None does not cover anything
assert not empty_hwm.covers(0)
assert not empty_hwm.covers(1)
assert not empty_hwm.covers(0.5)
assert not empty_hwm.covers(2)
"""

if self.value is None:
return False

return self._check_new_value(value) <= self.value

def __add__(self: ColumnHWMType, value: ColumnValueType) -> ColumnHWMType:
"""Increase HWM value and return copy of HWM
Expand Down
4 changes: 0 additions & 4 deletions etl_entities/hwm/hwm.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ def deserialize(cls: type[HWMType], inp: dict) -> HWMType:
def update(self: HWMType, value: Any) -> HWMType:
"""Update current HWM value with some implementation-specific logic, and return HWM"""

@abstractmethod
def covers(self, value: ValueType) -> bool:
"""Return ``True`` if input value is already covered by HWM"""

def _check_new_value(self, value):
validated_dict, _, validation_error = validate_model(
self.__class__,
Expand Down
Empty file.
138 changes: 138 additions & 0 deletions etl_entities/hwm/key_value/key_value_hwm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright 2023 MTS (Mobile Telesystems)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Generic, TypeVar

from frozendict import frozendict
from pydantic import Field

from etl_entities.entity import GenericModel
from etl_entities.hwm.hwm import HWM

KeyValueHWMValueType = TypeVar("KeyValueHWMValueType")
KeyValueHWMType = TypeVar("KeyValueHWMType", bound="KeyValueHWM")


class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel):
"""Base key value HWM type
Parameters
----------
name : ``str``
HWM unique name
value : ``frozendict[Any, KeyValueHWMValueType]]]`` , default: ``frozendict``
HWM value
description : ``str``, default: ``""``
Description of HWM
source : Any, default: ``None``
HWM source, e.g. ``topic`` name
expression : Any, default: ``None``
Expression used to generate HWM value, e.g. ``offset``
modified_time : :obj:`datetime.datetime`, default: current datetime
HWM value modification time
"""

entity: str = Field(alias="topic")
# value: frozendict with Any type for keys and KeyValueHWMValueType type for values.
# Direct type specification for frozendict contents (e.g., frozendict[KeyType, ValueType])
# is supported only from Python 3.9 onwards.
value: frozendict = Field(default_factory=frozendict)

def update(self, new_data: dict) -> "KeyValueHWM[KeyValueHWMValueType]":
"""
Updates the HWM value based on provided new key-value data. This method only updates
the value if the new value is greater than the current valur for a given key
or if the key does not exist in the current value.
.. note::
Changes the HWM value in place and returns the modified instance.
Parameters
----------
new_data : dict
A dictionary representing new key-value data. For example: keys are partitions and values are offsets.
Returns
-------
self : KeyValueHWM
The instance with updated HWM value.
Examples
--------
.. code:: python
from frozendict import frozendict
from etl_entities.hwm import KeyValueHWM
hwm = KeyValueHWM(value={0: 100, 1: 120}, ...)
hwm.update({1: 125, 2: 130})
assert hwm.value == frozendict({0: 100, 1: 125, 2: 130})
# The offset for partition 1 is not updated as 123 is less than 125
hwm.update({1: 123})
assert hwm.value == frozendict({0: 100, 1: 125, 2: 130})
"""

modified = False
temp_dict = dict(self.value)

Check warning on line 100 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L99-L100

Added lines #L99 - L100 were not covered by tests

for partition, new_offset in new_data.items():
current_offset = temp_dict.get(partition)

Check warning on line 103 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L103

Added line #L103 was not covered by tests
if current_offset is None or new_offset > current_offset:
temp_dict[partition] = new_offset
modified = True

Check warning on line 106 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L105-L106

Added lines #L105 - L106 were not covered by tests

# update the frozendict only if modifications were made.
# this avoids unnecessary reassignment and creation of a new frozendict object,
if modified:
self.set_value(frozendict(temp_dict))

Check warning on line 111 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L111

Added line #L111 was not covered by tests

return self

Check warning on line 113 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L113

Added line #L113 was not covered by tests

def __eq__(self, other):
"""Checks equality of two HWM instances
Params
-------
other : :obj:`etl_entities.hwm.key_value.key_value_hwm.KeyValueHWM`
You can compare two :obj:`etl_entities.hwm.key_value.key_value_hwm.KeyValueHWM` instances,
obj:`etl_entities.hwm.key_value.key_value_hwm.KeyValueHWM` with an :obj:`object`,
if its value is comparable with the ``value`` attribute of HWM
Returns
--------
result : bool
``True`` if both inputs are the same, ``False`` otherwise.
"""

if not isinstance(other, type(self)):
return NotImplemented

Check warning on line 134 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L134

Added line #L134 was not covered by tests

self_fields = self.dict(exclude={"modified_time"})
other_fields = other.dict(exclude={"modified_time"})
return self_fields == other_fields

Check warning on line 138 in etl_entities/hwm/key_value/key_value_hwm.py

View check run for this annotation

Codecov / codecov/patch

etl_entities/hwm/key_value/key_value_hwm.py#L136-L138

Added lines #L136 - L138 were not covered by tests
Loading

0 comments on commit 1474395

Please sign in to comment.