Skip to content

Commit

Permalink
batch_update_counters
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Sep 28, 2024
1 parent 743d7f2 commit ef255db
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 4 deletions.
40 changes: 38 additions & 2 deletions oc_ocdm/counter_handler/redis_counter_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
# SOFTWARE.

from typing import Optional, Union
from typing import Dict, Optional, Tuple, Union

import redis
from tqdm import tqdm

from oc_ocdm.counter_handler.counter_handler import CounterHandler


Expand Down Expand Up @@ -185,4 +187,38 @@ def _get_key(self, entity_short_name: str, prov_short_name: str = "", identifier
if prov_short_name:
key_parts.append(str(identifier))
key_parts.append(prov_short_name)
return ':'.join(filter(None, key_parts))
return ':'.join(filter(None, key_parts))

def batch_update_counters(self, updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]) -> None:
"""
Perform batch updates of counters, processing 1 million at a time with a progress bar.
:param updates: A dictionary structure containing the updates.
The structure is as follows:
{
supplier_prefix: {
(short_name, prov_short_name): {
identifier: counter_value
}
}
}
:type updates: Dict[str, Dict[Tuple[str, str], Dict[int, int]]]
"""
all_updates = []
for supplier_prefix, value in updates.items():
for (short_name, prov_short_name), counters in value.items():
for identifier, counter_value in counters.items():
key = self._get_key(short_name, prov_short_name, identifier, supplier_prefix)
all_updates.append((key, counter_value))

total_updates = len(all_updates)
batch_size = 1_000_000

with tqdm(total=total_updates, desc="Updating counters") as pbar:
for i in range(0, total_updates, batch_size):
batch = all_updates[i:i+batch_size]
pipeline = self.redis.pipeline()
for key, value in batch:
pipeline.set(key, value)
pipeline.execute()
pbar.update(len(batch))
22 changes: 21 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "oc_ocdm"
version = "9.0.1"
version = "9.1.0"
description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel."
authors = [
"Silvio Peroni <essepuntato@gmail.com>",
Expand Down Expand Up @@ -37,6 +37,7 @@ filelock = "^3.6.0"
pyshacl = "0.23.0"
setuptools = "^68.2.2"
redis = "^4.5.5"
tqdm = "^4.66.5"

[tool.poetry.dev-dependencies]
Sphinx = "^4.4.0"
Expand Down

0 comments on commit ef255db

Please sign in to comment.