diff --git a/oc_ocdm/counter_handler/redis_counter_handler.py b/oc_ocdm/counter_handler/redis_counter_handler.py index 2bc59b7..5e24fe4 100644 --- a/oc_ocdm/counter_handler/redis_counter_handler.py +++ b/oc_ocdm/counter_handler/redis_counter_handler.py @@ -14,9 +14,11 @@ # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS # SOFTWARE. -from typing import Optional, Union +from typing import Dict, Optional, Tuple, Union import redis +from tqdm import tqdm + from oc_ocdm.counter_handler.counter_handler import CounterHandler @@ -185,4 +187,38 @@ def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier if prov_short_name: key_parts.append(str(identifier)) key_parts.append(prov_short_name) - return ':'.join(filter(None, key_parts)) \ No newline at end of file + return ':'.join(filter(None, key_parts)) + + def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None: + """ + Perform batch updates of counters, processing 1 million at a time with a progress bar. + + :param updates: A dictionary structure containing the updates. + The structure is as follows: + { + supplier_prefix: { + (short_name, prov_short_name): { + identifier: counter_value + } + } + } + :type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]] + """ + all_updates = [] + for supplier_prefix, value in updates.items(): + for (short_name, prov_short_name), counters in value.items(): + for identifier, counter_value in counters.items(): + key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix) + all_updates.append((key, counter_value)) + + total_updates = len(all_updates) + batch_size = 1_000_000 + + with tqdm(total=total_updates, desc="Updating counters") as pbar: + for i in range(0, total_updates, batch_size): + batch = all_updates[i:i+batch_size] + pipeline = self.redis.pipeline() + for key, value in batch: + pipeline.set(key, value) + pipeline.execute() + pbar.update(len(batch)) \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 16254cc..ed1a259 100644 --- a/poetry.lock +++ b/poetry.lock @@ -682,6 +682,26 @@ files = [ lint = ["docutils-stubs", "flake8", "mypy"] test = ["pytest"] +[[package]] +name = "tqdm" +version = "4.66.5" +description = "Fast, Extensible Progress Meter" +optional = false +python-versions = ">=3.7" +files = [ + {file = "tqdm-4.66.5-py3-none-any.whl", hash = "sha256:90279a3770753eafc9194a0364852159802111925aa30eb3f9d85b0e805ac7cd"}, + {file = "tqdm-4.66.5.tar.gz", hash = "sha256:e1020aef2e5096702d8a025ac7d16b1577279c9d63f8375b63083e9a5f0fcbad"}, +] + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + +[package.extras] +dev = ["pytest (>=6)", "pytest-cov", "pytest-timeout", "pytest-xdist"] +notebook = ["ipywidgets (>=6)"] +slack = ["slack-sdk"] +telegram = ["requests"] + [[package]] name = "urllib3" version = "1.26.8" @@ -748,4 +768,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>= [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "255e87d31acb8cc579498892851bdce33aa1bc6b66723afda52d24c0f3341ad0" +content-hash = "73e0208f8e69fe7af978f61118c0818112c173cdce37308f363a786b72de532f" diff --git a/pyproject.toml b/pyproject.toml index ca150bb..c055ac4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "oc_ocdm" -version = "9.0.1" +version = "9.1.0" description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel." authors = [ "Silvio Peroni ", @@ -37,6 +37,7 @@ filelock = "^3.6.0" pyshacl = "0.23.0" setuptools = "^68.2.2" redis = "^4.5.5" +tqdm = "^4.66.5" [tool.poetry.dev-dependencies] Sphinx = "^4.4.0"