diff --git a/flaskapi/flask_workflows.py b/flaskapi/flask_workflows.py index c00b9af9..5fc164ea 100644 --- a/flaskapi/flask_workflows.py +++ b/flaskapi/flask_workflows.py @@ -480,9 +480,17 @@ def _timeit(fun: Callable, *args, **kwargs): # test_job_retrieval_paginated(function_uid="eea21c0d-6c2b-4cf4-91d1-116e6550cb22") +def _get_job_status(job: Dict[str, Any]) -> str: + status = job["status"] + if isinstance(status, dict) and "status" in status: + return status["status"] + elif isinstance(status, str): + return status + else: + raise ValueError(f"Unknown status format: {status}") def _check_jobs(jobs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - completed_jobs = [job for job in jobs if job["status"].lower() == "completed" or job["status"].lower() == "success"] # type: ignore + completed_jobs = [job for job in jobs if _get_job_status(job).lower() == "completed" or _get_job_status(job).lower() == "success"] # type: ignore for job in completed_jobs: assert "outputs" in job, f"No outputs key found for completed job: {job} with status: {job['status']}" # type: ignore @@ -965,11 +973,16 @@ def flask_test_job(): ), f"Job is None for function {function_uid} with sample {sample}. Response: {response}" uid = response.actual_instance.uid _logger.debug(f"Job UID: {uid}") - while ( - "JOB_TASK_" in (job := _get_function_job_from_uid(uid))["status"] - and not "FAILURE" in job - ): - time.sleep(1) + while True: + job = _get_function_job_from_uid(uid) + job_status = _get_job_status(job) + _logger.debug(f"Job status: {job_status}") + if "FAILURE" in job_status: + raise RuntimeError(f"Job {uid} failed with status: {job_status}") + elif not "JOB_TASK_" in job_status: + break ## exit the loop if the job has been initialized + else: + time.sleep(1) _logger.debug(f"Created job: {job}") return jsonify(job) # return the job details as a dictionary except Exception as e: diff --git a/node/src/components/data/JobRow.tsx b/node/src/components/data/JobRow.tsx index 681ad4a2..a9346597 100644 --- a/node/src/components/data/JobRow.tsx +++ b/node/src/components/data/JobRow.tsx @@ -5,7 +5,7 @@ import Typography from "@mui/material/Typography"; import { useState } from "react"; import { toast } from "react-toastify"; import { Function as OsparcFunction } from "../../osparc-api-ts-client"; -import { createJobStudyCopy, openStudyUid } from "../../utils/function_utils"; +import { createJobStudyCopy, extractJobStatus, isFinalStatus, openStudyUid } from "../../utils/function_utils"; import CustomTooltip from "../utils/CustomTooltip"; import { useJobContext } from "../../context/JobContext"; @@ -19,7 +19,7 @@ interface JobRowProps { function JobRow(props: JobRowProps) { const { jobUid, jobList, setSelected, selectedFunction } = props; const [creatingJobCopy, setCreatingJobCopy] = useState(false); - const { parseStatus } = useJobContext(); + const { getOutputsForTable } = useJobContext(); const job = jobList.find(j => j.job.uid === jobUid); const handleSetJob = (selected: boolean) => { @@ -93,10 +93,8 @@ function JobRow(props: JobRowProps) { ); } - - const jobStatus = job.job.status; - const outputs = parseStatus(jobStatus, job.job.outputs); - + const jobStatus = extractJobStatus(job); + const outputs = getOutputsForTable(job); const inputs = Object.entries(job.job.inputs).map(([key, value]) => ( {key} : {(value as number).toPrecision(3)} @@ -161,8 +159,7 @@ function JobRow(props: JobRowProps) { variant="outlined" size="small" disabled={ - creatingJobCopy || - (!jobStatus.includes("SUCCESS") && !(jobStatus.includes("FAILED") || jobStatus.includes("FAILURE"))) + creatingJobCopy || ! isFinalStatus(jobStatus) } onClick={async () => { setCreatingJobCopy(true); diff --git a/node/src/components/data/JobSelector.tsx b/node/src/components/data/JobSelector.tsx index d780b94a..a9b8c29b 100644 --- a/node/src/components/data/JobSelector.tsx +++ b/node/src/components/data/JobSelector.tsx @@ -22,7 +22,14 @@ import { useFunctionContext } from "../../context/FunctionContext"; import { useJobContext } from "../../context/JobContext"; import { useMMUXContext } from "../../context/MMUXContext"; import { useSamplingContext } from "../../context/SamplingContext"; -import { getJobCollectionStatus } from "../../utils/function_utils"; +import { FunctionJob } from "../../osparc-api-ts-client"; +import { + getFunctionJobCollections, + getFunctionJobsFromFunctionJobCollection, + getJobCollectionStatus, + isFinalStatus, + extractJobStatus, +} from "../../utils/function_utils"; import getMinMax from "../minmax"; import CustomTooltip from "../utils/CustomTooltip"; import JobRow from "./JobRow"; @@ -63,7 +70,7 @@ export default function JobsSelector(props: JobSelectorPropsType) { const auxJob = jc; if (jc.jobCollection.uid === uid) { auxJob.subJobs = auxJob.subJobs.map(j => ({ - selected: selected === true ? j.job.status === "SUCCESS" : false, + selected: selected === true ? extractJobStatus(j) === "SUCCESS" : false, job: j.job, })); auxJob.selected = selected === true ? auxJob.subJobs.some(j => j.selected === true) : false; @@ -156,7 +163,7 @@ export default function JobsSelector(props: JobSelectorPropsType) { const newJobCollections: SelectedJobCollection[] = jobCollections.map(jc => { const auxJob = jc; auxJob.subJobs = jc.subJobs.map(subJob => ({ - selected: checked === true ? subJob.job.status === "SUCCESS" : false, + selected: checked === true ? extractJobStatus(subJob) === "SUCCESS" : false, job: subJob.job, })); const auxJobState = auxJob.subJobs.map(j => j.selected); @@ -170,22 +177,6 @@ export default function JobsSelector(props: JobSelectorPropsType) { [jobCollections, updateJobContext], ); - // const autoSelectJobs = useCallback(() => { - // const newJobCollections: SelectedJobCollection[] = jobCollections.map(jc => { - // const auxJob = jc; - // auxJob.subJobs = jc.subJobs.map(subJob => ({ - // selected: subJob.job.status === "SUCCESS", - // job: subJob.job, - // })); - // const auxJobState = auxJob.subJobs.map(j => j.selected); - // auxJob.selected = !auxJobState.every(j => j === false); - // return auxJob; - // }); - - // setJobCollections(newJobCollections); - // updateJobContext(newJobCollections); - // }, [jobCollections, updateJobContext]); - const handleJobsUpdate = useCallback(async () => { await requestForceFetch(selectedFunction?.uid as string, setJobProgress); console.info("Updated JobCollections"); @@ -282,7 +273,7 @@ export default function JobsSelector(props: JobSelectorPropsType) { indeterminate={ jobCollections.some(jc => jc.selected === true) && !jobCollections.every( - jc => jc.subJobs.map(j => j.job).filter(j => j.status === "SUCCESS" && j.selected === true).length > 0, + jc => jc.subJobs.map(j => j.job).filter(j => extractJobStatus(j) === "SUCCESS" && j.selected === true).length > 0, ) } onChange={event => onToggleAll(event.target.checked)} @@ -295,7 +286,7 @@ export default function JobsSelector(props: JobSelectorPropsType) { checked={params.row.selected} indeterminate={params.row.subJobs.some(j => j.selected) && !params.row.subJobs.every(j => j.selected)} onChange={event => selectMainJob(params.row.jobCollection.uid, event.target.checked)} - disabled={params.row.subJobs.every((j: SubJob) => j.job.status !== "SUCCESS")} + disabled={params.row.subJobs.every((j: SubJob) => extractJobStatus(j) !== "SUCCESS")} inputProps={{ "aria-label": "Select job collection" }} sx={theme => ({ "& .MuiSvgIcon-root": { color: `${theme.palette.primary.main} !important` } })} /> diff --git a/node/src/context/JobContext.tsx b/node/src/context/JobContext.tsx index cd4bd5d0..a68db15c 100644 --- a/node/src/context/JobContext.tsx +++ b/node/src/context/JobContext.tsx @@ -7,7 +7,10 @@ import { PersistenceType } from "./types"; import { getFunctionJobCollections, getFunctionJobsFromFunctionJobCollection, - filterForFinalStatus, + isFinalStatus, + extractJobStatus, + extractJobOutputs, + AllowedJobStatus, } from "../utils/function_utils"; export interface JobContextType { @@ -20,7 +23,7 @@ export interface JobContextType { allJobsList: () => FunctionJob[]; filteredJobList: FunctionJob[]; requestForceFetch: (functionUID: string, progress: (progress: number) => void) => void; - parseStatus: (jobStatus: string, outputArray: Record) => string | JSX.Element[]; + getOutputsForTable: (job: FunctionJob | SubJob) => string | JSX.Element[]; } export const JobContext = createContext(undefined); @@ -37,23 +40,11 @@ export function JobContextProvider({ children }: Props) { const [fetchedJobCollections, setFetchedJobCollections] = useState(undefined); const [runningJobCollection, setRunningJobCollection] = useState(undefined); - // Filter out job status that are not strings - const jobStatusFilter = (status: unknown) => { - if (typeof status === "string") { - return status; - } - if (typeof status === "object" && status !== null) { - if ("status" in status && typeof status.status === "string") { - return status.status; - } - } - console.log("job status is UNKNOWN", status); - return "UNKNOWN"; - }; - - const parseStatus = (jobStatusUnk: unknown, outputArray: Record): string | JSX.Element[] => { - const jobStatus = jobStatusFilter(jobStatusUnk); - let outputs; + const getOutputsForTable = (job: FunctionJob | SubJob): string | JSX.Element[] => { + const jobStatus: AllowedJobStatus = extractJobStatus(job); + const outputArray: Record = extractJobOutputs(job); + + let outputs: string | JSX.Element[]; if (jobStatus === "SUCCESS") { outputs = Object.entries(outputArray).map(([key, value]) => ( @@ -61,21 +52,18 @@ export function JobContextProvider({ children }: Props) { {", "} )); - } else if (jobStatus === "STARTED") { + } else if (jobStatus === "RUNNING") { outputs = [ Running... , ]; - } else if (["FAILED", "ABORTED"].includes(jobStatus) || (jobStatus.startsWith("JOB_") && jobStatus.endsWith("_FAILURE"))) { + } else if (jobStatus === "FAILED") { outputs = "Failed - no outputs"; - } else if ( - ["PENDING", "WAITING_FOR_CLUSTER", "PUBLISHED", "NOT_STARTED", "WAITING_FOR_RESOURCES"].includes(jobStatus) || - (jobStatus.startsWith("JOB_") && !jobStatus.endsWith("_FAILURE")) - ) { + } else if (jobStatus === "PENDING") { outputs = "Pending to run"; } else if (jobStatus === "UNKNOWN") { - outputs = "Please try again later"; + outputs = "Unknown status, please try again later"; } else { outputs = "Unknown status, please contact support"; } @@ -98,12 +86,12 @@ export function JobContextProvider({ children }: Props) { const fetchedJCMap = new Map(fetchedJobCollections && fetchedJobCollections.map(fjc => [fjc.jobCollection.uid, fjc])); const equalJC: boolean[] = jobsC.map(jc => { const fetchedJC = fetchedJCMap.get(jc.uid); - const statusList = fetchedJC ? fetchedJC.subJobs.map(j => jobStatusFilter(j.job.status)) : []; + const statusList = fetchedJC ? fetchedJC.subJobs.map(j => extractJobStatus(j)) : []; return ( fetchedJC !== undefined && fetchedJC.subJobs.map(j => j.job.uid).every(jcUID => jc.jobIds.includes(jcUID)) && fetchedJC.subJobs.length === jc.jobIds.length && - statusList.every(s => filterForFinalStatus(s)) + statusList.every(s => isFinalStatus(s)) ); }); @@ -127,13 +115,13 @@ export function JobContextProvider({ children }: Props) { const subJobs = []; for (let subJobIdx = 0; subJobIdx < functionJobs.length; subJobIdx += 1) { const job: FunctionJob = functionJobs[subJobIdx]; - job.status = jobStatusFilter(job.status); + const jobStatus = extractJobStatus(job); jobsFetched += 1; const jobsProg = (jobsFetched / totalSubs) * 100; progress(jobsProg); const existingSelected = oldSubJobs.find(sj => sj.job.uid === job.uid)?.selected; subJobs.push({ - selected: existingSelected !== undefined ? existingSelected : job.status === "SUCCESS", + selected: existingSelected !== undefined ? existingSelected : jobStatus === "SUCCESS", job, }); } @@ -227,7 +215,7 @@ export function JobContextProvider({ children }: Props) { allJobsList, filteredJobList, requestForceFetch, - parseStatus, + getOutputsForTable, }), [ runningJobCollection, @@ -239,7 +227,7 @@ export function JobContextProvider({ children }: Props) { allJobsList, filteredJobList, requestForceFetch, - parseStatus, + getOutputsForTable, ], ); return {children}; diff --git a/node/src/utils/function_utils.ts b/node/src/utils/function_utils.ts index 7153ee00..e2bdd0c2 100644 --- a/node/src/utils/function_utils.ts +++ b/node/src/utils/function_utils.ts @@ -167,18 +167,123 @@ export type JobStatusCounts = { unknown: number; }; +export type AllowedJobStatus = "SUCCESS" | "FAILED" | "RUNNING" | "PENDING" | "UNKNOWN"; + +export function isSubJob(job: FunctionJob | SubJob): job is SubJob { + if (!job) { + throw new Error("Job is undefined"); + } + + if (typeof job === "object") { + return (job as SubJob).selected !== undefined && (job as SubJob).job !== undefined; + } + else { + return false; + } +} + +export function isFunctionJob(job: FunctionJob | SubJob): job is FunctionJob { + if (!job) { + throw new Error("Job is undefined"); + } + + if (typeof job === "object") { + return (job as FunctionJob).inputs !== undefined && (job as FunctionJob).functionUid !== undefined && (job as FunctionJob).status !== undefined; + } + else { + return false; + } +} + +function classifyJobStatus(jobStatus: string): AllowedJobStatus { + // This function helps homogenize job status into four categories + unknown, + // centralizing all corresponding logic + if (!jobStatus) { + throw new Error("JobStatus is undefined!") + } + + if (jobStatus === "SUCCESS") { + return "SUCCESS"; + } + else if (jobStatus.endsWith("FAILED") || jobStatus.endsWith("FAILURE")) { + return "FAILED"; + } + else if (jobStatus === "STARTED" || jobStatus === "RUNNING") { + return "RUNNING"; + } + else if (jobStatus === "PENDING" || jobStatus.startsWith("JOB_") || jobStatus.startsWith("WAITING_") || jobStatus === "PUBLISHED") { + return "PENDING"; + } + else { + console.warn("Could not classify JobStatus", jobStatus) + return "UNKNOWN"; + } +} + +export function extractJobStatus(job: FunctionJob | SubJob): AllowedJobStatus { + // This function extracts the job status from either a FunctionJob or a SubJob + // allowing for status to be either a string or an object with a status field + // and classifies it into one of the AllowedJobStatus categories + if (isFunctionJob(job)) { + if (typeof job.status === "string") { + return classifyJobStatus(job.status); + } + else if (job.status && typeof job.status === "object" && "status" in job.status && typeof job.status.status === "string") { + return classifyJobStatus((job.status as { status: string }).status); + } + else { + console.log(`job status ${job.status} could not be extracted, classifying as UNKNOWN.`); + return "UNKNOWN"; + } + } + // If it's a SubJob, recurse to extract from the inner job + else if (isSubJob(job)) { + return extractJobStatus(job.job); + } + else { + throw new Error("Job passed to extractJobStatus is neither FunctionJob nor SubJob!"); + } + } + +export function extractJobOutputs(job: FunctionJob | SubJob): Record { + // This function extracts the job outputs from either a FunctionJob or a SubJob + // allowing for outputs to be either a Record or an object with an outputs field + if (isFunctionJob(job)) { + if (job.outputs && typeof job.outputs === "object") { + return job.outputs as Record; + } + else if (job.outputs && typeof job.outputs === "object" && "outputs" in job.outputs && typeof job.outputs.outputs === "object") { + return job.outputs.outputs as Record; + } + else { + console.log(`job outputs ${job.outputs} could not be extracted, returning empty object.`); + return {}; + } + } + // If it's a SubJob, recurse to extract from the inner job + else if (isSubJob(job)) { + return extractJobOutputs(job.job); + } + else { + throw new Error("Job passed to extractJobOutputs is neither FunctionJob nor SubJob!"); + } +} + export function getJobStatusCounts(subJobs: SubJob[]): JobStatusCounts { return subJobs .filter(j => j.job) - .map(j => (typeof j.job.status === "string" ? j.job.status : (j.job.status as unknown as { status: string }).status)) + .map(j => extractJobStatus(j.job)) .reduce( - (acc, status: string) => { + (acc, status: AllowedJobStatus) => { if (status === "SUCCESS") acc.success += 1; - else if (status.endsWith("FAILED") || status.endsWith("FAILURE")) acc.failed += 1; - else if (status === "STARTED" || status === "RUNNING") acc.running += 1; - else if (status === "PENDING" || status.startsWith("JOB_") || status.startsWith("WAITING_") || status === "PUBLISHED") - acc.pending += 1; - else acc.unknown += 1; + else if (status === "FAILED") acc.failed += 1; + else if (status === "RUNNING") acc.running += 1; + else if (status === "PENDING") acc.pending += 1; + else if (status === "UNKNOWN") acc.unknown += 1; + else { + console.warn("status should have been classified into one of the AllowedJobStatus!"); + console.warn("status: ", status); + }; return acc; }, { success: 0, failed: 0, running: 0, pending: 0, unknown: 0 }, @@ -188,10 +293,6 @@ export function getJobStatusCounts(subJobs: SubJob[]): JobStatusCounts { export function getJobCollectionStatus(subJobs: SubJob[]) { if (!subJobs || subJobs.length === 0) return "NO JOBS"; const jobStatusCounts = getJobStatusCounts(subJobs); - if (jobStatusCounts.unknown > 0) { - // toast.warn("Could not classify some job statuses - please revise console logs") - console.warn("SubJobs that gave UNKNOWN status: ", subJobs); - } const allSuccess = jobStatusCounts.success === subJobs.length; const anySuccess = jobStatusCounts.success > 0; const anyRunning = jobStatusCounts.running > 0; @@ -203,9 +304,9 @@ export function getJobCollectionStatus(subJobs: SubJob[]) { if (anyRunning) return "RUNNING"; if (anyPending) return "PENDING"; if (anyFailed && anySuccess) return "FAILED PARTIALLY"; - return "UNKNOWN"; + else return "UNKNOWN"; } -export function filterForFinalStatus(status: string) { - return status === "FAILED" || status === "SUCCESS" || status.includes("FAILURE"); +export function isFinalStatus(status: string) { + return status === "FAILED" || status === "SUCCESS"; } diff --git a/node/src/utils/utils.test.ts b/node/src/utils/utils.test.ts index b5669787..3008a68e 100644 --- a/node/src/utils/utils.test.ts +++ b/node/src/utils/utils.test.ts @@ -195,7 +195,7 @@ describe("stepValidator", () => { requestForceFetch: (): void => { throw new Error("Function not implemented."); }, - parseStatus: (_jobStatus: string, _outputArray: Record): string | JSX.Element[] => { + getOutputsForTable: (_job: FunctionJob | SubJob): string | JSX.Element[] => { throw new Error("Function not implemented."); }, };