diff --git a/etl_entities/hwm/key_value/key_value_hwm.py b/etl_entities/hwm/key_value/key_value_hwm.py index 50204fd..cefda8b 100644 --- a/etl_entities/hwm/key_value/key_value_hwm.py +++ b/etl_entities/hwm/key_value/key_value_hwm.py @@ -11,10 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Generic, TypeVar +from typing import Generic, Optional, TypeVar from frozendict import frozendict -from pydantic import Field +from pydantic import Field, validator from etl_entities.entity import GenericModel from etl_entities.hwm.hwm import HWM @@ -53,7 +53,7 @@ class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel): HWM value modification time """ - entity: str = Field(alias="topic") + entity: Optional[str] = Field(default=None, alias="topic") # value: frozendict with Any type for keys and KeyValueHWMValueType type for values. # Direct type specification for frozendict contents (e.g., frozendict[KeyType, ValueType]) # is supported only from Python 3.9 onwards. @@ -136,3 +136,9 @@ def __eq__(self, other): self_fields = self.dict(exclude={"modified_time"}) other_fields = other.dict(exclude={"modified_time"}) return self_fields == other_fields + + @validator("value", pre=True, always=True) + def _convert_dict_to_frozendict(cls, v): # noqa: N805 + if isinstance(v, dict): + return frozendict(v) + return v diff --git a/etl_entities/hwm/key_value/key_value_int_hwm.py b/etl_entities/hwm/key_value/key_value_int_hwm.py index 32f9b19..1c9ee6a 100644 --- a/etl_entities/hwm/key_value/key_value_int_hwm.py +++ b/etl_entities/hwm/key_value/key_value_int_hwm.py @@ -15,7 +15,7 @@ from __future__ import annotations from frozendict import frozendict -from pydantic import Field +from pydantic import Field, validator from etl_entities.hwm.hwm_type_registry import register_hwm_type from etl_entities.hwm.key_value.key_value_hwm import KeyValueHWM @@ -70,3 +70,16 @@ class KeyValueIntHWM(KeyValueHWM[int]): """ value: frozendict = Field(default_factory=frozendict) + + @validator("value", pre=True) + def _validate_int_values(cls, key_value): # noqa: N805 + if key_value is None: + return key_value + new_key_value = {} + for key, value in key_value.items(): + if not isinstance(value, (int, str)): + raise ValueError + else: + new_key_value[key] = int(value) + + return frozendict(new_key_value) diff --git a/tests/test_hwm/test_key_value_int_hwm.py b/tests/test_hwm/test_key_value_int_hwm.py index 42e4bc7..7d375ac 100644 --- a/tests/test_hwm/test_key_value_int_hwm.py +++ b/tests/test_hwm/test_key_value_int_hwm.py @@ -1,3 +1,4 @@ +import secrets from datetime import datetime, timedelta import pytest @@ -7,17 +8,226 @@ @pytest.mark.parametrize( - "name, entity, expression, value, expected_value", + "expression, value, expected_value", [ - ("hwm_name", "topic_name", "offset", frozendict({0: 100, 1: 200}), frozendict({0: 100, 1: 200})), - ("hwm_name", "topic_name", None, frozendict(), frozendict()), + ("offset", {0: 100, 1: 200}, frozendict({0: 100, 1: 200})), + ("offset", {0: "100", 1: "200"}, frozendict({0: 100, 1: 200})), + ("offset", {"key1": 100, "key2": 200}, frozendict({"key1": 100, "key2": 200})), + ("offset", {"key1": "100", "key2": "200"}, frozendict({"key1": 100, "key2": 200})), + (None, {}, frozendict()), ], ) -def test_key_value_int_hwm_initialization(name, entity, expression, value, expected_value): - modified_time = datetime.now() - timedelta(days=1) - hwm = KeyValueIntHWM(name=name, entity=entity, expression=expression, value=value, modified_time=modified_time) +def test_key_value_int_hwm_valid_input(expression, value, expected_value): + name = "key_value_int_hwm_name" + entity = "topic_name" + modified_time = datetime.now() - timedelta(days=5) + + empty_hwm = KeyValueIntHWM(name=name) + assert empty_hwm.name == name + assert empty_hwm.value == frozendict() + + minimal_hwm = KeyValueIntHWM(name=name, value=value) + assert minimal_hwm.name == name + assert minimal_hwm.value == expected_value + + hwm_with_duplicates = KeyValueIntHWM(name=name, value={**value, **value}) + assert hwm_with_duplicates.name == name + assert hwm_with_duplicates.value == expected_value + + hwm = KeyValueIntHWM( + name=name, + value=value, + description="my hwm", + entity=entity, + expression="something", + modified_time=modified_time, + ) assert hwm.name == name - assert hwm.entity == entity - assert hwm.expression == expression assert hwm.value == expected_value + assert hwm.description == "my hwm" + assert hwm.entity == entity + assert hwm.expression == "something" assert hwm.modified_time == modified_time + + +@pytest.mark.parametrize( + "invalid_value", + [ + {1: 1.5}, + {1: "offset_value"}, + {"partition": "offset_value"}, + {1: None}, + None, + ], +) +def test_key_value_int_hwm_wrong_input(invalid_value): + valid_value = {0: 100, 1: 123} + name = "key_value_int_hwm_name" + + with pytest.raises(ValueError): + # missing name + KeyValueIntHWM() + + with pytest.raises(ValueError): + # missing name + KeyValueIntHWM(value=valid_value) + + with pytest.raises(ValueError): + # invalid_dict + KeyValueIntHWM(name=name, value=invalid_value) + + with pytest.raises(ValueError): + # extra fields not allowed + KeyValueIntHWM(name=name, unknown="unknown") + + +def test_key_value_int_hwm_set_value(): + name = "key_value_int_hwm_name" + value1 = {0: 10} + value2 = {0: 10, 1: 20} + value3 = {0: 10, 1: 20, 2: 30} + value = {0: 100, 1: 123} + + hwm = KeyValueIntHWM(name=name) + + hwm1 = hwm.copy() + hwm1.set_value(value) + assert hwm1.value == frozendict(value) + assert hwm1.modified_time > hwm.modified_time + + hwm2 = hwm.copy() + hwm2.set_value(value1) + assert hwm2.value == frozendict(value1) + assert hwm2.modified_time > hwm.modified_time + + hwm3 = hwm.copy() + hwm3.set_value(value2) + assert hwm3.value == frozendict(value2) + assert hwm3.modified_time > hwm.modified_time + + hwm4 = hwm.copy() + hwm4.set_value(value3) + assert hwm4.value == frozendict(value3) + assert hwm4.modified_time > hwm.modified_time + + with pytest.raises(ValueError): + # invalid dict + hwm.set_value({1: None}) + + +def test_key_value_int_hwm_frozen(): + value1 = {0: 10} + value2 = {0: 10, 1: 20} + value3 = {0: 10, 1: 20, 2: 30} + value = {0: 100, 1: 123} + name = "key_value_int_hwm_name" + modified_time = datetime.now() - timedelta(days=5) + + hwm = KeyValueIntHWM(name=name) + + for attr in ("value", "entity", "expression", "description", "modified_time"): + for item in (1, "abc", None, value1, value2, value3, value, modified_time): + with pytest.raises(TypeError): + setattr(hwm, attr, item) + + +def test_key_value_int_hwm_compare(): + name1 = secrets.token_hex() + name2 = secrets.token_hex() + entity = "topic_name" + + value1 = {0: 10} + value2 = {0: 10, 1: 20} + + hwm1 = KeyValueIntHWM(name=name1, value=value1) + hwm2 = KeyValueIntHWM(name=name2, value=value1) + hwm3 = KeyValueIntHWM(name=name1, value=value2) + hwm4 = KeyValueIntHWM(name=name2, value=value2) + + hwm5 = KeyValueIntHWM(name=name1, entity=entity) + hwm6 = KeyValueIntHWM(name=name1, entity=entity + entity) + + hwm7 = KeyValueIntHWM(name=name1, description="abc") + hwm8 = KeyValueIntHWM(name=name1, description="bcd") + + hwm9 = KeyValueIntHWM(name=name1, expression="abc") + hwm10 = KeyValueIntHWM(name=name1, expression="bcd") + + modified_time = datetime.now() - timedelta(days=5) + hwm_with_different_mtime = KeyValueIntHWM(name=name1, value=value1, modified_time=modified_time) + + # modified time is ignored when comparing + assert hwm1 == hwm_with_different_mtime + + items = (hwm1, hwm2, hwm3, hwm4, hwm5, hwm6, hwm7, hwm8, hwm9, hwm10) + + # items with different attribute values (except modified_time) are not equal + for item1 in items: + for item2 in items: + if item1 is not item2: + assert item1 != item2 + + # this was true until 2.1.x, but not anymore + for item in items: + assert item != item.value + + +def test_key_value_int_hwm_update(): + hwm = KeyValueIntHWM(name="test_hwm", value=frozendict({1: 100, 2: 150})) + original_modified_time = hwm.modified_time + + # update with new key, higher value, and lower value + hwm.update({3: 200, 2: 180, 1: 50}) + assert hwm.value == frozendict({1: 100, 2: 180, 3: 200}) + assert hwm.modified_time > original_modified_time + + # update with equal value and multiple keys + hwm.update({1: 100, 4: 300, 5: 400}) + assert hwm.value == frozendict({1: 100, 2: 180, 3: 200, 4: 300, 5: 400}) + + hwm.update({6: 500, 2: 175}) + assert hwm.value == frozendict({1: 100, 2: 180, 3: 200, 4: 300, 5: 400, 6: 500}) + + +def test_key_value_int_hwm_serialization(): + name = "key_value_int_hwm_name" + modified_time = datetime.now() - timedelta(days=5) + value = {"0": 100, "1": 123} + + hwm1 = KeyValueIntHWM( + name=name, + value=value, + entity="topic_name", + expression="some", + description="some description", + modified_time=modified_time, + ) + + expected1 = { + "type": "key_value_int", + "name": name, + "value": value, + "entity": "topic_name", + "expression": "some", + "description": "some description", + "modified_time": modified_time.isoformat(), + } + + serialized1 = hwm1.serialize() + assert expected1 == serialized1 + assert KeyValueIntHWM.deserialize(serialized1) == hwm1 + + hwm2 = KeyValueIntHWM(name=name, modified_time=modified_time) + expected2 = { + "type": "key_value_int", + "name": name, + "value": {}, + "entity": None, + "expression": None, + "description": "", + "modified_time": modified_time.isoformat(), + } + + serialized2 = hwm2.serialize() + assert serialized2 == expected2 + assert KeyValueIntHWM.deserialize(serialized2) == hwm2