@@ -12,18 +12,17 @@ from qiita_db.sql_connection import TRN
12
12
from qiita_db .processing_job import ProcessingJob
13
13
import pandas as pd
14
14
from time import sleep
15
- from datetime import timedelta
16
15
from math import ceil
16
+ from io import StringIO
17
+
17
18
18
- QIITA_QUEUE_LOG = '/home/qiita/qiita-queues-logs-DONT-DELETE.log'
19
19
SLEEP_TIME = 6
20
20
CHANCES = 3
21
21
SQL = """SELECT processing_job_id
22
22
FROM qiita.processing_job
23
23
JOIN qiita.processing_job_status
24
24
USING (processing_job_status_id)
25
25
WHERE processing_job_status = %s"""
26
- ARRAY_COMMANDS = set (['Woltka v0.1.1' ])
27
26
28
27
29
28
def _submit_jobs (jids_to_recover , recover_type ):
@@ -42,37 +41,17 @@ def _submit_jobs(jids_to_recover, recover_type):
42
41
43
42
44
43
def _retrieve_queue_jobs ():
45
- lines = check_output (["qstat" , "-f" ]).decode ('ascii' ).split ("\n " )
46
-
47
- # looking for qiita jobs
48
- # i-1: the line before is the job name, which is the internal qiita job id
49
- job_names = [lines [i - 1 ] for i , l in enumerate (lines )
50
- if l .startswith (' Job_Owner = qiita' )]
51
-
52
- qiita_jids = []
53
- for job in job_names :
54
- # this should always be false but rather check
55
- if 'Job_Name' not in job :
56
- continue
57
- # ignoring interactive jobs
58
- if 'STDIN' in job :
59
- continue
60
-
61
- # removing extra info
62
- jid = job [15 :].replace ('merge-' , '' ).replace ('.txt' , '' )
63
- qiita_jids .append (jid )
44
+ # getting all the jobs in the queues
45
+ all_jobs = pd .read_csv (StringIO (
46
+ check_output (['squeue' , '-o' , '%all' ]).decode ('ascii' )), sep = '|' )
64
47
65
- return set (qiita_jids )
48
+ # just keeping the qiita jobs
49
+ jobs = all_jobs [all_jobs .GROUP == 'qiita' ]
66
50
51
+ # ignore the merge-jobs and get unique values
52
+ qiita_jids = jobs .NAME .str .replace ('merge-' , '' ).unique ()
67
53
68
- def _count_jobs_in_scheduler ():
69
- # first let's count all regular jobs
70
- j1 = len (check_output (['qstat' ]).decode ('ascii' ).split ("\n " ))
71
- # now, let's count the jobs in job arrays
72
- lines = check_output (['qstat' , '-f' ]).decode ('ascii' ).split ("\n " )
73
- j2 = sum ([int (x .split (' ' )[- 1 ].split (',' )[- 1 ].split ('-' )[- 1 ])
74
- for x in lines if 'job_array_request' in x ])
75
- return j1 + j2
54
+ return set (qiita_jids )
76
55
77
56
78
57
def _get_jids_to_recover (recover_type ):
@@ -84,57 +63,35 @@ def _get_jids_to_recover(recover_type):
84
63
return jids_to_recover
85
64
86
65
87
- def _parse_queue_values (d ):
88
- max_mem = 0
89
- max_pmem = 0
90
- max_vmem = 0
91
- max_wt = timedelta (hours = 0 , minutes = 0 , seconds = 0 )
92
- d = d .split (',' )
93
- for dd in d :
94
- if dd .startswith ('mem' ):
95
- v = int (dd [4 :- 2 ])
96
- if v > max_mem :
97
- max_mem = v
98
- elif dd .startswith ('pmem' ):
99
- v = int (dd [5 :- 2 ])
100
- if v > max_pmem :
101
- max_pmem = v
102
- elif dd .startswith ('vmem' ):
103
- v = int (dd [5 :- 2 ])
104
- if v > max_mem :
105
- max_mem = v
106
- elif dd .startswith ('walltime' ):
107
- v = map (int , dd [9 :].split (':' ))
108
- v = timedelta (hours = v [0 ], minutes = v [1 ], seconds = v [2 ])
109
- if v > max_wt :
110
- max_wt = v
111
- return max_mem , max_pmem , max_vmem , max_wt
112
-
113
-
114
66
def _qiita_queue_log_parse (jids_to_recover ):
115
- df = pd .read_csv (QIITA_QUEUE_LOG , sep = '\t ' ,
116
- index_col = None , header = None , dtype = str , names = [
117
- 'bjid' , 'user' , 'group' , 'jid' , 'session' ,
118
- 'resource-list' , 'resource-used' , 'queue' , 'account' ,
119
- 'exit-code' , 'node' ])
120
- # remove the register and empty fields to avoid errors
121
- df = df [(df .bjid != '0' ) &
122
- (~ df .bjid .isnull ()) &
123
- (~ df .user .isnull ()) &
124
- (df .jid != 'register.txt' )]
125
- # generate the qiita job id
126
- df ['qjid' ] = df .jid .apply (lambda x : x .split ('.' )[0 ])
127
-
128
67
results = []
129
- for jid , ddf in df .groupby ('qjid' ):
130
- if jid in jids_to_recover :
131
- vals = []
132
- for _ , r in ddf .iterrows ():
133
- vals .append ({
134
- 'exit-code' : r ['exit-code' ],
135
- 'resource-list' : _parse_queue_values (r ['resource-list' ]),
136
- 'resource-used' : _parse_queue_values (r ['resource-used' ])})
137
- results .append ((ProcessingJob (jid ), vals ))
68
+ for jid in jids_to_recover :
69
+ job = ProcessingJob (jid )
70
+ if job .external_id :
71
+ bvals = pd .read_csv (StringIO (check_output ([
72
+ 'sacct' , '-p' ,
73
+ '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw' ,
74
+ '-j' , f'{ job .external_id } .batch' ]).decode (
75
+ 'ascii' )), sep = '|' ).iloc [0 ].to_dict ()
76
+ vals = pd .read_csv (StringIO (check_output ([
77
+ 'sacct' , '-p' ,
78
+ '--format=ExitCode,ReqMem,MaxRSS,CPUTimeRAW,TimelimitRaw' ,
79
+ '-j' , f'{ job .external_id } ' ]).decode (
80
+ 'ascii' )), sep = '|' ).iloc [0 ].to_dict ()
81
+ data = {
82
+ 'exit-code' : bvals ['ExitCode' ],
83
+ 'mem-requested' : bvals ['ReqMem' ],
84
+ 'time-requested' : vals ['TimelimitRaw' ],
85
+ 'mem-used' : bvals ['MaxRSS' ],
86
+ 'time-used' : bvals ['CPUTimeRAW' ]}
87
+ else :
88
+ data = {
89
+ 'exit-code' : None ,
90
+ 'mem-requested' : None ,
91
+ 'time-requested' : None ,
92
+ 'mem-used' : None ,
93
+ 'time-used' : None }
94
+ results .append (job , data )
138
95
139
96
return results
140
97
0 commit comments