Skip to content

Commit

Permalink
multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Mar 19, 2024
1 parent 7aca364 commit e062278
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 42 deletions.
4 changes: 2 additions & 2 deletions oc_ocdm/counter_handler/filesystem_counter_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ def increment_counter(self, entity_short_name: str, prov_short_name: str = "", i

def _get_info_path(self, short_name: str, supplier_prefix: str) -> str:
supplier_prefix = "" if supplier_prefix is None else supplier_prefix
directory = self.info_dir if supplier_prefix == self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
return directory + self.info_files[short_name]

def _get_prov_path(self, short_name: str, supplier_prefix: str) -> str:
supplier_prefix = "" if supplier_prefix is None else supplier_prefix
directory = self.info_dir if supplier_prefix == self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
directory = self.info_dir if supplier_prefix == self.supplier_prefix or not self.supplier_prefix else self.info_dir.replace(self.supplier_prefix, supplier_prefix, 1)
return directory + self.prov_files[short_name]

def _get_metadata_path(self, short_name: str, dataset_name: str) -> str:
Expand Down
1 change: 0 additions & 1 deletion oc_ocdm/graph/graph_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ def _add(self, graph_url: str, short_name: str, res: URIRef = None) -> Tuple[Gra
count: Optional[str] = None
label: Optional[str] = None
supplier_prefix = get_prefix(res) if res is not None else self.supplier_prefix

if res is not None:
try:
res_count: int = int(get_count(res))
Expand Down
25 changes: 11 additions & 14 deletions oc_ocdm/storer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from typing import TYPE_CHECKING
from zipfile import ZIP_DEFLATED, ZipFile

from filelock import FileLock
from rdflib import ConjunctiveGraph
from SPARQLWrapper import SPARQLWrapper

Expand Down Expand Up @@ -148,7 +147,7 @@ def _write_graph(self, graph: ConjunctiveGraph, zip_file: ZipFile, cur_file_path
else:
graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")

def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> List[str]:
def store_all(self, base_dir: str, base_iri: str, context_path: str = None, process_id: int|str = None) -> List[str]:
self.repok.new_article()
self.reperr.new_article()

Expand All @@ -160,7 +159,7 @@ def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> L
if self.modified_entities is not None and entity.res not in self.modified_entities:
is_relevant = False
if is_relevant:
cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri)
cur_dir_path, cur_file_path = self._dir_and_file_paths(entity.res, base_dir, base_iri, process_id)
if not os.path.exists(cur_dir_path):
os.makedirs(cur_dir_path)
relevant_paths.setdefault(cur_file_path, list())
Expand All @@ -170,15 +169,13 @@ def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> L
stored_g = None
# Here we try to obtain a reference to the currently stored graph
output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
lock = FileLock(f"{output_filepath}.lock")
with lock:
if os.path.exists(output_filepath):
stored_g = Reader(context_map=self.context_map).load(output_filepath)
if stored_g is None:
stored_g = ConjunctiveGraph()
for entity_in_path in entities_in_path:
self.store(entity_in_path, stored_g, relevant_path, context_path, False)
self._store_in_file(stored_g, relevant_path, context_path)
if os.path.exists(output_filepath):
stored_g = Reader(context_map=self.context_map).load(output_filepath)
if stored_g is None:
stored_g = ConjunctiveGraph()
for entity_in_path in entities_in_path:
self.store(entity_in_path, stored_g, relevant_path, context_path, False)
self._store_in_file(stored_g, relevant_path, context_path)

return list(relevant_paths.keys())

Expand Down Expand Up @@ -239,9 +236,9 @@ def upload_and_store(self, base_dir: str, triplestore_url: str, base_iri: str, c
else: # All the files have been stored
self.upload_all(triplestore_url, base_dir, batch_size)

def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str) -> Tuple[str, str]:
def _dir_and_file_paths(self, res: URIRef, base_dir: str, base_iri: str, process_id: int|str = None) -> Tuple[str, str]:
is_json: bool = (self.output_format == "json-ld")
return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json)
return find_paths(res, base_dir, base_iri, self.default_dir, self.dir_split, self.n_file_item, is_json=is_json, process_id=process_id)

@staticmethod
def _class_to_entity_type(entity: AbstractEntity) -> Optional[str]:
Expand Down
15 changes: 8 additions & 7 deletions oc_ocdm/support/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,21 @@ def find_local_line_id(res: URIRef, n_file_item: int = 1) -> int:


def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_split: int,
n_file_item: int, is_json: bool = True) -> Tuple[str, str]:
n_file_item: int, is_json: bool = True, process_id: int|str = None) -> Tuple[str, str]:
"""
This function is responsible for looking for the correct JSON file that contains the data related to the
resource identified by the variable 'string_iri'. This search takes into account the organisation in
directories and files, as well as the particular supplier prefix for bibliographic entities, if specified.
In case no supplier prefix is specified, the 'default_dir' (usually set to "_") is used instead.
"""
string_iri: str = str(res)
process_id_str: str = f"_{process_id}" if process_id else ""

if is_dataset(res):
cur_dir_path: str = (base_dir + re.sub(r"^%s(.*)$" % base_iri, r"\1", string_iri))[:-1]
# In case of dataset, the file path is different from regular files, e.g.
# /corpus/br/index.json
cur_file_path: str = cur_dir_path + os.sep + "index.json"
cur_file_path: str = cur_dir_path + os.sep + "index" + process_id_str + ".json"
else:
cur_number: int = get_resource_number(res)

Expand Down Expand Up @@ -307,7 +308,7 @@ def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_

cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
os.sep + str(cur_split) + os.sep + str(cur_file_split) + os.sep + "prov"
cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension
cur_file_path: str = cur_dir_path + os.sep + short_name + process_id_str + file_extension
else: # regular bibliographic entity
short_name: str = get_short_name(res)
sub_folder: str = get_prefix(res)
Expand All @@ -318,7 +319,7 @@ def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_
sub_folder = "_" # enforce default value

cur_dir_path: str = base_dir + short_name + os.sep + sub_folder + os.sep + str(cur_split)
cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension
cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
# Enter here if no split is needed
elif dir_split == 0:
if "/prov/" in string_iri:
Expand All @@ -333,7 +334,7 @@ def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_

cur_dir_path: str = base_dir + subj_short_name + os.sep + sub_folder + \
os.sep + str(cur_file_split) + os.sep + "prov"
cur_file_path: str = cur_dir_path + os.sep + short_name + file_extension
cur_file_path: str = cur_dir_path + os.sep + short_name + process_id_str + file_extension
else:
short_name: str = get_short_name(res)
sub_folder: str = get_prefix(res)
Expand All @@ -344,7 +345,7 @@ def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_
sub_folder = "_" # enforce default value

cur_dir_path: str = base_dir + short_name + os.sep + sub_folder
cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + file_extension
cur_file_path: str = cur_dir_path + os.sep + str(cur_file_split) + process_id_str + file_extension
# Enter here if the data is about a provenance agent, e.g. /corpus/prov/
else:
short_name: str = get_short_name(res)
Expand All @@ -353,7 +354,7 @@ def find_paths(res: URIRef, base_dir: str, base_iri: str, default_dir: str, dir_
file_extension: str = '.json' if is_json else '.nq'

cur_dir_path: str = base_dir + short_name
cur_file_path: str = cur_dir_path + os.sep + prefix + count + file_extension
cur_file_path: str = cur_dir_path + os.sep + prefix + count + process_id_str + file_extension

return cur_dir_path, cur_file_path

Expand Down
45 changes: 28 additions & 17 deletions oc_ocdm/test/storer/test_storer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from oc_ocdm.storer import Storer
from oc_ocdm.reader import Reader

from shutil import rmtree

class TestStorer(unittest.TestCase):
def setUp(self):
Expand All @@ -37,14 +38,18 @@ def setUp(self):
self.graph_set = GraphSet(self.base_iri, "", "060", False)
self.prov_set = ProvSet(self.graph_set, self.base_iri, "", False)
self.br = self.graph_set.add_br(self.resp_agent)
self.info_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance", "info_dir")
self.data_dir = os.path.join("oc_ocdm", "test", "storer", "data")
self.prov_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance")
self.info_dir = os.path.join(self.prov_dir, "info_dir")

# def tearDown(self):
# rmtree(os.path.join("oc_ocdm", "test", "storer", "data"))
def tearDown(self):
if os.path.exists(self.data_dir):
rmtree(self.data_dir)
if os.path.exists(self.prov_dir):
rmtree(os.path.join(self.prov_dir))

def test_store_graphs_in_file(self):
base_dir = os.path.join("oc_ocdm", "test", "storer", "data", "rdf") + os.sep
is_unix = system() != "Windows"
with self.subTest("output_format=json-ld, zip_output=True"):
modified_entities = self.prov_set.generate_provenance()
prov_storer = Storer(self.prov_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=True)
Expand All @@ -55,7 +60,6 @@ def test_store_graphs_in_file(self):
with ZipFile(os.path.join(base_dir, "br", "060", "10000", "1000.zip"), mode="r") as archive:
with archive.open("1000.json") as f:
data = json.load(f)
print(data)
self.assertEqual(data, [{'@graph': [{'@id': 'http://test/br/0601', '@type': ['http://purl.org/spar/fabio/Expression']}], '@id': 'http://test/br/'}])
with ZipFile(os.path.join(base_dir, "br", "060", "10000", "1000", "prov", "se.zip"), mode="r") as archive:
with archive.open("se.json") as f:
Expand All @@ -66,9 +70,6 @@ def test_store_graphs_in_file(self):
'http://purl.org/dc/terms/description': [{'@value': "The entity 'http://test/br/0601' has been created."}],
'http://www.w3.org/ns/prov#specializationOf': [{'@id': 'http://test/br/0601'}],
'http://www.w3.org/ns/prov#wasAttributedTo': [{'@id': 'http://resp_agent.test/'}]}], '@id': 'http://test/br/0601/prov/'}])
if is_unix:
self.assertTrue(os.path.exists(os.path.join(base_dir, "br", "060", "10000", "1000.zip.lock")))
self.assertTrue(os.path.exists(os.path.join(base_dir, "br", "060", "10000", "1000", "prov", "se.zip.lock")))
with self.subTest("output_format=json-ld, zip_output=False"):
base_dir_1 = os.path.join("oc_ocdm", "test", "storer", "data", "rdf_1") + os.sep
storer = Storer(self.graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
Expand All @@ -88,9 +89,6 @@ def test_store_graphs_in_file(self):
'http://purl.org/dc/terms/description': [{'@value': "The entity 'http://test/br/0601' has been created."}],
'http://www.w3.org/ns/prov#specializationOf': [{'@id': 'http://test/br/0601'}],
'http://www.w3.org/ns/prov#wasAttributedTo': [{'@id': 'http://resp_agent.test/'}]}], '@id': 'http://test/br/0601/prov/'}])
if is_unix:
self.assertTrue(os.path.exists(os.path.join(base_dir_1, "br", "060", "10000", "1000.json.lock")))
self.assertTrue(os.path.exists(os.path.join(base_dir_1, "br", "060", "10000", "1000", "prov", "se.json.lock")))
with self.subTest("output_format=nquads, zip_output=True"):
base_dir_2 = os.path.join("oc_ocdm", "test", "storer", "data", "rdf_2") + os.sep
storer = Storer(self.graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='nquads', zip_output=True)
Expand Down Expand Up @@ -119,9 +117,6 @@ def test_store_graphs_in_file(self):
if p == URIRef("http://www.w3.org/ns/prov#generatedAtTime"):
data_g.remove((s, p, o, c))
self.assertTrue(compare.isomorphic(data_g, expected_data_g))
if is_unix:
self.assertTrue(os.path.exists(os.path.join(base_dir_2, "br", "060", "10000", "1000.zip.lock")))
self.assertTrue(os.path.exists(os.path.join(base_dir_2, "br", "060", "10000", "1000", "prov", "se.zip.lock")))
with self.subTest("output_format=nquads, zip_output=False"):
base_dir_3 = os.path.join("oc_ocdm", "test", "storer", "data", "rdf_3") + os.sep
storer = Storer(self.graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='nquads', zip_output=False)
Expand All @@ -146,9 +141,25 @@ def test_store_graphs_in_file(self):
prov_unzipped.remove((s, p, o, c))
self.assertEqual(data_unzipped, "<http://test/br/0601> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://purl.org/spar/fabio/Expression> <http://test/br/> .\n\n")
self.assertTrue(compare.isomorphic(prov_unzipped, expected_prov_unzipped))
if is_unix:
self.assertTrue(os.path.exists(os.path.join(base_dir_3, "br", "060", "10000", "1000.nt.lock")))
self.assertTrue(os.path.exists(os.path.join(base_dir_3, "br", "060", "10000", "1000", "prov", "se.nq.lock")))

def test_store_graphs_in_file_multiprocessing(self):
base_dir = os.path.join("oc_ocdm", "test", "storer", "data", "multiprocessing") + os.sep
storer = Storer(self.graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
self.prov_set.generate_provenance()
prov_storer = Storer(self.prov_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
storer.store_all(base_dir, self.base_iri, process_id=7)
prov_storer.store_all(base_dir, self.base_iri, process_id=7)
with open(os.path.join(base_dir, "br", "060", "10000", "1000_7.json")) as f:
data = json.load(f)
self.assertEqual(data, [{'@graph': [{'@id': 'http://test/br/0601', '@type': ['http://purl.org/spar/fabio/Expression']}], '@id': 'http://test/br/'}])
with open(os.path.join(base_dir, "br", "060", "10000", "1000", "prov", "se_7.json")) as f:
data = [{g:[{k:v for k,v in datum.items() if k != "http://www.w3.org/ns/prov#generatedAtTime"} for datum in data] if g == "@graph" else data for g, data in graph.items()} for graph in json.load(f)]
self.assertEqual(data, [{'@graph': [{
'@id': 'http://test/br/0601/prov/se/1',
'@type': ['http://www.w3.org/ns/prov#Entity'],
'http://purl.org/dc/terms/description': [{'@value': "The entity 'http://test/br/0601' has been created."}],
'http://www.w3.org/ns/prov#specializationOf': [{'@id': 'http://test/br/0601'}],
'http://www.w3.org/ns/prov#wasAttributedTo': [{'@id': 'http://resp_agent.test/'}]}], '@id': 'http://test/br/0601/prov/'}])

def test_provenance(self):
ts = SPARQLWrapper(self.ts)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "oc_ocdm"
version = "7.3.5"
version = "8.0.0"
description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel."
authors = [
"Silvio Peroni <essepuntato@gmail.com>",
Expand Down

0 comments on commit e062278

Please sign in to comment.