forked from CMSCompOps/OSDroid
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
130 lines (100 loc) · 4.71 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python
import json
import logging
import logging.config
import os
import time
import traceback
from os.path import abspath, dirname, join
import yaml
from monitutils import (get_workflow_from_db, get_yamlconfig, save_json,
update_doc_archive_db)
from workflowalerts import alertWithEmail, errorEmailShooter
from workflowlabelmaker import updateLabelArchives
from workflowmonitexporter import (buildDoc, prepareWorkflows, sendDoc,
updateWorkflowStatusToDb)
from workflowprediction import makingPredictionsWithML
LOGDIR = join(dirname(abspath(__file__)), 'Logs')
CRED_FILE_PATH = join(dirname(abspath(__file__)), 'config/credential.yml')
CONFIG_FILE_PATH = join(dirname(abspath(__file__)), 'config/config.yml')
LOGGING_CONFIG = join(dirname(abspath(__file__)), 'config/configLogging.yml')
logger = logging.getLogger("workflowmonitLogger")
rootlogger = logging.getLogger()
class No502WarningFilter(logging.Filter):
def filter(self, record):
return 'STATUS: 502' not in record.getMessage()
rootlogger.addFilter(No502WarningFilter())
def main():
logging.config.dictConfig(get_yamlconfig(LOGGING_CONFIG))
cred = get_yamlconfig(CRED_FILE_PATH)
localconfig = get_yamlconfig(CONFIG_FILE_PATH)
if not os.path.isdir(LOGDIR):
os.makedirs(LOGDIR)
recipients = localconfig.get('alert_recipients', [])
try:
wfpacks = prepareWorkflows(CONFIG_FILE_PATH, test=False)
totaldocs = []
for pack in wfpacks:
try:
docs = buildDoc(pack, doconcurrent=True)
totaldocs.extend(docs)
# update status in local db
updateWorkflowStatusToDb(CONFIG_FILE_PATH, docs)
# send to CERN MONIT
failures = sendDoc(cred, docs)
# alerts
alertWithEmail(docs, recipients)
# backup doc
# bkpfn = join(LOGDIR, 'toSendDoc_{}'.format(time.strftime('%y%m%d-%H%M%S')))
# bkpdoc = save_json(docs, filename=bkpfn, gzipped=True)
# logger.info('Document backuped at: {}'.format(bkpdoc))
# backup failure msg
if len(failures):
faildocfn = join(LOGDIR, 'amqFailMsg_{}'.format(time.strftime('%y%m%d-%H%M%S')))
faildoc = save_json(failures, filename=faildocfn, gzipped=True)
logger.info('Failed message saved at: {}'.format(faildoc))
logger.info('Number of updated workflows: {}'.format(len(docs)))
except Exception:
logger.exception(f"Exception encountered, sending emails to {str(recipients)}")
errorEmailShooter(traceback.format_exc(), recipients)
# predictions
logger.info("Making predicions for {} workflows..".format(len(totaldocs)))
makingPredictionsWithML(totaldocs)
# labeling
qcmd = "SELECT NAME FROM CMS_UNIFIED_ADMIN.WORKFLOW WHERE WM_STATUS LIKE '%archived'"
archivedwfs = get_workflow_from_db(CONFIG_FILE_PATH, qcmd)
_wfnames = [w.name for w in archivedwfs]
logger.info("Passing {} workflows for label making..".format(len(_wfnames)))
updateLabelArchives(_wfnames)
# archive docs:
docs_to_insert = [(doc['name'], json.dumps(doc)) for doc in totaldocs]
update_doc_archive_db(localconfig, docs_to_insert)
except Exception:
logger.exception(f"Exception encountered, sending emails to {str(recipients)}")
errorEmailShooter(traceback.format_exc(), recipients)
def test():
logging.config.dictConfig(get_yamlconfig(LOGGING_CONFIG))
logger = logging.getLogger("testworkflowmonitLogger")
if not os.path.isdir(LOGDIR):
os.makedirs(LOGDIR)
cred = get_yamlconfig(CRED_FILE_PATH)
recipients = get_yamlconfig(CONFIG_FILE_PATH).get('alert_recipients', [])
try:
wfpacks = prepareWorkflows(CONFIG_FILE_PATH, test=True)
totaldocs = []
for pack in wfpacks:
docs = buildDoc(pack, doconcurrent=True)
totaldocs.extend(docs)
# predictions
logger.info("Making predicions for {} workflows..".format(len(totaldocs)))
makingPredictionsWithML(totaldocs)
# labeling
qcmd = "SELECT NAME FROM CMS_UNIFIED_ADMIN.WORKFLOW WHERE WM_STATUS LIKE '%archived'"
archivedwfs = get_workflow_from_db(CONFIG_FILE_PATH, qcmd)
_wfnames = [w.name for w in archivedwfs]
logger.info("Passing {} workflows for label making..".format(len(_wfnames)))
updateLabelArchives(_wfnames)
except Exception:
logger.exception(f"Exception encountered, sending emails to {str(recipients)}")
if __name__ == "__main__":
main()