Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Corrector optimization #106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 61 additions & 84 deletions corrector_module/opmon_corrector/corrector_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
# THE SOFTWARE.

import multiprocessing
import queue
import time
from collections import defaultdict

Expand Down Expand Up @@ -53,11 +52,12 @@ def run(self, process_dict):
# Raise exception again
raise e

def _process_workers(self, list_to_process, duplicates):
def _process_workers(self, list_to_process, duplicates, job_type='consume'):
"""
Processes the workers in a thread pool.
:param list_to_process: queue of items to be processed by the worker processes
:param duplicates: a shared Value object to store the number of duplicates encountered during processing
:param duplicates: a shared Value object to store the number of duplicates encountered
during processing
: return: None
"""
# Configure worker
Expand All @@ -66,7 +66,7 @@ def _process_workers(self, list_to_process, duplicates):
# Configure worker
worker = CorrectorWorker(self.settings, f'worker_{i}')
p = multiprocessing.Process(
target=worker.run, args=(list_to_process, duplicates)
target=worker.run, args=(list_to_process, duplicates, job_type)
)
pool.append(p)

Expand All @@ -79,9 +79,10 @@ def _process_workers(self, list_to_process, duplicates):

def _batch_run(self, process_dict):
"""
Gets unique xRequestId's, gets documents by xRequestId, corrects documents, unitializes workers,
gets raw documents, groups by "messageId", corrects documents' structure, initializes workers,
updates timeout documents to "done", removes duplicates from raw_messages.
Gets unique xRequestId's, gets documents by xRequestId, corrects documents, initializes
workers, gets raw documents, groups by "messageId", corrects documents' structure,
initializes workers, updates timeout documents to "done", removes duplicates from
raw_messages.
:param process_dict:
:return: Returns the amount of documents still to process.
"""
Expand All @@ -97,113 +98,89 @@ def _batch_run(self, process_dict):

limit = self.settings['corrector']['documents-max']
cursor = db_m.get_raw_documents(limit)
self.logger_m.log_info('corrector_batch_raw', 'Processing {0} raw documents'.format(len(cursor)))
self.logger_m.log_info('corrector_batch_raw', f'Processing {len(cursor)} raw documents.')

# Process documents with xRequestId
doc_map = defaultdict(list)
for _doc in cursor:
sanitised_doc = document_manager.DocumentManager.sanitise_document(_doc)
x_request_id = sanitised_doc.get('xRequestId')
x_request_id = _doc.get('xRequestId')
if not x_request_id:
continue
fix_doc = doc_m.correct_structure(sanitised_doc)
doc_map[x_request_id].append(fix_doc)
doc_map[x_request_id].append(_doc)

# Build queue to be processed
list_to_process = multiprocessing.Queue()
duplicates = multiprocessing.Value('i', 0, lock=True)

m = multiprocessing.Manager()
to_remove_queue = m.Queue()

for x_request_id in doc_map:
documents = doc_map[x_request_id]
data = dict()
data['logger_manager'] = self.logger_m
data['document_manager'] = doc_m
data['x_request_id'] = x_request_id
data['documents'] = documents
data['to_remove_queue'] = to_remove_queue
list_to_process.put(data)
doc_len += len(documents)

self._process_workers(list_to_process, duplicates)

# Go through the to_remove list and remove the duplicates
element_in_queue = True
total_raw_removed = 0
while element_in_queue:
try:
# Do not block queue and return element immediately else raise the queue.Empty
element = to_remove_queue.get(block=False)
db_m.remove_duplicate_from_raw(element)
total_raw_removed += 1
except queue.Empty:
element_in_queue = False
if total_raw_removed > 0:
self.logger_m.log_info('corrector_batch_remove_duplicates_from_raw',
'Total of {0} duplicate documents removed from raw messages.'.format(
total_raw_removed))
if duplicates.value > 0:
self.logger_m.log_info(
'corrector_batch_remove_duplicates_from_raw',
f'Total of {duplicates.value} duplicate documents removed from raw messages.')
else:
self.logger_m.log_info('corrector_batch_remove_duplicates_from_raw',
'No raw documents marked to removal.')

doc_len += total_raw_removed
self.logger_m.log_info(
'corrector_batch_remove_duplicates_from_raw',
'No raw documents marked to removal.')

# Process documents without xRequestId
cursor = db_m.get_faulty_raw_documents(limit)
self.logger_m.log_info('corrector_batch_raw', 'Processing {0} faulty raw documents'.format(len(cursor)))

for _doc in cursor:
sanitised_doc = document_manager.DocumentManager.sanitise_document(_doc)
fixed_doc = doc_m.correct_structure(sanitised_doc)
producer = fixed_doc if fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_PRODUCER else None
client = fixed_doc if fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_CLIENT else None
cleaned_document = doc_m.create_json(
client, producer, ''
)
cleaned_document = doc_m.apply_calculations(cleaned_document)
cleaned_document['correctorTime'] = database_manager.get_timestamp()
cleaned_document['correctorStatus'] = 'done'
cleaned_document['xRequestId'] = ''
cleaned_document['matchingType'] = 'orphan'
cleaned_document['messageId'] = fixed_doc.get('message_id') or ''
db_m.add_to_clean_data(cleaned_document)
db_m.mark_as_corrected(fixed_doc)

self.logger_m.log_info(
'corrector_batch_raw', f'Processing {len(cursor)} faulty raw documents')
if len(cursor) > 0:
doc_len += len(cursor)
list_to_process = multiprocessing.Queue()
for _doc in cursor:
data = dict()
data['logger_manager'] = self.logger_m
data['document_manager'] = doc_m
data['document'] = _doc
list_to_process.put(data)
self._process_workers(list_to_process, None, 'faulty')

# Updating Status of older documents from processing to done
timeout = self.settings['corrector']['timeout-days']
self.logger_m.log_info('corrector_batch_update_timeout',
f'Updating timed out [{timeout} days] orphans to done.')

# Update Status of older documents according to client.requestInTs

cursor = db_m.get_timeout_documents_client(timeout, limit=limit)
list_of_docs = list(cursor)
number_of_updated_docs = db_m.update_old_to_done(list_of_docs)

if number_of_updated_docs > 0:
self.logger_m.log_info('corrector_batch_update_client_old_to_done',
"Total of {0} orphans from Client updated to status 'done'.".format(
number_of_updated_docs))
self.logger_m.log_info(
'corrector_batch_update_timeout',
f'Updating timed out [{timeout} days] orphans to done.')

list_of_doc_ids_client = db_m.get_timeout_document_ids_client(timeout, limit=limit)
list_of_doc_ids_producer = db_m.get_timeout_document_ids_producer(timeout, limit=limit)
list_of_doc_ids = list_of_doc_ids_client + list_of_doc_ids_producer
if len(list_of_doc_ids) > 0:
doc_len += len(list_of_doc_ids)
list_to_process = multiprocessing.Queue()
for _doc in list_of_doc_ids:
list_to_process.put(_doc['_id'])
self._process_workers(list_to_process, None, 'timeout')

if len(list_of_doc_ids_client) > 0:
self.logger_m.log_info(
'corrector_batch_update_client_old_to_done',
f'Total of {len(list_of_doc_ids_client)} orphans from Client '
"updated to status 'done'.")
else:
self.logger_m.log_info('corrector_batch_update_client_old_to_done',
'No orphans updated to done.')
doc_len += number_of_updated_docs

# Update Status of older documents according to producer.requestInTs
cursor = db_m.get_timeout_documents_producer(timeout, limit=limit)
list_of_docs = list(cursor)
number_of_updated_docs = db_m.update_old_to_done(list_of_docs)

if number_of_updated_docs > 0:
self.logger_m.log_info('corrector_batch_update_producer_old_to_done',
"Total of {0} orphans from Producer updated to status 'done'.".format(
number_of_updated_docs))
self.logger_m.log_info(
'corrector_batch_update_client_old_to_done', 'No orphans updated to done.')

if len(list_of_doc_ids_producer) > 0:
self.logger_m.log_info(
'corrector_batch_update_producer_old_to_done',
f'Total of {len(list_of_doc_ids_producer)} orphans from Producer '
"updated to status 'done'.")
else:
self.logger_m.log_info('corrector_batch_update_producer_old_to_done',
'No orphans updated to done.')

doc_len += number_of_updated_docs
self.logger_m.log_info(
'corrector_batch_update_producer_old_to_done', 'No orphans updated to done.')

end_processing_time = time.time()
total_time = time.strftime(
Expand Down
96 changes: 52 additions & 44 deletions corrector_module/opmon_corrector/corrector_worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# The MIT License
# Copyright (c) 2021- Nordic Institute for Interoperability Solutions (NIIS)
# Copyright (c) 2017-2020 Estonian Information System Authority (RIA)
Expand Down Expand Up @@ -36,36 +35,46 @@ def __init__(self, settings, name):
self.db_m = None
self.worker_name = name

def run(self, to_process, duplicates):
def run(self, to_process, duplicates, job_type='consume'):
""" Process run entry point
:param to_process: Queue of documents to be processed
:param duplicates: Variable to hold the number of duplicates
:param job_type: Job type (consume / timeout)
:return: None
"""
self.db_m = database_manager.DatabaseManager(self.settings)
try:
# Process queue while is not empty
while True:
data = to_process.get(True, 1)
duplicate_count = self.consume_data(data)
with duplicates.get_lock():
duplicates.value += duplicate_count
if job_type == 'consume':
duplicate_count = self.consume_data(data)
with duplicates.get_lock():
duplicates.value += duplicate_count
elif job_type == 'faulty':
self.consume_faulty_data(data)
elif job_type == 'timeout':
self.db_m.update_old_doc_to_done(data)
except queue.Empty:
pass

def consume_data(self, data):
"""
The Corrector worker. Processes a batch of documents with the same xRequestId
:param data: Contains LoggerManager, DatabaseManager, DocumentManager, x_request_id and documents to be processed.
:param data: Contains LoggerManager, DocumentManager, x_request_id and documents to be
processed.
:return: Returns number of duplicates found.
"""
# Get parameters
logger_manager = data['logger_manager']
# logger_manager = data['logger_manager']
doc_m = data['document_manager']
x_request_id = data['x_request_id']
documents = data['documents']
to_remove_queue = data['to_remove_queue']
duplicates = no_requestInTs = 0
documents = []
for _doc in data['documents']:
sanitized_doc = doc_m.sanitize_document(_doc)
fix_doc = doc_m.correct_structure(sanitized_doc)
documents.append(fix_doc)
duplicates = 0

matched_pair = {}
clients = [
Expand All @@ -78,10 +87,12 @@ def consume_data(self, data):
]

if clients:
matched_pair['client'] = clients[0]
if not self.db_m.check_clean_document_exists(x_request_id, clients[0]):
matched_pair['client'] = clients[0]

if producers:
matched_pair['producer'] = producers[0]
if not self.db_m.check_clean_document_exists(x_request_id, producers[0]):
matched_pair['producer'] = producers[0]

docs_to_remove = [
doc for doc in documents
Expand All @@ -91,39 +102,13 @@ def consume_data(self, data):
)
]
for current_document in docs_to_remove:

# Mark to removal documents without requestInTs immediately (as of bug in xRoad software ver 6.22.0) # noqa
if (
current_document['requestInTs'] is None
and current_document['securityServerType'] is None
):
to_remove_queue.put(current_document['_id'])
no_requestInTs += 1
self.db_m.mark_as_corrected(current_document)
"""
:logger_manager.log_warning('no_requestInTs',
:'_id : ObjectId(\'' + str(current_document['_id']) + '\'),
:messageId : ' + str(current_document['messageId']))
"""
continue

if self.db_m.check_clean_document_exists(
x_request_id, current_document
):
to_remove_queue.put(current_document['_id'])
duplicates += 1
self.db_m.mark_as_corrected(current_document)
continue

# duplicates
to_remove_queue.put(current_document['_id'])
self.db_m.remove_duplicate_from_raw(current_document['_id'])
duplicates += 1
self.db_m.mark_as_corrected(current_document)
"""
:logger_manager.log_warning('batch_duplicated',
:'_id : ObjectId(\'' + str(current_document['_id']) + '\'),
:messageId : ' + str(current_document['messageId']))
"""
"""

if not matched_pair:
return duplicates
Expand Down Expand Up @@ -167,8 +152,31 @@ def consume_data(self, data):
for party in matched_pair:
self.db_m.mark_as_corrected(matched_pair[party])

if no_requestInTs:
msg = '[{0}] {1} document(s) without requestInTs present'.format(self.worker_name, no_requestInTs)
logger_manager.log_warning('corrector_no_requestInTs', msg)

return duplicates

def consume_faulty_data(self, data):
"""
The Corrector worker for faulty documents without xRequestId.
:param data: Contains LoggerManager, DocumentManager and document to be processed.
:return: None.
"""
# Get parameters
# logger_manager = data['logger_manager']
doc_m = data['document_manager']
sanitized_doc = doc_m.sanitize_document(data['document'])
fixed_doc = doc_m.correct_structure(sanitized_doc)
producer = fixed_doc if (
fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_PRODUCER) else None
client = fixed_doc if (
fixed_doc['securityServerType'].lower() == SECURITY_SERVER_TYPE_CLIENT) else None
cleaned_document = doc_m.create_json(
client, producer, ''
)
cleaned_document = doc_m.apply_calculations(cleaned_document)
cleaned_document['correctorTime'] = database_manager.get_timestamp()
cleaned_document['correctorStatus'] = 'done'
cleaned_document['xRequestId'] = ''
cleaned_document['matchingType'] = 'orphan'
cleaned_document['messageId'] = fixed_doc.get('message_id') or ''
self.db_m.add_to_clean_data(cleaned_document)
self.db_m.mark_as_corrected(fixed_doc)
Loading
Loading