Skip to content

Commit

Permalink
Merge pull request #100 from DataONEorg/bug-97-tagobject-cidrefs
Browse files Browse the repository at this point in the history
BugFix-97: `tag_object` cid refs file missing pids
  • Loading branch information
doulikecookiedough authored May 30, 2024
2 parents 277370e + fccc792 commit ad55464
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 86 deletions.
220 changes: 140 additions & 80 deletions src/hashstore/filehashstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import atexit
import io
import multiprocessing
import shutil
import threading
import time
Expand Down Expand Up @@ -57,14 +58,22 @@ class FileHashStore(HashStore):
"blake2b",
"blake2s",
]
# Variables to orchestrate thread locking and object store synchronization
# Variables to orchestrate parallelization
time_out_sec = 1
# Thread Synchronization
object_lock = threading.Lock()
metadata_lock = threading.Lock()
reference_lock = threading.Lock()
thread_condition = threading.Condition(reference_lock)
reference_locked_cids = []
# Multiprocessing Synchronization
reference_lock_mp = multiprocessing.Lock()
multiprocessing_condition = multiprocessing.Condition(reference_lock_mp)
reference_locked_cids_mp = multiprocessing.Manager().list() # Create a shared list

# TODO: To organize
object_locked_pids = []
metadata_locked_pids = []
reference_locked_cids = []

def __init__(self, properties=None):
if properties:
Expand Down Expand Up @@ -447,6 +456,7 @@ def store_object(
additional_algorithm, checksum, checksum_algorithm
)

# TODO: Implement multiprocessing lock check like 'tag_object'
# Wait for the pid to release if it's in use
while pid in self.object_locked_pids:
logging.debug(
Expand Down Expand Up @@ -478,7 +488,8 @@ def store_object(
"FileHashStore - store_object: Attempting to tag object for pid: %s",
pid,
)
self.tag_object(pid, object_metadata.cid)
cid = object_metadata.cid
self.tag_object(pid, cid)
logging.info(
"FileHashStore - store_object: Successfully stored object for pid: %s",
pid,
Expand Down Expand Up @@ -564,53 +575,73 @@ def tag_object(self, pid, cid):
)
self._check_string(pid, "pid", "tag_object")
self._check_string(cid, "cid", "tag_object")
# Wait for the cid to release if it's being tagged
while cid in self.reference_locked_cids:
logging.debug(
"FileHashStore - tag_object: (cid) %s is currently locked. Waiting.",
cid,
)
time.sleep(self.time_out_sec)

# Modify reference_locked_cids consecutively
with self.reference_lock:
logging.debug(
"FileHashStore - tag_object: Adding cid: %s to reference_locked_cids.",
cid,
)
self.reference_locked_cids.append(cid)
# Wait for the cid to release if it's being tagged
use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True"
if use_multiprocessing:
with self.multiprocessing_condition:
while cid in self.reference_locked_cids_mp:
logging.debug(
"FileHashStore - tag_object: (cid) %s is currently locked. Waiting.",
cid,
)
self.multiprocessing_condition.wait()
# Add cid to tracking array
self.reference_locked_cids_mp.append(cid)
else:
with self.thread_condition:
while cid in self.reference_locked_cids:
logging.debug(
"FileHashStore - tag_object: (cid) %s is currently locked. Waiting.",
cid,
)
self.thread_condition.wait()
# Add cid to tracking array
self.reference_locked_cids.append(cid)
try:
# Prepare files and paths
tmp_root_path = self._get_store_path("refs") / "tmp"
pid_ref_abs_path = self._resolve_path("pid", pid)
cid_ref_abs_path = self._resolve_path("cid", cid)
pid_ref_abs_path_exists = os.path.exists(pid_ref_abs_path)
cid_ref_abs_path_exists = os.path.exists(cid_ref_abs_path)
pid_refs_path = self._resolve_path("pid", pid)
cid_refs_path = self._resolve_path("cid", cid)
# All ref files begin as tmp files and get moved sequentially at once
# Get tmp files with the expected cid and pid refs content
pid_tmp_file_path = self._write_refs_file(tmp_root_path, cid, "pid")
cid_tmp_file_path = self._write_refs_file(tmp_root_path, pid, "cid")
# Create paths for pid ref file in '.../refs/pid' and cid ref file in '.../refs/cid'
self._create_path(os.path.dirname(pid_refs_path))
self._create_path(os.path.dirname(cid_refs_path))

if pid_ref_abs_path_exists and cid_ref_abs_path_exists:
if os.path.exists(pid_refs_path) and os.path.exists(cid_refs_path):
self._verify_hashstore_references(
pid,
cid,
pid_refs_path,
cid_refs_path,
"Refs file already exists, verifying.",
)
return True
elif pid_ref_abs_path_exists and not cid_ref_abs_path_exists:
elif os.path.exists(pid_refs_path) and not os.path.exists(cid_refs_path):
debug_msg = (
f"FileHashStore - tag_object: pid refs file exists ({pid_ref_abs_path})"
+ f" for pid: {pid}, but cid refs file doesn't at: {cid_ref_abs_path}"
f"FileHashStore - tag_object: pid refs file exists ({pid_refs_path})"
+ f" for pid: {pid}, but cid refs file doesn't at: {cid_refs_path}"
+ f" for cid: {cid}"
)
logging.debug(debug_msg)
# A pid reference file can only contain and reference one cid
# First, confirm that the expected cid refs file exists by getting the cid
with open(pid_ref_abs_path, "r", encoding="utf8") as pid_ref_file:
with open(pid_refs_path, "r", encoding="utf8") as pid_ref_file:
pid_refs_cid = pid_ref_file.read()

if self._is_string_in_refs_file(cid, pid_ref_abs_path):
if self._is_string_in_refs_file(cid, pid_refs_path):
# The pid correctly references the given cid, but the cid refs file is missing
cid_tmp_file_path = self._write_refs_file(tmp_root_path, pid, "cid")
self._create_path(os.path.dirname(cid_ref_abs_path))
shutil.move(cid_tmp_file_path, cid_ref_abs_path)
shutil.move(cid_tmp_file_path, cid_refs_path)
self._verify_hashstore_references(
pid, cid, "Created missing cid refs file"
pid,
cid,
pid_refs_path,
cid_refs_path,
"Created missing cid refs file",
)
info_msg = (
f"FileHashStore - tag_object: pid refs file exists for pid: {pid}"
Expand All @@ -622,38 +653,38 @@ def tag_object(self, pid, cid):
else:
# Check if the retrieved cid refs file exists and pid is referenced
retrieved_cid_refs_path = self._resolve_path("cid", pid_refs_cid)
retrieved_cid_refs_path_exists = os.path.exists(
if os.path.exists(
retrieved_cid_refs_path
)
if retrieved_cid_refs_path_exists and self._is_string_in_refs_file(
pid, retrieved_cid_refs_path
):
) and self._is_string_in_refs_file(pid, retrieved_cid_refs_path):
# Throw exception, this pid is accounted for
exception_string = (
error_msg = (
"FileHashStore - tag_object: Pid refs file exists with valid pid"
+ f" and cid reference files for pid: {pid} with cid: {cid}."
)
logging.error(exception_string)
raise FileExistsError(exception_string)
# Orphaned pid refs file found, the retrieved cid refs file exists
# but doesn't contain the cid. Proceed to overwrite the pid refs file.
# There is no return statement, so we move out of this if block.
elif not pid_ref_abs_path_exists and cid_ref_abs_path_exists:
logging.error(error_msg)
raise PidAlreadyExistsError(error_msg)
else:
debug_msg = (
f"FileHashStore - tag_object: Orphan pid refs file found for {pid}."
+ f" Cid ({cid}) reference file does not contain the pid. Proceeding."
)
logging.debug(debug_msg)
elif not os.path.exists(pid_refs_path) and os.path.exists(cid_refs_path):
debug_msg = (
f"FileHashStore - tag_object: pid refs file does not exists for pid {pid}"
+ f" but cid refs file exists at: {cid_ref_abs_path} for cid: {cid}"
+ f" but cid refs file exists at: {cid_refs_path} for cid: {cid}"
)
logging.debug(debug_msg)
# Create the pid refs file
pid_tmp_file_path = self._write_refs_file(tmp_root_path, cid, "pid")
self._create_path(os.path.dirname(pid_ref_abs_path))
shutil.move(pid_tmp_file_path, pid_ref_abs_path)
# Move the pid refs file
shutil.move(pid_tmp_file_path, pid_refs_path)
# Update cid ref files as it already exists
if not self._is_string_in_refs_file(pid, cid_ref_abs_path):
self._update_refs_file(cid_ref_abs_path, pid, "add")
if not self._is_string_in_refs_file(pid, cid_refs_path):
self._update_refs_file(cid_refs_path, pid, "add")
self._verify_hashstore_references(
pid,
cid,
pid_refs_path,
cid_refs_path,
"Pid refs file doesn't exist, but cid refs exists.",
)
logging.info(
Expand All @@ -663,20 +694,13 @@ def tag_object(self, pid, cid):
)
return True

# All ref files begin as tmp files and get moved sequentially at once
# Get tmp files with the expected cid and pid refs content
pid_tmp_file_path = self._write_refs_file(tmp_root_path, cid, "pid")
cid_tmp_file_path = self._write_refs_file(tmp_root_path, pid, "cid")
# Create paths for pid ref file in '.../refs/pid' and cid ref file in '.../refs/cid'
self._create_path(os.path.dirname(pid_ref_abs_path))
self._create_path(os.path.dirname(cid_ref_abs_path))
# Move both files
shutil.move(pid_tmp_file_path, pid_ref_abs_path)
shutil.move(cid_tmp_file_path, cid_ref_abs_path)
# Ensure that the reference files have been written as expected
# If there is an issue, client or user will have to manually review
log_msg = "Reference files have been moved to their permanent location."
self._verify_hashstore_references(pid, cid, log_msg)
# Move both files after checking the existing status of refs files
shutil.move(pid_tmp_file_path, pid_refs_path)
shutil.move(cid_tmp_file_path, cid_refs_path)
log_msg = "Reference files have been moved to their permanent location. Verifying refs."
self._verify_hashstore_references(
pid, cid, pid_refs_path, cid_refs_path, log_msg
)
logging.info(
"FileHashStore - tag_object: Successfully tagged cid: %s with pid %s",
cid,
Expand All @@ -685,12 +709,23 @@ def tag_object(self, pid, cid):
return True
finally:
# Release cid
with self.reference_lock:
logging.debug(
"FileHashStore - tag_object: Removing cid: %s from reference_locked_cids.",
cid,
)
self.reference_locked_cids.remove(cid)
if use_multiprocessing:
with self.multiprocessing_condition:
logging.debug(
"FileHashStore - tag_object (mp): Removing cid: %s from"
+ " reference_locked_cids.",
cid,
)
self.reference_locked_cids_mp.remove(cid)
self.multiprocessing_condition.notify()
else:
with self.thread_condition:
logging.debug(
"FileHashStore - tag_object: Removing cid: %s from reference_locked_cids.",
cid,
)
self.reference_locked_cids.remove(cid)
self.thread_condition.notify()

def find_object(self, pid):
logging.debug(
Expand Down Expand Up @@ -755,6 +790,7 @@ def store_metadata(self, pid, metadata, format_id=None):
checked_format_id = self._check_arg_format_id(format_id, "store_metadata")
self._check_arg_data(metadata)

# TODO: Implement multiprocessing lock check like 'tag_object'
# Wait for the pid to release if it's in use
while pid in self.metadata_locked_pids:
logging.debug(
Expand Down Expand Up @@ -861,6 +897,7 @@ def delete_object(self, ab_id, id_type=None):
# If the refs file still exists, do not delete the object
if not os.path.exists(cid_refs_abs_path):
cid = ab_id
# TODO: Implement multiprocessing lock check like 'tag_object'
# Synchronize the cid
while cid in self.reference_locked_cids:
logging.debug(
Expand Down Expand Up @@ -1081,6 +1118,7 @@ def delete_metadata(self, pid, format_id=None):
"FileHashStore - delete_metadata: Request to delete metadata for pid: %s",
pid,
)
# TODO: Implement multiprocessing lock check like 'tag_object'
self._check_string(pid, "pid", "delete_metadata")
checked_format_id = self._check_arg_format_id(format_id, "delete_metadata")
# Get the metadata directory path for the given pid
Expand Down Expand Up @@ -1792,56 +1830,68 @@ def _verify_object_information(
logging.debug(exception_string)
raise ValueError(exception_string)

def _verify_hashstore_references(self, pid, cid, additional_log_string):
def _verify_hashstore_references(
self,
pid,
cid,
pid_refs_path=None,
cid_refs_path=None,
additional_log_string=None,
):
"""Verifies that the supplied pid and pid reference file and content have been
written successfully.
:param str pid: Authority-based or persistent identifier.
:param str cid: Content identifier.
:param str pid_refs_path: Path to pid refs file
:param str cid_refs_path: Path to cid refs file
:param str additional_log_string: String to append to exception statement
"""
debug_msg = (
f"FileHashStore - _verify_hashstore_references: verifying pid ({pid})"
+ f" and cid ({cid}) refs files. Additional Note: {additional_log_string}"
)
logging.debug(debug_msg)
if pid_refs_path is None:
pid_refs_path = self._resolve_path("pid", pid)
if cid_refs_path is None:
cid_refs_path = self._resolve_path("cid", cid)

# Check that reference files were created
pid_ref_abs_path = self._resolve_path("pid", pid)
cid_ref_abs_path = self._resolve_path("cid", cid)
if not os.path.exists(pid_ref_abs_path):
if not os.path.exists(pid_refs_path):
exception_string = (
"FileHashStore - _verify_hashstore_references: Pid refs file missing: "
+ pid_ref_abs_path
+ pid_refs_path
+ f" . Additional Context: {additional_log_string}"
)
logging.error(exception_string)
raise FileNotFoundError(exception_string)
if not os.path.exists(cid_ref_abs_path):
if not os.path.exists(cid_refs_path):
exception_string = (
"FileHashStore - _verify_hashstore_references: Cid refs file missing: "
+ cid_ref_abs_path
+ cid_refs_path
+ f" . Additional Context: {additional_log_string}"
)
logging.error(exception_string)
raise FileNotFoundError(exception_string)
# Check the content of the reference files
# Start with the cid
with open(pid_ref_abs_path, "r", encoding="utf8") as f:
with open(pid_refs_path, "r", encoding="utf8") as f:
retrieved_cid = f.read()
if retrieved_cid != cid:
exception_string = (
"FileHashStore - _verify_hashstore_references: Pid refs file exists"
+ f" ({pid_ref_abs_path}) but cid ({cid}) does not match."
+ f" ({pid_refs_path}) but cid ({cid}) does not match."
+ f" Additional Context: {additional_log_string}"
)
logging.error(exception_string)
raise ValueError(exception_string)
# Then the pid
pid_found = self._is_string_in_refs_file(pid, cid_ref_abs_path)
pid_found = self._is_string_in_refs_file(pid, cid_refs_path)
if not pid_found:
exception_string = (
"FileHashStore - _verify_hashstore_references: Cid refs file exists"
+ f" ({cid_ref_abs_path}) but pid ({pid}) not found."
+ f" ({cid_refs_path}) but pid ({pid}) not found."
+ f" Additional Context: {additional_log_string}"
)
logging.error(exception_string)
Expand Down Expand Up @@ -2377,6 +2427,16 @@ def close(self):
self._obj.seek(self._pos)


class PidAlreadyExistsError(Exception):
"""Custom exception thrown when a client calls 'tag_object' and the pid
that is being tagged is already accounted for (has a pid refs file and
is found in the cid refs file)."""

def __init__(self, message, errors=None):
super().__init__(message)
self.errors = errors


class PidObjectMetadataError(Exception):
"""Custom exception thrown when an object cannot be verified due
to an error with the metadata provided to validate against."""
Expand Down
Loading

0 comments on commit ad55464

Please sign in to comment.