diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index 8ff158eafae6..df39e06de3c1 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -5,9 +5,9 @@ from uuid import uuid4 from django_guid import get_guid +from django.conf import settings from . import pg_bus_conn -from awx.main.utils import is_testing logger = logging.getLogger('awx.main.dispatch') @@ -101,7 +101,7 @@ def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw): obj = cls.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw) if callable(queue): queue = queue() - if not is_testing(): + if not settings.DISPATCHER_MOCK_PUBLISH: with pg_bus_conn() as conn: conn.notify(queue, json.dumps(obj)) return (obj, queue) diff --git a/awx/main/tasks/host_indirect.py b/awx/main/tasks/host_indirect.py index 63d6453650bd..d7e9edffaf32 100644 --- a/awx/main/tasks/host_indirect.py +++ b/awx/main/tasks/host_indirect.py @@ -1,6 +1,5 @@ import logging from typing import Tuple, Union -import time import yaml @@ -148,16 +147,11 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non if wait_for_events: # Gate running this task on the job having all events processed, not just EOF or playbook_on_stats - current_events = 0 - for _ in range(10): - current_events = job.job_events.count() - if current_events >= job.emitted_events: - break - logger.debug(f'Waiting for job_id={job_id} to finish processing events, currently {current_events} < {job.emitted_events}') - time.sleep(0.2) - else: - logger.warning(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking') + current_events = job.job_events.count() + if current_events < job.emitted_events: + logger.info(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking') return + job.log_lifecycle(f'finished processing {current_events} events, running save_indirect_host_entries') with transaction.atomic(): """ diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 31d890a6bcc1..91e4d0dee5e3 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -374,7 +374,12 @@ def events_processed_hook(unified_job): Either one of these events could happen before the other, or there may be no events""" unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed') if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): - save_indirect_host_entries.delay(unified_job.id) + if unified_job.event_queries_processed is True: + # If this is called from callback receiver, it likely does not have updated model data + # a refresh now is formally robust + unified_job.refresh_from_db(fields=['event_queries_processed']) + if unified_job.event_queries_processed is False: + save_indirect_host_entries.delay(unified_job.id) @task(queue=get_task_queuename) diff --git a/awx/main/tests/live/tests/test_indirect_host_counting.py b/awx/main/tests/live/tests/test_indirect_host_counting.py index 00eda91db5c9..7c86eb8d313b 100644 --- a/awx/main/tests/live/tests/test_indirect_host_counting.py +++ b/awx/main/tests/live/tests/test_indirect_host_counting.py @@ -2,7 +2,7 @@ import time from awx.main.tests.live.tests.conftest import wait_for_events -from awx.main.tasks.host_indirect import build_indirect_host_data +from awx.main.tasks.host_indirect import build_indirect_host_data, save_indirect_host_entries from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit from awx.main.models import Job @@ -38,13 +38,25 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook): assert job.ansible_version - # This will poll, because it depends on the background task finishing + # Poll for events finishing processing, because background task requires this for _ in range(10): - if IndirectManagedNodeAudit.objects.filter(job=job).exists(): + if job.job_events.count() >= job.emitted_events: break time.sleep(0.2) else: - raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}') + raise RuntimeError(f'job id={job.id} never processed events') + + # Task might not run due to race condition, so make it run here + job.refresh_from_db() + if job.event_queries_processed is False: + save_indirect_host_entries.delay(job.id, wait_for_events=False) + # This will poll for the background task to finish + for _ in range(10): + if IndirectManagedNodeAudit.objects.filter(job=job).exists(): + break + time.sleep(0.2) + else: + raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}') assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1 host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first() diff --git a/awx/main/tests/settings_for_test.py b/awx/main/tests/settings_for_test.py index 5634494c3373..b7d5cdf0235f 100644 --- a/awx/main/tests/settings_for_test.py +++ b/awx/main/tests/settings_for_test.py @@ -7,6 +7,9 @@ # Some things make decisions based on settings.SETTINGS_MODULE, so this is done for that SETTINGS_MODULE = 'awx.settings.development' +# Turn off task submission, because sqlite3 does not have pg_notify +DISPATCHER_MOCK_PUBLISH = True + # Use SQLite for unit tests instead of PostgreSQL. If the lines below are # commented out, Django will create the test_awx-dev database in PostgreSQL to # run unit tests. diff --git a/awx/main/tests/unit/test_settings.py b/awx/main/tests/unit/test_settings.py index dae59296551b..7ff2e3f4abf6 100644 --- a/awx/main/tests/unit/test_settings.py +++ b/awx/main/tests/unit/test_settings.py @@ -11,6 +11,7 @@ 'CACHES', 'DEBUG', 'NAMED_URL_GRAPH', + 'DISPATCHER_MOCK_PUBLISH', ) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 51c162cdaadc..1142109704f5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -424,6 +424,11 @@ # Amount of time dispatcher will try to reconnect to database for jobs and consuming new work DISPATCHER_DB_DOWNTIME_TOLERANCE = 40 +# If you set this, nothing will ever be sent to pg_notify +# this is not practical to use, although periodic schedules may still run slugish but functional tasks +# sqlite3 based tests will use this +DISPATCHER_MOCK_PUBLISH = False + BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},