diff --git a/corrector_module/opmon_corrector/corrector_batch.py b/corrector_module/opmon_corrector/corrector_batch.py index c40e9f79..b0cf2394 100644 --- a/corrector_module/opmon_corrector/corrector_batch.py +++ b/corrector_module/opmon_corrector/corrector_batch.py @@ -21,7 +21,6 @@ # THE SOFTWARE. import multiprocessing -import queue import time from collections import defaultdict @@ -53,11 +52,12 @@ def run(self, process_dict): # Raise exception again raise e - def _process_workers(self, list_to_process, duplicates): + def _process_workers(self, list_to_process, duplicates, job_type='consume'): """ Processes the workers in a thread pool. :param list_to_process: queue of items to be processed by the worker processes - :param duplicates: a shared Value object to store the number of duplicates encountered during processing + :param duplicates: a shared Value object to store the number of duplicates encountered + during processing : return: None """ # Configure worker @@ -66,7 +66,7 @@ def _process_workers(self, list_to_process, duplicates): # Configure worker worker = CorrectorWorker(self.settings, f'worker_{i}') p = multiprocessing.Process( - target=worker.run, args=(list_to_process, duplicates) + target=worker.run, args=(list_to_process, duplicates, job_type) ) pool.append(p) @@ -79,9 +79,10 @@ def _process_workers(self, list_to_process, duplicates): def _batch_run(self, process_dict): """ - Gets unique xRequestId's, gets documents by xRequestId, corrects documents, initializes workers, - gets raw documents, groups by "messageId", corrects documents' structure, initializes workers, - updates timeout documents to "done", removes duplicates from raw_messages. + Gets unique xRequestId's, gets documents by xRequestId, corrects documents, initializes + workers, gets raw documents, groups by "messageId", corrects documents' structure, + initializes workers, updates timeout documents to "done", removes duplicates from + raw_messages. :param process_dict: :return: Returns the amount of documents still to process. """ @@ -97,7 +98,7 @@ def _batch_run(self, process_dict): limit = self.settings['corrector']['documents-max'] cursor = db_m.get_raw_documents(limit) - self.logger_m.log_info('corrector_batch_raw', 'Processing {0} raw documents'.format(len(cursor))) + self.logger_m.log_info('corrector_batch_raw', f'Processing {len(cursor)} raw documents.') # Process documents with xRequestId doc_map = defaultdict(list) @@ -124,12 +125,13 @@ def _batch_run(self, process_dict): self._process_workers(list_to_process, duplicates) if duplicates.value > 0: - self.logger_m.log_info('corrector_batch_remove_duplicates_from_raw', - 'Total of {0} duplicate documents removed from raw messages.'.format( - duplicates.value)) + self.logger_m.log_info( + 'corrector_batch_remove_duplicates_from_raw', + f'Total of {duplicates.value} duplicate documents removed from raw messages.') else: - self.logger_m.log_info('corrector_batch_remove_duplicates_from_raw', - 'No raw documents marked to removal.') + self.logger_m.log_info( + 'corrector_batch_remove_duplicates_from_raw', + 'No raw documents marked to removal.') # Process documents without xRequestId cursor = db_m.get_faulty_raw_documents(limit) @@ -152,38 +154,39 @@ def _batch_run(self, process_dict): db_m.add_to_clean_data(cleaned_document) db_m.mark_as_corrected(fixed_doc) + # Updating Status of older documents from processing to done timeout = self.settings['corrector']['timeout-days'] - self.logger_m.log_info('corrector_batch_update_timeout', - f'Updating timed out [{timeout} days] orphans to done.') - - # Update Status of older documents according to client.requestInTs - cursor = db_m.get_timeout_documents_client(timeout, limit=limit) - list_of_docs = list(cursor) - number_of_updated_docs = db_m.update_old_to_done(list_of_docs) - - if number_of_updated_docs > 0: - self.logger_m.log_info('corrector_batch_update_client_old_to_done', - "Total of {0} orphans from Client updated to status 'done'.".format( - number_of_updated_docs)) + self.logger_m.log_info( + 'corrector_batch_update_timeout', + f'Updating timed out [{timeout} days] orphans to done.') + + list_of_doc_ids_client = db_m.get_timeout_document_ids_client(timeout, limit=limit) + list_of_doc_ids_producer = db_m.get_timeout_document_ids_producer(timeout, limit=limit) + list_of_doc_ids = list_of_doc_ids_client + list_of_doc_ids_producer + if len(list_of_doc_ids) > 0: + doc_len += len(list_of_doc_ids) + list_to_process = multiprocessing.Queue() + for _doc in list_of_doc_ids: + list_to_process.put(_doc['_id']) + self._process_workers(list_to_process, None, 'timeout') + + if len(list_of_doc_ids_client) > 0: + self.logger_m.log_info( + 'corrector_batch_update_client_old_to_done', + f'Total of {len(list_of_doc_ids_client)} orphans from Client ' + "updated to status 'done'.") else: - self.logger_m.log_info('corrector_batch_update_client_old_to_done', - 'No orphans updated to done.') - doc_len += number_of_updated_docs - - # Update Status of older documents according to producer.requestInTs - cursor = db_m.get_timeout_documents_producer(timeout, limit=limit) - list_of_docs = list(cursor) - number_of_updated_docs = db_m.update_old_to_done(list_of_docs) - - if number_of_updated_docs > 0: - self.logger_m.log_info('corrector_batch_update_producer_old_to_done', - "Total of {0} orphans from Producer updated to status 'done'.".format( - number_of_updated_docs)) + self.logger_m.log_info( + 'corrector_batch_update_client_old_to_done', 'No orphans updated to done.') + + if len(list_of_doc_ids_producer) > 0: + self.logger_m.log_info( + 'corrector_batch_update_producer_old_to_done', + f'Total of {len(list_of_doc_ids_producer)} orphans from Producer ' + "updated to status 'done'.") else: - self.logger_m.log_info('corrector_batch_update_producer_old_to_done', - 'No orphans updated to done.') - - doc_len += number_of_updated_docs + self.logger_m.log_info( + 'corrector_batch_update_producer_old_to_done', 'No orphans updated to done.') end_processing_time = time.time() total_time = time.strftime( diff --git a/corrector_module/opmon_corrector/corrector_worker.py b/corrector_module/opmon_corrector/corrector_worker.py index a4c16f7c..2f197b11 100644 --- a/corrector_module/opmon_corrector/corrector_worker.py +++ b/corrector_module/opmon_corrector/corrector_worker.py @@ -35,10 +35,11 @@ def __init__(self, settings, name): self.db_m = None self.worker_name = name - def run(self, to_process, duplicates): + def run(self, to_process, duplicates, job_type='consume'): """ Process run entry point :param to_process: Queue of documents to be processed :param duplicates: Variable to hold the number of duplicates + :param job_type: Job type (consume / timeout) :return: None """ self.db_m = database_manager.DatabaseManager(self.settings) @@ -46,9 +47,12 @@ def run(self, to_process, duplicates): # Process queue while is not empty while True: data = to_process.get(True, 1) - duplicate_count = self.consume_data(data) - with duplicates.get_lock(): - duplicates.value += duplicate_count + if job_type == 'consume': + duplicate_count = self.consume_data(data) + with duplicates.get_lock(): + duplicates.value += duplicate_count + elif job_type == 'timeout': + self.db_m.update_old_doc_to_done(data) except queue.Empty: pass diff --git a/corrector_module/opmon_corrector/database_manager.py b/corrector_module/opmon_corrector/database_manager.py index 5d7da25a..299e7999 100644 --- a/corrector_module/opmon_corrector/database_manager.py +++ b/corrector_module/opmon_corrector/database_manager.py @@ -157,12 +157,13 @@ def get_processing_document(self, current_doc: dict) -> Optional[dict]: self.logger_m.log_exception('DatabaseManager.get_processing_documents', repr(e)) raise e - def get_timeout_documents_client(self, timeout_days: int, limit: int = 1000) -> List[dict]: + def get_timeout_document_ids_client(self, timeout_days: int, limit: int = 1000) -> List[dict]: """ - Gets the documents from Client that have been processing more than timeout_days. + Gets the document ids (without other fields) from Client that have been processing more + than timeout_days. :param timeout_days: The timeout days. - :param limit: Number of documents to return. - :return: Returns the documents that have been processing more than timeout_days. + :param limit: Number of document ids to return. + :return: Returns the document ids that have been processing more than timeout_days. """ try: db = self.get_query_db() @@ -173,18 +174,20 @@ def get_timeout_documents_client(self, timeout_days: int, limit: int = 1000) -> 'client.requestInTs': {'$lt': ref_time}, 'client.xRequestId': {'$ne': None} } - cursor = clean_data.find(q).limit(limit) + cursor = clean_data.find(q, {'_id': True}).limit(limit) return list(cursor) except Exception as e: - self.logger_m.log_exception('DatabaseManager.get_timeout_documents_client', repr(e)) + self.logger_m.log_exception( + 'DatabaseManager.get_timeout_document_ids_client', repr(e)) raise e - def get_timeout_documents_producer(self, timeout_days: int, limit: int = 1000) -> List[dict]: + def get_timeout_document_ids_producer(self, timeout_days: int, limit: int = 1000) -> List[dict]: """ - Gets the documents from Producer that have been processing more than timeout_days. + Gets the document ids (without other fields) from Producer that have been processing more + than timeout_days. :param timeout_days: The timeout days. - :param limit: Number of documents to return. - :return: Returns the documents that have been processing more than timeout_days. + :param limit: Number of document ids to return. + :return: Returns the document ids that have been processing more than timeout_days. """ try: db = self.get_query_db() @@ -192,14 +195,17 @@ def get_timeout_documents_producer(self, timeout_days: int, limit: int = 1000) - ref_time = 1000 * (get_timestamp() - (timeout_days * 24 * 60 * 60)) q = { 'correctorStatus': 'processing', - 'client.requestInTs': {'$exists': False}, + # This check does not seem to be necessary. + # Documents with both client and producer should never be in "processing" state. + # 'client.requestInTs': {'$exists': False}, 'producer.requestInTs': {'$lt': ref_time}, 'producer.xRequestId': {'$ne': None} } - cursor = clean_data.find(q).limit(limit) + cursor = clean_data.find(q, {'_id': True}).limit(limit) return list(cursor) except Exception as e: - self.logger_m.log_exception('DatabaseManager.get_timeout_documents_producer', repr(e)) + self.logger_m.log_exception( + 'DatabaseManager.get_timeout_document_ids_producer', repr(e)) raise e def add_to_clean_data(self, document: dict) -> None: @@ -231,27 +237,22 @@ def update_document_clean_data(self, document: dict) -> None: self.logger_m.log_exception('DatabaseManager.update_form_clean_data', repr(e)) raise e - def update_old_to_done(self, list_of_docs: List[dict]) -> int: + def update_old_doc_to_done(self, doc_id: str) -> None: """ - Updates then correctorStatus to "done" for the given list of documents. Also updates the correctorTime. - :param list_of_docs: The input list of documents to be updated. - :return: Number of documents updated. + Updates correctorStatus to "done" for the given document. Also updates the correctorTime. + :param doc_id: Document "_id" to be updated. + :return: None. """ - number_of_updated_docs = 0 try: db = self.get_query_db() clean_data = db[CLEAN_DATA_COLLECTION] - for doc in list_of_docs: - clean_data.update({'_id': doc['_id']}, - {'$set': {'correctorStatus': 'done', 'correctorTime': get_timestamp()}}) - number_of_updated_docs += 1 - + clean_data.update_one( + {'_id': doc_id}, + {'$set': {'correctorStatus': 'done', 'correctorTime': get_timestamp()}}) except Exception as e: self.logger_m.log_exception('DatabaseManager.update_old_to_done', repr(e)) raise e - return number_of_updated_docs - def check_clean_document_exists(self, x_request_id: str, document: dict) -> bool: """ Checks if given document exists in clean_data or not diff --git a/corrector_module/opmon_corrector/tests/test_corrector_batch.py b/corrector_module/opmon_corrector/tests/test_corrector_batch.py index 1366ed5e..4a4e036b 100644 --- a/corrector_module/opmon_corrector/tests/test_corrector_batch.py +++ b/corrector_module/opmon_corrector/tests/test_corrector_batch.py @@ -19,9 +19,9 @@ class SingleProcessedCorrectorBatch(CorrectorBatch): - def _process_workers(self, list_to_process, duplicates): + def _process_workers(self, list_to_process, duplicates, job_type='consume'): worker = CorrectorWorker(self.settings, 'worker 1') - worker.run(list_to_process, duplicates) + worker.run(list_to_process, duplicates, job_type) def read_data_from_json(path):