Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions flaskapi/flask_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,17 @@ def _timeit(fun: Callable, *args, **kwargs):

# test_job_retrieval_paginated(function_uid="eea21c0d-6c2b-4cf4-91d1-116e6550cb22")

def _get_job_status(job: Dict[str, Any]) -> str:
status = job["status"]
if isinstance(status, dict) and "status" in status:
return status["status"]
elif isinstance(status, str):
return status
else:
raise ValueError(f"Unknown status format: {status}")

def _check_jobs(jobs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
completed_jobs = [job for job in jobs if job["status"].lower() == "completed" or job["status"].lower() == "success"] # type: ignore
completed_jobs = [job for job in jobs if _get_job_status(job).lower() == "completed" or _get_job_status(job).lower() == "success"] # type: ignore

for job in completed_jobs:
assert "outputs" in job, f"No outputs key found for completed job: {job} with status: {job['status']}" # type: ignore
Expand Down Expand Up @@ -965,11 +973,16 @@ def flask_test_job():
), f"Job is None for function {function_uid} with sample {sample}. Response: {response}"
uid = response.actual_instance.uid
_logger.debug(f"Job UID: {uid}")
while (
"JOB_TASK_" in (job := _get_function_job_from_uid(uid))["status"]
and not "FAILURE" in job
):
time.sleep(1)
while True:
job = _get_function_job_from_uid(uid)
job_status = _get_job_status(job)
_logger.debug(f"Job status: {job_status}")
if "FAILURE" in job_status:
raise RuntimeError(f"Job {uid} failed with status: {job_status}")
elif not "JOB_TASK_" in job_status:
break ## exit the loop if the job has been initialized
else:
time.sleep(1)
_logger.debug(f"Created job: {job}")
return jsonify(job) # return the job details as a dictionary
except Exception as e:
Expand Down
13 changes: 5 additions & 8 deletions node/src/components/data/JobRow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Typography from "@mui/material/Typography";
import { useState } from "react";
import { toast } from "react-toastify";
import { Function as OsparcFunction } from "../../osparc-api-ts-client";
import { createJobStudyCopy, openStudyUid } from "../../utils/function_utils";
import { createJobStudyCopy, extractJobStatus, isFinalStatus, openStudyUid } from "../../utils/function_utils";
import CustomTooltip from "../utils/CustomTooltip";
import { useJobContext } from "../../context/JobContext";

Expand All @@ -19,7 +19,7 @@ interface JobRowProps {
function JobRow(props: JobRowProps) {
const { jobUid, jobList, setSelected, selectedFunction } = props;
const [creatingJobCopy, setCreatingJobCopy] = useState(false);
const { parseStatus } = useJobContext();
const { getOutputsForTable } = useJobContext();
const job = jobList.find(j => j.job.uid === jobUid);

const handleSetJob = (selected: boolean) => {
Expand Down Expand Up @@ -93,10 +93,8 @@ function JobRow(props: JobRowProps) {
</TableRow>
);
}

const jobStatus = job.job.status;
const outputs = parseStatus(jobStatus, job.job.outputs);

const jobStatus = extractJobStatus(job);
const outputs = getOutputsForTable(job);
const inputs = Object.entries(job.job.inputs).map(([key, value]) => (
<Box key={`job-row-input-${key}`} display="inline">
{key} : {(value as number).toPrecision(3)}
Expand Down Expand Up @@ -161,8 +159,7 @@ function JobRow(props: JobRowProps) {
variant="outlined"
size="small"
disabled={
creatingJobCopy ||
(!jobStatus.includes("SUCCESS") && !(jobStatus.includes("FAILED") || jobStatus.includes("FAILURE")))
creatingJobCopy || ! isFinalStatus(jobStatus)
}
onClick={async () => {
setCreatingJobCopy(true);
Expand Down
33 changes: 12 additions & 21 deletions node/src/components/data/JobSelector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ import { useFunctionContext } from "../../context/FunctionContext";
import { useJobContext } from "../../context/JobContext";
import { useMMUXContext } from "../../context/MMUXContext";
import { useSamplingContext } from "../../context/SamplingContext";
import { getJobCollectionStatus } from "../../utils/function_utils";
import { FunctionJob } from "../../osparc-api-ts-client";
import {
getFunctionJobCollections,
getFunctionJobsFromFunctionJobCollection,
getJobCollectionStatus,
isFinalStatus,
extractJobStatus,
} from "../../utils/function_utils";
import getMinMax from "../minmax";
import CustomTooltip from "../utils/CustomTooltip";
import JobRow from "./JobRow";
Expand Down Expand Up @@ -63,7 +70,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
const auxJob = jc;
if (jc.jobCollection.uid === uid) {
auxJob.subJobs = auxJob.subJobs.map(j => ({
selected: selected === true ? j.job.status === "SUCCESS" : false,
selected: selected === true ? extractJobStatus(j) === "SUCCESS" : false,
job: j.job,
}));
auxJob.selected = selected === true ? auxJob.subJobs.some(j => j.selected === true) : false;
Expand Down Expand Up @@ -156,7 +163,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
const newJobCollections: SelectedJobCollection[] = jobCollections.map(jc => {
const auxJob = jc;
auxJob.subJobs = jc.subJobs.map(subJob => ({
selected: checked === true ? subJob.job.status === "SUCCESS" : false,
selected: checked === true ? extractJobStatus(subJob) === "SUCCESS" : false,
job: subJob.job,
}));
const auxJobState = auxJob.subJobs.map(j => j.selected);
Expand All @@ -170,22 +177,6 @@ export default function JobsSelector(props: JobSelectorPropsType) {
[jobCollections, updateJobContext],
);

// const autoSelectJobs = useCallback(() => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autoSelectJobs functions should not be removed but implemented, it was disabled as the feature which should automatically select the in-range jobs is not present, but due the fact that after every selection / deselection of a job, we refresh the moga, we should re-activate this and do it properly to save time for the user

// const newJobCollections: SelectedJobCollection[] = jobCollections.map(jc => {
// const auxJob = jc;
// auxJob.subJobs = jc.subJobs.map(subJob => ({
// selected: subJob.job.status === "SUCCESS",
// job: subJob.job,
// }));
// const auxJobState = auxJob.subJobs.map(j => j.selected);
// auxJob.selected = !auxJobState.every(j => j === false);
// return auxJob;
// });

// setJobCollections(newJobCollections);
// updateJobContext(newJobCollections);
// }, [jobCollections, updateJobContext]);

const handleJobsUpdate = useCallback(async () => {
await requestForceFetch(selectedFunction?.uid as string, setJobProgress);
console.info("Updated JobCollections");
Expand Down Expand Up @@ -282,7 +273,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
indeterminate={
jobCollections.some(jc => jc.selected === true) &&
!jobCollections.every(
jc => jc.subJobs.map(j => j.job).filter(j => j.status === "SUCCESS" && j.selected === true).length > 0,
jc => jc.subJobs.map(j => j.job).filter(j => extractJobStatus(j) === "SUCCESS" && j.selected === true).length > 0,
)
}
onChange={event => onToggleAll(event.target.checked)}
Expand All @@ -295,7 +286,7 @@ export default function JobsSelector(props: JobSelectorPropsType) {
checked={params.row.selected}
indeterminate={params.row.subJobs.some(j => j.selected) && !params.row.subJobs.every(j => j.selected)}
onChange={event => selectMainJob(params.row.jobCollection.uid, event.target.checked)}
disabled={params.row.subJobs.every((j: SubJob) => j.job.status !== "SUCCESS")}
disabled={params.row.subJobs.every((j: SubJob) => extractJobStatus(j) !== "SUCCESS")}
inputProps={{ "aria-label": "Select job collection" }}
sx={theme => ({ "& .MuiSvgIcon-root": { color: `${theme.palette.primary.main} !important` } })}
/>
Expand Down
53 changes: 21 additions & 32 deletions node/src/context/JobContext.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import { PersistenceType } from "./types";
import {
getFunctionJobCollections,
getFunctionJobsFromFunctionJobCollection,
filterForFinalStatus,
isFinalStatus,
extractJobStatus,
extractJobOutputs,
AllowedJobStatus,
} from "../utils/function_utils";

export interface JobContextType {
Expand All @@ -20,7 +23,7 @@ export interface JobContextType {
allJobsList: () => FunctionJob[];
filteredJobList: FunctionJob[];
requestForceFetch: (functionUID: string, progress: (progress: number) => void) => void;
parseStatus: (jobStatus: string, outputArray: Record<string, unknown>) => string | JSX.Element[];
getOutputsForTable: (job: FunctionJob | SubJob) => string | JSX.Element[];
}

export const JobContext = createContext<JobContextType | undefined>(undefined);
Expand All @@ -37,45 +40,31 @@ export function JobContextProvider({ children }: Props) {
const [fetchedJobCollections, setFetchedJobCollections] = useState<SelectedJobCollection[] | undefined>(undefined);
const [runningJobCollection, setRunningJobCollection] = useState<RegisteredFunctionJobCollection | undefined>(undefined);

// Filter out job status that are not strings
const jobStatusFilter = (status: unknown) => {
if (typeof status === "string") {
return status;
}
if (typeof status === "object" && status !== null) {
if ("status" in status && typeof status.status === "string") {
return status.status;
}
}
console.log("job status is UNKNOWN", status);
return "UNKNOWN";
};

const parseStatus = (jobStatusUnk: unknown, outputArray: Record<string, unknown>): string | JSX.Element[] => {
const jobStatus = jobStatusFilter(jobStatusUnk);
let outputs;
// TODO change all calls to this function!! bfr Alex was passing status + outputs -- here, just pass job
const getOutputsForTable = (job: FunctionJob | SubJob): string | JSX.Element[] => {
const jobStatus: AllowedJobStatus = extractJobStatus(job);
const outputArray: Record<string, unknown> = extractJobOutputs(job);

let outputs: string | JSX.Element[];
if (jobStatus === "SUCCESS") {
outputs = Object.entries(outputArray).map(([key, value]) => (
<Box key={`job-row-output-${key}`} display="inline">
{key} : {(value as number).toPrecision(3)}
{", "}
</Box>
));
} else if (jobStatus === "STARTED") {
} else if (jobStatus === "RUNNING") {
outputs = [
<Box key={0} display="inline">
Running...
</Box>,
];
} else if (["FAILED", "ABORTED"].includes(jobStatus) || (jobStatus.startsWith("JOB_") && jobStatus.endsWith("_FAILURE"))) {
} else if (jobStatus === "FAILED") {
outputs = "Failed - no outputs";
} else if (
["PENDING", "WAITING_FOR_CLUSTER", "PUBLISHED", "NOT_STARTED", "WAITING_FOR_RESOURCES"].includes(jobStatus) ||
(jobStatus.startsWith("JOB_") && !jobStatus.endsWith("_FAILURE"))
) {
} else if (jobStatus === "PENDING") {
outputs = "Pending to run";
} else if (jobStatus === "UNKNOWN") {
outputs = "Please try again later";
outputs = "Unknown status, please try again later";
} else {
outputs = "Unknown status, please contact support";
}
Expand All @@ -98,12 +87,12 @@ export function JobContextProvider({ children }: Props) {
const fetchedJCMap = new Map(fetchedJobCollections && fetchedJobCollections.map(fjc => [fjc.jobCollection.uid, fjc]));
const equalJC: boolean[] = jobsC.map(jc => {
const fetchedJC = fetchedJCMap.get(jc.uid);
const statusList = fetchedJC ? fetchedJC.subJobs.map(j => jobStatusFilter(j.job.status)) : [];
const statusList = fetchedJC ? fetchedJC.subJobs.map(j => extractJobStatus(j)) : [];
return (
fetchedJC !== undefined &&
fetchedJC.subJobs.map(j => j.job.uid).every(jcUID => jc.jobIds.includes(jcUID)) &&
fetchedJC.subJobs.length === jc.jobIds.length &&
statusList.every(s => filterForFinalStatus(s))
statusList.every(s => isFinalStatus(s))
);
});

Expand All @@ -127,13 +116,13 @@ export function JobContextProvider({ children }: Props) {
const subJobs = [];
for (let subJobIdx = 0; subJobIdx < functionJobs.length; subJobIdx += 1) {
const job: FunctionJob = functionJobs[subJobIdx];
job.status = jobStatusFilter(job.status);
const jobStatus = extractJobStatus(job);
jobsFetched += 1;
const jobsProg = (jobsFetched / totalSubs) * 100;
progress(jobsProg);
const existingSelected = oldSubJobs.find(sj => sj.job.uid === job.uid)?.selected;
subJobs.push({
selected: existingSelected !== undefined ? existingSelected : job.status === "SUCCESS",
selected: existingSelected !== undefined ? existingSelected : jobStatus === "SUCCESS",
job,
});
}
Expand Down Expand Up @@ -227,7 +216,7 @@ export function JobContextProvider({ children }: Props) {
allJobsList,
filteredJobList,
requestForceFetch,
parseStatus,
getOutputsForTable,
}),
[
runningJobCollection,
Expand All @@ -239,7 +228,7 @@ export function JobContextProvider({ children }: Props) {
allJobsList,
filteredJobList,
requestForceFetch,
parseStatus,
getOutputsForTable,
],
);
return <JobContext.Provider value={memoState}>{children}</JobContext.Provider>;
Expand Down
Loading