Skip to content

Commit

Permalink
Dynamic supplier prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Mar 17, 2024
1 parent 91c35ad commit 1bdfa3e
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 57 deletions.
6 changes: 3 additions & 3 deletions oc_ocdm/counter_handler/counter_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CounterHandler(ABC):

@abstractmethod
def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
identifier: int = 1) -> None:
identifier: int = 1, supplier_prefix: str = "") -> None:
"""
Method signature for concrete implementations that allow setting the counter value
of graph and provenance entities.
Expand All @@ -43,7 +43,7 @@ def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: s
raise NotImplementedError

@abstractmethod
def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1) -> int:
def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
"""
Method signature for concrete implementations that allow reading the counter value
of graph and provenance entities.
Expand All @@ -63,7 +63,7 @@ def read_counter(self, entity_short_name: str, prov_short_name: str = "", identi
raise NotImplementedError

@abstractmethod
def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1) -> int:
def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
"""
Method signature for concrete implementations that allow incrementing by one unit
the counter value of graph and provenance entities.
Expand Down
31 changes: 17 additions & 14 deletions oc_ocdm/counter_handler/filesystem_counter_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class FilesystemCounterHandler(CounterHandler):
_initial_line_len: int = 3
_trailing_char: str = " "

def __init__(self, info_dir: str) -> None:
def __init__(self, info_dir: str, supplier_prefix: str = "") -> None:
"""
Constructor of the ``FilesystemCounterHandler`` class.
Expand All @@ -49,6 +49,7 @@ def __init__(self, info_dir: str) -> None:
info_dir += os.sep

self.info_dir: str = info_dir
self.supplier_prefix: str = supplier_prefix
self.datasets_dir: str = info_dir + 'datasets' + os.sep
self.short_names: List[str] = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"]
self.metadata_short_names: List[str] = ["di"]
Expand All @@ -58,7 +59,7 @@ def __init__(self, info_dir: str) -> None:
for key in self.short_names}

def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
identifier: int = 1) -> None:
identifier: int = 1, supplier_prefix: str = "") -> None:
"""
It allows to set the counter value of graph and provenance entities.
Expand All @@ -80,12 +81,12 @@ def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: s
raise ValueError("new_value must be a non negative integer!")

if prov_short_name == "se":
file_path: str = self._get_prov_path(entity_short_name)
file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
else:
file_path: str = self._get_info_path(entity_short_name)
file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
self._set_number(new_value, file_path, identifier)

def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1) -> int:
def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
"""
It allows to read the counter value of graph and provenance entities.
Expand All @@ -102,12 +103,12 @@ def read_counter(self, entity_short_name: str, prov_short_name: str = "", identi
:return: The requested counter value.
"""
if prov_short_name == "se":
file_path: str = self._get_prov_path(entity_short_name)
file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
else:
file_path: str = self._get_info_path(entity_short_name)
file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
return self._read_number(file_path, identifier)[0]

def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1) -> int:
def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
"""
It allows to increment the counter value of graph and provenance entities by one unit.
Expand All @@ -124,16 +125,18 @@ def increment_counter(self, entity_short_name: str, prov_short_name: str = "", i
:return: The newly-updated (already incremented) counter value.
"""
if prov_short_name == "se":
file_path: str = self._get_prov_path(entity_short_name)
file_path: str = self._get_prov_path(entity_short_name, supplier_prefix)
else:
file_path: str = self._get_info_path(entity_short_name)
file_path: str = self._get_info_path(entity_short_name, supplier_prefix)
return self._add_number(file_path, identifier)

def _get_info_path(self, short_name: str) -> str:
return self.info_dir + self.info_files[short_name]
def _get_info_path(self, short_name: str, supplier_prefix: str) -> str:
directory = self.info_dir if supplier_prefix == self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
return directory + self.info_files[short_name]

def _get_prov_path(self, short_name: str) -> str:
return self.info_dir + self.prov_files[short_name]
def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str:
directory = self.info_dir if supplier_prefix == self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
return directory + self.prov_files[short_name]

def _get_metadata_path(self, short_name: str, dataset_name: str) -> str:
return self.datasets_dir + dataset_name + os.sep + 'metadata_' + short_name + '.txt'
Expand Down
6 changes: 3 additions & 3 deletions oc_ocdm/counter_handler/in_memory_counter_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self) -> None:
self.metadata_counters: Dict[str, Dict[str, int]] = {}

def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: str = "",
identifier: int = 1) -> None:
identifier: int = 1, supplier_prefix: str = "") -> None:
"""
It allows to set the counter value of graph and provenance entities.
Expand Down Expand Up @@ -82,7 +82,7 @@ def set_counter(self, new_value: int, entity_short_name: str, prov_short_name: s
# It's an entity!
self.entity_counters[entity_short_name] = new_value

def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1) -> int:
def read_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
"""
It allows to read the counter value of graph and provenance entities.
Expand Down Expand Up @@ -119,7 +119,7 @@ def read_counter(self, entity_short_name: str, prov_short_name: str = "", identi
# It's an entity!
return self.entity_counters[entity_short_name]

def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1) -> int:
def increment_counter(self, entity_short_name: str, prov_short_name: str = "", identifier: int = 1, supplier_prefix: str = "") -> int:
"""
It allows to increment the counter value of graph and provenance entities by one unit.
Expand Down
12 changes: 7 additions & 5 deletions oc_ocdm/graph/graph_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from oc_ocdm.abstract_set import AbstractSet
from oc_ocdm.reader import Reader
from oc_ocdm.support.support import get_count, get_short_name
from oc_ocdm.support.support import get_count, get_short_name, get_prefix

if TYPE_CHECKING:
from typing import Dict, ClassVar, Tuple, Optional, List, Set
Expand Down Expand Up @@ -77,6 +77,7 @@ def __init__(self, base_iri: str, info_dir: str = "", supplier_prefix: str = "",
# The following variable maps a URIRef with the related graph entity
self.res_to_entity: Dict[URIRef, GraphEntity] = {}
self.base_iri: str = base_iri
self.info_dir: str = info_dir
self.supplier_prefix: str = supplier_prefix
self.wanted_label: bool = wanted_label
# Graphs
Expand All @@ -96,7 +97,7 @@ def __init__(self, base_iri: str, info_dir: str = "", supplier_prefix: str = "",
self.g_rp: str = base_iri + "rp/"

if info_dir is not None and info_dir != "":
self.counter_handler: CounterHandler = FilesystemCounterHandler(info_dir)
self.counter_handler: CounterHandler = FilesystemCounterHandler(info_dir, supplier_prefix)
else:
self.counter_handler: CounterHandler = InMemoryCounterHandler()

Expand Down Expand Up @@ -232,17 +233,18 @@ def _add(self, graph_url: str, short_name: str, res: URIRef = None) -> Tuple[Gra

count: Optional[str] = None
label: Optional[str] = None
supplier_prefix = get_prefix(res) if res is not None else self.supplier_prefix

if res is not None:
try:
res_count: int = int(get_count(res))
except ValueError:
res_count: int = -1
if res_count > self.counter_handler.read_counter(short_name):
self.counter_handler.set_counter(res_count, short_name)
if res_count > self.counter_handler.read_counter(short_name, supplier_prefix=supplier_prefix):
self.counter_handler.set_counter(res_count, short_name, supplier_prefix=supplier_prefix)
return cur_g, count, label

count = self.supplier_prefix + str(self.counter_handler.increment_counter(short_name))
count = supplier_prefix + str(self.counter_handler.increment_counter(short_name, supplier_prefix=supplier_prefix))

if self.wanted_label:
label = "%s %s [%s/%s]" % (self.labels[short_name], count, short_name, count)
Expand Down
62 changes: 31 additions & 31 deletions oc_ocdm/prov/prov_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ProvSet(AbstractSet):
}

def __init__(self, prov_subj_graph_set: GraphSet, base_iri: str, info_dir: str = "",
wanted_label: bool = True, custom_counters : dict = dict()) -> None:
wanted_label: bool = True, custom_counters : dict = dict(), supplier_prefix: str = "") -> None:
super(ProvSet, self).__init__()
self.prov_g: GraphSet = prov_subj_graph_set
# The following variable maps a URIRef with the related provenance entity
Expand All @@ -59,10 +59,11 @@ def __init__(self, prov_subj_graph_set: GraphSet, base_iri: str, info_dir: str =
short_names = ["an", "ar", "be", "br", "ci", "de", "id", "pl", "ra", "re", "rp"]
self.counter_handlers : Dict[str, CounterHandler] = dict()
self.custom_counters = custom_counters
self.supplier_prefix = supplier_prefix
if info_dir is not None and info_dir != "":
for short_name in short_names:
if short_name not in custom_counters:
self.counter_handlers[short_name] = FilesystemCounterHandler(info_dir)
self.counter_handlers[short_name] = FilesystemCounterHandler(info_dir, supplier_prefix=supplier_prefix)
else:
self.counter_handlers[short_name] = custom_counters[short_name]
else:
Expand All @@ -82,7 +83,8 @@ def add_se(self, prov_subject: GraphEntity, res: URIRef = None) -> SnapshotEntit
if res is not None and res in self.res_to_entity:
return self.res_to_entity[res]
g_prov: str = str(prov_subject) + "/prov/"
cur_g, count, label = self._add_prov(g_prov, "se", prov_subject, res)
supplier_prefix = get_prefix(res) if res is not None else self.supplier_prefix
cur_g, count, label = self._add_prov(g_prov, "se", prov_subject, res, supplier_prefix)
return SnapshotEntity(prov_subject, cur_g, self, res, prov_subject.resp_agent,
prov_subject.source, ProvEntity.iri_entity, count, label, "se")

Expand Down Expand Up @@ -210,28 +212,27 @@ def generate_provenance(self, c_time: float = None) -> set:
modified_entities.add(cur_subj.res)
return modified_entities

def _fix_info_dir(self, prov_subject: URIRef) -> None:
short_name = get_short_name(prov_subject)
if not short_name or self.info_dir is None or self.info_dir == "":
return
if not isinstance(self.counter_handlers[short_name], FilesystemCounterHandler):
return
if has_supplier_prefix(prov_subject, self.base_iri):
supplier_prefix = get_prefix(prov_subject)
info_dir_folders = os.path.normpath(self.info_dir).split(os.sep)
info_dir_prefix = [
folder for folder in info_dir_folders
if folder.startswith('0') and folder.endswith('0') and folder.isdigit() and len(folder) > 2]
if info_dir_prefix:
info_dir_prefix = info_dir_prefix[-1]
if supplier_prefix != info_dir_prefix:
new_info_dir = os.sep.join([folder if folder != info_dir_prefix else supplier_prefix for folder in info_dir_folders])
self.info_dir = new_info_dir
self.counter_handlers[short_name]: CounterHandler = FilesystemCounterHandler(new_info_dir)
# def _fix_info_dir(self, prov_subject: URIRef) -> None:
# short_name = get_short_name(prov_subject)
# if not short_name or self.info_dir is None or self.info_dir == "":
# return
# if not isinstance(self.counter_handlers[short_name], FilesystemCounterHandler):
# return
# if has_supplier_prefix(prov_subject, self.base_iri):
# supplier_prefix = get_prefix(prov_subject)
# info_dir_folders = os.path.normpath(self.info_dir).split(os.sep)
# info_dir_prefix = [
# folder for folder in info_dir_folders
# if folder.startswith('0') and folder.endswith('0') and folder.isdigit() and len(folder) > 2]
# if info_dir_prefix:
# info_dir_prefix = info_dir_prefix[-1]
# if supplier_prefix != info_dir_prefix:
# new_info_dir = os.sep.join([folder if folder != info_dir_prefix else supplier_prefix for folder in info_dir_folders])
# self.info_dir = new_info_dir
# self.counter_handlers[short_name]: CounterHandler = FilesystemCounterHandler(new_info_dir)

def _add_prov(self, graph_url: str, short_name: str, prov_subject: GraphEntity,
res: URIRef = None) -> Tuple[Graph, Optional[str], Optional[str]]:
self._fix_info_dir(prov_subject.res)
res: URIRef = None, supplier_prefix: str = "") -> Tuple[Graph, Optional[str], Optional[str]]:
cur_g: Graph = Graph(identifier=graph_url)
self._set_ns(cur_g)

Expand All @@ -244,19 +245,19 @@ def _add_prov(self, graph_url: str, short_name: str, prov_subject: GraphEntity,
except ValueError:
res_count: int = -1
if isinstance(self.counter_handlers[prov_subject.short_name], SqliteCounterHandler):
cur_count: str = self.counter_handlers[prov_subject.short_name].read_counter(prov_subject)
cur_count: str = self.counter_handlers[prov_subject.short_name].read_counter(prov_subject, supplier_prefix=supplier_prefix)
else:
cur_count: str = self.counter_handlers[prov_subject.short_name].read_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)))
cur_count: str = self.counter_handlers[prov_subject.short_name].read_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)
if res_count > cur_count:
if isinstance(self.counter_handlers[prov_subject.short_name], SqliteCounterHandler):
self.counter_handlers[prov_subject.short_name].set_counter(int(get_count(prov_subject.res)), prov_subject)
self.counter_handlers[prov_subject.short_name].set_counter(int(get_count(prov_subject.res)), prov_subject, supplier_prefix=supplier_prefix)
else:
self.counter_handlers[prov_subject.short_name].set_counter(res_count, prov_subject.short_name, "se", int(get_count(prov_subject.res)))
self.counter_handlers[prov_subject.short_name].set_counter(res_count, prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix)
return cur_g, count, label
if isinstance(self.counter_handlers[prov_subject.short_name], SqliteCounterHandler):
count = str(self.counter_handlers[prov_subject.short_name].increment_counter(prov_subject))
count = str(self.counter_handlers[prov_subject.short_name].increment_counter(prov_subject, supplier_prefix=supplier_prefix))
else:
count = str(self.counter_handlers[prov_subject.short_name].increment_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res))))
count = str(self.counter_handlers[prov_subject.short_name].increment_counter(prov_subject.short_name, "se", int(get_count(prov_subject.res)), supplier_prefix=supplier_prefix))
if self.wanted_label:
cur_short_name = prov_subject.short_name
cur_entity_count = get_count(prov_subject.res)
Expand All @@ -276,11 +277,10 @@ def _set_ns(g: Graph) -> None:
g.namespace_manager.bind("prov", ProvEntity.PROV)

def _retrieve_last_snapshot(self, prov_subject: URIRef) -> Optional[URIRef]:
self._fix_info_dir(prov_subject)
subj_short_name: str = get_short_name(prov_subject)
subj_count: str = get_count(prov_subject)
if subj_short_name not in self.custom_counters:
try:
subj_count: str = get_count(prov_subject)
if int(subj_count) <= 0:
raise ValueError('prov_subject is not a valid URIRef. Extracted count value should be a positive '
'non-zero integer number!')
Expand Down
2 changes: 1 addition & 1 deletion oc_ocdm/test/prov/test_prov_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestProvSet(unittest.TestCase):

def setUp(self):
self.graph_set = GraphSet("http://test/", "./info_dir/", "", False)
self.prov_set = ProvSet(self.graph_set, "http://test/", "./info_dir/", False, custom_counters={'ci': SqliteCounterHandler('oc_ocdm/test/prov/prov_counter.db')})
self.prov_set = ProvSet(self.graph_set, "http://test/", "./info_dir/", False, custom_counters={'ci': SqliteCounterHandler('oc_ocdm/test/prov/prov_counter.db')}, supplier_prefix="")

def test_add_se(self):
prov_subj = self.graph_set.add_br(self.resp_agent)
Expand Down

0 comments on commit 1bdfa3e

Please sign in to comment.