forked from SUSE/pcw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcleanup.py
47 lines (38 loc) · 1.74 KB
/
cleanup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import logging
import traceback
from webui.PCWConfig import PCWConfig
from ocw.lib.azure import Azure
from ocw.lib.ec2 import EC2
from ocw.lib.gce import GCE
from ocw.lib.openstack import Openstack
from ocw.lib.eks import EKS
from ocw.lib.emailnotify import send_mail, send_cluster_notification
from ocw.enums import ProviderChoice
logger = logging.getLogger(__name__)
def cleanup_run():
for namespace in PCWConfig.get_namespaces_for('cleanup'):
try:
providers = PCWConfig.get_providers_for('cleanup', namespace)
logger.info("[%s] Run cleanup for %s", namespace, ','.join(providers))
if ProviderChoice.AZURE in providers:
Azure(namespace).cleanup_all()
if ProviderChoice.EC2 in providers:
EC2(namespace).cleanup_all()
if ProviderChoice.GCE in providers:
GCE(namespace).cleanup_all()
if ProviderChoice.OSTACK in providers:
Openstack(namespace).cleanup_all()
except Exception as ex:
logger.exception("[%s] Cleanup failed!", namespace)
send_mail(f'{type(ex).__name__} on Cleanup in [{namespace}]', traceback.format_exc())
def list_clusters():
for namespace in PCWConfig.get_namespaces_for('clusters'):
try:
clusters = EKS(namespace).all_clusters()
quantity = sum(len(clusters[c1]) for c1 in clusters)
logger.info("%d cluster(s) found", quantity)
if quantity > 0:
send_cluster_notification(namespace, clusters)
except Exception as ex:
logger.exception("[%s] List clusters failed!", namespace)
send_mail(f'{type(ex).__name__} on List clusters in [{namespace}]', traceback.format_exc())