Skip to content

Commit

Permalink
Optimizing processing documents without xRequestId
Browse files Browse the repository at this point in the history
Using multiprocessing for processing of documents without xRequestId.

Adding documents without xRequestId to total number of documents processed.
  • Loading branch information
VitaliStupin committed Nov 29, 2023
1 parent 69732d1 commit 55a6dbb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 19 deletions.
30 changes: 12 additions & 18 deletions corrector_module/opmon_corrector/corrector_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,18 @@ def _batch_run(self, process_dict):

# Process documents without xRequestId
cursor = db_m.get_faulty_raw_documents(limit)
self.logger_m.log_info('corrector_batch_raw', 'Processing {0} faulty raw documents'.format(len(cursor)))

for _doc in cursor:
sanitised_doc = doc_m.sanitise_document(_doc)
fixed_doc = doc_m.correct_structure(sanitised_doc)
producer = fixed_doc if fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_PRODUCER else None
client = fixed_doc if fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_CLIENT else None
cleaned_document = doc_m.create_json(
client, producer, ''
)
cleaned_document = doc_m.apply_calculations(cleaned_document)
cleaned_document['correctorTime'] = database_manager.get_timestamp()
cleaned_document['correctorStatus'] = 'done'
cleaned_document['xRequestId'] = ''
cleaned_document['matchingType'] = 'orphan'
cleaned_document['messageId'] = fixed_doc.get('message_id') or ''
db_m.add_to_clean_data(cleaned_document)
db_m.mark_as_corrected(fixed_doc)
self.logger_m.log_info(
'corrector_batch_raw', f'Processing {len(cursor)} faulty raw documents')
if len(cursor) > 0:
doc_len += len(cursor)
list_to_process = multiprocessing.Queue()
for _doc in cursor:
data = dict()
data['logger_manager'] = self.logger_m
data['document_manager'] = doc_m
data['document'] = _doc
list_to_process.put(data)
self._process_workers(list_to_process, None, 'faulty')

# Updating Status of older documents from processing to done
timeout = self.settings['corrector']['timeout-days']
Expand Down
32 changes: 31 additions & 1 deletion corrector_module/opmon_corrector/corrector_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def run(self, to_process, duplicates, job_type='consume'):
duplicate_count = self.consume_data(data)
with duplicates.get_lock():
duplicates.value += duplicate_count
elif job_type == 'faulty':
self.consume_faulty_data(data)
elif job_type == 'timeout':
self.db_m.update_old_doc_to_done(data)
except queue.Empty:
Expand All @@ -59,7 +61,8 @@ def run(self, to_process, duplicates, job_type='consume'):
def consume_data(self, data):
"""
The Corrector worker. Processes a batch of documents with the same xRequestId
:param data: Contains LoggerManager, DocumentManager, x_request_id and documents to be processed.
:param data: Contains LoggerManager, DocumentManager, x_request_id and documents to be
processed.
:return: Returns number of duplicates found.
"""
# Get parameters
Expand Down Expand Up @@ -150,3 +153,30 @@ def consume_data(self, data):
self.db_m.mark_as_corrected(matched_pair[party])

return duplicates

def consume_faulty_data(self, data):
"""
The Corrector worker for faulty documents without xRequestId.
:param data: Contains LoggerManager, DocumentManager and document to be processed.
:return: None.
"""
# Get parameters
# logger_manager = data['logger_manager']
doc_m = data['document_manager']
sanitised_doc = doc_m.sanitise_document(data['document'])
fixed_doc = doc_m.correct_structure(sanitised_doc)
producer = fixed_doc if (
fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_PRODUCER) else None
client = fixed_doc if (
fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_CLIENT) else None
cleaned_document = doc_m.create_json(
client, producer, ''
)
cleaned_document = doc_m.apply_calculations(cleaned_document)
cleaned_document['correctorTime'] = database_manager.get_timestamp()
cleaned_document['correctorStatus'] = 'done'
cleaned_document['xRequestId'] = ''
cleaned_document['matchingType'] = 'orphan'
cleaned_document['messageId'] = fixed_doc.get('message_id') or ''
self.db_m.add_to_clean_data(cleaned_document)
self.db_m.mark_as_corrected(fixed_doc)

0 comments on commit 55a6dbb

Please sign in to comment.