Skip to content

Commit 392d7ba

Browse files
authored
Merge pull request #150 from kbase/dev-service
Save JAWS run ID in mongo after submit
2 parents d68763a + 4fea817 commit 392d7ba

File tree

4 files changed

+52
-9
lines changed

4 files changed

+52
-9
lines changed

cdmtaskservice/jobflows/nersc_jaws.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,11 @@ async def start_job(self, job: models.Job, objmeta: list[S3ObjectMeta]):
9898
# remote cluster and have job_state handle choosing the correct mongo method & params
9999
# to run
100100
await self._mongo.add_NERSC_download_task_id(
101-
# TODO TEST will need a way to mock out timestamps
102101
job.id,
103102
task_id,
104103
models.JobState.CREATED,
105104
models.JobState.DOWNLOAD_SUBMITTED,
105+
# TODO TEST will need a way to mock out timestamps
106106
timestamp.utcdatetime(),
107107
)
108108
except Exception as e:
@@ -137,8 +137,16 @@ async def _submit_jaws_job(self, job: models.AdminJobDetails):
137137
try:
138138
# TODO PERF configure file download concurrency
139139
jaws_job_id = await self._nman.run_JAWS(job)
140-
# TODO JAWS record job ID in DB
141-
logr.info(f"JAWS job id: {jaws_job_id}")
140+
# See notes above about adding the NERSC task id to the job
141+
await self._mongo.add_JAWS_run_id(
142+
job.id,
143+
jaws_job_id,
144+
models.JobState.JOB_SUBMITTING,
145+
models.JobState.JOB_SUBMITTED,
146+
# TODO TEST will need a way to mock out timestamps
147+
timestamp.utcdatetime(),
148+
)
149+
# TDOO JOBRUN start polling JAWS to wait for job completion
142150
except Exception as e:
143151
# TODO LOGGING figure out how logging it going to work etc.
144152
logr.exception(f"Error starting JAWS job for job {job.id}")

cdmtaskservice/models.py

+11
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
FLD_JOB_STATE = "state"
4343
FLD_JOB_NERSC_DETAILS = "nersc_details"
4444
FLD_NERSC_DETAILS_DL_TASK_ID = "download_task_id"
45+
FLD_JOB_JAWS_DETAILS = "jaws_details"
46+
FLD_JAWS_DETAILS_RUN_ID = "run_id"
4547

4648

4749
# https://en.wikipedia.org/wiki/Filename#Comparison_of_filename_limitations
@@ -699,9 +701,18 @@ class NERSCDetails(BaseModel):
699701
+ "completion in the SFAPI. Multiple tasks indicate job retries after failures.")]
700702

701703

704+
class JAWSDetails(BaseModel):
705+
"""
706+
Details about a JAWS job run.
707+
"""
708+
run_id: Annotated[list[str], Field(
709+
description="Run IDs for a JGI JAWS job. Multiple run IDs indicate job retries after "
710+
+ "failures.")]
711+
702712
class AdminJobDetails(Job):
703713
"""
704714
Information about a job with added details of interest to service administrators.
705715
"""
706716
nersc_details: NERSCDetails | None = None
707717
# TODO NERSC more details, logs, etc.
718+
jaws_details: JAWSDetails | None = None

cdmtaskservice/mongo.py

+25-5
Original file line numberDiff line numberDiff line change
@@ -195,19 +195,39 @@ async def add_NERSC_download_task_id(
195195
"""
196196
Add a download task_id to the NERSC section of a job and update the state.
197197
198-
job_id - the job ID.
198+
Arguments are as update_job_state except for the addition of:
199+
199200
task_id - the NERSC task ID.
200-
current_state - the expected current state of the job. If the job is not in this state
201-
an error is thrown.
202-
state - the new state for the job.
203-
time - the time at which the job transitioned to the new state.
204201
"""
205202
# may need to make this more generic where the cluster is passed in and mapped to
206203
# a job structure location or something if we support more than NERSC
207204
await self._update_job_state(job_id, current_state, state, time, push={
208205
self._FLD_NERSC_DL_TASK: _require_string(task_id, "task_id")
209206
})
210207

208+
_FLD_JAWS_RUN_ID = f"{models.FLD_JOB_JAWS_DETAILS}.{models.FLD_JAWS_DETAILS_RUN_ID}"
209+
210+
async def add_JAWS_run_id(
211+
self,
212+
job_id: str,
213+
run_id: str,
214+
current_state: models.JobState,
215+
state: models.JobState,
216+
time: datetime.datetime
217+
):
218+
"""
219+
Add a run ID to the JAWS section of a job and update the state.
220+
221+
Arguments are as update_job_state except for the addition of:
222+
223+
run_id - the JAWS run ID.
224+
"""
225+
# may need to make this more generic where the cluster is passed in and mapped to
226+
# a job structure location or something if we support more than NERSC
227+
await self._update_job_state(job_id, current_state, state, time, push={
228+
self._FLD_JAWS_RUN_ID: _require_string(run_id, "run_id")
229+
})
230+
211231

212232
class NoSuchImageError(Exception):
213233
""" The image does not exist in the system. """

cdmtaskservice/nersc/manager.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,10 @@ async def run_JAWS(self, job: models.Job, file_download_concurrency: int = 10) -
408408
perl = await cli.compute(Machine.perlmutter)
409409
pre = self._perlmutter_scratch / _CTS_SCRATCH_ROOT_DIR / job.id
410410
try:
411+
# TODO PERF this copies all the files to the jaws staging area, and so could take
412+
# a long time. Hardlink them in first to avoid the copy. Also look into
413+
# caching between jobs, so multiple jobs on the same file don't DL it over
414+
# and over
411415
res = await perl.run(_JAWS_COMMAND_TEMPLATE.format(
412416
job_id=job.id,
413417
wdlpath=pre / _JAWS_INPUT_WDL,
@@ -431,7 +435,7 @@ async def run_JAWS(self, job: models.Job, file_download_concurrency: int = 10) -
431435
logging.getLogger(__name__).info(
432436
f"Submitted JAWS job with run id {run_id} for job {job.id}"
433437
)
434-
return run_id
438+
return str(run_id)
435439
except json.JSONDecodeError as e:
436440
raise ValueError(f"JAWS returned invalid JSON: {e}\n{res}") from e
437441

0 commit comments

Comments
 (0)