diff --git a/corrector_module/opmon_corrector/corrector_batch.py b/corrector_module/opmon_corrector/corrector_batch.py index b0cf239..ad46d23 100644 --- a/corrector_module/opmon_corrector/corrector_batch.py +++ b/corrector_module/opmon_corrector/corrector_batch.py @@ -135,24 +135,18 @@ def _batch_run(self, process_dict): # Process documents without xRequestId cursor = db_m.get_faulty_raw_documents(limit) - self.logger_m.log_info('corrector_batch_raw', 'Processing {0} faulty raw documents'.format(len(cursor))) - - for _doc in cursor: - sanitised_doc = doc_m.sanitise_document(_doc) - fixed_doc = doc_m.correct_structure(sanitised_doc) - producer = fixed_doc if fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_PRODUCER else None - client = fixed_doc if fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_CLIENT else None - cleaned_document = doc_m.create_json( - client, producer, '' - ) - cleaned_document = doc_m.apply_calculations(cleaned_document) - cleaned_document['correctorTime'] = database_manager.get_timestamp() - cleaned_document['correctorStatus'] = 'done' - cleaned_document['xRequestId'] = '' - cleaned_document['matchingType'] = 'orphan' - cleaned_document['messageId'] = fixed_doc.get('message_id') or '' - db_m.add_to_clean_data(cleaned_document) - db_m.mark_as_corrected(fixed_doc) + self.logger_m.log_info( + 'corrector_batch_raw', f'Processing {len(cursor)} faulty raw documents') + if len(cursor) > 0: + doc_len += len(cursor) + list_to_process = multiprocessing.Queue() + for _doc in cursor: + data = dict() + data['logger_manager'] = self.logger_m + data['document_manager'] = doc_m + data['document'] = _doc + list_to_process.put(data) + self._process_workers(list_to_process, None, 'faulty') # Updating Status of older documents from processing to done timeout = self.settings['corrector']['timeout-days'] diff --git a/corrector_module/opmon_corrector/corrector_worker.py b/corrector_module/opmon_corrector/corrector_worker.py index 2f197b1..a392f79 100644 --- a/corrector_module/opmon_corrector/corrector_worker.py +++ b/corrector_module/opmon_corrector/corrector_worker.py @@ -51,6 +51,8 @@ def run(self, to_process, duplicates, job_type='consume'): duplicate_count = self.consume_data(data) with duplicates.get_lock(): duplicates.value += duplicate_count + elif job_type == 'faulty': + self.consume_faulty_data(data) elif job_type == 'timeout': self.db_m.update_old_doc_to_done(data) except queue.Empty: @@ -59,7 +61,8 @@ def run(self, to_process, duplicates, job_type='consume'): def consume_data(self, data): """ The Corrector worker. Processes a batch of documents with the same xRequestId - :param data: Contains LoggerManager, DocumentManager, x_request_id and documents to be processed. + :param data: Contains LoggerManager, DocumentManager, x_request_id and documents to be + processed. :return: Returns number of duplicates found. """ # Get parameters @@ -150,3 +153,30 @@ def consume_data(self, data): self.db_m.mark_as_corrected(matched_pair[party]) return duplicates + + def consume_faulty_data(self, data): + """ + The Corrector worker for faulty documents without xRequestId. + :param data: Contains LoggerManager, DocumentManager and document to be processed. + :return: None. + """ + # Get parameters + # logger_manager = data['logger_manager'] + doc_m = data['document_manager'] + sanitised_doc = doc_m.sanitise_document(data['document']) + fixed_doc = doc_m.correct_structure(sanitised_doc) + producer = fixed_doc if ( + fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_PRODUCER) else None + client = fixed_doc if ( + fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_CLIENT) else None + cleaned_document = doc_m.create_json( + client, producer, '' + ) + cleaned_document = doc_m.apply_calculations(cleaned_document) + cleaned_document['correctorTime'] = database_manager.get_timestamp() + cleaned_document['correctorStatus'] = 'done' + cleaned_document['xRequestId'] = '' + cleaned_document['matchingType'] = 'orphan' + cleaned_document['messageId'] = fixed_doc.get('message_id') or '' + self.db_m.add_to_clean_data(cleaned_document) + self.db_m.mark_as_corrected(fixed_doc)