Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.

Commit 9c2a6ce

Browse files
authored
Merge pull request #1204 from yanfr0818/reviveFailed
Add reviveFailed.py script to resubmit/reject failed WF
2 parents 3c45748 + 940c11c commit 9c2a6ce

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed

reviveFailed.py

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#!/usr/bin/env python
2+
import sys
3+
import os
4+
import pwd
5+
import json
6+
import httplib
7+
import optparse
8+
import resubmit, reqMgrClient
9+
import dbs3Client as dbs3
10+
from optparse import OptionParser
11+
from assignSession import *
12+
from utils import *
13+
14+
15+
"""
16+
Find the list of WFs that needs revive
17+
Clone and kill the original WFs
18+
"""
19+
20+
def getRequestsFailed(url):
21+
"""
22+
Retrieves workflows overview from WMStats
23+
by querying couch db JSON direcly
24+
"""
25+
#TODO use the couch API from WMStatsClient instead of wmstats URL
26+
conn = httplib.HTTPSConnection(url, cert_file = os.getenv('X509_USER_PROXY'),
27+
key_file = os.getenv('X509_USER_PROXY'))
28+
conn.request("GET", '/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatus?key="failed"')
29+
response = conn.getresponse()
30+
data = response.read()
31+
conn.close()
32+
myString=data.decode('utf-8')
33+
workflows=json.loads(myString)['rows']
34+
35+
print("{} failed WFs in ReqMgr.".format(len(workflows)))
36+
return workflows
37+
38+
39+
def findReviveList(list_request_dicts):
40+
"""
41+
Find the list of WFs that needs revive using the list of the request dictionaries
42+
"""
43+
revived_names = []
44+
list_request_dicts = [d for d in list_request_dicts if d['key'] == "failed"]
45+
46+
# Define the abnormal RegMgr and Unified status. If at least one child WF is NOT in abnormal status, do NOT revive the original WF.
47+
# abnormalRstatus = ["failed", "trouble"]
48+
# abnormalUstatus = []
49+
50+
# Remove the failed WFs that have already been revived
51+
for request_dict in list_request_dicts:
52+
request_name = request_dict['id']
53+
wf = workflowInfo(url, request_name)
54+
wf_family = wf.getFamilly()
55+
if (len(wf_family)):
56+
for wf_scion in wf_family:
57+
scion_name = wf_scion['RequestName']
58+
scion_Rstatus = wf_scion['RequestStatus'] # Request status
59+
# if scion_Rstatus not in abnormalRstatus:
60+
# revived_names.append(request_name)
61+
# continue
62+
63+
all_info = session.query(Workflow).filter(Workflow.name == scion_name).all()
64+
if len(all_info):
65+
revived_names.append(request_name)
66+
print("{} has already been revived.".format(request_name))
67+
continue
68+
# scion_info = all_info[0]
69+
# if scion_info.status not in abnormalUStatus:
70+
# revived_names.append(request_name)
71+
# continue
72+
73+
list_request_dicts = [d for d in list_request_dicts if d['id'] not in revived_names]
74+
print("{} WFs need to be revived".format(len(list_request_dicts)))
75+
print("{} WFs has been revived or is being revived".format(len(revived_names)))
76+
return list_request_dicts
77+
78+
79+
def resubmitFailed(list_revive_dicts):
80+
"""
81+
Resubmit the list of WFs
82+
"""
83+
uinfo = pwd.getpwuid(os.getuid())
84+
user = uinfo.pw_name
85+
group = 'DATAOPS'
86+
87+
list_revive_dicts = [d for d in list_revive_dicts if d['key'] == "failed"]
88+
file_name = "failedWFs.txt" # Write to external txt file
89+
flist = open(file_name,"w+")
90+
91+
for revive_dict in list_revive_dicts:
92+
revive_name = revive_dict['id']
93+
clone = resubmit.cloneWorkflow(revive_name, user, group, verbose=False)
94+
flist.write(clone+"\n")
95+
96+
print("List of the resubmitted WFs has been writen to {}. Run \"python setReqMgrStatus.py -s staged -f {}\" later to stage those WFs in Unified.".format(file_name, file_name))
97+
98+
99+
def rejectFailed(url, list_request_dicts):
100+
"""
101+
Reject the list of WFs and invalidate the datasets
102+
"""
103+
list_request_dicts = [d for d in list_request_dicts if d['key'] == "failed"]
104+
for request_dict in list_request_dicts:
105+
failed_name = request_dict['id']
106+
print("Rejcting {}...".format(failed_name))
107+
reqMgrClient.rejectWorkflow(url, failed_name)
108+
datasets = reqMgrClient.outputdatasetsWorkflow(url, failed_name)
109+
for ds in datasets:
110+
dbs3.setDatasetStatus(ds, 'INVALID', files=True)
111+
112+
113+
if __name__ == "__main__":
114+
parser = optparse.OptionParser()
115+
parser.add_option('-r', '--reject', default=False, action = 'store_true', help='Reject all the failed WFs in RegMgr')
116+
(options, args) = parser.parse_args()
117+
118+
url='cmsweb.cern.ch'
119+
failedWFs = getRequestsFailed(url)
120+
reviveWFs = findReviveList(failedWFs)
121+
122+
resubmitFailed(reviveWFs)
123+
124+
if options.reject:
125+
rejectFailed(failedWFs)

0 commit comments

Comments
 (0)