From 7fa646c6edf743a1d0bed284d4aeab58fcc41ff2 Mon Sep 17 00:00:00 2001 From: arcangelo7 Date: Thu, 14 Mar 2024 14:35:31 +0100 Subject: [PATCH] active lock between reader and storer --- oc_ocdm/reader.py | 71 +++++++++++++------------- oc_ocdm/storer.py | 81 ++++++++++++++++-------------- oc_ocdm/test/reader/test_reader.py | 6 ++- oc_ocdm/test/storer/test_storer.py | 49 ++++++++++++++++-- pyproject.toml | 2 +- 5 files changed, 131 insertions(+), 78 deletions(-) diff --git a/oc_ocdm/reader.py b/oc_ocdm/reader.py index b88e206..d5001ff 100644 --- a/oc_ocdm/reader.py +++ b/oc_ocdm/reader.py @@ -21,7 +21,6 @@ from zipfile import ZipFile import rdflib -from filelock import FileLock from rdflib import RDF, ConjunctiveGraph, Graph, URIRef from SPARQLWrapper import XML, SPARQLWrapper @@ -81,45 +80,47 @@ def load(self, rdf_file_path: str) -> Optional[ConjunctiveGraph]: return loaded_graph def _load_graph(self, file_path: str) -> ConjunctiveGraph: - formats: List[str] = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"] + formats = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"] + loaded_graph = ConjunctiveGraph() - loaded_graph: ConjunctiveGraph = ConjunctiveGraph() + if file_path.endswith('.zip'): + try: + with ZipFile(file=file_path, mode="r") as archive: + for zf_name in archive.namelist(): + with archive.open(zf_name) as f: + if self._try_parse(loaded_graph, f, formats): + return loaded_graph + except Exception as e: + raise IOError(f"Error opening or reading zip file '{file_path}': {e}") + else: + try: + with open(file_path, 'rt', encoding='utf-8') as f: + if self._try_parse(loaded_graph, f, formats): + return loaded_graph + except Exception as e: + raise IOError(f"Error opening or reading file '{file_path}': {e}") + + raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.") - errors: str = "" + def _try_parse(self, graph: ConjunctiveGraph, file_obj, formats: List[str]) -> bool: for cur_format in formats: + file_obj.seek(0) # Reset file pointer to the beginning for each new attempt try: - lock = FileLock(f"{file_path}.lock") - with lock: - if file_path.endswith('.zip'): - with ZipFile(file=file_path, mode="r") as archive: - for zf_name in archive.namelist(): - f = archive.open(zf_name) - else: - f = open(file_path, 'rt', encoding='utf-8') - if cur_format == "json-ld": - json_ld_file: Any = json.load(f) - if isinstance(json_ld_file, dict): - json_ld_file: List[Any] = [json_ld_file] - - for json_ld_resource in json_ld_file: - # Trick to force the use of a pre-loaded context if the format - # specified is JSON-LD - if "@context" in json_ld_resource: - cur_context: str = json_ld_resource["@context"] - if cur_context in self.context_map: - context_json: Any = self.context_map[cur_context]["@context"] - json_ld_resource["@context"] = context_json - - loaded_graph.parse(data=json.dumps(json_ld_resource, ensure_ascii=False), - format=cur_format) - else: - loaded_graph.parse(file=f, format=cur_format) - f.close() - return loaded_graph + if cur_format == "json-ld": + json_ld_file = json.load(file_obj) + if isinstance(json_ld_file, dict): + json_ld_file = [json_ld_file] + for json_ld_resource in json_ld_file: + if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map: + json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"] + data = json.dumps(json_ld_file, ensure_ascii=False) + graph.parse(data=data, format=cur_format) + else: + graph.parse(file=file_obj, format=cur_format) + return True # Success, no need to try other formats except Exception as e: - errors += f" | {e}" # Try another format - - raise IOError("1", f"It was impossible to handle the format used for storing the file '{file_path}'{errors}") + continue # Try the next format + return False # None of the formats succeeded @staticmethod def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph: diff --git a/oc_ocdm/storer.py b/oc_ocdm/storer.py index 7a74540..b7a65ac 100644 --- a/oc_ocdm/storer.py +++ b/oc_ocdm/storer.py @@ -109,40 +109,45 @@ def _store_in_file(self, cur_g: ConjunctiveGraph, cur_file_path: str, context_pa for g_context in cur_g.contexts((s, p, o)): g_iri = g_context.identifier break - new_g.addN([(s, p, o, g_iri)]) + zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip") - lock = FileLock(f"{zip_file_path}.lock") if self.zip_output else FileLock(f"{cur_file_path}.lock") - with lock: - if self.zip_output: - zip_file = ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) - if self.output_format == "json-ld": - if context_path is not None and context_path in self.context_map: - cur_json_ld: Any = json.loads( - new_g.serialize(format="json-ld", context=self.context_map[context_path])) - if isinstance(cur_json_ld, dict): - cur_json_ld["@context"] = context_path - else: # it is a list - for item in cur_json_ld: - item["@context"] = context_path - else: - cur_json_ld: Any = json.loads(new_g.serialize(format="json-ld")) - if self.zip_output: - dumped_json: bytes = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8') - zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=dumped_json) - else: - with open(cur_file_path, 'wt', encoding='utf-8') as f: - json.dump(cur_json_ld, f, ensure_ascii=False) - else: - if self.zip_output: - rdf_serialization: bytes = new_g.serialize(destination=None, format=self.output_format, encoding="utf-8") - zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization) - else: - new_g.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8") - if self.zip_output: - zip_file.close() + + if self.zip_output: + with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file: + self._write_graph(new_g, zip_file, cur_file_path, context_path) + else: + # Handle non-zipped output directly to a file + self._write_graph(new_g, None, cur_file_path, context_path) + self.repok.add_sentence(f"File '{cur_file_path}' added.") + def _write_graph(self, graph: ConjunctiveGraph, zip_file: ZipFile, cur_file_path, context_path): + if self.output_format == "json-ld": + # Serialize the graph in JSON-LD format + cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map.get(context_path))) + if context_path is not None and context_path in self.context_map: + if isinstance(cur_json_ld, dict): + cur_json_ld["@context"] = context_path + else: # When cur_json_ld is a list + for item in cur_json_ld: + item["@context"] = context_path + + # Determine how to write based on zip file presence + if zip_file is not None: + dumped_json = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8') + zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=dumped_json) + else: + with open(cur_file_path, 'wt', encoding='utf-8') as f: + json.dump(cur_json_ld, f, ensure_ascii=False) + else: + # Handle other RDF formats + if zip_file is not None: + rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8") + zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization) + else: + graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8") + def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> List[str]: self.repok.new_article() self.reperr.new_article() @@ -165,13 +170,15 @@ def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> L stored_g = None # Here we try to obtain a reference to the currently stored graph output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path - if os.path.exists(output_filepath): - stored_g = Reader(context_map=self.context_map).load(output_filepath) - if stored_g is None: - stored_g = ConjunctiveGraph() - for entity_in_path in entities_in_path: - self.store(entity_in_path, stored_g, relevant_path, context_path, False) - self._store_in_file(stored_g, relevant_path, context_path) + lock = FileLock(f"{output_filepath}.lock") + with lock: + if os.path.exists(output_filepath): + stored_g = Reader(context_map=self.context_map).load(output_filepath) + if stored_g is None: + stored_g = ConjunctiveGraph() + for entity_in_path in entities_in_path: + self.store(entity_in_path, stored_g, relevant_path, context_path, False) + self._store_in_file(stored_g, relevant_path, context_path) return list(relevant_paths.keys()) diff --git a/oc_ocdm/test/reader/test_reader.py b/oc_ocdm/test/reader/test_reader.py index faa1b0a..96a95b7 100644 --- a/oc_ocdm/test/reader/test_reader.py +++ b/oc_ocdm/test/reader/test_reader.py @@ -16,10 +16,12 @@ import os import unittest + +from rdflib import URIRef from SPARQLWrapper import JSON, SPARQLWrapper -from oc_ocdm.reader import Reader + from oc_ocdm.graph import GraphSet -from rdflib import URIRef +from oc_ocdm.reader import Reader class TestReader(unittest.TestCase): diff --git a/oc_ocdm/test/storer/test_storer.py b/oc_ocdm/test/storer/test_storer.py index 5be49fe..f40f5c0 100644 --- a/oc_ocdm/test/storer/test_storer.py +++ b/oc_ocdm/test/storer/test_storer.py @@ -17,26 +17,30 @@ import os import unittest from platform import system -from shutil import rmtree from zipfile import ZipFile +from multiprocessing import Pool +from SPARQLWrapper import POST, SPARQLWrapper from rdflib import ConjunctiveGraph, URIRef, compare from oc_ocdm.graph.graph_set import GraphSet from oc_ocdm.prov.prov_set import ProvSet from oc_ocdm.storer import Storer +from oc_ocdm.reader import Reader class TestStorer(unittest.TestCase): def setUp(self): self.resp_agent = "http://resp_agent.test/" self.base_iri = "http://test/" + self.ts = 'http://127.0.0.1:9999/blazegraph/sparql' self.graph_set = GraphSet(self.base_iri, "", "060", False) self.prov_set = ProvSet(self.graph_set, self.base_iri, "", False) self.br = self.graph_set.add_br(self.resp_agent) + self.info_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance", "info_dir") - def tearDown(self): - rmtree(os.path.join("oc_ocdm", "test", "storer", "data")) + # def tearDown(self): + # rmtree(os.path.join("oc_ocdm", "test", "storer", "data")) def test_store_graphs_in_file(self): base_dir = os.path.join("oc_ocdm", "test", "storer", "data", "rdf") + os.sep @@ -51,6 +55,7 @@ def test_store_graphs_in_file(self): with ZipFile(os.path.join(base_dir, "br", "060", "10000", "1000.zip"), mode="r") as archive: with archive.open("1000.json") as f: data = json.load(f) + print(data) self.assertEqual(data, [{'@graph': [{'@id': 'http://test/br/0601', '@type': ['http://purl.org/spar/fabio/Expression']}], '@id': 'http://test/br/'}]) with ZipFile(os.path.join(base_dir, "br", "060", "10000", "1000", "prov", "se.zip"), mode="r") as archive: with archive.open("se.json") as f: @@ -145,6 +150,44 @@ def test_store_graphs_in_file(self): self.assertTrue(os.path.exists(os.path.join(base_dir_3, "br", "060", "10000", "1000.nt.lock"))) self.assertTrue(os.path.exists(os.path.join(base_dir_3, "br", "060", "10000", "1000", "prov", "se.nq.lock"))) + def test_provenance(self): + ts = SPARQLWrapper(self.ts) + ts.setQuery('delete{?x ?y ?z} where{?x ?y ?z}') + ts.setMethod(POST) + ts.query() + graph_set = GraphSet(self.base_iri, "", "060", False) + prov_set = ProvSet(graph_set, self.base_iri, info_dir=self.info_dir) + base_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance") + os.sep + graph_set.add_br(self.resp_agent) + graph_set.add_br(self.resp_agent) + graph_set.add_br(self.resp_agent) + prov_set.generate_provenance() + storer = Storer(graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False) + prov_storer = Storer(prov_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False) + prov_storer.store_all(base_dir, self.base_iri) + storer.upload_all(self.ts, base_dir) + graph_set.commit_changes() + entities_to_process = [('http://test/br/0601',), ('http://test/br/0602',), ('http://test/br/0603',)] + with Pool(processes=3) as pool: + pool.starmap(process_entity, entities_to_process) + +def process_entity(entity): + base_iri = "http://test/" + ts = 'http://127.0.0.1:9999/blazegraph/sparql' + resp_agent = "http://resp_agent.test/" + base_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance") + os.sep + info_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance", "info_dir") + graph_set = GraphSet(base_iri, "", "060", False) + Reader.import_entity_from_triplestore(graph_set, ts, URIRef(entity), resp_agent) + br = graph_set.get_entity(URIRef(entity)) + br.has_title("Hola") + prov_set = ProvSet(graph_set, base_iri, info_dir=info_dir) + prov_set.generate_provenance() + storer = Storer(graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False) + prov_storer = Storer(prov_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False) + prov_storer.store_all(base_dir, base_iri) + storer.upload_all(ts, base_dir) + if __name__ == '__main__': unittest.main() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0e5d848..7078d81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "oc_ocdm" -version = "7.3.1" +version = "7.3.2" description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel." authors = [ "Silvio Peroni ",