From bc08e035131974c38e063041415011a30aa726ec Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 31 Jan 2025 06:25:48 -0500 Subject: [PATCH] AAP-39559 Wait for all event processing to finish, add fallback task (#15798) * Wait for all event processing to finish, add fallback task * Add flag check to periodic task --- awx/main/tasks/callback.py | 3 +- awx/main/tasks/host_indirect.py | 63 +++++++++++-- .../functional/tasks/test_host_indirect.py | 89 +++++++++---------- .../unit/tasks/test_host_indirect_unit.py | 56 ++++++++++++ awx/settings/defaults.py | 9 ++ 5 files changed, 165 insertions(+), 55 deletions(-) create mode 100644 awx/main/tests/unit/tasks/test_host_indirect_unit.py diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index e17d0440c801..f07899dd6ca0 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -3,6 +3,7 @@ import time import logging from collections import deque +from typing import Tuple, Optional from awx.main.models.event_query import EventQuery @@ -47,7 +48,7 @@ def collect_queries(query_file_contents) -> dict: COLLECTION_FILENAME = "ansible_data.json" -def try_load_query_file(artifact_dir) -> (bool, dict): +def try_load_query_file(artifact_dir) -> Tuple[bool, Optional[dict]]: """ try_load_query_file checks the artifact directory after job completion and returns the contents of ansible_data.json if present diff --git a/awx/main/tasks/host_indirect.py b/awx/main/tasks/host_indirect.py index 337bc48fb07a..112ac16eed8c 100644 --- a/awx/main/tasks/host_indirect.py +++ b/awx/main/tasks/host_indirect.py @@ -1,10 +1,17 @@ import logging from typing import Tuple, Union +import time import yaml import jq +from django.utils.timezone import now, timedelta +from django.conf import settings + +# Django flags +from flags.state import flag_enabled + from awx.main.dispatch.publish import task from awx.main.dispatch import get_task_queuename from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit @@ -18,7 +25,7 @@ class UnhashableFacts(RuntimeError): pass -def get_hashable_form(input_data: Union[dict, list, int, float, str, bool]) -> Tuple[Union[Tuple, dict, int, float]]: +def get_hashable_form(input_data: Union[dict, list, Tuple, int, float, str, bool]) -> Tuple[Union[Tuple, int, float, str, bool]]: "Given a dictionary of JSON types, return something that can be hashed and is the same data" if isinstance(input_data, (int, float, str, bool)): return input_data # return scalars as-is @@ -32,7 +39,7 @@ def get_hashable_form(input_data: Union[dict, list, int, float, str, bool]) -> T raise UnhashableFacts(f'Cannonical facts contains a {type(input_data)} type which can not be hashed.') -def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]: +def build_indirect_host_data(job: Job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]: results = {} compiled_jq_expressions = {} # Cache for compiled jq expressions facts_missing_logged = False @@ -84,7 +91,7 @@ def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[Ind return list(results.values()) -def fetch_job_event_query(job) -> dict[str, str]: +def fetch_job_event_query(job: Job) -> dict[str, str]: """Returns the following data structure { "demo.query.example": "{canonical_facts: {host_name: .direct_host_name}}" @@ -102,11 +109,55 @@ def fetch_job_event_query(job) -> dict[str, str]: return net_job_data -@task(queue=get_task_queuename) -def save_indirect_host_entries(job_id): - job = Job.objects.get(id=job_id) +def save_indirect_host_entries_of_job(job: Job) -> None: + "Once we have a job and we know that we want to do indirect host processing, this is called" job_event_queries = fetch_job_event_query(job) records = build_indirect_host_data(job, job_event_queries) IndirectManagedNodeAudit.objects.bulk_create(records) job.event_queries_processed = True + + +@task(queue=get_task_queuename) +def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None: + try: + job = Job.objects.get(id=job_id) + except Job.DoesNotExist: + logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries') + return + + if wait_for_events: + # Gate running this task on the job having all events processed, not just EOF or playbook_on_stats + current_events = 0 + for _ in range(10): + current_events = job.job_events.count() + if current_events >= job.emitted_events: + break + logger.debug(f'Waiting for job_id={job_id} to finish processing events, currently {current_events} < {job.emitted_events}') + time.sleep(0.2) + else: + logger.warning(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking') + return + + try: + save_indirect_host_entries_of_job(job) + except Exception: + logger.traceback(f'Error processing indirect host data for job_id={job_id}') + + # Mark job as processed, even if the processing failed job.save(update_fields=['event_queries_processed']) + + +@task(queue=get_task_queuename) +def save_indirect_host_entries_fallback() -> None: + if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + return + + job_ct = 0 + right_now_time = now() + window_end = right_now_time - timedelta(seconds=settings.INDIRECT_HOST_QUERY_FALLBACK_MINUTES) + window_start = right_now_time - timedelta(days=settings.INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS) + for job in Job.objects.filter(event_queries_processed=False, finished__lte=window_end, finished__gte=window_start).iterator(): + save_indirect_host_entries.delay(job.id, wait_for_events=True) + job_ct += 1 + if job_ct: + logger.info(f'Restarted event processing for {job_ct} jobs') diff --git a/awx/main/tests/functional/tasks/test_host_indirect.py b/awx/main/tests/functional/tasks/test_host_indirect.py index 8624ff1f2a54..6928a561db17 100644 --- a/awx/main/tests/functional/tasks/test_host_indirect.py +++ b/awx/main/tests/functional/tasks/test_host_indirect.py @@ -1,8 +1,11 @@ import yaml +from unittest import mock import pytest -from awx.main.tasks.host_indirect import build_indirect_host_data, fetch_job_event_query, save_indirect_host_entries, get_hashable_form +from django.utils.timezone import now, timedelta + +from awx.main.tasks.host_indirect import build_indirect_host_data, fetch_job_event_query, save_indirect_host_entries, save_indirect_host_entries_fallback from awx.main.models.event_query import EventQuery from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit @@ -148,52 +151,42 @@ def test_multiple_registered_modules_same_collection(bare_job): assert set(host_audit.events) == {'demo.query.example', 'demo.query.example2'} -class TestHashableForm: - def test_same_dict(self): - assert get_hashable_form({'a': 'b'}) == get_hashable_form({'a': 'b'}) - - def test_same_list(self): - assert get_hashable_form(['a', 'b']) == get_hashable_form(['a', 'b']) - assert get_hashable_form(('a', 'b')) == get_hashable_form(('a', 'b')) - - def test_different_list(self): - assert get_hashable_form(['a', 'b']) != get_hashable_form(['a', 'c']) - assert get_hashable_form(('a', 'b')) != get_hashable_form(('a', 'c')) - - def test_values_different(self): - assert get_hashable_form({'a': 'b'}) != get_hashable_form({'a': 'c'}) - - def test_has_extra_key(self): - assert get_hashable_form({'a': 'b'}) != get_hashable_form({'a': 'b', 'c': 'd'}) - - def test_nested_dictionaries_different(self): - assert get_hashable_form({'a': {'b': 'c'}}) != get_hashable_form({'a': {'b': 'd'}}) - - def test_nested_dictionaries_same(self): - assert get_hashable_form({'a': {'b': 'c'}}) == get_hashable_form({'a': {'b': 'c'}}) - - def test_nested_lists_different(self): - assert get_hashable_form({'a': ['b', 'c']}) != get_hashable_form({'a': ['b', 'd']}) - assert get_hashable_form({'a': ('b', 'c')}) != get_hashable_form({'a': ('b', 'd')}) - - def test_nested_lists_same(self): - assert get_hashable_form({'a': ['b', 'c']}) == get_hashable_form({'a': ['b', 'c']}) - assert get_hashable_form({'a': ('b', 'c')}) == get_hashable_form({'a': ('b', 'c')}) - assert hash(get_hashable_form({'a': ['b', 'c']})) == hash(get_hashable_form({'a': ['b', 'c']})) - - def test_list_nested_lists_different(self): - assert get_hashable_form(['a', ['b', 'c']]) != get_hashable_form(['a', ['b', 'd']]) - assert get_hashable_form(['a', ('b', 'c')]) != get_hashable_form(['a', ('b', 'd')]) - - def test_list_nested_lists_same(self): - assert get_hashable_form(['a', ['b', 'c']]) == get_hashable_form(['a', ['b', 'c']]) - assert get_hashable_form(['a', ('b', 'c')]) == get_hashable_form(['a', ('b', 'c')]) - assert hash(get_hashable_form(['a', ('b', 'c')])) == hash(get_hashable_form(['a', ('b', 'c')])) +@pytest.mark.django_db +def test_events_not_fully_processed_no_op(bare_job): + # I have a job that produced 12 events, but those are not saved + bare_job.emitted_events = 12 + bare_job.finished = now() + bare_job.save(update_fields=['emitted_events', 'finished']) + + # Running the normal post-run task will do nothing at this point + assert bare_job.event_queries_processed is False + with mock.patch('time.sleep'): # for test speedup + save_indirect_host_entries(bare_job.id) + bare_job.refresh_from_db() + assert bare_job.event_queries_processed is False + + # Right away, the fallback processing will not run either + with mock.patch.object(save_indirect_host_entries, 'delay') as mock_delay: + save_indirect_host_entries_fallback() + mock_delay.assert_not_called() + bare_job.refresh_from_db() + assert bare_job.event_queries_processed is False + + # After 3 hours have passed... + bare_job.finished = now() - timedelta(hours=3) + bare_job.save(update_fields=['finished']) + + # The fallback task will now process indirect host query data for this job + with mock.patch.object(save_indirect_host_entries, 'delay') as mock_delay: + save_indirect_host_entries_fallback() + mock_delay.assert_called_once_with(bare_job.id, wait_for_events=True) + + # Test code to process anyway, events collected or not + save_indirect_host_entries(bare_job.id, wait_for_events=False) + bare_job.refresh_from_db() + assert bare_job.event_queries_processed is True - def test_list_nested_dicts_different(self): - assert get_hashable_form(['a', {'b': 'c'}]) != get_hashable_form(['a', {'b': 'd'}]) - assert hash(get_hashable_form(['a', {'b': 'c'}])) != hash(get_hashable_form(['a', {'b': 'd'}])) - def test_list_nested_dicts_same(self): - assert get_hashable_form(['a', {'b': 'c'}]) == get_hashable_form(['a', {'b': 'c'}]) - assert hash(get_hashable_form(['a', {'b': 'c'}])) == hash(get_hashable_form(['a', {'b': 'c'}])) +@pytest.mark.django_db +def test_job_id_does_not_exist(): + save_indirect_host_entries(10000001) diff --git a/awx/main/tests/unit/tasks/test_host_indirect_unit.py b/awx/main/tests/unit/tasks/test_host_indirect_unit.py new file mode 100644 index 000000000000..6a13d8b85a00 --- /dev/null +++ b/awx/main/tests/unit/tasks/test_host_indirect_unit.py @@ -0,0 +1,56 @@ +import copy + +import pytest + +from awx.main.tasks.host_indirect import get_hashable_form + + +class TestHashableForm: + @pytest.mark.parametrize( + 'data', + [ + {'a': 'b'}, + ['a', 'b'], + ('a', 'b'), + {'a': {'b': 'c'}}, + {'a': ['b', 'c']}, + {'a': ('b', 'c')}, + ['a', ['b', 'c']], + ['a', ('b', 'c')], + ['a', {'b': 'c'}], + ], + ) + def test_compare_equal_data(self, data): + other_data = copy.deepcopy(data) + # A tuple of scalars may be cached so ids could legitimately be the same + if data != ('a', 'b'): + assert id(data) != id(other_data) # sanity + assert id(get_hashable_form(data)) != id(get_hashable_form(data)) + + assert get_hashable_form(data) == get_hashable_form(data) + assert hash(get_hashable_form(data)) == hash(get_hashable_form(data)) + + assert get_hashable_form(data) in {get_hashable_form(data): 1} # test lookup hit + + @pytest.mark.parametrize( + 'data, other_data', + [ + [{'a': 'b'}, {'a': 'c'}], + [{'a': 'b'}, {'a': 'b', 'c': 'd'}], + [['a', 'b'], ['a', 'c']], + [('a', 'b'), ('a', 'c')], + [{'a': {'b': 'c'}}, {'a': {'b': 'd'}}], + [{'a': ['b', 'c']}, {'a': ['b', 'd']}], + [{'a': ('b', 'c')}, {'a': ('b', 'd')}], + [['a', ['b', 'c']], ['a', ['b', 'd']]], + [['a', ('b', 'c')], ['a', ('b', 'd')]], + [['a', {'b': 'c'}], ['a', {'b': 'd'}]], + ], + ) + def test_compar_different_data(self, data, other_data): + assert data != other_data # sanity, otherwise why test this? + assert get_hashable_form(data) != get_hashable_form(other_data) + assert hash(get_hashable_form(data)) != hash(get_hashable_form(other_data)) + + assert get_hashable_form(other_data) not in {get_hashable_form(data): 1} # test lookup miss + assert get_hashable_form(data) not in {get_hashable_form(other_data): 1} diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 9d5c8b82610d..616765a36dd5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -441,6 +441,7 @@ 'cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': timedelta(hours=3, minutes=30)}, 'host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': timedelta(hours=4)}, 'periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': timedelta(minutes=15)}, + 'save_indirect_host_entries_fallback': {'task': 'awx.main.tasks.host_indirect.save_indirect_host_entries_fallback', 'schedule': timedelta(minutes=60)}, } # Django Caching Configuration @@ -1060,6 +1061,14 @@ # system username for django-ansible-base SYSTEM_USERNAME = None +# For indirect host query processing +# if a job is not immediently confirmed to have all events processed +# it will be eligable for processing after this number of minutes +INDIRECT_HOST_QUERY_FALLBACK_MINUTES = 60 + +# If an error happens in event collection, give up after this time +INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS = 3 + # feature flags FLAGS = {'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}]}