diff --git a/awx/main/dispatch/__init__.py b/awx/main/dispatch/__init__.py index d5349e02f53c..c0261e07aff4 100644 --- a/awx/main/dispatch/__init__.py +++ b/awx/main/dispatch/__init__.py @@ -93,6 +93,22 @@ def close(self): self.conn.close() +def create_listener_connection(): + conf = settings.DATABASES['default'].copy() + conf['OPTIONS'] = conf.get('OPTIONS', {}).copy() + # Modify the application name to distinguish from other connections the process might use + conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener') + + # Apply overrides specifically for the listener connection + for k, v in settings.LISTENER_DATABASES.get('default', {}).items(): + conf[k] = v + for k, v in settings.LISTENER_DATABASES.get('default', {}).get('OPTIONS', {}).items(): + conf['OPTIONS'][k] = v + + connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}" + return psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS']) + + @contextmanager def pg_bus_conn(new_connection=False, select_timeout=None): ''' @@ -106,12 +122,7 @@ def pg_bus_conn(new_connection=False, select_timeout=None): ''' if new_connection: - conf = settings.DATABASES['default'].copy() - conf['OPTIONS'] = conf.get('OPTIONS', {}).copy() - # Modify the application name to distinguish from other connections the process might use - conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener') - connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}" - conn = psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS']) + conn = create_listener_connection() else: if pg_connection.connection is None: pg_connection.connect() diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 2ff8752f0661..05b51b2930bc 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -214,7 +214,10 @@ def run_periodic_tasks(self): # bypasses pg_notify for scheduled tasks self.dispatch_task(body) - self.pg_is_down = False + if self.pg_is_down: + logger.info('Dispatcher listener connection established') + self.pg_is_down = False + self.listen_start = time.time() return self.scheduler.time_until_next_run() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 1cd89590bcaf..ae3610633a4d 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -37,6 +37,18 @@ } } +# Special database overrides for dispatcher connections listening to pg_notify +LISTENER_DATABASES = { + 'default': { + 'OPTIONS': { + 'keepalives': 1, + 'keepalives_idle': 5, + 'keepalives_interval': 5, + 'keepalives_count': 5, + }, + } +} + # Whether or not the deployment is a K8S-based deployment # In K8S-based deployments, instances have zero capacity - all playbook # automation is intended to flow through defined Container Groups that