-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathMetaFixer.py
359 lines (289 loc) · 13.8 KB
/
MetaFixer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
"""metacat MetaFixer
evaluate and fix metacat for files
"""
##
# @mainpage MetaFixer
#
# @section description_main
#
#
# @file MetaFixer.py
# pylint: disable=C0303
# pylint: disable=C0321
# pylint: disable=C0301
# pylint: disable=C0209
# pylint: disable=C0103
# pylint: disable=C0325
# pylint: disable=C0123
# pyline: disable=W1514
# need to implement this
#from argparse import ArgumentParser as ap
import sys
import os
import json
import datetime
import argparse
#import samweb_client
from metacat.webapi import MetaCatClient
from TypeChecker import TypeChecker
DEBUG = False
mc_client = MetaCatClient(os.getenv("METACAT_SERVER_URL"))
class MetaFixer:
''' Class to check and fix metadata'''
def __init__(self,verbose=False,errname="error.txt",tests=None,fix=None):
'''
__init__ initialization, does very little
:param verbose: print out a lot of things
'''
self.query_files=None
self.verbose = verbose
self.fix=fix
self.limit=1000000
self.skip=0
self.errfile=open(errname,'w')
self.errfile.write(errname+"\n")
self.tests=tests
def getInput(self,query=None,limit=10000000,skip=0):
'''
get a query and return a list of did's
'''
self.skip = skip
self.limit = limit
thequery = query + " skip %d limit %d "%(self.skip,self.limit)
print ("thequery:",thequery)
try:
self.query_files = list(mc_client.query(thequery))
print ("getinput returned:",len(self.query_files), "files")
except:
print("metacat query ", thequery, " failed")
sys.exit(0)
return self.query_files
def explore(self):
" see what there is in a general query "
if not os.path.exists("metadata"):
#this explores and counts things'
os.mkdir("metadata")
datatypes = ["core.data_tier","core.run_type","dune.campaign","dune_mc.gen_fcl_filename","core.application","dune.requestid"]
typecount = {}
for datatype in datatypes:
typecount[datatype]={}
typecount["namespace"]={}
count = self.skip
for file in self.query_files :
count += 1
#if self.verbose:print (file)
thedid = "%s:%s"%(file["namespace"],file["name"])
if count%10 == 0 and self.verbose:
print (count, thedid)
try:
md = mc_client.get_file(did=thedid,with_metadata=True,with_provenance=True)
except:
print ("failed at file",count,thedid)
break
self.checker(md)
def checker(self, filemd=None):
' check various aspects of the file '
did = "%s:%s"%(filemd["namespace"],filemd["name"])
for check in self.tests:
if check == "duplicates":
self.dupfinder(filemd=filemd)
if check == "parentage":
self.parentfixer(filemd)
if check == "types":
status, fixer = TypeChecker(filemd,self.errfile)
print ("result of type check",filemd["name"],status)
def parentfixer(self, filemd=None,check="parentage"):
' check parentage of the file '
did = "%s:%s"%(filemd["namespace"],filemd["name"])
status = "good"
if "parents" in filemd and len(filemd["parents"])> 0:
# has some parentage, look at it.
parents = filemd["parents"]
if self.verbose:
print ("parents",parents)
for p in parents:
parentmd = mc_client.get_file(fid=p["fid"])
if self.verbose:
print(parentmd["namespace"],parentmd["name"],jsondump(p))
self.errfile.write("%s, parentage ok\n"%did)
return status
else: # no parents found
metadata = filemd["metadata"]
if "core.parents" in metadata:
parentlist = []
if self.verbose:
print ("core.parents", metadata["core.parents"])
for p in metadata["core.parents"]:
if ":" in p["file_name"]:
#if self.verbose:
self.errfile.write("%s, missing parents\n"%did)
thisparent = {"did":p["file_name"]}
else:
if self.verbose:
print ("ERROR missing namespace for parent in this file",did)
self.errfile.write("%s, missing namespace in parents\n"%did)
thisparent = {"did":"%s:%s"%(filemd["namespace"],p["file_name"])} # hack in namespace of child file
parentlist.append(thisparent)
print ("parents to add",parentlist)
if self.fix:
print ("Tried to fix this file", did)
try:
mc_client.update_file(did, parents=parentlist)
print ("fix succeeded")
self.errfile.write("%s, fixed it\n"%did)
except:
print ("fix failed")
self.errfile.write("%s, failed to fix \n"%did)
status = "fail"
return status
else:
self.errfile.write("%s, no parents or core.parents\n"%did)
return status
def dupfinder(self,filemd=None):
' loop over parents, look for children and look for duplicates'
thedid = "%s:%s"%(filemd["namespace"],filemd["name"])
md = filemd["metadata"]
tag = "%s_%s_%s_%s_%s_%s_%s"%(filemd["namespace"],md["core.application.version"],md["core.application.name"],md["core.data_tier"],md["core.data_stream"],md["dune.campaign"],md["core.file_format"])
if self.verbose:
print ("---------------------------\n")
print ("thefile",thedid)
if "parents" in filemd and len(filemd["parents"])> 0:
# has some parentage, look at it.
parents = filemd["parents"]
for p in parents:
try:
parentmd = mc_client.get_file(fid=p["fid"],with_metadata=True,with_provenance=True)
except Exception as e:
print ("ERROR could not get metadata for",fid,e)
continue
if self.verbose:
print ("the parent, %s:%s"%(parentmd["namespace"],parentmd["name"]))
children = parentmd["children"]
if len(children) == 0:
message = "%s, ERROR no children\n"%(thedid)
self.errfile.write(message)
print (message)
if self.verbose:
print ("had %d children"%len(children))
if len(children) == 1:
continue
count = 0
for child in children:
try:
childmd = mc_client.get_file(fid=child["fid"],with_metadata=True,with_provenance=True)
except Exception as e:
print ("ERROR could not get metadata for",child["fid"],e)
continue
cm = childmd["metadata"]
childdid = "%s:%s"%(childmd["namespace"],childmd["name"])
if childdid == thedid:
continue
#print ("child", jsondump(childmd))
if cm["dune.output_status"] != "confirmed": continue
if "dune.workflow" in cm:
workdata = cm["dune.workflow"]
workid = workdata["workflow_id"]
else:
workid = -1
ctag = "BAD"
try:
ctag = "%s_%s_%s_%s_%s_%s_%s"%(childmd["namespace"],cm["core.application.version"],md["core.application.name"],cm["core.data_tier"],cm["core.data_stream"],cm["dune.campaign"],cm["core.file_format"])
except:
error = "%s, ERROR ctag couldn't be made %s\n"%(thedid,childdid,ctag)
self.errfile.write(error)
continue
#print (childdid)
if ctag == tag:
count += 1
if count > 0:
message = "%s, ERROR duplicate file %d, %s workflow=%d\n"%(thedid,count, childdid,workid)
print (message)
self.errfile.write(message)
def cleanup(self):
' make certain the output errorfile is closed'
self.errfile.close()
def jsondump(adict):
' dump a dictionary to a nicely formatted string'
return json.dumps(adict,indent=4)
def parentchecker(query):
' build the complicated parent query -- it is very slow '
newquery = " %s - children(parents(%s))"%(query,query)
return newquery
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='check and fix metadata')
#parser.add_argument("--fileName", type=str, help="Name of merged file, will be padded with timestamp if already exists", default="merged.root")
parser.add_argument("--workflows",default=False,action='store_true', help="use worflow id for min/max")
parser.add_argument("--dataset",default=None,type=str, help="name of dataset to check")
parser.add_argument("--runs",default=False,action='store_true', help="use run id for min/max")
parser.add_argument("--min",type=int, help="minimum id to check",default = None)
parser.add_argument("--max",type=int, help="maximum id to check",default = None)
parser.add_argument("--tests",type=str, help="list of tests to run, comma delimited string",default="duplicates")
parser.add_argument("--data_tiers",type=str, help="list of data_tiers to test",default = "full-reconstructed")
parser.add_argument("--experiment",type=str, help="experiment",default = "hd-protodune")
parser.add_argument("--mc",help="set if mc",default=False,action='store_true')
parser.add_argument("--fix",help="do or suggest a fix",default=False,action='store_true')
parser.add_argument("--debug",help="do a short run with printout", default=False,action='store_true')
#data_tier = "full-reconstructed"
#workflow = 1630
args = parser.parse_args()
FIX = args.fix
TESTME = args.debug
tests = args.tests.split(",")
data_tiers = args.data_tiers.split(",")
print (tests)
#for workflow in [1638,1650]:
#hd = [1630,1631,1632,1650,1638,1633,1596,1597,1598,1599,1600,1601,1602,1604,1606,1608,1609,1581,1582,1584,1594,1586,1587,1588,1595]
#vd = [1583,1590,1591,1593] + list(range(1610,1630))
if args.dataset:
args.min = 0
args.max = 0
for id in range(args.min,args.max+1):
for data_tier in data_tiers:
testquery = ""
if args.runs:
if "parentage" in tests:
testquery = "files from dune:all where core.data_tier='%s' and core.runs[any] in (%d) "%(data_tier,id)
print ("top level query metacat query",testquery)
if "duplicates" in tests:
testquery = "files from dune:all where core.data_tier='%s' and core.runs[any] in (%d) and dune.output_status=confirmed "%(data_tier,id)
elif args.workflows:
if "parentage" in tests:
testquery = "files from dune:all where core.data_tier='%s' and dune.workflow['workflow_id'] in (%d)"%(data_tier,id)
print ("top level query metacat query",testquery)
if "duplicates" in tests:
testquery = "files from dune:all where core.data_tier='%s' and dune.workflow['workflow_id'] in (%d) and dune.output_status=confirmed "%(data_tier,id)
elif args.dataset:
if "duplicates" in tests:
testquery = "files from %s where core.data_tier='%s' and dune.output_status=confirmed "%(args.dataset,data_tier)
else:
print ("need to specify --workflows or --runs or --dataset")
sys.exit(1)
if TESTME:
testquery += " limit 100"
print (testquery)
#parentquery = (parentchecker(testquery))
#print ("parent checker",parentquery)
summary = mc_client.query(query=testquery,summary="count")
#checksummary = mc_client.query(query=parentquery,summary="count")
print ("summary of testquery", summary)#, checksummary)
# if 0 == checksummary["count"]:
# print ("you seem to have parents for all files - quitting")
# sys.exit(0)
now = "%10.0f"%datetime.datetime.now().timestamp()
errname = "%s_%d_%s.txt"%(data_tier,id,now)
if args.dataset:
errname = "%s_%s_%s.txt"%(data_tier,dataset,now)
fixer=MetaFixer(verbose=False,errname=errname,tests=tests, fix=FIX)
thelimit=100
theskip=0
for i in range(0, thelimit):
thelist = fixer.getInput(query=testquery,limit=thelimit,skip=theskip)
if len(thelist) == 0:
print ("got to the end of the list at ",theskip)
break
fixer.explore()
if len(thelist) <= 0:
print ("readed end of list at",theskip)
theskip += thelimit
if TESTME and theskip > 100: break
fixer.cleanup()