Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AAP-39559 Wait for all event processing to finish, add fallback task #15798

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion awx/main/tasks/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import logging
from collections import deque
from typing import Tuple, Optional

from awx.main.models.event_query import EventQuery

Expand Down Expand Up @@ -47,7 +48,7 @@ def collect_queries(query_file_contents) -> dict:
COLLECTION_FILENAME = "ansible_data.json"


def try_load_query_file(artifact_dir) -> (bool, dict):
def try_load_query_file(artifact_dir) -> Tuple[bool, Optional[dict]]:
"""
try_load_query_file checks the artifact directory after job completion and
returns the contents of ansible_data.json if present
Expand Down
63 changes: 57 additions & 6 deletions awx/main/tasks/host_indirect.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import logging
from typing import Tuple, Union
import time

import yaml

import jq

from django.utils.timezone import now, timedelta
from django.conf import settings

# Django flags
from flags.state import flag_enabled

from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
Expand All @@ -18,7 +25,7 @@
pass


def get_hashable_form(input_data: Union[dict, list, int, float, str, bool]) -> Tuple[Union[Tuple, dict, int, float]]:
def get_hashable_form(input_data: Union[dict, list, Tuple, int, float, str, bool]) -> Tuple[Union[Tuple, int, float, str, bool]]:
"Given a dictionary of JSON types, return something that can be hashed and is the same data"
if isinstance(input_data, (int, float, str, bool)):
return input_data # return scalars as-is
Expand All @@ -32,7 +39,7 @@
raise UnhashableFacts(f'Cannonical facts contains a {type(input_data)} type which can not be hashed.')


def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]:
def build_indirect_host_data(job: Job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]:
results = {}
compiled_jq_expressions = {} # Cache for compiled jq expressions
facts_missing_logged = False
Expand Down Expand Up @@ -84,7 +91,7 @@
return list(results.values())


def fetch_job_event_query(job) -> dict[str, str]:
def fetch_job_event_query(job: Job) -> dict[str, str]:
"""Returns the following data structure
{
"demo.query.example": "{canonical_facts: {host_name: .direct_host_name}}"
Expand All @@ -102,11 +109,55 @@
return net_job_data


@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id):
job = Job.objects.get(id=job_id)
def save_indirect_host_entries_of_job(job: Job) -> None:
"Once we have a job and we know that we want to do indirect host processing, this is called"
job_event_queries = fetch_job_event_query(job)
records = build_indirect_host_data(job, job_event_queries)
IndirectManagedNodeAudit.objects.bulk_create(records)
job.event_queries_processed = True


@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None:
try:
job = Job.objects.get(id=job_id)
except Job.DoesNotExist:
logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries')
return

if wait_for_events:
# Gate running this task on the job having all events processed, not just EOF or playbook_on_stats
current_events = 0
for _ in range(10):
current_events = job.job_events.count()
if current_events >= job.emitted_events:
break
logger.debug(f'Waiting for job_id={job_id} to finish processing events, currently {current_events} < {job.emitted_events}')
time.sleep(0.2)
else:
logger.warning(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
return

try:
save_indirect_host_entries_of_job(job)
except Exception:
logger.traceback(f'Error processing indirect host data for job_id={job_id}')

Check warning on line 144 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L143-L144

Added lines #L143 - L144 were not covered by tests

# Mark job as processed, even if the processing failed
job.save(update_fields=['event_queries_processed'])


@task(queue=get_task_queuename)
def save_indirect_host_entries_fallback() -> None:
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return

Check warning on line 153 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L153

Added line #L153 was not covered by tests

job_ct = 0
right_now_time = now()
window_end = right_now_time - timedelta(seconds=settings.INDIRECT_HOST_QUERY_FALLBACK_MINUTES)
window_start = right_now_time - timedelta(days=settings.INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS)
for job in Job.objects.filter(event_queries_processed=False, finished__lte=window_end, finished__gte=window_start).iterator():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this runs the first time, it could lead to a large number of processing jobs (all jobs completed in the last three days). Are we ok with that? Most of them will probably end quickly because there are no queries.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that seems like a good argument for having a limit to the start of the window which I did here. Otherwise that was more of a gut feeling.

But I still sense something to be worried about. This will get scheduled for a lot of jobs that don't need the processing done. The periodic task will run and schedule a job for each unprocessed job. Each should be a no-op with a single write, but that could still be a lot of jobs. Worse, it could temporarily overwhelm the dispatcher, because it can't delay tasks (ideal might be to scramble them with random delays).

I have a thought for this, and I'll work on putting something up. With a different angle, it'll get more intuitive.

save_indirect_host_entries.delay(job.id, wait_for_events=True)
job_ct += 1
if job_ct:
logger.info(f'Restarted event processing for {job_ct} jobs')
89 changes: 41 additions & 48 deletions awx/main/tests/functional/tasks/test_host_indirect.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import yaml
from unittest import mock

import pytest

from awx.main.tasks.host_indirect import build_indirect_host_data, fetch_job_event_query, save_indirect_host_entries, get_hashable_form
from django.utils.timezone import now, timedelta

from awx.main.tasks.host_indirect import build_indirect_host_data, fetch_job_event_query, save_indirect_host_entries, save_indirect_host_entries_fallback
from awx.main.models.event_query import EventQuery
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit

Expand Down Expand Up @@ -148,52 +151,42 @@ def test_multiple_registered_modules_same_collection(bare_job):
assert set(host_audit.events) == {'demo.query.example', 'demo.query.example2'}


class TestHashableForm:
def test_same_dict(self):
assert get_hashable_form({'a': 'b'}) == get_hashable_form({'a': 'b'})

def test_same_list(self):
assert get_hashable_form(['a', 'b']) == get_hashable_form(['a', 'b'])
assert get_hashable_form(('a', 'b')) == get_hashable_form(('a', 'b'))

def test_different_list(self):
assert get_hashable_form(['a', 'b']) != get_hashable_form(['a', 'c'])
assert get_hashable_form(('a', 'b')) != get_hashable_form(('a', 'c'))

def test_values_different(self):
assert get_hashable_form({'a': 'b'}) != get_hashable_form({'a': 'c'})

def test_has_extra_key(self):
assert get_hashable_form({'a': 'b'}) != get_hashable_form({'a': 'b', 'c': 'd'})

def test_nested_dictionaries_different(self):
assert get_hashable_form({'a': {'b': 'c'}}) != get_hashable_form({'a': {'b': 'd'}})

def test_nested_dictionaries_same(self):
assert get_hashable_form({'a': {'b': 'c'}}) == get_hashable_form({'a': {'b': 'c'}})

def test_nested_lists_different(self):
assert get_hashable_form({'a': ['b', 'c']}) != get_hashable_form({'a': ['b', 'd']})
assert get_hashable_form({'a': ('b', 'c')}) != get_hashable_form({'a': ('b', 'd')})

def test_nested_lists_same(self):
assert get_hashable_form({'a': ['b', 'c']}) == get_hashable_form({'a': ['b', 'c']})
assert get_hashable_form({'a': ('b', 'c')}) == get_hashable_form({'a': ('b', 'c')})
assert hash(get_hashable_form({'a': ['b', 'c']})) == hash(get_hashable_form({'a': ['b', 'c']}))

def test_list_nested_lists_different(self):
assert get_hashable_form(['a', ['b', 'c']]) != get_hashable_form(['a', ['b', 'd']])
assert get_hashable_form(['a', ('b', 'c')]) != get_hashable_form(['a', ('b', 'd')])

def test_list_nested_lists_same(self):
assert get_hashable_form(['a', ['b', 'c']]) == get_hashable_form(['a', ['b', 'c']])
assert get_hashable_form(['a', ('b', 'c')]) == get_hashable_form(['a', ('b', 'c')])
assert hash(get_hashable_form(['a', ('b', 'c')])) == hash(get_hashable_form(['a', ('b', 'c')]))
@pytest.mark.django_db
def test_events_not_fully_processed_no_op(bare_job):
# I have a job that produced 12 events, but those are not saved
bare_job.emitted_events = 12
bare_job.finished = now()
bare_job.save(update_fields=['emitted_events', 'finished'])

# Running the normal post-run task will do nothing at this point
assert bare_job.event_queries_processed is False
with mock.patch('time.sleep'): # for test speedup
save_indirect_host_entries(bare_job.id)
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is False

# Right away, the fallback processing will not run either
with mock.patch.object(save_indirect_host_entries, 'delay') as mock_delay:
save_indirect_host_entries_fallback()
mock_delay.assert_not_called()
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is False

# After 3 hours have passed...
bare_job.finished = now() - timedelta(hours=3)
bare_job.save(update_fields=['finished'])

# The fallback task will now process indirect host query data for this job
with mock.patch.object(save_indirect_host_entries, 'delay') as mock_delay:
save_indirect_host_entries_fallback()
mock_delay.assert_called_once_with(bare_job.id, wait_for_events=True)

# Test code to process anyway, events collected or not
save_indirect_host_entries(bare_job.id, wait_for_events=False)
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is True

def test_list_nested_dicts_different(self):
assert get_hashable_form(['a', {'b': 'c'}]) != get_hashable_form(['a', {'b': 'd'}])
assert hash(get_hashable_form(['a', {'b': 'c'}])) != hash(get_hashable_form(['a', {'b': 'd'}]))

def test_list_nested_dicts_same(self):
assert get_hashable_form(['a', {'b': 'c'}]) == get_hashable_form(['a', {'b': 'c'}])
assert hash(get_hashable_form(['a', {'b': 'c'}])) == hash(get_hashable_form(['a', {'b': 'c'}]))
@pytest.mark.django_db
def test_job_id_does_not_exist():
save_indirect_host_entries(10000001)
56 changes: 56 additions & 0 deletions awx/main/tests/unit/tasks/test_host_indirect_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import copy

import pytest

from awx.main.tasks.host_indirect import get_hashable_form


class TestHashableForm:
@pytest.mark.parametrize(
'data',
[
{'a': 'b'},
['a', 'b'],
('a', 'b'),
{'a': {'b': 'c'}},
{'a': ['b', 'c']},
{'a': ('b', 'c')},
['a', ['b', 'c']],
['a', ('b', 'c')],
['a', {'b': 'c'}],
],
)
def test_compare_equal_data(self, data):
other_data = copy.deepcopy(data)
# A tuple of scalars may be cached so ids could legitimately be the same
if data != ('a', 'b'):
assert id(data) != id(other_data) # sanity
assert id(get_hashable_form(data)) != id(get_hashable_form(data))

assert get_hashable_form(data) == get_hashable_form(data)
assert hash(get_hashable_form(data)) == hash(get_hashable_form(data))

assert get_hashable_form(data) in {get_hashable_form(data): 1} # test lookup hit

@pytest.mark.parametrize(
'data, other_data',
[
[{'a': 'b'}, {'a': 'c'}],
[{'a': 'b'}, {'a': 'b', 'c': 'd'}],
[['a', 'b'], ['a', 'c']],
[('a', 'b'), ('a', 'c')],
[{'a': {'b': 'c'}}, {'a': {'b': 'd'}}],
[{'a': ['b', 'c']}, {'a': ['b', 'd']}],
[{'a': ('b', 'c')}, {'a': ('b', 'd')}],
[['a', ['b', 'c']], ['a', ['b', 'd']]],
[['a', ('b', 'c')], ['a', ('b', 'd')]],
[['a', {'b': 'c'}], ['a', {'b': 'd'}]],
],
)
def test_compar_different_data(self, data, other_data):
assert data != other_data # sanity, otherwise why test this?
assert get_hashable_form(data) != get_hashable_form(other_data)
assert hash(get_hashable_form(data)) != hash(get_hashable_form(other_data))

assert get_hashable_form(other_data) not in {get_hashable_form(data): 1} # test lookup miss
assert get_hashable_form(data) not in {get_hashable_form(other_data): 1}
9 changes: 9 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@
'cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': timedelta(hours=3, minutes=30)},
'host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': timedelta(hours=4)},
'periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': timedelta(minutes=15)},
'save_indirect_host_entries_fallback': {'task': 'awx.main.tasks.host_indirect.save_indirect_host_entries_fallback', 'schedule': timedelta(minutes=60)},
}

# Django Caching Configuration
Expand Down Expand Up @@ -1060,6 +1061,14 @@
# system username for django-ansible-base
SYSTEM_USERNAME = None

# For indirect host query processing
# if a job is not immediently confirmed to have all events processed
# it will be eligable for processing after this number of minutes
INDIRECT_HOST_QUERY_FALLBACK_MINUTES = 60

# If an error happens in event collection, give up after this time
INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS = 3

# feature flags
FLAGS = {'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}]}

Expand Down
Loading