Skip to content

Commit

Permalink
AAP-39559 Wait for all event processing to finish, add fallback task (#…
Browse files Browse the repository at this point in the history
…15798)

* Wait for all event processing to finish, add fallback task

* Add flag check to periodic task
  • Loading branch information
AlanCoding authored Jan 31, 2025
1 parent e2151ae commit bc08e03
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 55 deletions.
3 changes: 2 additions & 1 deletion awx/main/tasks/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import logging
from collections import deque
from typing import Tuple, Optional

from awx.main.models.event_query import EventQuery

Expand Down Expand Up @@ -47,7 +48,7 @@ def collect_queries(query_file_contents) -> dict:
COLLECTION_FILENAME = "ansible_data.json"


def try_load_query_file(artifact_dir) -> (bool, dict):
def try_load_query_file(artifact_dir) -> Tuple[bool, Optional[dict]]:
"""
try_load_query_file checks the artifact directory after job completion and
returns the contents of ansible_data.json if present
Expand Down
63 changes: 57 additions & 6 deletions awx/main/tasks/host_indirect.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import logging
from typing import Tuple, Union
import time

import yaml

import jq

from django.utils.timezone import now, timedelta
from django.conf import settings

# Django flags
from flags.state import flag_enabled

from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
Expand All @@ -18,7 +25,7 @@ class UnhashableFacts(RuntimeError):
pass


def get_hashable_form(input_data: Union[dict, list, int, float, str, bool]) -> Tuple[Union[Tuple, dict, int, float]]:
def get_hashable_form(input_data: Union[dict, list, Tuple, int, float, str, bool]) -> Tuple[Union[Tuple, int, float, str, bool]]:
"Given a dictionary of JSON types, return something that can be hashed and is the same data"
if isinstance(input_data, (int, float, str, bool)):
return input_data # return scalars as-is
Expand All @@ -32,7 +39,7 @@ def get_hashable_form(input_data: Union[dict, list, int, float, str, bool]) -> T
raise UnhashableFacts(f'Cannonical facts contains a {type(input_data)} type which can not be hashed.')


def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]:
def build_indirect_host_data(job: Job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]:
results = {}
compiled_jq_expressions = {} # Cache for compiled jq expressions
facts_missing_logged = False
Expand Down Expand Up @@ -84,7 +91,7 @@ def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[Ind
return list(results.values())


def fetch_job_event_query(job) -> dict[str, str]:
def fetch_job_event_query(job: Job) -> dict[str, str]:
"""Returns the following data structure
{
"demo.query.example": "{canonical_facts: {host_name: .direct_host_name}}"
Expand All @@ -102,11 +109,55 @@ def fetch_job_event_query(job) -> dict[str, str]:
return net_job_data


@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id):
job = Job.objects.get(id=job_id)
def save_indirect_host_entries_of_job(job: Job) -> None:
"Once we have a job and we know that we want to do indirect host processing, this is called"
job_event_queries = fetch_job_event_query(job)
records = build_indirect_host_data(job, job_event_queries)
IndirectManagedNodeAudit.objects.bulk_create(records)
job.event_queries_processed = True


@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None:
try:
job = Job.objects.get(id=job_id)
except Job.DoesNotExist:
logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries')
return

if wait_for_events:
# Gate running this task on the job having all events processed, not just EOF or playbook_on_stats
current_events = 0
for _ in range(10):
current_events = job.job_events.count()
if current_events >= job.emitted_events:
break
logger.debug(f'Waiting for job_id={job_id} to finish processing events, currently {current_events} < {job.emitted_events}')
time.sleep(0.2)
else:
logger.warning(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
return

try:
save_indirect_host_entries_of_job(job)
except Exception:
logger.traceback(f'Error processing indirect host data for job_id={job_id}')

# Mark job as processed, even if the processing failed
job.save(update_fields=['event_queries_processed'])


@task(queue=get_task_queuename)
def save_indirect_host_entries_fallback() -> None:
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return

job_ct = 0
right_now_time = now()
window_end = right_now_time - timedelta(seconds=settings.INDIRECT_HOST_QUERY_FALLBACK_MINUTES)
window_start = right_now_time - timedelta(days=settings.INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS)
for job in Job.objects.filter(event_queries_processed=False, finished__lte=window_end, finished__gte=window_start).iterator():
save_indirect_host_entries.delay(job.id, wait_for_events=True)
job_ct += 1
if job_ct:
logger.info(f'Restarted event processing for {job_ct} jobs')
89 changes: 41 additions & 48 deletions awx/main/tests/functional/tasks/test_host_indirect.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import yaml
from unittest import mock

import pytest

from awx.main.tasks.host_indirect import build_indirect_host_data, fetch_job_event_query, save_indirect_host_entries, get_hashable_form
from django.utils.timezone import now, timedelta

from awx.main.tasks.host_indirect import build_indirect_host_data, fetch_job_event_query, save_indirect_host_entries, save_indirect_host_entries_fallback
from awx.main.models.event_query import EventQuery
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit

Expand Down Expand Up @@ -148,52 +151,42 @@ def test_multiple_registered_modules_same_collection(bare_job):
assert set(host_audit.events) == {'demo.query.example', 'demo.query.example2'}


class TestHashableForm:
def test_same_dict(self):
assert get_hashable_form({'a': 'b'}) == get_hashable_form({'a': 'b'})

def test_same_list(self):
assert get_hashable_form(['a', 'b']) == get_hashable_form(['a', 'b'])
assert get_hashable_form(('a', 'b')) == get_hashable_form(('a', 'b'))

def test_different_list(self):
assert get_hashable_form(['a', 'b']) != get_hashable_form(['a', 'c'])
assert get_hashable_form(('a', 'b')) != get_hashable_form(('a', 'c'))

def test_values_different(self):
assert get_hashable_form({'a': 'b'}) != get_hashable_form({'a': 'c'})

def test_has_extra_key(self):
assert get_hashable_form({'a': 'b'}) != get_hashable_form({'a': 'b', 'c': 'd'})

def test_nested_dictionaries_different(self):
assert get_hashable_form({'a': {'b': 'c'}}) != get_hashable_form({'a': {'b': 'd'}})

def test_nested_dictionaries_same(self):
assert get_hashable_form({'a': {'b': 'c'}}) == get_hashable_form({'a': {'b': 'c'}})

def test_nested_lists_different(self):
assert get_hashable_form({'a': ['b', 'c']}) != get_hashable_form({'a': ['b', 'd']})
assert get_hashable_form({'a': ('b', 'c')}) != get_hashable_form({'a': ('b', 'd')})

def test_nested_lists_same(self):
assert get_hashable_form({'a': ['b', 'c']}) == get_hashable_form({'a': ['b', 'c']})
assert get_hashable_form({'a': ('b', 'c')}) == get_hashable_form({'a': ('b', 'c')})
assert hash(get_hashable_form({'a': ['b', 'c']})) == hash(get_hashable_form({'a': ['b', 'c']}))

def test_list_nested_lists_different(self):
assert get_hashable_form(['a', ['b', 'c']]) != get_hashable_form(['a', ['b', 'd']])
assert get_hashable_form(['a', ('b', 'c')]) != get_hashable_form(['a', ('b', 'd')])

def test_list_nested_lists_same(self):
assert get_hashable_form(['a', ['b', 'c']]) == get_hashable_form(['a', ['b', 'c']])
assert get_hashable_form(['a', ('b', 'c')]) == get_hashable_form(['a', ('b', 'c')])
assert hash(get_hashable_form(['a', ('b', 'c')])) == hash(get_hashable_form(['a', ('b', 'c')]))
@pytest.mark.django_db
def test_events_not_fully_processed_no_op(bare_job):
# I have a job that produced 12 events, but those are not saved
bare_job.emitted_events = 12
bare_job.finished = now()
bare_job.save(update_fields=['emitted_events', 'finished'])

# Running the normal post-run task will do nothing at this point
assert bare_job.event_queries_processed is False
with mock.patch('time.sleep'): # for test speedup
save_indirect_host_entries(bare_job.id)
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is False

# Right away, the fallback processing will not run either
with mock.patch.object(save_indirect_host_entries, 'delay') as mock_delay:
save_indirect_host_entries_fallback()
mock_delay.assert_not_called()
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is False

# After 3 hours have passed...
bare_job.finished = now() - timedelta(hours=3)
bare_job.save(update_fields=['finished'])

# The fallback task will now process indirect host query data for this job
with mock.patch.object(save_indirect_host_entries, 'delay') as mock_delay:
save_indirect_host_entries_fallback()
mock_delay.assert_called_once_with(bare_job.id, wait_for_events=True)

# Test code to process anyway, events collected or not
save_indirect_host_entries(bare_job.id, wait_for_events=False)
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is True

def test_list_nested_dicts_different(self):
assert get_hashable_form(['a', {'b': 'c'}]) != get_hashable_form(['a', {'b': 'd'}])
assert hash(get_hashable_form(['a', {'b': 'c'}])) != hash(get_hashable_form(['a', {'b': 'd'}]))

def test_list_nested_dicts_same(self):
assert get_hashable_form(['a', {'b': 'c'}]) == get_hashable_form(['a', {'b': 'c'}])
assert hash(get_hashable_form(['a', {'b': 'c'}])) == hash(get_hashable_form(['a', {'b': 'c'}]))
@pytest.mark.django_db
def test_job_id_does_not_exist():
save_indirect_host_entries(10000001)
56 changes: 56 additions & 0 deletions awx/main/tests/unit/tasks/test_host_indirect_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import copy

import pytest

from awx.main.tasks.host_indirect import get_hashable_form


class TestHashableForm:
@pytest.mark.parametrize(
'data',
[
{'a': 'b'},
['a', 'b'],
('a', 'b'),
{'a': {'b': 'c'}},
{'a': ['b', 'c']},
{'a': ('b', 'c')},
['a', ['b', 'c']],
['a', ('b', 'c')],
['a', {'b': 'c'}],
],
)
def test_compare_equal_data(self, data):
other_data = copy.deepcopy(data)
# A tuple of scalars may be cached so ids could legitimately be the same
if data != ('a', 'b'):
assert id(data) != id(other_data) # sanity
assert id(get_hashable_form(data)) != id(get_hashable_form(data))

assert get_hashable_form(data) == get_hashable_form(data)
assert hash(get_hashable_form(data)) == hash(get_hashable_form(data))

assert get_hashable_form(data) in {get_hashable_form(data): 1} # test lookup hit

@pytest.mark.parametrize(
'data, other_data',
[
[{'a': 'b'}, {'a': 'c'}],
[{'a': 'b'}, {'a': 'b', 'c': 'd'}],
[['a', 'b'], ['a', 'c']],
[('a', 'b'), ('a', 'c')],
[{'a': {'b': 'c'}}, {'a': {'b': 'd'}}],
[{'a': ['b', 'c']}, {'a': ['b', 'd']}],
[{'a': ('b', 'c')}, {'a': ('b', 'd')}],
[['a', ['b', 'c']], ['a', ['b', 'd']]],
[['a', ('b', 'c')], ['a', ('b', 'd')]],
[['a', {'b': 'c'}], ['a', {'b': 'd'}]],
],
)
def test_compar_different_data(self, data, other_data):
assert data != other_data # sanity, otherwise why test this?
assert get_hashable_form(data) != get_hashable_form(other_data)
assert hash(get_hashable_form(data)) != hash(get_hashable_form(other_data))

assert get_hashable_form(other_data) not in {get_hashable_form(data): 1} # test lookup miss
assert get_hashable_form(data) not in {get_hashable_form(other_data): 1}
9 changes: 9 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@
'cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': timedelta(hours=3, minutes=30)},
'host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': timedelta(hours=4)},
'periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': timedelta(minutes=15)},
'save_indirect_host_entries_fallback': {'task': 'awx.main.tasks.host_indirect.save_indirect_host_entries_fallback', 'schedule': timedelta(minutes=60)},
}

# Django Caching Configuration
Expand Down Expand Up @@ -1060,6 +1061,14 @@
# system username for django-ansible-base
SYSTEM_USERNAME = None

# For indirect host query processing
# if a job is not immediently confirmed to have all events processed
# it will be eligable for processing after this number of minutes
INDIRECT_HOST_QUERY_FALLBACK_MINUTES = 60

# If an error happens in event collection, give up after this time
INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS = 3

# feature flags
FLAGS = {'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}]}

Expand Down

0 comments on commit bc08e03

Please sign in to comment.