Skip to content

Commit

Permalink
Optimizing updating status for old documents
Browse files Browse the repository at this point in the history
Using multiprocessing for updating status of orphans that reached timeout.

Fetching only document ids instead of full documents.
  • Loading branch information
VitaliStupin committed Nov 29, 2023
1 parent f10a640 commit 69732d1
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 73 deletions.
87 changes: 45 additions & 42 deletions corrector_module/opmon_corrector/corrector_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
# THE SOFTWARE.

import multiprocessing
import queue
import time
from collections import defaultdict

Expand Down Expand Up @@ -53,11 +52,12 @@ def run(self, process_dict):
# Raise exception again
raise e

def _process_workers(self, list_to_process, duplicates):
def _process_workers(self, list_to_process, duplicates, job_type='consume'):
"""
Processes the workers in a thread pool.
:param list_to_process: queue of items to be processed by the worker processes
:param duplicates: a shared Value object to store the number of duplicates encountered during processing
:param duplicates: a shared Value object to store the number of duplicates encountered
during processing
: return: None
"""
# Configure worker
Expand All @@ -66,7 +66,7 @@ def _process_workers(self, list_to_process, duplicates):
# Configure worker
worker = CorrectorWorker(self.settings, f'worker_{i}')
p = multiprocessing.Process(
target=worker.run, args=(list_to_process, duplicates)
target=worker.run, args=(list_to_process, duplicates, job_type)
)
pool.append(p)

Expand All @@ -79,9 +79,10 @@ def _process_workers(self, list_to_process, duplicates):

def _batch_run(self, process_dict):
"""
Gets unique xRequestId's, gets documents by xRequestId, corrects documents, initializes workers,
gets raw documents, groups by "messageId", corrects documents' structure, initializes workers,
updates timeout documents to "done", removes duplicates from raw_messages.
Gets unique xRequestId's, gets documents by xRequestId, corrects documents, initializes
workers, gets raw documents, groups by "messageId", corrects documents' structure,
initializes workers, updates timeout documents to "done", removes duplicates from
raw_messages.
:param process_dict:
:return: Returns the amount of documents still to process.
"""
Expand All @@ -97,7 +98,7 @@ def _batch_run(self, process_dict):

limit = self.settings['corrector']['documents-max']
cursor = db_m.get_raw_documents(limit)
self.logger_m.log_info('corrector_batch_raw', 'Processing {0} raw documents'.format(len(cursor)))
self.logger_m.log_info('corrector_batch_raw', f'Processing {len(cursor)} raw documents.')

# Process documents with xRequestId
doc_map = defaultdict(list)
Expand All @@ -124,12 +125,13 @@ def _batch_run(self, process_dict):
self._process_workers(list_to_process, duplicates)

if duplicates.value > 0:
self.logger_m.log_info('corrector_batch_remove_duplicates_from_raw',
'Total of {0} duplicate documents removed from raw messages.'.format(
duplicates.value))
self.logger_m.log_info(
'corrector_batch_remove_duplicates_from_raw',
f'Total of {duplicates.value} duplicate documents removed from raw messages.')
else:
self.logger_m.log_info('corrector_batch_remove_duplicates_from_raw',
'No raw documents marked to removal.')
self.logger_m.log_info(
'corrector_batch_remove_duplicates_from_raw',
'No raw documents marked to removal.')

# Process documents without xRequestId
cursor = db_m.get_faulty_raw_documents(limit)
Expand All @@ -152,38 +154,39 @@ def _batch_run(self, process_dict):
db_m.add_to_clean_data(cleaned_document)
db_m.mark_as_corrected(fixed_doc)

# Updating Status of older documents from processing to done
timeout = self.settings['corrector']['timeout-days']
self.logger_m.log_info('corrector_batch_update_timeout',
f'Updating timed out [{timeout} days] orphans to done.')

# Update Status of older documents according to client.requestInTs
cursor = db_m.get_timeout_documents_client(timeout, limit=limit)
list_of_docs = list(cursor)
number_of_updated_docs = db_m.update_old_to_done(list_of_docs)

if number_of_updated_docs > 0:
self.logger_m.log_info('corrector_batch_update_client_old_to_done',
"Total of {0} orphans from Client updated to status 'done'.".format(
number_of_updated_docs))
self.logger_m.log_info(
'corrector_batch_update_timeout',
f'Updating timed out [{timeout} days] orphans to done.')

list_of_doc_ids_client = db_m.get_timeout_document_ids_client(timeout, limit=limit)
list_of_doc_ids_producer = db_m.get_timeout_document_ids_producer(timeout, limit=limit)
list_of_doc_ids = list_of_doc_ids_client + list_of_doc_ids_producer
if len(list_of_doc_ids) > 0:
doc_len += len(list_of_doc_ids)
list_to_process = multiprocessing.Queue()
for _doc in list_of_doc_ids:
list_to_process.put(_doc['_id'])
self._process_workers(list_to_process, None, 'timeout')

if len(list_of_doc_ids_client) > 0:
self.logger_m.log_info(
'corrector_batch_update_client_old_to_done',
f'Total of {len(list_of_doc_ids_client)} orphans from Client '
"updated to status 'done'.")
else:
self.logger_m.log_info('corrector_batch_update_client_old_to_done',
'No orphans updated to done.')
doc_len += number_of_updated_docs

# Update Status of older documents according to producer.requestInTs
cursor = db_m.get_timeout_documents_producer(timeout, limit=limit)
list_of_docs = list(cursor)
number_of_updated_docs = db_m.update_old_to_done(list_of_docs)

if number_of_updated_docs > 0:
self.logger_m.log_info('corrector_batch_update_producer_old_to_done',
"Total of {0} orphans from Producer updated to status 'done'.".format(
number_of_updated_docs))
self.logger_m.log_info(
'corrector_batch_update_client_old_to_done', 'No orphans updated to done.')

if len(list_of_doc_ids_producer) > 0:
self.logger_m.log_info(
'corrector_batch_update_producer_old_to_done',
f'Total of {len(list_of_doc_ids_producer)} orphans from Producer '
"updated to status 'done'.")
else:
self.logger_m.log_info('corrector_batch_update_producer_old_to_done',
'No orphans updated to done.')

doc_len += number_of_updated_docs
self.logger_m.log_info(
'corrector_batch_update_producer_old_to_done', 'No orphans updated to done.')

end_processing_time = time.time()
total_time = time.strftime(
Expand Down
12 changes: 8 additions & 4 deletions corrector_module/opmon_corrector/corrector_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,24 @@ def __init__(self, settings, name):
self.db_m = None
self.worker_name = name

def run(self, to_process, duplicates):
def run(self, to_process, duplicates, job_type='consume'):
""" Process run entry point
:param to_process: Queue of documents to be processed
:param duplicates: Variable to hold the number of duplicates
:param job_type: Job type (consume / timeout)
:return: None
"""
self.db_m = database_manager.DatabaseManager(self.settings)
try:
# Process queue while is not empty
while True:
data = to_process.get(True, 1)
duplicate_count = self.consume_data(data)
with duplicates.get_lock():
duplicates.value += duplicate_count
if job_type == 'consume':
duplicate_count = self.consume_data(data)
with duplicates.get_lock():
duplicates.value += duplicate_count
elif job_type == 'timeout':
self.db_m.update_old_doc_to_done(data)
except queue.Empty:
pass

Expand Down
51 changes: 26 additions & 25 deletions corrector_module/opmon_corrector/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,13 @@ def get_processing_document(self, current_doc: dict) -> Optional[dict]:
self.logger_m.log_exception('DatabaseManager.get_processing_documents', repr(e))
raise e

def get_timeout_documents_client(self, timeout_days: int, limit: int = 1000) -> List[dict]:
def get_timeout_document_ids_client(self, timeout_days: int, limit: int = 1000) -> List[dict]:
"""
Gets the documents from Client that have been processing more than timeout_days.
Gets the document ids (without other fields) from Client that have been processing more
than timeout_days.
:param timeout_days: The timeout days.
:param limit: Number of documents to return.
:return: Returns the documents that have been processing more than timeout_days.
:param limit: Number of document ids to return.
:return: Returns the document ids that have been processing more than timeout_days.
"""
try:
db = self.get_query_db()
Expand All @@ -173,33 +174,38 @@ def get_timeout_documents_client(self, timeout_days: int, limit: int = 1000) ->
'client.requestInTs': {'$lt': ref_time},
'client.xRequestId': {'$ne': None}
}
cursor = clean_data.find(q).limit(limit)
cursor = clean_data.find(q, {'_id': True}).limit(limit)
return list(cursor)
except Exception as e:
self.logger_m.log_exception('DatabaseManager.get_timeout_documents_client', repr(e))
self.logger_m.log_exception(
'DatabaseManager.get_timeout_document_ids_client', repr(e))
raise e

def get_timeout_documents_producer(self, timeout_days: int, limit: int = 1000) -> List[dict]:
def get_timeout_document_ids_producer(self, timeout_days: int, limit: int = 1000) -> List[dict]:
"""
Gets the documents from Producer that have been processing more than timeout_days.
Gets the document ids (without other fields) from Producer that have been processing more
than timeout_days.
:param timeout_days: The timeout days.
:param limit: Number of documents to return.
:return: Returns the documents that have been processing more than timeout_days.
:param limit: Number of document ids to return.
:return: Returns the document ids that have been processing more than timeout_days.
"""
try:
db = self.get_query_db()
clean_data = db[CLEAN_DATA_COLLECTION]
ref_time = 1000 * (get_timestamp() - (timeout_days * 24 * 60 * 60))
q = {
'correctorStatus': 'processing',
'client.requestInTs': {'$exists': False},
# This check does not seem to be necessary.
# Documents with both client and producer should never be in "processing" state.
# 'client.requestInTs': {'$exists': False},
'producer.requestInTs': {'$lt': ref_time},
'producer.xRequestId': {'$ne': None}
}
cursor = clean_data.find(q).limit(limit)
cursor = clean_data.find(q, {'_id': True}).limit(limit)
return list(cursor)
except Exception as e:
self.logger_m.log_exception('DatabaseManager.get_timeout_documents_producer', repr(e))
self.logger_m.log_exception(
'DatabaseManager.get_timeout_document_ids_producer', repr(e))
raise e

def add_to_clean_data(self, document: dict) -> None:
Expand Down Expand Up @@ -231,27 +237,22 @@ def update_document_clean_data(self, document: dict) -> None:
self.logger_m.log_exception('DatabaseManager.update_form_clean_data', repr(e))
raise e

def update_old_to_done(self, list_of_docs: List[dict]) -> int:
def update_old_doc_to_done(self, doc_id: str) -> None:
"""
Updates then correctorStatus to "done" for the given list of documents. Also updates the correctorTime.
:param list_of_docs: The input list of documents to be updated.
:return: Number of documents updated.
Updates correctorStatus to "done" for the given document. Also updates the correctorTime.
:param doc_id: Document "_id" to be updated.
:return: None.
"""
number_of_updated_docs = 0
try:
db = self.get_query_db()
clean_data = db[CLEAN_DATA_COLLECTION]
for doc in list_of_docs:
clean_data.update({'_id': doc['_id']},
{'$set': {'correctorStatus': 'done', 'correctorTime': get_timestamp()}})
number_of_updated_docs += 1

clean_data.update_one(
{'_id': doc_id},
{'$set': {'correctorStatus': 'done', 'correctorTime': get_timestamp()}})
except Exception as e:
self.logger_m.log_exception('DatabaseManager.update_old_to_done', repr(e))
raise e

return number_of_updated_docs

def check_clean_document_exists(self, x_request_id: str, document: dict) -> bool:
"""
Checks if given document exists in clean_data or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@


class SingleProcessedCorrectorBatch(CorrectorBatch):
def _process_workers(self, list_to_process, duplicates):
def _process_workers(self, list_to_process, duplicates, job_type='consume'):
worker = CorrectorWorker(self.settings, 'worker 1')
worker.run(list_to_process, duplicates)
worker.run(list_to_process, duplicates, job_type)


def read_data_from_json(path):
Expand Down

0 comments on commit 69732d1

Please sign in to comment.