diff --git a/README.md b/README.md index da7aa829..1aff3ad4 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,19 @@ These reference files are implemented in HashStore underneath the hood with no e └── 8f0b04e812a3b4c8f686ce34e6fec558804bf61e54b176742a7f6368d6 ``` +## Concurrency in HashStore + +HashStore is both thread and process safe, and by default synchronizes calls to store & delete objects/metadata with Python's threading module. If you wish to use multiprocessing to parallelize your application, please declare a global environment variable `USE_MULTIPROCESSING` as `True` before initializing Hashstore. This will direct the relevant Public API calls to synchronize using the Python `multiprocessing` module's locks and conditions. Please see below for example: + +```py +# Set the global environment variable +os.environ["USE_MULTIPROCESSING"] = "True" + +# Check that the global environment variable has been set +use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" +``` + + ## Development build HashStore is a python package, and built using the [Python Poetry](https://python-poetry.org) build tool. diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 5821c6a4..d2ca5cfd 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -5,7 +5,6 @@ import multiprocessing import shutil import threading -import time import hashlib import os import logging @@ -59,23 +58,31 @@ class FileHashStore(HashStore): "blake2s", ] # Variables to orchestrate parallelization - time_out_sec = 1 # Thread Synchronization object_lock = threading.Lock() + object_condition = threading.Condition(object_lock) + object_locked_pids = [] metadata_lock = threading.Lock() + metadata_condition = threading.Condition(metadata_lock) + metadata_locked_pids = [] reference_lock = threading.Lock() - thread_condition = threading.Condition(reference_lock) + reference_condition = threading.Condition(reference_lock) reference_locked_cids = [] # Multiprocessing Synchronization + object_lock_mp = multiprocessing.Lock() + object_condition_mp = multiprocessing.Condition(object_lock_mp) + object_locked_pids_mp = multiprocessing.Manager().list() + metadata_lock_mp = multiprocessing.Lock() + metadata_condition_mp = multiprocessing.Condition(metadata_lock_mp) + metadata_locked_pids_mp = multiprocessing.Manager().list() reference_lock_mp = multiprocessing.Lock() - multiprocessing_condition = multiprocessing.Condition(reference_lock_mp) - reference_locked_cids_mp = multiprocessing.Manager().list() # Create a shared list - - # TODO: To organize - object_locked_pids = [] - metadata_locked_pids = [] + reference_condition_mp = multiprocessing.Condition(reference_lock_mp) + reference_locked_cids_mp = multiprocessing.Manager().list() def __init__(self, properties=None): + # Check to see whether a multiprocessing or threading sync lock should be used + self.use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" + # Now check properties if properties: # Validate properties against existing configuration if present checked_properties = self._validate_properties(properties) @@ -456,21 +463,28 @@ def store_object( additional_algorithm, checksum, checksum_algorithm ) - # TODO: Implement multiprocessing lock check like 'tag_object' - # Wait for the pid to release if it's in use - while pid in self.object_locked_pids: - logging.debug( - "FileHashStore - store_object: %s is currently being stored. Waiting.", - pid, - ) - time.sleep(self.time_out_sec) - # Modify object_locked_pids consecutively - with self.object_lock: - logging.debug( - "FileHashStore - store_object: Adding pid: %s to object_locked_pids.", - pid, - ) - self.object_locked_pids.append(pid) + sync_begin_debug_msg = ( + f"FileHashStore - store_object: Adding pid ({pid}) to locked list." + ) + sync_wait_msg = ( + f"FileHashStore - store_object: Pid ({pid}) is locked. Waiting." + ) + if self.use_multiprocessing: + with self.object_condition_mp: + # Wait for the pid to release if it's in use + while pid in self.object_locked_pids_mp: + logging.debug(sync_wait_msg) + self.object_condition_mp.wait() + # Modify object_locked_pids consecutively + logging.debug(sync_begin_debug_msg) + self.object_locked_pids_mp.append(pid) + else: + with self.object_condition: + while pid in self.object_locked_pids: + logging.debug(sync_wait_msg) + self.object_condition.wait() + logging.debug(sync_begin_debug_msg) + self.object_locked_pids.append(pid) try: logging.debug( "FileHashStore - store_object: Attempting to store object for pid: %s", @@ -512,12 +526,21 @@ def store_object( raise err finally: # Release pid - with self.object_lock: - logging.debug( - "FileHashStore - store_object: Removing pid: %s from object_locked_pids.", - pid, - ) - self.object_locked_pids.remove(pid) + end_sync_debug_msg = ( + f"FileHashStore - store_object: Releasing pid ({pid})" + + " from locked list" + ) + if self.use_multiprocessing: + with self.object_condition_mp: + logging.debug(end_sync_debug_msg) + self.object_locked_pids_mp.remove(pid) + self.object_condition_mp.notify() + else: + # Release pid + with self.object_condition: + logging.debug(end_sync_debug_msg) + self.object_locked_pids.remove(pid) + self.object_condition.notify() return object_metadata @@ -576,28 +599,25 @@ def tag_object(self, pid, cid): self._check_string(pid, "pid", "tag_object") self._check_string(cid, "cid", "tag_object") - # Modify reference_locked_cids consecutively - # Wait for the cid to release if it's being tagged - use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" - if use_multiprocessing: - with self.multiprocessing_condition: + sync_begin_debug_msg = ( + f"FileHashStore - tag_object: Adding cid ({pid}) to locked list." + ) + sync_wait_msg = f"FileHashStore - tag_object: Cid ({cid}) is locked. Waiting." + if self.use_multiprocessing: + with self.reference_condition_mp: + # Wait for the cid to release if it's being tagged while cid in self.reference_locked_cids_mp: - logging.debug( - "FileHashStore - tag_object: (cid) %s is currently locked. Waiting.", - cid, - ) - self.multiprocessing_condition.wait() - # Add cid to tracking array + logging.debug(sync_wait_msg) + self.reference_condition_mp.wait() + # Modify reference_locked_cids consecutively + logging.debug(sync_begin_debug_msg) self.reference_locked_cids_mp.append(cid) else: - with self.thread_condition: + with self.reference_condition: while cid in self.reference_locked_cids: - logging.debug( - "FileHashStore - tag_object: (cid) %s is currently locked. Waiting.", - cid, - ) - self.thread_condition.wait() - # Add cid to tracking array + logging.debug(sync_wait_msg) + self.reference_condition.wait() + logging.debug(sync_begin_debug_msg) self.reference_locked_cids.append(cid) try: # Prepare files and paths @@ -668,8 +688,8 @@ def tag_object(self, pid, cid): logging.debug(debug_msg) elif not os.path.exists(pid_refs_path) and os.path.exists(cid_refs_path): debug_msg = ( - f"FileHashStore - tag_object: pid refs file does not exists for pid {pid}" - + f" but cid refs file exists at: {cid_refs_path} for cid: {cid}" + f"FileHashStore - tag_object: pid refs file does not exist for pid {pid}" + + f" but cid refs file found at: {cid_refs_path} for cid: {cid}" ) logging.debug(debug_msg) # Move the pid refs file @@ -683,7 +703,7 @@ def tag_object(self, pid, cid): cid, pid_refs_path, cid_refs_path, - "Pid refs file doesn't exist, but cid refs exists.", + f"Updated existing cid refs file: {cid_refs_path} with pid: {pid}", ) logging.info( "FileHashStore - tag_object: Successfully updated cid: %s with pid: %s", @@ -709,23 +729,20 @@ def tag_object(self, pid, cid): return True finally: # Release cid - if use_multiprocessing: - with self.multiprocessing_condition: - logging.debug( - "FileHashStore - tag_object (mp): Removing cid: %s from" - + " reference_locked_cids.", - cid, - ) + end_sync_debug_msg = ( + f"FileHashStore - tag_object: Releasing cid ({cid}) from" + + " reference_locked_cids." + ) + if self.use_multiprocessing: + with self.reference_condition_mp: + logging.debug(end_sync_debug_msg) self.reference_locked_cids_mp.remove(cid) - self.multiprocessing_condition.notify() + self.reference_condition_mp.notify() else: - with self.thread_condition: - logging.debug( - "FileHashStore - tag_object: Removing cid: %s from reference_locked_cids.", - cid, - ) + with self.reference_condition: + logging.debug(end_sync_debug_msg) self.reference_locked_cids.remove(cid) - self.thread_condition.notify() + self.reference_condition.notify() def find_object(self, pid): logging.debug( @@ -790,22 +807,28 @@ def store_metadata(self, pid, metadata, format_id=None): checked_format_id = self._check_arg_format_id(format_id, "store_metadata") self._check_arg_data(metadata) - # TODO: Implement multiprocessing lock check like 'tag_object' - # Wait for the pid to release if it's in use - while pid in self.metadata_locked_pids: - logging.debug( - "FileHashStore - store_metadata: %s is currently being stored. Waiting.", - pid, - ) - time.sleep(self.time_out_sec) - - with self.metadata_lock: - logging.debug( - "FileHashStore - store_metadata: Adding pid: %s to metadata_locked_pids.", - pid, - ) - # Modify metadata_locked_pids consecutively - self.metadata_locked_pids.append(pid) + sync_begin_debug_msg = ( + f"FileHashStore - store_metadata: Adding pid ({pid}) to locked list." + ) + sync_wait_msg = ( + f"FileHashStore - store_metadata: Pid ({pid}) is locked. Waiting." + ) + if self.use_multiprocessing: + with self.metadata_condition_mp: + # Wait for the pid to release if it's in use + while pid in self.metadata_locked_pids_mp: + logging.debug(sync_wait_msg) + self.metadata_condition_mp.wait() + # Modify metadata_locked_pids consecutively + logging.debug(sync_begin_debug_msg) + self.metadata_locked_pids_mp.append(pid) + else: + with self.metadata_condition: + while pid in self.metadata_locked_pids: + logging.debug(sync_wait_msg) + self.metadata_condition.wait() + logging.debug(sync_begin_debug_msg) + self.metadata_locked_pids.append(pid) try: logging.debug( @@ -821,12 +844,20 @@ def store_metadata(self, pid, metadata, format_id=None): return metadata_cid finally: # Release pid - with self.metadata_lock: - logging.debug( - "FileHashStore - store_metadata: Removing pid: %s from metadata_locked_pids.", - pid, - ) - self.metadata_locked_pids.remove(pid) + end_sync_debug_msg = ( + f"FileHashStore - store_metadata: Releasing pid ({pid})" + + " from locked list" + ) + if self.use_multiprocessing: + with self.metadata_condition_mp: + logging.debug(end_sync_debug_msg) + self.metadata_locked_pids_mp.remove(pid) + self.metadata_condition_mp.notify() + else: + with self.metadata_condition: + logging.debug(end_sync_debug_msg) + self.metadata_locked_pids.remove(pid) + self.metadata_condition.notify() def retrieve_object(self, pid): logging.debug( @@ -892,38 +923,53 @@ def delete_object(self, ab_id, id_type=None): "FileHashStore - delete_object: Request to delete object for id: %s", ab_id ) self._check_string(ab_id, "ab_id", "delete_object") + if id_type == "cid": cid_refs_abs_path = self._resolve_path("cid", ab_id) # If the refs file still exists, do not delete the object if not os.path.exists(cid_refs_abs_path): cid = ab_id - # TODO: Implement multiprocessing lock check like 'tag_object' - # Synchronize the cid - while cid in self.reference_locked_cids: - logging.debug( - "FileHashStore - delete_object: (cid) %s is currently locked. Waiting", - cid, - ) - time.sleep(self.time_out_sec) - # Modify reference_locked_cids consecutively - with self.reference_lock: - logging.debug( - "FileHashStore - delete_object: Add cid: %s to reference_locked_cids.", - cid, - ) - self.reference_locked_cids.append(ab_id) + sync_begin_debug_msg = ( + f"FileHashStore - delete_object: Cid ({cid}) to locked list." + ) + sync_wait_msg = ( + f"FileHashStore - delete_object: Cid ({cid}) is locked. Waiting." + ) + if self.use_multiprocessing: + with self.reference_condition_mp: + # Wait for the cid to release if it's in use + while cid in self.reference_locked_cids_mp: + logging.debug(sync_wait_msg) + self.reference_condition_mp.wait() + # Modify reference_locked_cids consecutively + logging.debug(sync_begin_debug_msg) + self.reference_locked_cids_mp.append(cid) + else: + with self.reference_condition: + while cid in self.reference_locked_cids: + logging.debug(sync_wait_msg) + self.reference_condition.wait() + logging.debug(sync_begin_debug_msg) + self.reference_locked_cids.append(cid) try: - self._delete("objects", ab_id) + self._delete("objects", cid) finally: # Release cid - with self.reference_lock: - logging.debug( - "FileHashStore - delete_object: Removing cid: %s from" - + "reference_locked_cids.", - cid, - ) - self.reference_locked_cids.remove(cid) + end_sync_debug_msg = ( + f"FileHashStore - delete_object: Releasing cid ({cid})" + + " from locked list" + ) + if self.use_multiprocessing: + with self.reference_condition_mp: + logging.debug(end_sync_debug_msg) + self.reference_locked_cids_mp.remove(cid) + self.reference_condition_mp.notify() + else: + with self.reference_condition: + logging.debug(end_sync_debug_msg) + self.reference_locked_cids.remove(cid) + self.reference_condition.notify() else: # id_type is "pid" pid = ab_id @@ -936,19 +982,28 @@ def delete_object(self, ab_id, id_type=None): # Storing and deleting objects are synchronized together # Duplicate store object requests for a pid are rejected, but deleting an object # will wait for a pid to be released if it's found to be in use before proceeding. - while pid in self.object_locked_pids: - logging.debug( - "FileHashStore - delete_object: pid (%s) is currently locked. Waiting.", - pid, - ) - time.sleep(self.time_out_sec) - # Modify object_locked_pids consecutively - with self.object_lock: - logging.debug( - "FileHashStore - delete_object: Adding pid: %s to object_locked_pids.", - pid, - ) - self.object_locked_pids.append(pid) + sync_begin_debug_msg = ( + f"FileHashStore - delete_object: Pid ({pid}) to locked list." + ) + sync_wait_msg = ( + f"FileHashStore - delete_object: Pid ({pid}) is locked. Waiting." + ) + if self.use_multiprocessing: + with self.object_condition_mp: + # Wait for the pid to release if it's in use + while pid in self.object_locked_pids_mp: + logging.debug(sync_wait_msg) + self.object_condition_mp.wait() + # Modify object_locked_pids consecutively + logging.debug(sync_begin_debug_msg) + self.object_locked_pids_mp.append(pid) + else: + with self.object_condition: + while pid in self.object_locked_pids: + logging.debug(sync_wait_msg) + self.object_condition.wait() + logging.debug(sync_begin_debug_msg) + self.object_locked_pids.append(pid) try: # Before we begin deletion process, we look for the `cid` by calling @@ -960,19 +1015,29 @@ def delete_object(self, ab_id, id_type=None): # Proceed with next steps - cid has been retrieved without any issues # We must synchronized here based on the `cid` because multiple threads may # try to access the `cid_reference_file` - while cid in self.reference_locked_cids: - logging.debug( - "FileHashStore - delete_object: (cid) %s is currently locked. Waiting", - cid, - ) - time.sleep(self.time_out_sec) - # Modify reference_locked_cids consecutively - with self.reference_lock: - logging.debug( - "FileHashStore - delete_object: Add cid: %s to reference_locked_cids.", - cid, - ) - self.reference_locked_cids.append(cid) + sync_begin_debug_msg = ( + f"FileHashStore - delete_object: Cid ({cid}) to locked list." + ) + sync_wait_msg = ( + f"FileHashStore - delete_object: Cid ({cid}) is locked." + + " Waiting." + ) + if self.use_multiprocessing: + with self.reference_condition_mp: + # Wait for the cid to release if it's in use + while cid in self.reference_locked_cids_mp: + logging.debug(sync_wait_msg) + self.reference_condition_mp.wait() + # Modify reference_locked_cids consecutively + logging.debug(sync_begin_debug_msg) + self.reference_locked_cids_mp.append(cid) + else: + with self.reference_condition: + while cid in self.reference_locked_cids: + logging.debug(sync_wait_msg) + self.reference_condition.wait() + logging.debug(sync_begin_debug_msg) + self.reference_locked_cids.append(cid) try: cid_ref_abs_path = self._resolve_path("cid", cid) @@ -1017,13 +1082,20 @@ def delete_object(self, ab_id, id_type=None): finally: # Release cid - with self.reference_lock: - debug_msg = ( - "FileHashStore - delete_object:" - + f" Removing cid: {cid} from reference_locked_cids." - ) - logging.debug(debug_msg) - self.reference_locked_cids.remove(cid) + end_sync_debug_msg = ( + f"FileHashStore - delete_object: Releasing cid ({cid})" + + " from locked list" + ) + if self.use_multiprocessing: + with self.reference_condition_mp: + logging.debug(end_sync_debug_msg) + self.reference_locked_cids_mp.remove(cid) + self.reference_condition_mp.notify() + else: + with self.reference_condition: + logging.debug(end_sync_debug_msg) + self.reference_locked_cids.remove(cid) + self.reference_condition.notify() except PidRefsDoesNotExist: warn_msg = ( @@ -1044,7 +1116,6 @@ def delete_object(self, ab_id, id_type=None): for obj in objects_to_delete: os.remove(obj) return - except CidRefsDoesNotExist: # Delete pid refs file objects_to_delete.append( @@ -1061,7 +1132,6 @@ def delete_object(self, ab_id, id_type=None): for obj in objects_to_delete: os.remove(obj) return - except RefsFileExistsButCidObjMissing: # Add pid refs file to be permanently deleted pid_ref_abs_path = self._resolve_path("pid", pid) @@ -1087,7 +1157,6 @@ def delete_object(self, ab_id, id_type=None): for obj in objects_to_delete: os.remove(obj) return - except PidNotFoundInCidRefsFile: # Add pid refs file to be permanently deleted pid_ref_abs_path = self._resolve_path("pid", pid) @@ -1106,56 +1175,105 @@ def delete_object(self, ab_id, id_type=None): return finally: # Release pid - with self.object_lock: - logging.debug( - "FileHashStore - delete_object: Removing pid: %s from object_locked_pids.", - pid, - ) - self.object_locked_pids.remove(pid) + end_sync_debug_msg = ( + f"FileHashStore - delete_object: Releasing pid ({pid})" + + " from locked list" + ) + if self.use_multiprocessing: + with self.object_condition_mp: + logging.debug(end_sync_debug_msg) + self.object_locked_pids_mp.remove(pid) + self.object_condition_mp.notify() + else: + # Release pid + with self.object_condition: + logging.debug(end_sync_debug_msg) + self.object_locked_pids.remove(pid) + self.object_condition.notify() def delete_metadata(self, pid, format_id=None): logging.debug( "FileHashStore - delete_metadata: Request to delete metadata for pid: %s", pid, ) - # TODO: Implement multiprocessing lock check like 'tag_object' self._check_string(pid, "pid", "delete_metadata") checked_format_id = self._check_arg_format_id(format_id, "delete_metadata") - # Get the metadata directory path for the given pid - entity = "metadata" - metadata_directory = self._computehash(pid) - rel_path = "/".join(self._shard(metadata_directory)) - metadata_rel_path = self._get_store_path("metadata") / rel_path - if format_id is None: - # Delete all metadata files - objects_to_delete = [] - metadata_file_paths = self._get_file_paths(metadata_rel_path) - if metadata_file_paths is not None: - for path in metadata_file_paths: - objects_to_delete.append(self._rename_path_for_deletion(path)) - for obj in objects_to_delete: - os.remove(obj) - - info_string = ( - "FileHashStore - delete_metadata: Successfully deleted all metadata for pid: %s", - pid, - ) - logging.info(info_string) - return + + # Wait for the pid to release if it's in use + sync_begin_debug_msg = ( + f"FileHashStore - delete_metadata: Adding pid ({pid}) to locked list." + ) + sync_wait_msg = ( + f"FileHashStore - delete_metadata: Pid ({pid}) is locked. Waiting." + ) + if self.use_multiprocessing: + with self.metadata_condition_mp: + # Wait for the pid to release if it's in use + while pid in self.metadata_locked_pids_mp: + logging.debug(sync_wait_msg) + self.metadata_condition_mp.wait() + # Modify metadata_locked_pids consecutively + logging.debug(sync_begin_debug_msg) + self.metadata_locked_pids_mp.append(pid) else: - # Delete a specific metadata file - metadata_document_name = self._computehash(pid + checked_format_id) - full_path_without_directory = rel_path + "/" + metadata_document_name - metadata_exists = self._exists(entity, full_path_without_directory) - if metadata_exists: - self._delete(entity, full_path_without_directory) - - info_string = ( - "FileHashStore - delete_metadata: Successfully deleted metadata for pid:" - + f" {pid} for format_id: {format_id}" + with self.metadata_condition: + while pid in self.metadata_locked_pids: + logging.debug(sync_wait_msg) + self.metadata_condition.wait() + logging.debug(sync_begin_debug_msg) + self.metadata_locked_pids.append(pid) + try: + # Get the metadata directory path for the given pid + entity = "metadata" + metadata_directory = self._computehash(pid) + rel_path = "/".join(self._shard(metadata_directory)) + metadata_rel_path = self._get_store_path("metadata") / rel_path + if format_id is None: + # Delete all metadata files + objects_to_delete = [] + metadata_file_paths = self._get_file_paths(metadata_rel_path) + if metadata_file_paths is not None: + for path in metadata_file_paths: + objects_to_delete.append(self._rename_path_for_deletion(path)) + for obj in objects_to_delete: + os.remove(obj) + + info_string = ( + "FileHashStore - delete_metadata: Successfully deleted all metadata" + + f"for pid: {pid}", + ) + logging.info(info_string) + return + else: + # Delete a specific metadata file + metadata_document_name = self._computehash(pid + checked_format_id) + full_path_without_directory = rel_path + "/" + metadata_document_name + metadata_exists = self._exists(entity, full_path_without_directory) + if metadata_exists: + self._delete(entity, full_path_without_directory) + + info_string = ( + "FileHashStore - delete_metadata: Successfully deleted metadata for pid:" + + f" {pid} for format_id: {format_id}" + ) + logging.info(info_string) + return + finally: + # Release pid + end_sync_debug_msg = ( + f"FileHashStore - delete_metadata: Releasing pid ({pid})" + + " from locked list" ) - logging.info(info_string) - return + if self.use_multiprocessing: + with self.metadata_condition_mp: + logging.debug(end_sync_debug_msg) + self.metadata_locked_pids_mp.remove(pid) + self.metadata_condition_mp.notify() + else: + with self.metadata_condition: + logging.debug(end_sync_debug_msg) + self.metadata_locked_pids.remove(pid) + self.metadata_condition.notify() def get_hex_digest(self, pid, algorithm): logging.debug( diff --git a/src/hashstore/hashstoreclient.py b/src/hashstore/hashstoreclient.py index decb83fb..7022a521 100644 --- a/src/hashstore/hashstoreclient.py +++ b/src/hashstore/hashstoreclient.py @@ -258,6 +258,13 @@ def __init__(self, properties, testflag=None): module_name = "filehashstore" class_name = "FileHashStore" + # Set multiprocessing to true + os.environ["USE_MULTIPROCESSING"] = "True" + use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" + logging.info( + "HashStoreClient - use_multiprocessing (bool): %s", use_multiprocessing + ) + # Instance attributes self.hashstore = factory.get_hashstore(module_name, class_name, properties) logging.info("HashStoreClient - HashStore initialized.") @@ -285,15 +292,6 @@ def store_to_hashstore_from_list(self, origin_dir, obj_type, num, skip_obj_size) ) logging.info(info_msg) - # Test Begin - # Set multiprocessing to true - os.environ["USE_MULTIPROCESSING"] = "True" - use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" - logging.info( - "HashStoreClient - use_multiprocessing (bool): %s", use_multiprocessing - ) - # Test End - # Get list of objects to store from metacat db if obj_type == self.OBJ_TYPE: checked_obj_list = self.metacatdb.refine_list_for_objects(