Skip to content

Commit

Permalink
active lock between reader and storer
Browse files Browse the repository at this point in the history
  • Loading branch information
arcangelo7 committed Mar 14, 2024
1 parent 7cd5381 commit 7fa646c
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 78 deletions.
71 changes: 36 additions & 35 deletions oc_ocdm/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from zipfile import ZipFile

import rdflib
from filelock import FileLock
from rdflib import RDF, ConjunctiveGraph, Graph, URIRef
from SPARQLWrapper import XML, SPARQLWrapper

Expand Down Expand Up @@ -81,45 +80,47 @@ def load(self, rdf_file_path: str) -> Optional[ConjunctiveGraph]:
return loaded_graph

def _load_graph(self, file_path: str) -> ConjunctiveGraph:
formats: List[str] = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"]
formats = ["json-ld", "rdfxml", "turtle", "trig", "nt11", "nquads"]
loaded_graph = ConjunctiveGraph()

loaded_graph: ConjunctiveGraph = ConjunctiveGraph()
if file_path.endswith('.zip'):
try:
with ZipFile(file=file_path, mode="r") as archive:
for zf_name in archive.namelist():
with archive.open(zf_name) as f:
if self._try_parse(loaded_graph, f, formats):
return loaded_graph
except Exception as e:
raise IOError(f"Error opening or reading zip file '{file_path}': {e}")
else:
try:
with open(file_path, 'rt', encoding='utf-8') as f:
if self._try_parse(loaded_graph, f, formats):
return loaded_graph
except Exception as e:
raise IOError(f"Error opening or reading file '{file_path}': {e}")

raise IOError(f"It was impossible to load the file '{file_path}' with supported formats.")

errors: str = ""
def _try_parse(self, graph: ConjunctiveGraph, file_obj, formats: List[str]) -> bool:
for cur_format in formats:
file_obj.seek(0) # Reset file pointer to the beginning for each new attempt
try:
lock = FileLock(f"{file_path}.lock")
with lock:
if file_path.endswith('.zip'):
with ZipFile(file=file_path, mode="r") as archive:
for zf_name in archive.namelist():
f = archive.open(zf_name)
else:
f = open(file_path, 'rt', encoding='utf-8')
if cur_format == "json-ld":
json_ld_file: Any = json.load(f)
if isinstance(json_ld_file, dict):
json_ld_file: List[Any] = [json_ld_file]

for json_ld_resource in json_ld_file:
# Trick to force the use of a pre-loaded context if the format
# specified is JSON-LD
if "@context" in json_ld_resource:
cur_context: str = json_ld_resource["@context"]
if cur_context in self.context_map:
context_json: Any = self.context_map[cur_context]["@context"]
json_ld_resource["@context"] = context_json

loaded_graph.parse(data=json.dumps(json_ld_resource, ensure_ascii=False),
format=cur_format)
else:
loaded_graph.parse(file=f, format=cur_format)
f.close()
return loaded_graph
if cur_format == "json-ld":
json_ld_file = json.load(file_obj)
if isinstance(json_ld_file, dict):
json_ld_file = [json_ld_file]
for json_ld_resource in json_ld_file:
if "@context" in json_ld_resource and json_ld_resource["@context"] in self.context_map:
json_ld_resource["@context"] = self.context_map[json_ld_resource["@context"]]["@context"]
data = json.dumps(json_ld_file, ensure_ascii=False)
graph.parse(data=data, format=cur_format)
else:
graph.parse(file=file_obj, format=cur_format)
return True # Success, no need to try other formats
except Exception as e:
errors += f" | {e}" # Try another format

raise IOError("1", f"It was impossible to handle the format used for storing the file '{file_path}'{errors}")
continue # Try the next format
return False # None of the formats succeeded

@staticmethod
def get_graph_from_subject(graph: Graph, subject: URIRef) -> Graph:
Expand Down
81 changes: 44 additions & 37 deletions oc_ocdm/storer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,40 +109,45 @@ def _store_in_file(self, cur_g: ConjunctiveGraph, cur_file_path: str, context_pa
for g_context in cur_g.contexts((s, p, o)):
g_iri = g_context.identifier
break

new_g.addN([(s, p, o, g_iri)])

zip_file_path = cur_file_path.replace(os.path.splitext(cur_file_path)[1], ".zip")
lock = FileLock(f"{zip_file_path}.lock") if self.zip_output else FileLock(f"{cur_file_path}.lock")
with lock:
if self.zip_output:
zip_file = ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True)
if self.output_format == "json-ld":
if context_path is not None and context_path in self.context_map:
cur_json_ld: Any = json.loads(
new_g.serialize(format="json-ld", context=self.context_map[context_path]))
if isinstance(cur_json_ld, dict):
cur_json_ld["@context"] = context_path
else: # it is a list
for item in cur_json_ld:
item["@context"] = context_path
else:
cur_json_ld: Any = json.loads(new_g.serialize(format="json-ld"))
if self.zip_output:
dumped_json: bytes = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8')
zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=dumped_json)
else:
with open(cur_file_path, 'wt', encoding='utf-8') as f:
json.dump(cur_json_ld, f, ensure_ascii=False)
else:
if self.zip_output:
rdf_serialization: bytes = new_g.serialize(destination=None, format=self.output_format, encoding="utf-8")
zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization)
else:
new_g.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")
if self.zip_output:
zip_file.close()

if self.zip_output:
with ZipFile(zip_file_path, mode="w", compression=ZIP_DEFLATED, allowZip64=True) as zip_file:
self._write_graph(new_g, zip_file, cur_file_path, context_path)
else:
# Handle non-zipped output directly to a file
self._write_graph(new_g, None, cur_file_path, context_path)

self.repok.add_sentence(f"File '{cur_file_path}' added.")

def _write_graph(self, graph: ConjunctiveGraph, zip_file: ZipFile, cur_file_path, context_path):
if self.output_format == "json-ld":
# Serialize the graph in JSON-LD format
cur_json_ld = json.loads(graph.serialize(format="json-ld", context=self.context_map.get(context_path)))
if context_path is not None and context_path in self.context_map:
if isinstance(cur_json_ld, dict):
cur_json_ld["@context"] = context_path
else: # When cur_json_ld is a list
for item in cur_json_ld:
item["@context"] = context_path

# Determine how to write based on zip file presence
if zip_file is not None:
dumped_json = json.dumps(cur_json_ld, ensure_ascii=False).encode('utf-8')
zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=dumped_json)
else:
with open(cur_file_path, 'wt', encoding='utf-8') as f:
json.dump(cur_json_ld, f, ensure_ascii=False)
else:
# Handle other RDF formats
if zip_file is not None:
rdf_serialization = graph.serialize(destination=None, format=self.output_format, encoding="utf-8")
zip_file.writestr(zinfo_or_arcname=os.path.basename(cur_file_path), data=rdf_serialization)
else:
graph.serialize(destination=cur_file_path, format=self.output_format, encoding="utf-8")

def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> List[str]:
self.repok.new_article()
self.reperr.new_article()
Expand All @@ -165,13 +170,15 @@ def store_all(self, base_dir: str, base_iri: str, context_path: str = None) -> L
stored_g = None
# Here we try to obtain a reference to the currently stored graph
output_filepath = relevant_path.replace(os.path.splitext(relevant_path)[1], ".zip") if self.zip_output else relevant_path
if os.path.exists(output_filepath):
stored_g = Reader(context_map=self.context_map).load(output_filepath)
if stored_g is None:
stored_g = ConjunctiveGraph()
for entity_in_path in entities_in_path:
self.store(entity_in_path, stored_g, relevant_path, context_path, False)
self._store_in_file(stored_g, relevant_path, context_path)
lock = FileLock(f"{output_filepath}.lock")
with lock:
if os.path.exists(output_filepath):
stored_g = Reader(context_map=self.context_map).load(output_filepath)
if stored_g is None:
stored_g = ConjunctiveGraph()
for entity_in_path in entities_in_path:
self.store(entity_in_path, stored_g, relevant_path, context_path, False)
self._store_in_file(stored_g, relevant_path, context_path)

return list(relevant_paths.keys())

Expand Down
6 changes: 4 additions & 2 deletions oc_ocdm/test/reader/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import os
import unittest

from rdflib import URIRef
from SPARQLWrapper import JSON, SPARQLWrapper
from oc_ocdm.reader import Reader

from oc_ocdm.graph import GraphSet
from rdflib import URIRef
from oc_ocdm.reader import Reader


class TestReader(unittest.TestCase):
Expand Down
49 changes: 46 additions & 3 deletions oc_ocdm/test/storer/test_storer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@
import os
import unittest
from platform import system
from shutil import rmtree
from zipfile import ZipFile
from multiprocessing import Pool
from SPARQLWrapper import POST, SPARQLWrapper

from rdflib import ConjunctiveGraph, URIRef, compare

from oc_ocdm.graph.graph_set import GraphSet
from oc_ocdm.prov.prov_set import ProvSet
from oc_ocdm.storer import Storer
from oc_ocdm.reader import Reader


class TestStorer(unittest.TestCase):
def setUp(self):
self.resp_agent = "http://resp_agent.test/"
self.base_iri = "http://test/"
self.ts = 'http://127.0.0.1:9999/blazegraph/sparql'
self.graph_set = GraphSet(self.base_iri, "", "060", False)
self.prov_set = ProvSet(self.graph_set, self.base_iri, "", False)
self.br = self.graph_set.add_br(self.resp_agent)
self.info_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance", "info_dir")

def tearDown(self):
rmtree(os.path.join("oc_ocdm", "test", "storer", "data"))
# def tearDown(self):
# rmtree(os.path.join("oc_ocdm", "test", "storer", "data"))

def test_store_graphs_in_file(self):
base_dir = os.path.join("oc_ocdm", "test", "storer", "data", "rdf") + os.sep
Expand All @@ -51,6 +55,7 @@ def test_store_graphs_in_file(self):
with ZipFile(os.path.join(base_dir, "br", "060", "10000", "1000.zip"), mode="r") as archive:
with archive.open("1000.json") as f:
data = json.load(f)
print(data)
self.assertEqual(data, [{'@graph': [{'@id': 'http://test/br/0601', '@type': ['http://purl.org/spar/fabio/Expression']}], '@id': 'http://test/br/'}])
with ZipFile(os.path.join(base_dir, "br", "060", "10000", "1000", "prov", "se.zip"), mode="r") as archive:
with archive.open("se.json") as f:
Expand Down Expand Up @@ -145,6 +150,44 @@ def test_store_graphs_in_file(self):
self.assertTrue(os.path.exists(os.path.join(base_dir_3, "br", "060", "10000", "1000.nt.lock")))
self.assertTrue(os.path.exists(os.path.join(base_dir_3, "br", "060", "10000", "1000", "prov", "se.nq.lock")))

def test_provenance(self):
ts = SPARQLWrapper(self.ts)
ts.setQuery('delete{?x ?y ?z} where{?x ?y ?z}')
ts.setMethod(POST)
ts.query()
graph_set = GraphSet(self.base_iri, "", "060", False)
prov_set = ProvSet(graph_set, self.base_iri, info_dir=self.info_dir)
base_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance") + os.sep
graph_set.add_br(self.resp_agent)
graph_set.add_br(self.resp_agent)
graph_set.add_br(self.resp_agent)
prov_set.generate_provenance()
storer = Storer(graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
prov_storer = Storer(prov_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
prov_storer.store_all(base_dir, self.base_iri)
storer.upload_all(self.ts, base_dir)
graph_set.commit_changes()
entities_to_process = [('http://test/br/0601',), ('http://test/br/0602',), ('http://test/br/0603',)]
with Pool(processes=3) as pool:
pool.starmap(process_entity, entities_to_process)

def process_entity(entity):
base_iri = "http://test/"
ts = 'http://127.0.0.1:9999/blazegraph/sparql'
resp_agent = "http://resp_agent.test/"
base_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance") + os.sep
info_dir = os.path.join("oc_ocdm", "test", "storer", "test_provenance", "info_dir")
graph_set = GraphSet(base_iri, "", "060", False)
Reader.import_entity_from_triplestore(graph_set, ts, URIRef(entity), resp_agent)
br = graph_set.get_entity(URIRef(entity))
br.has_title("Hola")
prov_set = ProvSet(graph_set, base_iri, info_dir=info_dir)
prov_set.generate_provenance()
storer = Storer(graph_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
prov_storer = Storer(prov_set, context_map={}, dir_split=10000, n_file_item=1000, default_dir="_", output_format='json-ld', zip_output=False)
prov_storer.store_all(base_dir, base_iri)
storer.upload_all(ts, base_dir)


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "oc_ocdm"
version = "7.3.1"
version = "7.3.2"
description = "Object mapping library for manipulating RDF graphs that are compliant with the OpenCitations datamodel."
authors = [
"Silvio Peroni <essepuntato@gmail.com>",
Expand Down

0 comments on commit 7fa646c

Please sign in to comment.