-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpushLQE2GQE.py
84 lines (71 loc) · 3.08 KB
/
pushLQE2GQE.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
NOTE: Use this script with care, otherwise it can mess up a given workflow
This script can be used when there is a workqueue element acquired by an
agent, but stuck in Available in local workqueue db.
Expected actions are:
1. set the local workqueue_inbox element to Available (plus other parameters that need to be reset)
2. delete the corresponding local workqueue element.
"""
from __future__ import print_function
import os
import sys
import logging
from pprint import pprint
from WMCore.Configuration import loadConfigurationFile
from WMCore.WorkQueue.WorkQueueBackend import WorkQueueBackend
def setupLogger():
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s:%(message)s")
handler.setFormatter(formatter)
root.addHandler(handler)
return root
def main():
if 'WMAGENT_CONFIG' not in os.environ:
os.environ['WMAGENT_CONFIG'] = '/data/srv/wmagent/current/config/wmagent/config.py'
config = loadConfigurationFile(os.environ["WMAGENT_CONFIG"])
print("Work in progress! It might create document conflicts as it is!")
sys.exit(10)
if len(sys.argv) != 2:
print("You must provide a request name")
sys.exit(1)
reqName = sys.argv[1]
childQueue = config.WorkQueueManager.queueParams['QueueURL']
logger = setupLogger()
localWQBackend = WorkQueueBackend(config.WorkQueueManager.couchurl, db_name="workqueue", logger=logger)
localElems = localWQBackend.getElements(WorkflowName=reqName)
localInboxElems = localWQBackend.getInboxElements(WorkflowName=reqName)
docsToUpdate = []
logger.info("** Local workqueue_inbox elements for workflow %s and agent %s", reqName, childQueue)
for elem in localInboxElems:
if elem['Status'] == "Acquired":
logger.info("Element id: %s has status: %s", elem.id, elem['Status'])
elem['Status'] = 'Available'
elem['ChildQueueUrl'] = None
docsToUpdate.append(elem)
if docsToUpdate:
var = raw_input("Found %d inbox elements to update, shall we proceed (Y/N): " % len(docsToUpdate))
if var == "Y":
resp = localWQBackend.saveElements(*docsToUpdate)
logger.info(" update response: %s", resp)
docsToUpdate = []
logger.info("** Local workqueue elements for workflow %s and agent %s", reqName, childQueue)
for elem in localElems:
if elem['Status'] == "Available":
logger.info("Element id: %s has status: %s", elem.id, elem['Status'])
docsToUpdate.append(elem._id)
if docsToUpdate:
var = raw_input("Found %d elements to delete, shall we proceed (Y/N): " % len(docsToUpdate))
if var == "Y":
for elem in docsToUpdate:
elem.delete()
resp = docsToUpdate[0]._couch.commit()
logger.info(" deletion response: %s", resp)
print("Done!")
sys.exit(0)
if __name__ == "__main__":
sys.exit(main())