Skip to content

Commit 50c36c4

Browse files
authored
Allow user to specify exposures or tiles to resubmit (PR #2450)
Allow user to specify exposures or tiles to resubmit
2 parents 0852688 + 97173ef commit 50c36c4

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

bin/desi_resubmit_queue_failures

+15-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ def parse_args(): # options=None):
2525

2626
parser.add_argument("-n","--night", type=str, default=None,
2727
required=False, help="The night you want processed.")
28+
parser.add_argument("-e","--expids", type=str, default=None,
29+
required=False, help="The exposure ids to resubmit (along "
30+
+ "with the jobs they depend on)."
31+
+ " Should be a comma separated list.")
32+
parser.add_argument("-t","--tileids", type=str, default=None,
33+
required=False, help="The tile ids to resubmit (along "
34+
+ "with the jobs they depend on)."
35+
+ " Should be a comma separated list.")
2836
parser.add_argument("--proc-table-pathname", type=str, required=False, default=None,
2937
help="Directory name where the output processing table should be saved.")
3038
parser.add_argument("--tab-filetype", type=str, required=False, default='csv',
@@ -51,6 +59,11 @@ def parse_args(): # options=None):
5159

5260
args = parser.parse_args()
5361

62+
if args.expids is not None:
63+
args.expids = np.array([eid.strip() for eid in args.expids.split(',')]).astype(int)
64+
if args.tileids is not None:
65+
args.tileids = np.array([tid.strip() for tid in args.tileids.split(',')]).astype(int)
66+
5467
if args.resub_states is not None:
5568
## User should never provide custom list of states and request to remove FAILED
5669
if args.no_resub_failed:
@@ -91,7 +104,8 @@ if __name__ == '__main__':
91104
no_resub_failed=args.no_resub_failed,
92105
ptab_name=ptable_pathname,
93106
dry_run_level=args.dry_run_level,
94-
reservation=args.reservation)
107+
reservation=args.reservation,
108+
expids=args.expids, tileids=args.tileids)
95109

96110
if args.dry_run_level < 3:
97111
write_table(ptable, tablename=ptable_pathname)

py/desispec/workflow/processing.py

+30-2
Original file line numberDiff line numberDiff line change
@@ -1169,7 +1169,8 @@ def all_calibs_submitted(accounted_for, do_cte_flats):
11691169
def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
11701170
resubmission_states=None,
11711171
no_resub_failed=False, ptab_name=None,
1172-
dry_run_level=0, reservation=None):
1172+
dry_run_level=0, reservation=None,
1173+
expids=None, tileids=None):
11731174
"""
11741175
Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
11751176
Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
@@ -1194,6 +1195,8 @@ def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
11941195
4 Doesn't write, submit jobs, or query Slurm.
11951196
5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
11961197
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
1198+
expids: list of ints. The exposure ids to resubmit (along with the jobs they depend on).
1199+
tileids: list of ints. The tile ids to resubmit (along with the jobs they depend on).
11971200
11981201
Returns:
11991202
tuple: A tuple containing:
@@ -1207,6 +1210,17 @@ def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
12071210
This modifies the inputs of both proc_table and submits and returns them.
12081211
"""
12091212
log = get_logger()
1213+
if tileids is not None and expids is not None:
1214+
msg = f"Provided both expids and tilesids. Please only provide one."
1215+
log.critical(msg)
1216+
raise AssertionError(msg)
1217+
elif tileids is not None:
1218+
msg = f"Only resubmitting the following tileids and the jobs they depend on: {tileids=}"
1219+
log.info(msg)
1220+
elif expids is not None:
1221+
msg = f"Only resubmitting the following expids and the jobs they depend on: {expids=}"
1222+
log.info(msg)
1223+
12101224
if resubmission_states is None:
12111225
resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)
12121226

@@ -1219,9 +1233,23 @@ def update_and_recursively_submit(proc_table, submits=0, max_resubs=100,
12191233
log.info(np.array(cols))
12201234
for row in proc_table:
12211235
log.info(np.array(row[cols]))
1236+
1237+
## If expids or tileids are given, subselect to the processing table rows
1238+
## that included those exposures or tiles otherwise just list all indices
1239+
## NOTE: Other rows can still be submitted if the selected rows depend on them
1240+
## we hand the entire table to recursive_submit_failed(), which will walk the
1241+
## entire dependency tree as necessary.
1242+
if expids is not None:
1243+
select_ptab_rows = np.where([np.any(np.isin(prow_eids, expids)) for prow_eids in proc_table['EXPID']])[0]
1244+
elif tileids is not None:
1245+
select_ptab_rows = np.where(np.isin(proc_table['TILEID'], tileids))[0]
1246+
else:
1247+
select_ptab_rows = np.arange(len(proc_table))
1248+
12221249
log.info("\n")
12231250
id_to_row_map = {row['INTID']: rown for rown, row in enumerate(proc_table)}
1224-
for rown in range(len(proc_table)):
1251+
## Loop over all requested rows and resubmit those that have failed
1252+
for rown in select_ptab_rows:
12251253
if proc_table['STATUS'][rown] in resubmission_states:
12261254
proc_table, submits = recursive_submit_failed(rown=rown, proc_table=proc_table,
12271255
submits=submits, max_resubs=max_resubs,

0 commit comments

Comments
 (0)