Skip to content

Commit

Permalink
New setting for pg_notify listener DB settings, add keepalive
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Jan 10, 2024
1 parent bb1922c commit e9bb53b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
23 changes: 17 additions & 6 deletions awx/main/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ def close(self):
self.conn.close()


def create_listener_connection():
conf = settings.DATABASES['default'].copy()
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')

# Apply overrides specifically for the listener connection
for k, v in settings.LISTENER_DATABASES.get('default', {}).items():
conf[k] = v
for k, v in settings.LISTENER_DATABASES.get('default', {}).get('OPTIONS', {}).items():
conf['OPTIONS'][k] = v

connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
return psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])


@contextmanager
def pg_bus_conn(new_connection=False, select_timeout=None):
'''
Expand All @@ -106,12 +122,7 @@ def pg_bus_conn(new_connection=False, select_timeout=None):
'''

if new_connection:
conf = settings.DATABASES['default'].copy()
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')
connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
conn = psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])
conn = create_listener_connection()
else:
if pg_connection.connection is None:
pg_connection.connect()
Expand Down
5 changes: 4 additions & 1 deletion awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def run_periodic_tasks(self):
# bypasses pg_notify for scheduled tasks
self.dispatch_task(body)

self.pg_is_down = False
if self.pg_is_down:
logger.info('Dispatcher listener connection established')
self.pg_is_down = False

self.listen_start = time.time()

return self.scheduler.time_until_next_run()
Expand Down
12 changes: 12 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@
}
}

# Special database overrides for dispatcher connections listening to pg_notify
LISTENER_DATABASES = {
'default': {
'OPTIONS': {
'keepalives': 1,
'keepalives_idle': 5,
'keepalives_interval': 5,
'keepalives_count': 5,
},
}
}

# Whether or not the deployment is a K8S-based deployment
# In K8S-based deployments, instances have zero capacity - all playbook
# automation is intended to flow through defined Container Groups that
Expand Down

0 comments on commit e9bb53b

Please sign in to comment.