diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..adafa4a --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +# Set update schedule for GitHub Actions + +version: 2 +updates: +- package-ecosystem: github-actions + directory: / + schedule: + interval: daily + labels: + - type:ci diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml index f863a17..69c227c 100644 --- a/.github/workflows/changelog.yml +++ b/.github/workflows/changelog.yml @@ -7,7 +7,7 @@ on: - master env: - DEFAULT_PYTHON: '3.11' + DEFAULT_PYTHON: '3.12' permissions: contents: read @@ -20,13 +20,13 @@ jobs: if: "!contains(github.event.pull_request.labels.*.name, 'ci:skip-changelog') && github.event.pull_request.user.login != 'pre-commit-ci[bot]' && github.event.pull_request.user.login != 'dependabot[bot]'" steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Python ${{ env.DEFAULT_PYTHON }} id: python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.DEFAULT_PYTHON }} diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 24dc3a7..2de66f7 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -13,6 +13,10 @@ concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.ref }} cancel-in-progress: true +env: + # flake8-commas is failing on Python 3.12 + DEFAULT_PYTHON: '3.11' + jobs: linters: name: Linters @@ -24,21 +28,21 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - - name: Set up Python 3.11 - uses: actions/setup-python@v4 + - name: Set up Python ${{ env.DEFAULT_PYTHON }} + uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: ${{ env.DEFAULT_PYTHON }} - name: Cache pip uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-python-3.11-codeql-${{ hashFiles('requirements*.txt') }} + key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-codeql-${{ hashFiles('requirements*.txt') }} restore-keys: | - ${{ runner.os }}-python-3.11-codeql-${{ hashFiles('requirements*.txt') }} - ${{ runner.os }}-python-3.11-codeql- + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-codeql-${{ hashFiles('requirements*.txt') }} + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-codeql- ${{ runner.os }}-python ${{ runner.os }}- @@ -69,20 +73,20 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - - name: Set up Python 3.11 - uses: actions/setup-python@v4 + - name: Set up Python ${{ env.DEFAULT_PYTHON }} + uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: ${{ env.DEFAULT_PYTHON }} # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v2 + uses: github/codeql-action/init@v3 with: languages: python - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + uses: github/codeql-action/analyze@v3 with: category: /language:python diff --git a/.github/workflows/dev-release.yml b/.github/workflows/dev-release.yml index bfa98d1..922a54b 100644 --- a/.github/workflows/dev-release.yml +++ b/.github/workflows/dev-release.yml @@ -9,7 +9,7 @@ on: workflow_dispatch: env: - DEFAULT_PYTHON: '3.11' + DEFAULT_PYTHON: '3.12' concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.ref }} @@ -29,13 +29,13 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Python ${{ env.DEFAULT_PYTHON }} id: python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.DEFAULT_PYTHON }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 126c5be..0d828cb 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,7 +6,7 @@ on: - '[0-9]+.[0-9]+.[0-9]+' env: - DEFAULT_PYTHON: '3.11' + DEFAULT_PYTHON: '3.12' jobs: release: @@ -23,13 +23,13 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Python ${{ env.DEFAULT_PYTHON }} id: python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.DEFAULT_PYTHON }} @@ -59,7 +59,7 @@ jobs: run: | cat docs/changelog/${{ github.ref_name }}.rst > changelog.rst - - name: Fix Github links + - name: Prepare rST syntax for conversion to Markdown run: | # Replace Github links from Sphinx syntax with Markdown sed -i -E 's/:github:issue:`(.*)`/#\1/g' changelog.rst @@ -77,7 +77,7 @@ jobs: --wrap=none changelog.rst - - name: Fix Github code blocks + - name: Fixing Markdown syntax after conversion run: | # Replace ``` {.python caption="abc"} with ```python caption="abc" sed -i -E 's/``` \{\.(.*)\}/```\1/g' changelog.md @@ -85,6 +85,9 @@ jobs: # Replace ``` python with ```python sed -i -E 's/``` (\w+)/```\1/g' changelog.md + # Replace \# with # + sed -i -E 's/\\#/#/g' changelog.md + - name: Get release name id: release-name run: | diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a4272f7..793c7c4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,6 +12,9 @@ concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.ref }} cancel-in-progress: true +env: + DEFAULT_PYTHON: '3.12' + jobs: tests: name: Run tests (${{ matrix.python-version }} on ${{ matrix.os }}) @@ -24,12 +27,12 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} @@ -62,7 +65,7 @@ jobs: ./run_tests.sh - name: Upload coverage results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: coverage-${{ matrix.python-version }}-os-${{ matrix.os }} path: reports/* @@ -74,21 +77,21 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - - name: Set up Python 3.11 - uses: actions/setup-python@v4 + - name: Set up Python ${{ env.DEFAULT_PYTHON }} + uses: actions/setup-python@v5 with: - python-version: '3.11' + python-version: ${{ env.DEFAULT_PYTHON }} - name: Cache pip uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-python-3.11-coverage-${{ hashFiles('requirements*.txt') }} + key: ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-coverage-${{ hashFiles('requirements*.txt') }} restore-keys: | - ${{ runner.os }}-python-3.11-coverage-${{ hashFiles('requirements*.txt') }} - ${{ runner.os }}-python-3.11-coverage- + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-coverage-${{ hashFiles('requirements*.txt') }} + ${{ runner.os }}-python-${{ env.DEFAULT_PYTHON }}-coverage- ${{ runner.os }}-python ${{ runner.os }}- @@ -99,7 +102,7 @@ jobs: run: pip install -I -r requirements-test.txt - name: Download all coverage reports - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: path: reports diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f71a3a2..ea816a4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,12 +33,12 @@ repos: - id: codespell args: [-w] - repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks - rev: v2.11.0 + rev: v2.12.0 hooks: - id: pretty-format-yaml args: [--autofix, --indent, '2'] - repo: https://github.com/pycqa/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort files: etl_entities/.* @@ -59,7 +59,7 @@ repos: - id: pyupgrade args: [--py37-plus, --keep-runtime-typing] - repo: https://github.com/psf/black-pre-commit-mirror - rev: 23.11.0 + rev: 23.12.1 hooks: - id: black language_version: python3 diff --git a/.readthedocs.yml b/.readthedocs.yml index a61c65b..586d275 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -3,7 +3,7 @@ version: 2 build: os: ubuntu-22.04 tools: - python: '3.11' + python: '3.12' python: install: diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 9c44726..1cd3744 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -204,3 +204,44 @@ How to skip change notes check? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Just add ``ci:skip-changelog`` label to pull request. + +Release Process +^^^^^^^^^^^^^^^ + +Before making a release from the ``develop`` branch, follow these steps: + +1. Backup ``NEXT_RELEASE.rst`` + +.. code:: bash + + cp docs/changelog/NEXT_RELEASE.rst docs/changelog/temp_NEXT_RELEASE.rst + +2. Build the Release notes with Towncrier + +.. code:: bash + + export VERSION=$(cat etl_entities/VERSION) + towncrier build --version=${VERSION} + +3. Update Changelog + +.. code:: bash + + mv docs/changelog/NEXT_RELEASE.rst docs/changelog/${VERSION}.rst + +4. Edit the ``${VERSION}.rst`` file +Remove content above the version number heading in the ``${VERSION}.rst`` file. + +5. Update Changelog Index + +.. code:: bash + + awk -v version=${VERSION} '/NEXT_RELEASE/{print;print " " version;next}1' docs/changelog/index.rst > temp && mv temp docs/changelog/index.rst + +6. Reset ``NEXT_RELEASE.rst`` file + +.. code:: bash + + mv docs/changelog/temp_NEXT_RELEASE.rst docs/changelog/NEXT_RELEASE.rst + +7. Update the patch version in the ``VERSION`` file of ``develop`` branch **after release**. diff --git a/README.rst b/README.rst index 37aabcd..60e0428 100644 --- a/README.rst +++ b/README.rst @@ -1,7 +1,8 @@ .. title -ETL Entities lib -================ +ETL Entities +============ + |Repo Status| |PyPI| |PyPI License| |PyPI Python Version| |Documentation| |Build Status| |Coverage| @@ -23,13 +24,16 @@ ETL Entities lib What is ETL Entities? ----------------------- -Collection of classes used for handling High Water Mark (HWM) and gathering Lineage graph. +Collection of classes & decorators used for handling High Water Mark (HWM). Currently implemented: * ``ColumnIntHWM`` * ``ColumnDateHWM`` * ``ColumnDateTimeHWM`` * ``FileListHWM`` + * ``KeyValueIntHWM`` + * ``MemoryHWMStore`` + * ``BaseHWMStore`` (interface for third-party HWM store implementations) .. installation diff --git a/docs/changelog/2.2.0.rst b/docs/changelog/2.2.0.rst new file mode 100644 index 0000000..f0d7e2a --- /dev/null +++ b/docs/changelog/2.2.0.rst @@ -0,0 +1,16 @@ +2.2.0 (2024-01-10) +================== + +Breaking Changes +---------------- + +- Remove ``cover`` method from base ``HWM`` class, it only remains in ``FileHWM`` class (:github:pull:`71`) + + +Features +-------- + +- Improve typing: + * Fix Pylance (VS Code) complained ``"SomeClass" is not exported from module "etl_entities.module". Import from "etl_entities.module.submodule" instead``. + * Mark old HWM classes with ``typing_extensions.deprecated`` decorator (:github:pull:`69`) +- Add ``KeyValueIntHWM`` class, designed to manage HWM for partitioned data sources like Kafka topics. It extends the functionality of the base HWM classes to handle key-value pairs. (:github:pull:`71`) diff --git a/docs/changelog/index.rst b/docs/changelog/index.rst index fbd01e7..0b0a6db 100644 --- a/docs/changelog/index.rst +++ b/docs/changelog/index.rst @@ -4,6 +4,7 @@ DRAFT NEXT_RELEASE + 2.2.0 2.1.2 2.1.1 2.1.0 diff --git a/docs/hwm/column/date_hwm.rst b/docs/hwm/column/date_hwm.rst index 18aed30..5eb6e43 100644 --- a/docs/hwm/column/date_hwm.rst +++ b/docs/hwm/column/date_hwm.rst @@ -4,5 +4,5 @@ Column Date HWM .. currentmodule:: etl_entities.hwm.column.date_hwm .. autoclass:: ColumnDateHWM - :members: name, set_value, dict, json, copy, deserialize, covers + :members: name, set_value, dict, json, copy, deserialize :special-members: __bool__, __add__, __sub__, __eq__, __lt__ diff --git a/docs/hwm/column/datetime_hwm.rst b/docs/hwm/column/datetime_hwm.rst index 9f8e1a4..d136b32 100644 --- a/docs/hwm/column/datetime_hwm.rst +++ b/docs/hwm/column/datetime_hwm.rst @@ -4,5 +4,5 @@ Column Datetime HWM .. currentmodule:: etl_entities.hwm.column.datetime_hwm .. autoclass:: ColumnDateTimeHWM - :members: name, set_value, dict, json, copy, deserialize, covers + :members: name, set_value, dict, json, copy, deserialize :special-members: __bool__, __add__, __sub__, __eq__, __lt__ diff --git a/docs/hwm/column/index.rst b/docs/hwm/column/index.rst index 004e89c..160fe8e 100644 --- a/docs/hwm/column/index.rst +++ b/docs/hwm/column/index.rst @@ -12,8 +12,8 @@ Column HWM date_hwm datetime_hwm -What is HWM? -------------- +What is Column HWM? +------------------- Sometimes it's necessary to read only changed rows from a table. diff --git a/docs/hwm/column/int_hwm.rst b/docs/hwm/column/int_hwm.rst index 0a2defd..8f80a14 100644 --- a/docs/hwm/column/int_hwm.rst +++ b/docs/hwm/column/int_hwm.rst @@ -4,5 +4,5 @@ Column Integer HWM .. currentmodule:: etl_entities.hwm.column.int_hwm .. autoclass:: ColumnIntHWM - :members: name, set_value, dict, json, copy, deserialize, covers + :members: name, set_value, dict, json, copy, deserialize :special-members: __bool__, __add__, __sub__, __eq__, __lt__ diff --git a/docs/hwm/file/index.rst b/docs/hwm/file/index.rst index 8d6fce3..36e6c64 100644 --- a/docs/hwm/file/index.rst +++ b/docs/hwm/file/index.rst @@ -10,8 +10,8 @@ File HWM file_list_hwm -What is HWM? -------------- +What is File HWM? +----------------- Sometimes it's necessary to read/download only new files from a source folder. diff --git a/docs/hwm/index.rst b/docs/hwm/index.rst index 65f7d59..35d0e3e 100644 --- a/docs/hwm/index.rst +++ b/docs/hwm/index.rst @@ -14,3 +14,9 @@ HWM :caption: File HWM file/index + +.. toctree:: + :maxdepth: 2 + :caption: KeyValue HWM + + key_value/index diff --git a/docs/hwm/key_value/index.rst b/docs/hwm/key_value/index.rst new file mode 100644 index 0000000..8db2d78 --- /dev/null +++ b/docs/hwm/key_value/index.rst @@ -0,0 +1,76 @@ +.. _key_value_hwm_classes: + +KeyValue HWM +============ + +.. toctree:: + :maxdepth: 2 + :caption: HWM classes + :name: key_value_hwm_classes + + key_value_int_hwm + +What is KeyValue HWM? +--------------------- + +The KeyValue High Water Mark (HWM) is a specialized class designed to manage and track incremental data changes in systems where data is stored or represented as key-value pairs, such as in message queues like Kafka. + +Use Case +-------- + +The ``KeyValueHWM`` class is particularly beneficial in scenarios where there is a need to `incrementally `_ upload data in an ETL process. + +For instance, in typical ETL processes using `Spark with Kafka `_, data re-written entirely from all partitions in topics starting from **zero** offset. This approach can be inefficient, time-consuming and create duplicates in target. By leveraging the ``KeyValueIntHWM`` class, it becomes possible to track the last offset of data processed. This enables the ETL process to read data appended to topic since previous run instead of reading the entire topic content each time. + +Example Usage with Kafka Messages +--------------------------------- + +Consider a Kafka topic with several partitions, each having its own offset indicating the latest message. + +Initial Kafka Topic State: + +.. code:: bash + + Partition 0: Offset 100 + Partition 1: Offset 150 + Partition 2: Offset 200 + +When a new batch of messages arrives, the offsets in the Kafka partitions are updated: + +.. code:: bash + + Partition 0: Offset 110 # 10 new messages + Partition 1: Offset 155 # 5 new messages + Partition 2: Offset 200 # No new messages + +Using the ``KeyValueIntHWM`` class, we can track these offsets: + +.. code:: python + + from etl_entities.hwm import KeyValueIntHWM + + initial_offsets = { + 0: 100, # Partition 0 offset 100 + 1: 150, # Partition 1 offset 150 + 2: 200, # Partition 2 offset 200 + } + + # Creating an instance of KeyValueIntHWM with initial offsets + hwm = KeyValueIntHWM(value=initial_offsets, ...) + + # Running some ETL process, which updates the HWM value after finish + run_etl_process(hwm, new_batch_data) + + # HWM values after running the ETL process + assert hwm.value == {0: 110, 1: 155, 2: 200} + + +This approach ensures that only new messages (i.e., those after the last recorded offset in each partition) are considered in the next ETL process. For Partition 0 and Partition 1, the new offsets (110 and 155, respectively) are stored in the HWM, while Partition 2 remains unchanged as there were no new messages. + + +Restrictions +------------ + +- **Non-Decreasing Values**: The ``KeyValueHWM`` class is designed to handle only non-decreasing values. During the update process, if the new offset provided for a given partition is less than the current offset, the value will not be updated. + +- **Incomplete Key Updates**: If a key is not included in new hwm value, its value remains unchanged. This is essential because keys in systems like Kafka (partitions) cannot be deleted, and their last known position is left intact. diff --git a/docs/hwm/key_value/key_value_int_hwm.rst b/docs/hwm/key_value/key_value_int_hwm.rst new file mode 100644 index 0000000..89ada05 --- /dev/null +++ b/docs/hwm/key_value/key_value_int_hwm.rst @@ -0,0 +1,7 @@ +KeyValue Int HWM +================ + +.. currentmodule:: etl_entities.hwm.key_value.key_value_int_hwm + +.. autoclass:: KeyValueIntHWM + :members: name, set_value, dict, json, copy, update diff --git a/etl_entities/VERSION b/etl_entities/VERSION index eca07e4..ccbccc3 100644 --- a/etl_entities/VERSION +++ b/etl_entities/VERSION @@ -1 +1 @@ -2.1.2 +2.2.0 diff --git a/etl_entities/__init__.py b/etl_entities/__init__.py index e8e61a7..af74b16 100644 --- a/etl_entities/__init__.py +++ b/etl_entities/__init__.py @@ -16,6 +16,8 @@ from etl_entities.plugins import import_plugins from etl_entities.version import __version__ +__all__ = ["__version__"] + def plugins_auto_import(): """ diff --git a/etl_entities/hwm/__init__.py b/etl_entities/hwm/__init__.py index a3fcb2e..53c1289 100644 --- a/etl_entities/hwm/__init__.py +++ b/etl_entities/hwm/__init__.py @@ -20,3 +20,19 @@ from etl_entities.hwm.file.file_list_hwm import FileListHWM from etl_entities.hwm.hwm import HWM from etl_entities.hwm.hwm_type_registry import HWMTypeRegistry, register_hwm_type +from etl_entities.hwm.key_value.key_value_hwm import KeyValueHWM +from etl_entities.hwm.key_value.key_value_int_hwm import KeyValueIntHWM + +__all__ = [ + "HWM", + "ColumnHWM", + "ColumnDateHWM", + "ColumnDateTimeHWM", + "ColumnIntHWM", + "FileHWM", + "FileListHWM", + "KeyValueHWM", + "KeyValueIntHWM", + "HWMTypeRegistry", + "register_hwm_type", +] diff --git a/etl_entities/hwm/column/column_hwm.py b/etl_entities/hwm/column/column_hwm.py index 86e7e77..c9ddf21 100644 --- a/etl_entities/hwm/column/column_hwm.py +++ b/etl_entities/hwm/column/column_hwm.py @@ -59,35 +59,6 @@ class ColumnHWM(HWM[Optional[ColumnValueType]], Generic[ColumnValueType], Generi entity: Optional[str] = Field(default=None, alias="source") value: Optional[ColumnValueType] = None - def covers(self, value: Optional[ColumnValueType]) -> bool: - """Return ``True`` if input value is already covered by HWM - - Examples - ---------- - - .. code:: python - - hwm = ColumnIntHWM(name="somename", value=1) - - assert hwm.covers(0) # 0 <= 1 - assert hwm.covers(1) # 1 <= 1 - assert hwm.covers(0.5) # 0.5 <= 1 - assert not hwm.covers(2) # 2 > 1 - - empty_hwm = ColumnIntHWM(name="somename") - - # None does not cover anything - assert not empty_hwm.covers(0) - assert not empty_hwm.covers(1) - assert not empty_hwm.covers(0.5) - assert not empty_hwm.covers(2) - """ - - if self.value is None: - return False - - return self._check_new_value(value) <= self.value - def __add__(self: ColumnHWMType, value: ColumnValueType) -> ColumnHWMType: """Increase HWM value and return copy of HWM diff --git a/etl_entities/hwm/file/file_hwm.py b/etl_entities/hwm/file/file_hwm.py index 55e27ad..c840e41 100644 --- a/etl_entities/hwm/file/file_hwm.py +++ b/etl_entities/hwm/file/file_hwm.py @@ -15,6 +15,7 @@ from __future__ import annotations import os +from abc import abstractmethod from typing import Generic, Optional, TypeVar from pydantic import Field, validator @@ -67,6 +68,10 @@ class FileHWM( class Config: # noqa: WPS431 json_encoders = {AbsolutePath: os.fspath} + @abstractmethod + def covers(self, value: FileHWMValueType) -> bool: + """Return ``True`` if input value is already covered by HWM""" + def __eq__(self, other): """Checks equality of two FileHWM instances diff --git a/etl_entities/hwm/hwm.py b/etl_entities/hwm/hwm.py index 28fa983..a9affca 100644 --- a/etl_entities/hwm/hwm.py +++ b/etl_entities/hwm/hwm.py @@ -175,10 +175,6 @@ def deserialize(cls: type[HWMType], inp: dict) -> HWMType: def update(self: HWMType, value: Any) -> HWMType: """Update current HWM value with some implementation-specific logic, and return HWM""" - @abstractmethod - def covers(self, value: ValueType) -> bool: - """Return ``True`` if input value is already covered by HWM""" - def _check_new_value(self, value): validated_dict, _, validation_error = validate_model( self.__class__, diff --git a/etl_entities/hwm/hwm_type_registry.py b/etl_entities/hwm/hwm_type_registry.py index 1a8d5a4..590a6e1 100644 --- a/etl_entities/hwm/hwm_type_registry.py +++ b/etl_entities/hwm/hwm_type_registry.py @@ -176,7 +176,7 @@ class MyHWM(HWM): """ - def wrapper(klass: type[HWM]): + def wrapper(klass): HWMTypeRegistry.add(type_name, klass) return klass diff --git a/etl_entities/hwm/key_value/__init__.py b/etl_entities/hwm/key_value/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/etl_entities/hwm/key_value/key_value_hwm.py b/etl_entities/hwm/key_value/key_value_hwm.py new file mode 100644 index 0000000..b54e993 --- /dev/null +++ b/etl_entities/hwm/key_value/key_value_hwm.py @@ -0,0 +1,146 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Generic, Optional, TypeVar + +from frozendict import frozendict +from pydantic import Field, validator + +from etl_entities.entity import GenericModel +from etl_entities.hwm.hwm import HWM + +KeyValueHWMValueType = TypeVar("KeyValueHWMValueType") +KeyValueHWMType = TypeVar("KeyValueHWMType", bound="KeyValueHWM") + + +class KeyValueHWM(HWM[frozendict], Generic[KeyValueHWMValueType], GenericModel): + """Base key value HWM type + + Parameters + ---------- + name : ``str`` + + HWM unique name + + value : ``frozendict[Any, KeyValueHWMValueType]`` , default: ``frozendict`` + + HWM value + + description : ``str``, default: ``""`` + + Description of HWM + + source : Any, default: ``None`` + + HWM source, e.g. ``topic`` name + + expression : Any, default: ``None`` + + Expression used to generate HWM value, e.g. ``offset`` + + modified_time : :obj:`datetime.datetime`, default: current datetime + + HWM value modification time + """ + + entity: Optional[str] = Field(default=None, alias="topic") + # value: frozendict with Any type for keys and KeyValueHWMValueType type for values. + # Direct type specification for frozendict contents (e.g., frozendict[KeyType, ValueType]) + # is supported only from Python 3.9 onwards. + value: frozendict = Field(default_factory=frozendict) + + def update(self: KeyValueHWMType, new_data: dict) -> KeyValueHWMType: + """ + Updates the HWM value based on provided new key-value data. This method only updates + the value if the new value is greater than the current valur for a given key + or if the key does not exist in the current value. + + .. note:: + Changes the HWM value in place and returns the modified instance. + + Parameters + ---------- + new_data : dict + A dictionary representing new key-value data. For example: keys are partitions and values are offsets. + + Returns + ------- + self : KeyValueHWM + The instance with updated HWM value. + + Examples + -------- + + .. code:: python + + from frozendict import frozendict + from etl_entities.hwm import KeyValueHWM + + hwm = KeyValueHWM(value={0: 100, 1: 120}, ...) + + hwm.update({1: 125, 2: 130}) + assert hwm.value == frozendict({0: 100, 1: 125, 2: 130}) + + # The offset for partition 1 is not updated as 123 is less than 125 + hwm.update({1: 123}) + assert hwm.value == frozendict({0: 100, 1: 125, 2: 130}) + """ + + modified = False + temp_dict = dict(self.value) + + for partition, new_offset in new_data.items(): + current_offset = temp_dict.get(partition) + if current_offset is None or new_offset > current_offset: + temp_dict[partition] = new_offset + modified = True + + # update the frozendict only if modifications were made. + # this avoids unnecessary reassignment and creation of a new frozendict object, + if modified: + self.set_value(frozendict(temp_dict)) + + return self + + def __eq__(self, other): + """Checks equality of two HWM instances + + Params + ------- + other : :obj:`etl_entities.hwm.key_value.key_value_hwm.KeyValueHWM` + + You can compare two :obj:`etl_entities.hwm.key_value.key_value_hwm.KeyValueHWM` instances, + obj:`etl_entities.hwm.key_value.key_value_hwm.KeyValueHWM` with an :obj:`object`, + if its value is comparable with the ``value`` attribute of HWM + + Returns + -------- + result : bool + + ``True`` if both inputs are the same, ``False`` otherwise. + """ + + if not isinstance(other, type(self)): + return NotImplemented + + self_fields = self.dict(exclude={"modified_time"}) + other_fields = other.dict(exclude={"modified_time"}) + return self_fields == other_fields + + @validator("value", pre=True, always=True) + def _convert_dict_to_frozendict(cls, v): # noqa: N805 + if isinstance(v, dict): + return frozendict(v) + return v diff --git a/etl_entities/hwm/key_value/key_value_int_hwm.py b/etl_entities/hwm/key_value/key_value_int_hwm.py new file mode 100644 index 0000000..b93bf28 --- /dev/null +++ b/etl_entities/hwm/key_value/key_value_int_hwm.py @@ -0,0 +1,126 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from frozendict import frozendict +from pydantic import Field, validator + +from etl_entities.hwm.hwm_type_registry import register_hwm_type +from etl_entities.hwm.key_value.key_value_hwm import KeyValueHWM + + +@register_hwm_type("key_value_int") +class KeyValueIntHWM(KeyValueHWM[int]): + """Integer KeyValue HWM type + + Parameters + ---------- + name : ``str`` + + HWM unique name + + value : ``frozendict[Any, KeyValueHWMValueType]``, default: ``frozendict`` + + HWM value + + description : ``str``, default: ``""`` + + Description of HWM + + source : Any, default: ``None`` + + HWM source, e.g. topic name + + expression : Any, default: ``None`` + + Expression used to generate HWM value, e.g. ``offset`` + + modified_time : :obj:`datetime.datetime`, default: current datetime + + HWM value modification time + + Examples + ---------- + + .. code:: python + + from etl_entities.hwm import KeyValueIntHWM + + hwm_kv_int = KeyValueIntHWM( + name="long_unique_name", + source="topic_name", + expression="offset", + value={ + 0: 100, # 0 and 1 - partition numbers + 1: 123, # 100 and 123 - offset values + }, + ) + """ + + # employing frozendict without specifying `frozendict[Any, int]` + # due to the lack of support for generic dict annotations in Python 3.7. + value: frozendict = Field(default_factory=frozendict) + + def serialize(self) -> dict: + """Return dict representation of HWM + + Returns + ------- + result : dict + + Serialized HWM + + Examples + ---------- + + .. code:: python + + from etl_entities.hwm import ColumnIntHWM + + hwm = ColumnIntHWM(value=1, ...) + assert hwm.serialize() == { + "value": "1", + "type": "int", + "column": "column_name", + "name": "table_name", + "description": ..., + } + """ + + # Convert self.value to a regular dictionary if it is a frozendict + # This is necessary because frozendict objects are not natively serializable to JSON. + serialized_data = { + "name": self.name, + "value": dict(self.value), + "description": self.description, + "entity": self.entity, + "expression": self.expression, + "modified_time": self.modified_time.isoformat() if self.modified_time else None, + "type": "key_value_int", + } + return serialized_data # noqa: WPS331 + + @validator("value", pre=True) + def _validate_int_values(cls, key_value): # noqa: N805 + if key_value is None: + return key_value + new_key_value = {} + for key, value in key_value.items(): + if not isinstance(value, (int, str)): + raise ValueError + else: + new_key_value[key] = int(value) + + return frozendict(new_key_value) diff --git a/etl_entities/hwm_store/__init__.py b/etl_entities/hwm_store/__init__.py index 94fe3b5..35d8d76 100644 --- a/etl_entities/hwm_store/__init__.py +++ b/etl_entities/hwm_store/__init__.py @@ -20,3 +20,12 @@ from etl_entities.hwm_store.hwm_store_detect import detect_hwm_store from etl_entities.hwm_store.hwm_store_stack_manager import HWMStoreStackManager from etl_entities.hwm_store.memory_hwm_store import MemoryHWMStore + +__all__ = [ + "BaseHWMStore", + "HWMStoreClassRegistry", + "register_hwm_store_class", + "detect_hwm_store", + "HWMStoreStackManager", + "MemoryHWMStore", +] diff --git a/etl_entities/instance/__init__.py b/etl_entities/instance/__init__.py index a40d85f..f382b7e 100644 --- a/etl_entities/instance/__init__.py +++ b/etl_entities/instance/__init__.py @@ -16,3 +16,12 @@ from etl_entities.instance.host import Host from etl_entities.instance.path import AbsolutePath, GenericPath, RelativePath from etl_entities.instance.url import GenericURL + +__all__ = [ + "Cluster", + "Host", + "AbsolutePath", + "GenericPath", + "RelativePath", + "GenericURL", +] diff --git a/etl_entities/old_hwm/__init__.py b/etl_entities/old_hwm/__init__.py index 3e1be68..631d32e 100644 --- a/etl_entities/old_hwm/__init__.py +++ b/etl_entities/old_hwm/__init__.py @@ -19,3 +19,10 @@ from etl_entities.old_hwm.file_list_hwm import FileListHWM from etl_entities.old_hwm.hwm import HWM from etl_entities.old_hwm.int_hwm import IntHWM + +__all__ = [ + "DateHWM", + "DateTimeHWM", + "FileListHWM", + "IntHWM", +] diff --git a/etl_entities/old_hwm/column_hwm.py b/etl_entities/old_hwm/column_hwm.py index fe3ca90..a351a28 100644 --- a/etl_entities/old_hwm/column_hwm.py +++ b/etl_entities/old_hwm/column_hwm.py @@ -30,6 +30,7 @@ class ColumnHWM(HWM[Optional[ColumnValueType], str], GenericModel, Generic[Colum """Base column HWM type .. deprecated:: 2.0.0 + Use :obj:`etl_entities.hwm.column.column_hwm.ColumnHWM>` instead Parameters ---------- diff --git a/etl_entities/old_hwm/date_hwm.py b/etl_entities/old_hwm/date_hwm.py index feec3a5..254350d 100644 --- a/etl_entities/old_hwm/date_hwm.py +++ b/etl_entities/old_hwm/date_hwm.py @@ -17,6 +17,7 @@ from datetime import date from typing import Optional +import typing_extensions from pydantic import validator from pydantic.validators import strict_str_validator @@ -24,11 +25,16 @@ from etl_entities.old_hwm.column_hwm import ColumnHWM +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) @register_hwm_type("old_column_date") class DateHWM(ColumnHWM[date]): """Date HWM type .. deprecated:: 2.0.0 + Use :obj:`ColumnDateHWM ` instead Parameters ---------- diff --git a/etl_entities/old_hwm/datetime_hwm.py b/etl_entities/old_hwm/datetime_hwm.py index 9c0f9b5..bb5338f 100644 --- a/etl_entities/old_hwm/datetime_hwm.py +++ b/etl_entities/old_hwm/datetime_hwm.py @@ -17,6 +17,7 @@ from datetime import datetime from typing import Optional +import typing_extensions from pydantic import validator from pydantic.validators import strict_str_validator @@ -24,11 +25,16 @@ from etl_entities.old_hwm.column_hwm import ColumnHWM +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) @register_hwm_type("old_column_datetime") class DateTimeHWM(ColumnHWM[datetime]): """DateTime HWM type .. deprecated:: 2.0.0 + Use :obj:`ColumnDateTimeHWM ` instead Parameters ---------- diff --git a/etl_entities/old_hwm/file_hwm.py b/etl_entities/old_hwm/file_hwm.py index 91f09c2..dd1c07e 100644 --- a/etl_entities/old_hwm/file_hwm.py +++ b/etl_entities/old_hwm/file_hwm.py @@ -35,6 +35,7 @@ class FileHWM( """Basic file HWM type .. deprecated:: 2.0.0 + Use :obj:`etl_entities.hwm.file.file_hwm.FileHWM` instead Parameters ---------- diff --git a/etl_entities/old_hwm/file_list_hwm.py b/etl_entities/old_hwm/file_list_hwm.py index 25a9ce0..e46d218 100644 --- a/etl_entities/old_hwm/file_list_hwm.py +++ b/etl_entities/old_hwm/file_list_hwm.py @@ -18,6 +18,7 @@ from pathlib import PurePosixPath from typing import FrozenSet, Iterable, List +import typing_extensions from pydantic import Field, validator from etl_entities.hwm import FileListHWM as NewFileListHWM @@ -28,11 +29,16 @@ FileListType = FrozenSet[RelativePath] +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) @register_hwm_type("old_file_list") class FileListHWM(FileHWM[FileListType, List[str]]): """File List HWM type .. deprecated:: 2.0.0 + Use :obj:`etl_entities.hwm.file.file_list_hwm.FileListHWM` instead Parameters ---------- diff --git a/etl_entities/old_hwm/hwm.py b/etl_entities/old_hwm/hwm.py index 3132521..fe4cce5 100644 --- a/etl_entities/old_hwm/hwm.py +++ b/etl_entities/old_hwm/hwm.py @@ -34,6 +34,7 @@ class HWM(ABC, Entity, GenericModel, Generic[ValueType, SerializedType]): """Generic HWM type .. deprecated:: 2.0.0 + Use :obj:`etl_entities.hwm.HWM` instead Parameters ---------- diff --git a/etl_entities/old_hwm/int_hwm.py b/etl_entities/old_hwm/int_hwm.py index 9cf5911..20ba557 100644 --- a/etl_entities/old_hwm/int_hwm.py +++ b/etl_entities/old_hwm/int_hwm.py @@ -16,6 +16,7 @@ from typing import Optional +import typing_extensions from pydantic import validator from pydantic.types import StrictInt from pydantic.validators import int_validator @@ -24,11 +25,16 @@ from etl_entities.old_hwm.column_hwm import ColumnHWM +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) @register_hwm_type("old_column_int") class IntHWM(ColumnHWM[StrictInt]): """Integer HWM type .. deprecated:: 2.0.0 + Use :obj:`ColumnIntHWM ` instead Parameters ---------- diff --git a/etl_entities/plugins/__init__.py b/etl_entities/plugins/__init__.py index d60c1a2..e328110 100644 --- a/etl_entities/plugins/__init__.py +++ b/etl_entities/plugins/__init__.py @@ -1 +1,17 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from etl_entities.plugins.import_plugins import import_plugins + +__all__ = ["import_plugins"] diff --git a/etl_entities/process/__init__.py b/etl_entities/process/__init__.py index aff2c87..aef014f 100644 --- a/etl_entities/process/__init__.py +++ b/etl_entities/process/__init__.py @@ -14,3 +14,5 @@ from etl_entities.process.process import Process from etl_entities.process.process_stack_manager import ProcessStackManager + +__all__ = ["Process", "ProcessStackManager"] diff --git a/etl_entities/process/process.py b/etl_entities/process/process.py index e45d32b..c3ab129 100644 --- a/etl_entities/process/process.py +++ b/etl_entities/process/process.py @@ -20,6 +20,7 @@ from socket import getfqdn import psutil +import typing_extensions from pydantic import ConstrainedStr, Field, validator from etl_entities.entity import BaseModel, Entity @@ -33,9 +34,15 @@ class DagTaskName(ConstrainedStr): regex = re.compile("^[^.]*$") +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) class Process(BaseModel, Entity): """Process representation + .. deprecated:: 2.0.0 + Parameters ---------- name : :obj:`str`, default: current process name diff --git a/etl_entities/process/process_stack_manager.py b/etl_entities/process/process_stack_manager.py index 454a7df..3cb4fd7 100644 --- a/etl_entities/process/process_stack_manager.py +++ b/etl_entities/process/process_stack_manager.py @@ -17,13 +17,21 @@ from dataclasses import dataclass from typing import ClassVar +import typing_extensions + from etl_entities.process.process import Process +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) @dataclass class ProcessStackManager: """ Handles current stack of processes + + .. deprecated:: 2.0.0 """ default: ClassVar[Process] = Process() # noqa: WPS462 diff --git a/etl_entities/source/__init__.py b/etl_entities/source/__init__.py index 022b0a3..76a7cd2 100644 --- a/etl_entities/source/__init__.py +++ b/etl_entities/source/__init__.py @@ -14,3 +14,5 @@ from etl_entities.source.db import Column, Table from etl_entities.source.file import RemoteFolder + +__all__ = ["Column", "Table", "RemoteFolder"] diff --git a/etl_entities/source/db/column.py b/etl_entities/source/db/column.py index 3bc3681..bbe29e7 100644 --- a/etl_entities/source/db/column.py +++ b/etl_entities/source/db/column.py @@ -17,6 +17,7 @@ import re from typing import OrderedDict +import typing_extensions from pydantic import ConstrainedStr, Field, validator from etl_entities.entity import BaseModel, Entity @@ -27,9 +28,15 @@ class ColumnName(ConstrainedStr): regex = re.compile(r"^[^\|/=@#]+$") +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) class Column(BaseModel, Entity): """DB column representation + .. deprecated:: 2.0.0 + Parameters ---------- name : str diff --git a/etl_entities/source/db/table.py b/etl_entities/source/db/table.py index 41743e4..5da8613 100644 --- a/etl_entities/source/db/table.py +++ b/etl_entities/source/db/table.py @@ -17,6 +17,7 @@ import re from typing import Union +import typing_extensions from pydantic import ConstrainedStr from etl_entities.entity import BaseModel, Entity @@ -28,9 +29,15 @@ class TableDBName(ConstrainedStr): regex = re.compile("^[^@#]+$") +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) class Table(BaseModel, Entity): """DB table representation + .. deprecated:: 2.0.0 + Parameters ---------- name : str diff --git a/etl_entities/source/file/remote_folder.py b/etl_entities/source/file/remote_folder.py index 25a04ed..e906b9a 100644 --- a/etl_entities/source/file/remote_folder.py +++ b/etl_entities/source/file/remote_folder.py @@ -17,6 +17,7 @@ import os from typing import Union +import typing_extensions from pydantic import validator from etl_entities.entity import BaseModel, Entity @@ -26,9 +27,15 @@ PROHIBITED_PATH_SYMBOLS = "@#" +@typing_extensions.deprecated( + "Deprecated in v2.0, will be removed in v3.0", + category=UserWarning, +) class RemoteFolder(BaseModel, Entity): """Remote folder representation + .. deprecated:: 2.0.0 + Parameters ---------- name : :obj:`str` or :obj:`pathlib.PosixPath` diff --git a/requirements.txt b/requirements.txt index 4cff938..e6dadff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ bidict +frozendict importlib_metadata>=3.6.0 psutil pydantic<2 +typing-extensions>=4.5.0 diff --git a/setup.cfg b/setup.cfg index d26d45f..363097a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -192,7 +192,9 @@ ignore = # RST307: Error in "code" directive RST307 # Q000 Single quotes found but double quotes preferred (doesn't work in version < 3.12) - Q000 + Q000, +# WPS410 Found wrong metadata variable: __all__ + WPS410 # http://flake8.pycqa.org/en/latest/user/options.html?highlight=per-file-ignores#cmdoption-flake8-per-file-ignores per-file-ignores = diff --git a/setup.py b/setup.py index ee14bba..c9cd3a4 100644 --- a/setup.py +++ b/setup.py @@ -66,6 +66,7 @@ def parse_requirements(file: Path) -> list[str]: "CI/CD": "https://github.com/MobileTeleSystems/etl-entities/actions", "Tracker": "https://github.com/MobileTeleSystems/etl-entities/issues", }, + entry_points={"tricoder_package_spy.register": ["etl-entities=etl_entities"]}, python_requires=">=3.7", install_requires=requirements, include_package_data=True, diff --git a/tests/test_hwm/test_column_hwm.py b/tests/test_hwm/test_column_hwm.py index d0fa1e4..23ff159 100644 --- a/tests/test_hwm/test_column_hwm.py +++ b/tests/test_hwm/test_column_hwm.py @@ -216,30 +216,6 @@ def test_column_hwm_compare(hwm_class, value, delta): # noqa: WPS210 assert item2 < item1 -@pytest.mark.parametrize( # noqa: WPS210 - "hwm_class, value, delta", - [ - (ColumnDateHWM, date.today(), timedelta(days=2)), - (ColumnDateTimeHWM, datetime.now(), timedelta(seconds=2)), - (ColumnIntHWM, 1, 2), - ], -) -def test_column_hwm_covers(hwm_class, value, delta): # noqa: WPS210 - name = secrets.token_hex(8) - - empty_hwm = hwm_class(name=name) - - assert not empty_hwm.covers(value) - assert not empty_hwm.covers(value - delta) - assert not empty_hwm.covers(value + delta) - - hwm = hwm_class(name=name, value=value) - - assert hwm.covers(value) - assert hwm.covers(value - delta) - assert not hwm.covers(value + delta) - - @pytest.mark.parametrize( "hwm_class", [ diff --git a/tests/test_hwm/test_key_value_int_hwm.py b/tests/test_hwm/test_key_value_int_hwm.py new file mode 100644 index 0000000..7d375ac --- /dev/null +++ b/tests/test_hwm/test_key_value_int_hwm.py @@ -0,0 +1,233 @@ +import secrets +from datetime import datetime, timedelta + +import pytest +from frozendict import frozendict + +from etl_entities.hwm import KeyValueIntHWM + + +@pytest.mark.parametrize( + "expression, value, expected_value", + [ + ("offset", {0: 100, 1: 200}, frozendict({0: 100, 1: 200})), + ("offset", {0: "100", 1: "200"}, frozendict({0: 100, 1: 200})), + ("offset", {"key1": 100, "key2": 200}, frozendict({"key1": 100, "key2": 200})), + ("offset", {"key1": "100", "key2": "200"}, frozendict({"key1": 100, "key2": 200})), + (None, {}, frozendict()), + ], +) +def test_key_value_int_hwm_valid_input(expression, value, expected_value): + name = "key_value_int_hwm_name" + entity = "topic_name" + modified_time = datetime.now() - timedelta(days=5) + + empty_hwm = KeyValueIntHWM(name=name) + assert empty_hwm.name == name + assert empty_hwm.value == frozendict() + + minimal_hwm = KeyValueIntHWM(name=name, value=value) + assert minimal_hwm.name == name + assert minimal_hwm.value == expected_value + + hwm_with_duplicates = KeyValueIntHWM(name=name, value={**value, **value}) + assert hwm_with_duplicates.name == name + assert hwm_with_duplicates.value == expected_value + + hwm = KeyValueIntHWM( + name=name, + value=value, + description="my hwm", + entity=entity, + expression="something", + modified_time=modified_time, + ) + assert hwm.name == name + assert hwm.value == expected_value + assert hwm.description == "my hwm" + assert hwm.entity == entity + assert hwm.expression == "something" + assert hwm.modified_time == modified_time + + +@pytest.mark.parametrize( + "invalid_value", + [ + {1: 1.5}, + {1: "offset_value"}, + {"partition": "offset_value"}, + {1: None}, + None, + ], +) +def test_key_value_int_hwm_wrong_input(invalid_value): + valid_value = {0: 100, 1: 123} + name = "key_value_int_hwm_name" + + with pytest.raises(ValueError): + # missing name + KeyValueIntHWM() + + with pytest.raises(ValueError): + # missing name + KeyValueIntHWM(value=valid_value) + + with pytest.raises(ValueError): + # invalid_dict + KeyValueIntHWM(name=name, value=invalid_value) + + with pytest.raises(ValueError): + # extra fields not allowed + KeyValueIntHWM(name=name, unknown="unknown") + + +def test_key_value_int_hwm_set_value(): + name = "key_value_int_hwm_name" + value1 = {0: 10} + value2 = {0: 10, 1: 20} + value3 = {0: 10, 1: 20, 2: 30} + value = {0: 100, 1: 123} + + hwm = KeyValueIntHWM(name=name) + + hwm1 = hwm.copy() + hwm1.set_value(value) + assert hwm1.value == frozendict(value) + assert hwm1.modified_time > hwm.modified_time + + hwm2 = hwm.copy() + hwm2.set_value(value1) + assert hwm2.value == frozendict(value1) + assert hwm2.modified_time > hwm.modified_time + + hwm3 = hwm.copy() + hwm3.set_value(value2) + assert hwm3.value == frozendict(value2) + assert hwm3.modified_time > hwm.modified_time + + hwm4 = hwm.copy() + hwm4.set_value(value3) + assert hwm4.value == frozendict(value3) + assert hwm4.modified_time > hwm.modified_time + + with pytest.raises(ValueError): + # invalid dict + hwm.set_value({1: None}) + + +def test_key_value_int_hwm_frozen(): + value1 = {0: 10} + value2 = {0: 10, 1: 20} + value3 = {0: 10, 1: 20, 2: 30} + value = {0: 100, 1: 123} + name = "key_value_int_hwm_name" + modified_time = datetime.now() - timedelta(days=5) + + hwm = KeyValueIntHWM(name=name) + + for attr in ("value", "entity", "expression", "description", "modified_time"): + for item in (1, "abc", None, value1, value2, value3, value, modified_time): + with pytest.raises(TypeError): + setattr(hwm, attr, item) + + +def test_key_value_int_hwm_compare(): + name1 = secrets.token_hex() + name2 = secrets.token_hex() + entity = "topic_name" + + value1 = {0: 10} + value2 = {0: 10, 1: 20} + + hwm1 = KeyValueIntHWM(name=name1, value=value1) + hwm2 = KeyValueIntHWM(name=name2, value=value1) + hwm3 = KeyValueIntHWM(name=name1, value=value2) + hwm4 = KeyValueIntHWM(name=name2, value=value2) + + hwm5 = KeyValueIntHWM(name=name1, entity=entity) + hwm6 = KeyValueIntHWM(name=name1, entity=entity + entity) + + hwm7 = KeyValueIntHWM(name=name1, description="abc") + hwm8 = KeyValueIntHWM(name=name1, description="bcd") + + hwm9 = KeyValueIntHWM(name=name1, expression="abc") + hwm10 = KeyValueIntHWM(name=name1, expression="bcd") + + modified_time = datetime.now() - timedelta(days=5) + hwm_with_different_mtime = KeyValueIntHWM(name=name1, value=value1, modified_time=modified_time) + + # modified time is ignored when comparing + assert hwm1 == hwm_with_different_mtime + + items = (hwm1, hwm2, hwm3, hwm4, hwm5, hwm6, hwm7, hwm8, hwm9, hwm10) + + # items with different attribute values (except modified_time) are not equal + for item1 in items: + for item2 in items: + if item1 is not item2: + assert item1 != item2 + + # this was true until 2.1.x, but not anymore + for item in items: + assert item != item.value + + +def test_key_value_int_hwm_update(): + hwm = KeyValueIntHWM(name="test_hwm", value=frozendict({1: 100, 2: 150})) + original_modified_time = hwm.modified_time + + # update with new key, higher value, and lower value + hwm.update({3: 200, 2: 180, 1: 50}) + assert hwm.value == frozendict({1: 100, 2: 180, 3: 200}) + assert hwm.modified_time > original_modified_time + + # update with equal value and multiple keys + hwm.update({1: 100, 4: 300, 5: 400}) + assert hwm.value == frozendict({1: 100, 2: 180, 3: 200, 4: 300, 5: 400}) + + hwm.update({6: 500, 2: 175}) + assert hwm.value == frozendict({1: 100, 2: 180, 3: 200, 4: 300, 5: 400, 6: 500}) + + +def test_key_value_int_hwm_serialization(): + name = "key_value_int_hwm_name" + modified_time = datetime.now() - timedelta(days=5) + value = {"0": 100, "1": 123} + + hwm1 = KeyValueIntHWM( + name=name, + value=value, + entity="topic_name", + expression="some", + description="some description", + modified_time=modified_time, + ) + + expected1 = { + "type": "key_value_int", + "name": name, + "value": value, + "entity": "topic_name", + "expression": "some", + "description": "some description", + "modified_time": modified_time.isoformat(), + } + + serialized1 = hwm1.serialize() + assert expected1 == serialized1 + assert KeyValueIntHWM.deserialize(serialized1) == hwm1 + + hwm2 = KeyValueIntHWM(name=name, modified_time=modified_time) + expected2 = { + "type": "key_value_int", + "name": name, + "value": {}, + "entity": None, + "expression": None, + "description": "", + "modified_time": modified_time.isoformat(), + } + + serialized2 = hwm2.serialize() + assert serialized2 == expected2 + assert KeyValueIntHWM.deserialize(serialized2) == hwm2