diff --git a/lib_client/src/d1_client/iter/base_multi.py b/lib_client/src/d1_client/iter/base_multi.py new file mode 100644 index 000000000..9d602923c --- /dev/null +++ b/lib_client/src/d1_client/iter/base_multi.py @@ -0,0 +1,211 @@ +# This work was created by participants in the DataONE project, and is +# jointly copyrighted by participating institutions in DataONE. For +# more information on DataONE, see our web site at http://dataone.org. +# +# Copyright 2009-2019 DataONE +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Base for multiprocessed DataONE type iterator.""" + +import logging +import multiprocessing +import time + +import d1_common.types.exceptions + +import d1_client.mnclient_1_2 +import d1_client.mnclient_2_0 + + +# Defaults +PAGE_SIZE = 1000 +MAX_WORKERS = 16 +# See notes in module docstring for SysMeta iterator before changing +MAX_RESULT_QUEUE_SIZE = 100 +MAX_TASK_QUEUE_SIZE = 16 +API_MAJOR = 2 + + +logger = logging.getLogger(__name__) + +# fmt: off +class MultiprocessedIteratorBase(object): + def __init__( + self, + base_url, page_size, max_workers, max_result_queue_size, + max_task_queue_size, api_major, client_arg_dict, page_arg_dict, + item_proc_arg_dict, page_func, iter_func, item_proc_func, + ): + self._base_url = base_url + self._page_size = page_size + self._max_workers = max_workers + self._max_result_queue_size = max_result_queue_size + self._max_task_queue_size = max_task_queue_size + self._api_major = api_major + self._client_arg_dict = client_arg_dict or {} + self._page_arg_dict = page_arg_dict or {} + self._item_proc_arg_dict = item_proc_arg_dict or {} + self._page_func = page_func + self._iter_func = iter_func + self._item_proc_func = item_proc_func + self._total = None + + @property + def total(self): + if self._total is None: + client = create_client( + self._base_url, self._api_major, self._client_arg_dict + ) + page_pyxb = self._page_func(client)( + start=0, count=0, **self._page_arg_dict + ) + self._total = page_pyxb.total + return self._total + + def __iter__(self): + manager = multiprocessing.Manager() + queue = manager.Queue(maxsize=self._max_result_queue_size) + namespace = manager.Namespace() + namespace.stop = False + + process = multiprocessing.Process( + target=_get_all_pages, + args=( + queue, namespace, self._base_url, self._page_size, self._max_workers, + self._max_task_queue_size, self._api_major, self._client_arg_dict, + self._page_arg_dict, self._item_proc_arg_dict, self._page_func, + self._iter_func, self._item_proc_func, self.total + ), + ) + + process.start() + + try: + while True: + item_result = queue.get() + if item_result is None: + logger.debug("Received None sentinel value. Stopping iteration") + break + elif isinstance(item_result, dict): + logger.debug('Raising exception received as dict. dict="{}"'.format(item_result)) + yield d1_common.types.exceptions.create_exception_by_name( + item_result["error"], + identifier=item_result["pid"], + ) + else: + yield item_result + except GeneratorExit: + logger.debug("GeneratorExit exception") + pass + + # If generator is exited before exhausted, provide clean shutdown of the + # generator by signaling processes to stop, then waiting for them. + logger.debug("Setting stop signal") + namespace.stop = True + # Prevent parent from leaving zombie children behind. + while queue.qsize(): + logger.debug("Dropping unwanted result") + queue.get() + logger.debug("Waiting for process to exit") + process.join() + + +def _get_all_pages( + queue, namespace, base_url, page_size, max_workers, max_task_queue_size, api_major, + client_arg_dict, page_arg_dict, item_proc_arg_dict, page_func, iter_func, item_proc_func, n_total +): + logger.debug("Creating pool of {} workers".format(max_workers)) + pool = multiprocessing.Pool(processes=max_workers) + n_pages = (n_total - 1) // page_size + 1 + + for page_idx in range(n_pages): + if namespace.stop: + logger.debug("Received stop signal") + break + try: + pool.apply_async( + _get_page, + args=( + queue, namespace, base_url, page_idx, n_pages, page_size, api_major, + client_arg_dict, page_arg_dict, item_proc_arg_dict, page_func, + iter_func, item_proc_func + ), + ) + except Exception as e: + logger.debug('Continuing after exception. error="{}"'.format(str(e))) + # The pool does not support a clean way to limit the number of queued tasks + # so we have to access the internals to check the queue size and wait if + # necessary. + while pool._taskqueue.qsize() > max_task_queue_size: + if namespace.stop: + logger.debug("Received stop signal") + break + # logger.debug('_get_all_pages(): Waiting to queue task') + time.sleep(1) + + # Workaround for workers hanging at exit. + # pool.terminate() + logger.debug("Preventing more tasks for being added to the pool") + pool.close() + logger.debug("Waiting for the workers to exit") + pool.join() + logger.debug("Sending None sentinel value to stop the generator") + queue.put(None) + + +def _get_page( + queue, namespace, base_url, page_idx, n_pages, page_size, api_major, + client_arg_dict, page_arg_dict, item_proc_arg_dict, page_func, iter_func, item_proc_func +): + logger.debug("Processing page. page_idx={} n_pages={}".format(page_idx, n_pages)) + + if namespace.stop: + logger.debug("Received stop signal") + return + + client = create_client(base_url, api_major, client_arg_dict) + + try: + page_pyxb = page_func(client)( + start=page_idx * page_size, count=page_size, **page_arg_dict + ) + except Exception as e: + logger.error( + 'Unable to get page. page_idx={} page_total={} error="{}"'.format( + page_idx, n_pages, str(e) + ) + ) + return + + iterable_pyxb = iter_func(page_pyxb) + + logger.debug( + "Starting page item iteration. page_idx={} n_items={}".format( + page_idx, len(iterable_pyxb) + ) + ) + + for item_pyxb in iterable_pyxb: + if namespace.stop: + logger.debug("Received stop signal") + break + queue.put(item_proc_func(client, item_pyxb, item_proc_arg_dict)) + + logger.debug("Completed page") + + +def create_client(base_url, api_major, client_arg_dict): + if api_major in (1, "1", "v1"): + return d1_client.mnclient_1_2.MemberNodeClient_1_2(base_url, **client_arg_dict) + else: + return d1_client.mnclient_2_0.MemberNodeClient_2_0(base_url, **client_arg_dict) diff --git a/lib_client/src/d1_client/iter/logrecord_multi.py b/lib_client/src/d1_client/iter/logrecord_multi.py index d5b40fe28..c308ff97e 100644 --- a/lib_client/src/d1_client/iter/logrecord_multi.py +++ b/lib_client/src/d1_client/iter/logrecord_multi.py @@ -24,235 +24,38 @@ """ import logging -import multiprocessing -import time -import d1_common.types.exceptions +import d1_client.iter.base_multi -import d1_client.mnclient_1_2 -import d1_client.mnclient_2_0 +logger = logging.getLogger(__name__) -# Defaults -LOG_RECORD_PAGE_SIZE = 1000 -MAX_WORKERS = 16 -# See notes in module docstring for SysMeta iterator before changing -MAX_RESULT_QUEUE_SIZE = 100 -MAX_TASK_QUEUE_SIZE = 16 -API_MAJOR = 2 - - -class LogRecordIteratorMulti(object): +# fmt: off +class LogRecordIteratorMulti(d1_client.iter.base_multi.MultiprocessedIteratorBase): def __init__( self, base_url, - page_size=LOG_RECORD_PAGE_SIZE, - max_workers=MAX_WORKERS, - max_result_queue_size=MAX_RESULT_QUEUE_SIZE, - max_task_queue_size=MAX_TASK_QUEUE_SIZE, - api_major=API_MAJOR, - client_dict=None, - get_log_records_dict=None, - debug=False, - ): - self._log = logging.getLogger(__name__) - self._base_url = base_url - self._page_size = page_size - self._max_workers = max_workers - self._max_result_queue_size = max_result_queue_size - self._max_task_queue_size = max_task_queue_size - self._api_major = api_major - self._client_dict = client_dict or {} - self._get_log_records_dict = get_log_records_dict or {} - self.total = self._get_total_object_count( - base_url, api_major, self._client_dict, self._get_log_records_dict - ) - self._debug = debug - if debug: - logger = multiprocessing.log_to_stderr() - logger.setLevel(logging.DEBUG) - - def __iter__(self): - manager = multiprocessing.Manager() - queue = manager.Queue(maxsize=self._max_result_queue_size) - namespace = manager.Namespace() - namespace.stop = False - - process = multiprocessing.Process( - target=self._get_all_pages, - args=( - queue, - namespace, - self._base_url, - self._page_size, - self._max_workers, - self._max_task_queue_size, - self._api_major, - self._client_dict, - self._get_log_records_dict, - self.total, - ), - ) - - process.start() - - try: - while True: - error_dict_or_log_pyxb = queue.get() - if error_dict_or_log_pyxb is None: - self._log.debug( - "__iter__(): Received None sentinel value. Stopping iteration" - ) - break - elif isinstance(error_dict_or_log_pyxb, dict): - yield d1_common.types.exceptions.create_exception_by_name( - error_dict_or_log_pyxb["error"], - identifier=error_dict_or_log_pyxb["pid"], - ) - else: - yield error_dict_or_log_pyxb - except GeneratorExit: - self._log.debug("__iter__(): GeneratorExit exception") - pass - - # If generator is exited before exhausted, provide clean shutdown of the - # generator by signaling processes to stop, then waiting for them. - self._log.debug("__iter__(): Setting stop signal") - namespace.stop = True - # Prevent parent from leaving zombie children behind. - while queue.qsize(): - self._log.debug("__iter__(): queue.size(): Dropping unwanted result") - queue.get() - self._log.debug("__iter__(): process.join(): Waiting for process to exit") - process.join() - - def _get_all_pages( - self, - queue, - namespace, - base_url, - page_size, - max_workers, - max_task_queue_size, - api_major, - client_dict, - get_log_records_dict, - n_total, + page_size=d1_client.iter.base_multi.PAGE_SIZE, + max_workers=d1_client.iter.base_multi.MAX_WORKERS, + max_result_queue_size=d1_client.iter.base_multi.MAX_RESULT_QUEUE_SIZE, + max_task_queue_size=d1_client.iter.base_multi.MAX_TASK_QUEUE_SIZE, + api_major=d1_client.iter.base_multi.API_MAJOR, + client_arg_dict=None, + get_log_records_arg_dict=None, ): - self._log.info("Creating pool of {} workers".format(max_workers)) - pool = multiprocessing.Pool(processes=max_workers) - n_pages = (n_total - 1) // page_size + 1 - - for page_idx in range(n_pages): - if namespace.stop: - self._log.debug("_get_all_pages(): Page iter: Received stop signal") - break - try: - pool.apply_async( - self._get_page, - args=( - queue, - namespace, - base_url, - page_idx, - n_pages, - page_size, - api_major, - client_dict, - get_log_records_dict, - ), - ) - except Exception as e: - self._log.debug( - '_get_all_pages(): pool.apply_async() error="{}"'.format(str(e)) - ) - # The pool does not support a clean way to limit the number of queued tasks - # so we have to access the internals to check the queue size and wait if - # necessary. - # noinspection PyProtectedMember - while pool._taskqueue.qsize() > max_task_queue_size: - if namespace.stop: - self._log.debug( - "_get_all_pages(): Waiting to queue task: Received stop signal" - ) - break - # self._log.debug('_get_all_pages(): Waiting to queue task') - time.sleep(1) - - # Workaround for workers hanging at exit. - # pool.terminate() - self._log.debug( - "_get_all_pages(): pool.close(): Preventing more tasks for being added to the pool" - ) - pool.close() - self._log.debug( - "_get_all_pages(): pool.join(): Waiting for the workers to exit" - ) - pool.join() - self._log.debug( - "_get_all_pages(): queue.put(None): Sending None sentinel value to stop the generator" + super(LogRecordIteratorMulti, self).__init__( + base_url, page_size, max_workers, max_result_queue_size, + max_task_queue_size, api_major, client_arg_dict, get_log_records_arg_dict, + None, _page_func, _iter_func, _item_proc_func ) - queue.put(None) - def _get_page( - self, - queue, - namespace, - base_url, - page_idx, - n_pages, - page_size, - api_major, - client_dict, - get_log_records_dict, - ): - self._log.debug("_get_page(): page_idx={} n_pages={}".format(page_idx, n_pages)) - if namespace.stop: - self._log.debug("_get_page(): Received stop signal before listObjects()") - return +def _page_func(client): + return client.getLogRecords - client = self._create_client(base_url, api_major, client_dict) - try: - log_records_pyxb = client.getLogRecords( - start=page_idx * page_size, count=page_size, **get_log_records_dict - ) - except Exception as e: - self._log.error( - '_get_page(): getLogRecords() failed. page_idx={} page_total={} error="{}"'.format( - page_idx, n_pages, str(e) - ) - ) - return +def _iter_func(page_pyxb): + return page_pyxb.logEntry - self._log.debug( - "_get_page(): Retrieved page. page_idx={} n_items={}".format( - page_idx, len(log_records_pyxb.logEntry) - ) - ) - - i = 0 - for log_entry_pyxb in log_records_pyxb.logEntry: - self._log.debug("_get_page(): Iterating over logEntry. i={}".format(i)) - i += 1 - if namespace.stop: - self._log.debug("_get_page(): logEntry iter: Received stop signal") - break - queue.put(log_entry_pyxb) - - def _create_client(self, base_url, api_major, client_dict): - self._log.debug( - '_create_client(): api="v{}"'.format(1 if api_major <= 1 else 2) - ) - if api_major <= 1: - return d1_client.mnclient_1_2.MemberNodeClient_1_2(base_url, **client_dict) - else: - return d1_client.mnclient_2_0.MemberNodeClient_2_0(base_url, **client_dict) - - def _get_total_object_count( - self, base_url, api_major, client_dict, get_log_records_dict - ): - client = self._create_client(base_url, api_major, client_dict) - args_dict = get_log_records_dict.copy() - args_dict["count"] = 0 - return client.getLogRecords(**args_dict).total +# noinspection PyUnusedLocal +def _item_proc_func(client_, item_pyxb, item_proc_arg_dict_): + return item_pyxb diff --git a/lib_client/src/d1_client/iter/node.py b/lib_client/src/d1_client/iter/node.py index 8dd1dc6c8..8677f2889 100644 --- a/lib_client/src/d1_client/iter/node.py +++ b/lib_client/src/d1_client/iter/node.py @@ -34,17 +34,17 @@ class NodeListIterator(object): def __init__( - self, base_url, api_major=API_MAJOR, client_dict=None, listNodes_dict=None + self, base_url, api_major=API_MAJOR, client_arg_dict=None, listNodes_dict=None ): self._log = logging.getLogger(__name__) self._base_url = base_url self._api_major = api_major - self._client_dict = client_dict or {} + self._client_arg_dict = client_arg_dict or {} self._listNodes_dict = listNodes_dict def __iter__(self): client = d1_client.cnclient_2_0.CoordinatingNodeClient_2_0( - self._base_url, **self._client_dict + self._base_url, **self._client_arg_dict ) # The NodeList type does not support slicing. node_list_pyxb = client.listNodes() diff --git a/lib_client/src/d1_client/iter/objectlist_multi.py b/lib_client/src/d1_client/iter/objectlist_multi.py index 215fed549..e99d10c4f 100644 --- a/lib_client/src/d1_client/iter/objectlist_multi.py +++ b/lib_client/src/d1_client/iter/objectlist_multi.py @@ -17,150 +17,47 @@ # limitations under the License. """Multiprocessed ObjectList Iterator. -Fast retrieval of ObjectInfo from a DataONE Node. +Fast retrieval of ObjectList from a DataONE Node. + +See additional notes in SysMeta iter docstring. """ import logging -import multiprocessing -import d1_client.mnclient_1_2 -import d1_client.mnclient_2_0 +import d1_client.iter.base_multi -# Defaults -OBJECT_LIST_PAGE_SIZE = 100 -MAX_WORKERS = 10 -MAX_QUEUE_SIZE = 100 -API_MAJOR = 2 +logger = logging.getLogger(__name__) -class ObjectListIteratorMulti(object): +# fmt: off +class ObjectListIteratorMulti(d1_client.iter.base_multi.MultiprocessedIteratorBase): def __init__( self, base_url, - page_size=OBJECT_LIST_PAGE_SIZE, - max_workers=MAX_WORKERS, - max_queue_size=MAX_QUEUE_SIZE, - api_major=API_MAJOR, - client_args_dict=None, - list_objects_args_dict=None, + page_size=d1_client.iter.base_multi.PAGE_SIZE, + max_workers=d1_client.iter.base_multi.MAX_WORKERS, + max_result_queue_size=d1_client.iter.base_multi.MAX_RESULT_QUEUE_SIZE, + max_task_queue_size=d1_client.iter.base_multi.MAX_TASK_QUEUE_SIZE, + api_major=d1_client.iter.base_multi.API_MAJOR, + client_arg_dict=None, + list_objects_arg_dict=None, ): - self._log = logging.getLogger(__name__) - self._base_url = base_url - self._page_size = page_size - self._max_workers = max_workers - self._max_queue_size = max_queue_size - self._api_major = api_major - self._client_args_dict = client_args_dict or {} - self._list_objects_args_dict = list_objects_args_dict or {} - # d1_common.type_conversions.set_default_pyxb_namespace(api_major) - self.total = self._get_total_object_count( - base_url, api_major, self._client_args_dict, self._list_objects_args_dict - ) - - def __iter__(self): - manager = multiprocessing.Manager() - queue = manager.Queue(maxsize=self._max_queue_size) - - process = multiprocessing.Process( - target=self._get_all_pages, - args=( - queue, - self._base_url, - self._page_size, - self._max_workers, - self._client_args_dict, - self._list_objects_args_dict, - self.total, - ), + super(ObjectListIteratorMulti, self).__init__( + base_url, page_size, max_workers, max_result_queue_size, + max_task_queue_size, api_major, client_arg_dict, list_objects_arg_dict, + None, _page_func, _iter_func, _item_proc_func ) - process.start() - while True: - object_info_pyxb = queue.get() - if object_info_pyxb is None: - self._log.debug("Received None sentinel value. Stopping iteration") - break - yield object_info_pyxb - - process.join() - - def _get_total_object_count( - self, base_url, api_major, client_args_dict, list_objects_args_dict - ): - client = self._create_client(base_url, api_major, client_args_dict) - args_dict = list_objects_args_dict.copy() - args_dict["count"] = 0 - return client.listObjects(**args_dict).total +def _page_func(client): + return client.listObjects - def _get_all_pages( - self, - queue, - base_url, - page_size, - max_workers, - client_args_dict, - list_objects_args_dict, - n_total, - ): - self._log.info("Creating pool of {} workers".format(max_workers)) - pool = multiprocessing.Pool(processes=max_workers) - n_pages = (n_total - 1) // page_size + 1 - for page_idx in range(n_pages): - self._log.debug( - "apply_async(): page_idx={} n_pages={}".format(page_idx, n_pages) - ) - pool.apply_async( - self._get_page, - args=( - queue, - base_url, - page_idx, - n_pages, - page_size, - client_args_dict, - list_objects_args_dict, - ), - ) - # Prevent any more tasks from being submitted to the pool. Once all the - # tasks have been completed the worker processes will exit. - pool.close() - # Wait for the worker processes to exit - pool.join() - # Use None as sentinel value to stop the generator - queue.put(None) +def _iter_func(page_pyxb): + return page_pyxb.objectInfo - def _get_page( - self, - queue, - base_url, - page_idx, - n_pages, - page_size, - client_args_dict, - list_objects_args_dict, - ): - client = d1_client.mnclient_2_0.MemberNodeClient_2_0( - base_url, **client_args_dict - ) - try: - object_list_pyxb = client.listObjects( - start=page_idx * page_size, count=page_size, **list_objects_args_dict - ) - self._log.debug("Retrieved page: {}/{}".format(page_idx + 1, n_pages)) - for object_info_pyxb in object_list_pyxb.objectInfo: - queue.put(object_info_pyxb) - except Exception as e: - self._log.error( - "Failed to retrieve page: {}/{}. Error: {}".format( - page_idx + 1, n_pages, str(e) - ) - ) - def _create_client(self, base_url, api_major, client_dict): - if api_major in (1, "1", "v1"): - return d1_client.mnclient_1_2.MemberNodeClient_1_2(base_url, **client_dict) - else: - return d1_client.mnclient_2_0.MemberNodeClient_2_0(base_url, **client_dict) +# noinspection PyUnusedLocal +def _item_proc_func(client_, item_pyxb, item_proc_arg_dict_): + return item_pyxb diff --git a/lib_client/src/d1_client/iter/sysmeta_multi.py b/lib_client/src/d1_client/iter/sysmeta_multi.py index efd101268..632c43ae3 100644 --- a/lib_client/src/d1_client/iter/sysmeta_multi.py +++ b/lib_client/src/d1_client/iter/sysmeta_multi.py @@ -48,263 +48,51 @@ """ import logging -import multiprocessing -import time -import d1_common.types.exceptions +import d1_client.iter.base_multi +import d1_common.xml -import d1_client.mnclient_1_2 -import d1_client.mnclient_2_0 +logger = logging.getLogger(__name__) -# Defaults -OBJECT_LIST_PAGE_SIZE = 1000 -MAX_WORKERS = 16 -# See notes in module docstring before changing -MAX_RESULT_QUEUE_SIZE = 100 -MAX_TASK_QUEUE_SIZE = 16 -API_MAJOR = 2 - -class SystemMetadataIteratorMulti(object): +# fmt: off +class SystemMetadataIteratorMulti(d1_client.iter.base_multi.MultiprocessedIteratorBase): def __init__( self, base_url, - page_size=OBJECT_LIST_PAGE_SIZE, - max_workers=MAX_WORKERS, - max_result_queue_size=MAX_RESULT_QUEUE_SIZE, - max_task_queue_size=MAX_TASK_QUEUE_SIZE, - api_major=API_MAJOR, - client_dict=None, - list_objects_dict=None, - get_sysmeta_dict=None, - debug=False, + page_size=d1_client.iter.base_multi.PAGE_SIZE, + max_workers=d1_client.iter.base_multi.MAX_WORKERS, + max_result_queue_size=d1_client.iter.base_multi.MAX_RESULT_QUEUE_SIZE, + max_task_queue_size=d1_client.iter.base_multi.MAX_TASK_QUEUE_SIZE, + api_major=d1_client.iter.base_multi.API_MAJOR, + client_arg_dict=None, + list_objects_arg_dict=None, + get_system_metadata_arg_dict=None, ): - self._log = logging.getLogger(__name__) - self._base_url = base_url - self._page_size = page_size - self._max_workers = max_workers - self._max_queue_size = max_result_queue_size - self._max_task_queue_size = max_task_queue_size - self._api_major = api_major - self._client_dict = client_dict or {} - self._list_objects_dict = list_objects_dict or {} - self._get_sysmeta_dict = get_sysmeta_dict or {} - self.total = self._get_total_object_count( - base_url, api_major, self._client_dict, self._list_objects_dict - ) - self._debug = debug - if debug: - logger = multiprocessing.log_to_stderr() - logger.setLevel(logging.DEBUG) - - def __iter__(self): - manager = multiprocessing.Manager() - queue = manager.Queue(maxsize=self._max_queue_size) - namespace = manager.Namespace() - namespace.stop = False - - process = multiprocessing.Process( - target=self._get_all_pages, - args=( - queue, - namespace, - self._base_url, - self._page_size, - self._max_workers, - self._max_task_queue_size, - self._api_major, - self._client_dict, - self._list_objects_dict, - self._get_sysmeta_dict, - self.total, - ), + super(SystemMetadataIteratorMulti, self).__init__( + base_url, page_size, max_workers, max_result_queue_size, + max_task_queue_size, api_major, client_arg_dict, list_objects_arg_dict, + get_system_metadata_arg_dict, _page_func, _iter_func, _item_proc_func ) - process.start() - try: - while True: - error_dict_or_sysmeta_pyxb = queue.get() - if error_dict_or_sysmeta_pyxb is None: - self._log.debug( - "__iter__(): Received None sentinel value. Stopping iteration" - ) - break - elif isinstance(error_dict_or_sysmeta_pyxb, dict): - yield d1_common.types.exceptions.create_exception_by_name( - error_dict_or_sysmeta_pyxb["error"], - identifier=error_dict_or_sysmeta_pyxb["pid"], - ) - else: - yield error_dict_or_sysmeta_pyxb - except GeneratorExit: - self._log.debug("__iter__(): GeneratorExit exception") - pass +def _page_func(client): + return client.listObjects - # If generator is exited before exhausted, provide clean shutdown of the - # generator by signaling processes to stop, then waiting for them. - self._log.debug("__iter__(): Setting stop signal") - namespace.stop = True - # Prevent parent from leaving zombie children behind. - while queue.qsize(): - self._log.debug("__iter__(): queue.size(): Dropping unwanted result") - queue.get() - self._log.debug("__iter__(): process.join(): Waiting for process to exit") - process.join() - - def _get_all_pages( - self, - queue, - namespace, - base_url, - page_size, - max_workers, - max_task_queue_size, - api_major, - client_dict, - list_objects_dict, - get_sysmeta_dict, - n_total, - ): - self._log.info("Creating pool of {} workers".format(max_workers)) - pool = multiprocessing.Pool(processes=max_workers) - n_pages = (n_total - 1) // page_size + 1 - for page_idx in range(n_pages): - if namespace.stop: - self._log.debug("_get_all_pages(): Page iter: Received stop signal") - break - try: - pool.apply_async( - self._get_page, - args=( - queue, - namespace, - base_url, - page_idx, - n_pages, - page_size, - api_major, - client_dict, - list_objects_dict, - get_sysmeta_dict, - ), - ) - except Exception as e: - self._log.debug( - '_get_all_pages(): pool.apply_async() error="{}"'.format(str(e)) - ) - # The pool does not support a clean way to limit the number of queued tasks - # so we have to access the internals to check the queue size and wait if - # necessary. - # noinspection PyProtectedMember,PyUnresolvedReferences - while pool._taskqueue.qsize() > max_task_queue_size: - if namespace.stop: - self._log.debug( - "_get_all_pages(): Waiting to queue task: Received stop signal" - ) - break - # self._log.debug('_get_all_pages(): Waiting to queue task') - time.sleep(1) +def _iter_func(page_pyxb): + return page_pyxb.objectInfo - # Workaround for workers hanging at exit. - # pool.terminate() - self._log.debug( - "_get_all_pages(): pool.close(): Preventing more tasks for being added to the pool" - ) - pool.close() - self._log.debug( - "_get_all_pages(): pool.join(): Waiting for the workers to exit" - ) - pool.join() - self._log.debug( - "_get_all_pages(): queue.put(None): Sending None sentinel value to stop the generator" - ) - queue.put(None) - - def _get_page( - self, - queue, - namespace, - base_url, - page_idx, - n_pages, - page_size, - api_major, - client_dict, - list_objects_dict, - get_sysmeta_dict, - ): - self._log.debug("_get_page(): page_idx={} n_pages={}".format(page_idx, n_pages)) - if namespace.stop: - self._log.debug("_get_page(): Received stop signal before listObjects()") - return - - client = self._create_client(base_url, api_major, client_dict) - - try: - object_list_pyxb = client.listObjects( - start=page_idx * page_size, count=page_size, **list_objects_dict - ) - except Exception as e: - self._log.error( - '_get_page(): listObjects() failed. page_idx={} page_total={} error="{}"'.format( - page_idx, n_pages, str(e) - ) - ) - return - - self._log.debug( - "_get_page(): Retrieved page. page_idx={} n_items={}".format( - page_idx, len(object_list_pyxb.objectInfo) - ) - ) - - i = 0 - for object_info_pyxb in object_list_pyxb.objectInfo: - self._log.debug("_get_page(): Iterating over objectInfo. i={}".format(i)) - i += 1 - if namespace.stop: - self._log.debug("_get_page(): objectInfo iter: Received stop signal") - break - self._get_sysmeta( - client, queue, object_info_pyxb.identifier.value(), get_sysmeta_dict - ) - - def _get_sysmeta(self, client, queue, pid, get_sysmeta_dict): - self._log.debug('_get_sysmeta(): pid="{}"'.format(pid)) - try: - sysmeta_pyxb = client.getSystemMetadata(pid, get_sysmeta_dict) - except d1_common.types.exceptions.DataONEException as e: - self._log.debug( - '_get_sysmeta(): getSystemMetadata() failed. pid="{}" error="{}"'.format( - pid, str(e) - ) - ) - queue.put({"pid": pid, "error": e.name}) - except Exception as e: - self._log.debug( - '_get_sysmeta(): getSystemMetadata() failed. pid="{}" error="{}"'.format( - pid, str(e) - ) +def _item_proc_func(client, item_pyxb, get_system_metadata_arg_dict): + pid = d1_common.xml.get_req_val(item_pyxb.identifier) + logger.debug('Retrieving System Metadata. pid="{}".format(pid)') + try: + return client.getSystemMetadata(pid, get_system_metadata_arg_dict) + except Exception as e: + logger.error( + 'Unable to retrieve System Metadata. pid="{}" error="{}"'.format( + pid, str(e) ) - else: - queue.put(sysmeta_pyxb) - - def _create_client(self, base_url, api_major, client_dict): - self._log.debug( - '_create_client(): api="v{}"'.format(1 if api_major <= 1 else 2) ) - if api_major <= 1: - return d1_client.mnclient_1_2.MemberNodeClient_1_2(base_url, **client_dict) - else: - return d1_client.mnclient_2_0.MemberNodeClient_2_0(base_url, **client_dict) - - def _get_total_object_count( - self, base_url, api_major, client_dict, list_objects_dict - ): - client = self._create_client(base_url, api_major, client_dict) - args_dict = list_objects_dict.copy() - args_dict["count"] = 0 - return client.listObjects(**args_dict).total + return {"pid": pid, "error": e.name} diff --git a/lib_client/src/d1_client/tests/test_iter_log_record_multi.py b/lib_client/src/d1_client/tests/test_iter_log_record_multi.py index df2cc0d57..d3d7aa1be 100644 --- a/lib_client/src/d1_client/tests/test_iter_log_record_multi.py +++ b/lib_client/src/d1_client/tests/test_iter_log_record_multi.py @@ -44,8 +44,8 @@ def _log_record_iterator_test(self, page_size, from_date=None, to_date=None): base_url=d1_test.d1_test_case.MOCK_MN_BASE_URL, page_size=page_size, api_major=2, - client_dict={"verify_tls": False, "timeout_sec": 0}, - get_log_records_dict={"fromDate": from_date, "toDate": to_date}, + client_arg_dict={"verify_tls": False, "timeout_sec": 0}, + get_log_records_arg_dict={"fromDate": from_date, "toDate": to_date}, ) i = 0 diff --git a/lib_client/src/d1_client/tests/test_iter_object_list_multi.py b/lib_client/src/d1_client/tests/test_iter_object_list_multi.py index 0a0a2524e..1098de686 100644 --- a/lib_client/src/d1_client/tests/test_iter_object_list_multi.py +++ b/lib_client/src/d1_client/tests/test_iter_object_list_multi.py @@ -17,11 +17,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import sys import d1_common import freezegun -import pytest import responses import d1_test.d1_test_case @@ -49,7 +47,7 @@ def test_1000(self, mn_client_v1_v2): d1_test.d1_test_case.MOCK_MN_BASE_URL, page_size=13, max_workers=2, - max_queue_size=10, + max_result_queue_size=10, api_major=api_major, ) diff --git a/lib_client/src/d1_client/tests/test_iter_sysmeta_multi.py b/lib_client/src/d1_client/tests/test_iter_sysmeta_multi.py index bbb14b1f7..690558cae 100644 --- a/lib_client/src/d1_client/tests/test_iter_sysmeta_multi.py +++ b/lib_client/src/d1_client/tests/test_iter_sysmeta_multi.py @@ -89,11 +89,11 @@ def test_1000(self, page_size, n_workers, from_date, to_date): d1_test.d1_test_case.MOCK_MN_BASE_URL, page_size=page_size, max_workers=n_workers, - client_dict={ + client_arg_dict={ # 'cert_pem_path': cert_pem_path, # 'cert_key_path': cert_key_path, }, - list_objects_dict={"fromDate": from_date, "toDate": to_date}, + list_objects_arg_dict={"fromDate": from_date, "toDate": to_date}, ) for sysmeta_pyxb in sysmeta_iter: