Skip to content

Commit

Permalink
Remove polling loop for job finishing event processing (#15811)
Browse files Browse the repository at this point in the history
* Remove polling loop for job finishing event processing

* Make awx/main/tests/live dramatically faster (#15780)
  • Loading branch information
AlanCoding authored Feb 10, 2025
1 parent b2a36c9 commit 26f0f99
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 17 deletions.
4 changes: 2 additions & 2 deletions awx/main/dispatch/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from uuid import uuid4

from django_guid import get_guid
from django.conf import settings

from . import pg_bus_conn
from awx.main.utils import is_testing

logger = logging.getLogger('awx.main.dispatch')

Expand Down Expand Up @@ -101,7 +101,7 @@ def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
obj = cls.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw)
if callable(queue):
queue = queue()
if not is_testing():
if not settings.DISPATCHER_MOCK_PUBLISH:
with pg_bus_conn() as conn:
conn.notify(queue, json.dumps(obj))
return (obj, queue)
Expand Down
14 changes: 4 additions & 10 deletions awx/main/tasks/host_indirect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from typing import Tuple, Union
import time

import yaml

Expand Down Expand Up @@ -148,16 +147,11 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non

if wait_for_events:
# Gate running this task on the job having all events processed, not just EOF or playbook_on_stats
current_events = 0
for _ in range(10):
current_events = job.job_events.count()
if current_events >= job.emitted_events:
break
logger.debug(f'Waiting for job_id={job_id} to finish processing events, currently {current_events} < {job.emitted_events}')
time.sleep(0.2)
else:
logger.warning(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
current_events = job.job_events.count()
if current_events < job.emitted_events:
logger.info(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
return
job.log_lifecycle(f'finished processing {current_events} events, running save_indirect_host_entries')

with transaction.atomic():
"""
Expand Down
7 changes: 6 additions & 1 deletion awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,12 @@ def events_processed_hook(unified_job):
Either one of these events could happen before the other, or there may be no events"""
unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed')
if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
save_indirect_host_entries.delay(unified_job.id)
if unified_job.event_queries_processed is True:
# If this is called from callback receiver, it likely does not have updated model data
# a refresh now is formally robust
unified_job.refresh_from_db(fields=['event_queries_processed'])
if unified_job.event_queries_processed is False:
save_indirect_host_entries.delay(unified_job.id)

Check warning on line 382 in awx/main/tasks/system.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/system.py#L382

Added line #L382 was not covered by tests


@task(queue=get_task_queuename)
Expand Down
20 changes: 16 additions & 4 deletions awx/main/tests/live/tests/test_indirect_host_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time

from awx.main.tests.live.tests.conftest import wait_for_events
from awx.main.tasks.host_indirect import build_indirect_host_data
from awx.main.tasks.host_indirect import build_indirect_host_data, save_indirect_host_entries
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models import Job

Expand Down Expand Up @@ -38,13 +38,25 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook):

assert job.ansible_version

# This will poll, because it depends on the background task finishing
# Poll for events finishing processing, because background task requires this
for _ in range(10):
if IndirectManagedNodeAudit.objects.filter(job=job).exists():
if job.job_events.count() >= job.emitted_events:
break
time.sleep(0.2)
else:
raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}')
raise RuntimeError(f'job id={job.id} never processed events')

# Task might not run due to race condition, so make it run here
job.refresh_from_db()
if job.event_queries_processed is False:
save_indirect_host_entries.delay(job.id, wait_for_events=False)
# This will poll for the background task to finish
for _ in range(10):
if IndirectManagedNodeAudit.objects.filter(job=job).exists():
break
time.sleep(0.2)
else:
raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}')

assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first()
Expand Down
3 changes: 3 additions & 0 deletions awx/main/tests/settings_for_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
# Some things make decisions based on settings.SETTINGS_MODULE, so this is done for that
SETTINGS_MODULE = 'awx.settings.development'

# Turn off task submission, because sqlite3 does not have pg_notify
DISPATCHER_MOCK_PUBLISH = True

# Use SQLite for unit tests instead of PostgreSQL. If the lines below are
# commented out, Django will create the test_awx-dev database in PostgreSQL to
# run unit tests.
Expand Down
1 change: 1 addition & 0 deletions awx/main/tests/unit/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
'CACHES',
'DEBUG',
'NAMED_URL_GRAPH',
'DISPATCHER_MOCK_PUBLISH',
)


Expand Down
5 changes: 5 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@
# Amount of time dispatcher will try to reconnect to database for jobs and consuming new work
DISPATCHER_DB_DOWNTIME_TOLERANCE = 40

# If you set this, nothing will ever be sent to pg_notify
# this is not practical to use, although periodic schedules may still run slugish but functional tasks
# sqlite3 based tests will use this
DISPATCHER_MOCK_PUBLISH = False

BROKER_URL = 'unix:///var/run/redis/redis.sock'
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},
Expand Down

0 comments on commit 26f0f99

Please sign in to comment.