diff --git a/execution_engine2.spec b/execution_engine2.spec index eb042674e..6b886b9ff 100644 --- a/execution_engine2.spec +++ b/execution_engine2.spec @@ -16,6 +16,9 @@ /* A job id. */ typedef string job_id; + /* A job state's job status. */ + typedef string job_status; + /* A structure representing the Execution Engine status git_commit - the Git hash of the version of the module. @@ -229,6 +232,20 @@ boolean as_admin; } BulkRetryParams; + + /* + batch_job_id: BATCH_ID to retry + status_list: job states in ['terminated', 'error'] (valid retry states) + as_admin: retry someone else's job in your namespace + #TODO: Possibly Add list job_requirements; + */ + typedef structure { + job_id batch_job_id; + list status_list; + boolean as_admin; + } BatchRetryParams; + + /* #TODO write retry parent tests to ensure BOTH the parent_job_id is present, and retry_job_id is present #TODO Add retry child that checks the status of the child? to prevent multiple retries @@ -246,14 +263,17 @@ */ funcdef retry_jobs(BulkRetryParams params) returns (list retry_result) authentication required; - + /* + Retry a job based on a batch id with a job_state status list ['error', 'terminated'] + Requires the user to keep track of the job states of the Status enum in the ee2 models file + If no status_list is provided, an exception is thrown. + */ + funcdef retry_batch_jobs_by_status(BatchRetryParams params) returns (list retry_result) authentication required; funcdef abandon_children(AbandonChildren params) returns (BatchSubmission parent_and_child_ids) authentication required; - - /* EE2Constants Concierge Params are request_cpus: int request_memory: int in MB @@ -585,6 +605,29 @@ */ funcdef cancel_job(CancelJobParams params) returns () authentication required; + + /* + batch_job_id: BATCH_ID to cancel + status_list: required filter of one or more of [created, estimating, queued, or running] + terminated_code: optional terminated code, default to terminated by user + as_admin: retry someone else's job in your namespace + @optional terminated_code + */ + typedef structure { + job_id batch_job_id; + list status_list; + int terminated_code; + boolean as_admin; + } BatchCancelParams; + + /* + Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. + Valid statuses are ['created', 'estimating', 'queued', 'running'] + (Requires the user to keep track of the job states of the Status enum in the ee2 models file) + If no status_list is provided, an exception is thrown. + */ + funcdef cancel_batch_jobs_by_status(BatchCancelParams params) returns (list job_ids) authentication required; + /* job_id - id of job running method finished - indicates whether job is done (including error/cancel cases) or not diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 349b066bc..e7f431c42 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -4,7 +4,8 @@ import traceback from contextlib import contextmanager from datetime import datetime -from typing import Dict, List +from typing import Dict, List, Union + from bson.objectid import ObjectId from mongoengine import connect, connection from pymongo import MongoClient, UpdateOne @@ -15,9 +16,8 @@ RecordNotFoundException, InvalidStatusTransitionException, ) - -from lib.execution_engine2.utils.arg_processing import parse_bool from execution_engine2.sdk.EE2Runjob import JobIdPair +from lib.execution_engine2.utils.arg_processing import parse_bool class MongoUtil: @@ -223,6 +223,48 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job: return job + def get_jobs_with_status( + self, + job_ids: List[str], + status_list: List[str], + only_job_ids: bool = False, + retried_jobs_allowed=True, + ) -> Union[List[Job], List[str]]: + if not (job_ids and isinstance(job_ids, list)): + raise ValueError("Please provide a non empty list of job ids") + + if not (status_list and isinstance(job_ids, list)): + raise ValueError("Please provide a non empty list of job statuses") + + with self.mongo_engine_connection(): + # TODO: Only seems to be returning other fields as well. Is .only() broken? + if retried_jobs_allowed: + jobs = Job.objects(id__in=job_ids, status__in=status_list) + else: + jobs = Job.objects( + id__in=job_ids, status__in=status_list, retry_parent__exists=False + ) + + if only_job_ids: + return [str(job.id) for job in jobs] + return jobs + + def eligible_for_retry(self, job: Job): + """ + Checks the job record to see if it has any retry_ids, + and if those retry_ids do not contain an ineligble job state + :param job: Should be a child job of a BATCH job + """ + + if not job.retry_ids: + return True + valid_statuses = [Status.terminated.value, Status.error.value] + jobs = self.get_jobs(job_ids=job.retry_ids) + for job in jobs: + if job.status not in valid_statuses: + return False + return True + def get_jobs( self, job_ids=None, exclude_fields=None, sort_id_ascending=None ) -> List[Job]: diff --git a/lib/execution_engine2/db/models/models.py b/lib/execution_engine2/db/models/models.py index 99e115412..a14d25460 100644 --- a/lib/execution_engine2/db/models/models.py +++ b/lib/execution_engine2/db/models/models.py @@ -227,6 +227,10 @@ class Status(Enum): A job begins at created, then can either be estimating """ + @classmethod + def get_status_names(cls): + return list(map(lambda x: x.value, cls._member_map_.values())) + created = "created" estimating = "estimating" queued = "queued" diff --git a/lib/execution_engine2/exceptions.py b/lib/execution_engine2/exceptions.py index 13961697e..7034455ec 100644 --- a/lib/execution_engine2/exceptions.py +++ b/lib/execution_engine2/exceptions.py @@ -18,10 +18,22 @@ def __init__(self, msg=None, *args, **kwargs): super().__init__(msg or self.__doc__, *args, **kwargs) +class InvalidStatusListException(ExecutionEngineValueError): + """Invalid job status provided""" + + +class BatchTerminationException(ExecutionEngineException): + """No jobs to terminate""" + + class IncorrectParamsException(ExecutionEngineValueError): """Wrong parameters were provided""" +class NotBatchJobException(ExecutionEngineValueError): + """Requested job is not a batch job""" + + class InvalidParameterForBatch(ExecutionEngineValueError): """Workspace ids are not allowed in RunJobParams in Batch Mode""" diff --git a/lib/execution_engine2/execution_engine2Impl.py b/lib/execution_engine2/execution_engine2Impl.py index 5b6366de5..905dde121 100644 --- a/lib/execution_engine2/execution_engine2Impl.py +++ b/lib/execution_engine2/execution_engine2Impl.py @@ -29,8 +29,8 @@ class execution_engine2: # the latter method is running. ######################################### noqa VERSION = "0.0.5" - GIT_URL = "https://github.com/mrcreosote/execution_engine2.git" - GIT_COMMIT_HASH = "2ad95ce47caa4f1e7b939651f2b1773840e67a8a" + GIT_URL = "git@github.com:kbase/execution_engine2.git" + GIT_COMMIT_HASH = "6f9e6f9593f74a64cd8453c342546e97b269ba14" #BEGIN_CLASS_HEADER MONGO_COLLECTION = "jobs" @@ -360,7 +360,7 @@ def run_job_batch(self, ctx, params, batch_params): #BEGIN run_job_batch mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), - clients = self.clients, + clients=self.clients, job_permission_cache=self.job_permission_cache, admin_permissions_cache=self.admin_permissions_cache ) @@ -402,7 +402,7 @@ def retry_job(self, ctx, params): #BEGIN retry_job mr = SDKMethodRunner( user_clients=self.gen_cfg.get_user_clients(ctx), - clients = self.clients, + clients=self.clients, job_permission_cache=self.job_permission_cache, admin_permissions_cache=self.admin_permissions_cache ) @@ -450,6 +450,44 @@ def retry_jobs(self, ctx, params): # return the results return [retry_result] + def retry_batch_jobs_by_status(self, ctx, params): + """ + Retry a job based on a batch id with a job_state status list ['error', 'terminated'] + Requires the user to keep track of the job states of the Status enum in the ee2 models file + If no status_list is provided, an exception is thrown. + :param params: instance of type "BatchRetryParams" (batch_job_id: + BATCH_ID to retry status_list: job states in ['terminated', + 'error'] (valid retry states) as_admin: retry someone else's job + in your namespace #TODO: Possibly Add list + job_requirements;) -> structure: parameter "batch_job_id" of type + "job_id" (A job id.), parameter "status_list" of list of type + "job_status" (A job state's job status.), parameter "as_admin" of + type "boolean" (@range [0,1]) + :returns: instance of list of type "RetryResult" (job_id of retried + job retry_id: job_id of the job that was launched str error: + reason as to why that particular retry failed (available for bulk + retry only)) -> structure: parameter "job_id" of type "job_id" (A + job id.), parameter "retry_id" of type "job_id" (A job id.), + parameter "error" of String + """ + # ctx is the context object + # return variables are: retry_result + #BEGIN retry_batch_jobs_by_status + mr = SDKMethodRunner(user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, + job_permission_cache=self.job_permission_cache, + admin_permissions_cache=self.admin_permissions_cache) + retry_result = mr.retry_batch(job_id=params.get('job_id'), + status_list=params.get('status_list'), + as_admin=params.get('as_admin')) + #END retry_batch_jobs_by_status + + # At some point might do deeper type checking... + if not isinstance(retry_result, list): + raise ValueError('Method retry_batch_jobs_by_status return value ' + + 'retry_result is not type list as required.') + # return the results + return [retry_result] + def abandon_children(self, ctx, params): """ :param params: instance of type "AbandonChildren" -> structure: @@ -911,7 +949,7 @@ def check_job(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1033,19 +1071,19 @@ def check_job_batch(self, ctx, params): of list of String, parameter "as_admin" of type "boolean" (@range [0,1]) :returns: instance of type "CheckJobBatchResults" (batch_jobstate - - state of parent job of the batch child_jobstates - states of child - jobs IDEA: ADD aggregate_states - count of all available child job - states, even if they are zero) -> structure: parameter - "batch_jobstate" of type "JobState" (job_id - string - id of the - job user - string - user who started the job wsid - int - optional - id of the workspace where the job is bound authstrat - string - - what strategy used to authenticate the job job_input - object - - inputs to the job (from the run_job call) ## TODO - verify - job_output - object - outputs from the job (from the run_job call) - ## TODO - verify updated - int - timestamp since epoch in - milliseconds of the last time the status was updated running - int - - timestamp since epoch in milliseconds of when it entered the - running state created - int - timestamp since epoch in + state of the coordinating job for the batch child_jobstates - + states of child jobs IDEA: ADD aggregate_states - count of all + available child job states, even if they are zero) -> structure: + parameter "batch_jobstate" of type "JobState" (job_id - string - + id of the job user - string - user who started the job wsid - int + - optional id of the workspace where the job is bound authstrat - + string - what strategy used to authenticate the job job_input - + object - inputs to the job (from the run_job call) ## TODO - + verify job_output - object - outputs from the job (from the + run_job call) ## TODO - verify updated - int - timestamp since + epoch in milliseconds of the last time the status was updated + running - int - timestamp since epoch in milliseconds of when it + entered the running state created - int - timestamp since epoch in milliseconds when the job was created finished - int - timestamp since epoch in milliseconds when the job was finished status - string - status of the job. one of the following: created - job @@ -1068,7 +1106,7 @@ def check_job_batch(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1189,7 +1227,7 @@ def check_job_batch(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1343,7 +1381,7 @@ def check_jobs(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1500,7 +1538,7 @@ def check_workspace_jobs(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -1641,6 +1679,44 @@ def cancel_job(self, ctx, params): #END cancel_job pass + def cancel_batch_jobs_by_status(self, ctx, params): + """ + Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0. + Valid statuses are ['created', 'estimating', 'queued', 'running'] + (Requires the user to keep track of the job states of the Status enum in the ee2 models file) + If no status_list is provided, an exception is thrown. + :param params: instance of type "BatchCancelParams" (batch_job_id: + BATCH_ID to cancel status_list: required filter of one or more of + [created, estimating, queued, or running] terminated_code: + optional terminated code, default to terminated by user as_admin: + retry someone else's job in your namespace @optional + terminated_code) -> structure: parameter "batch_job_id" of type + "job_id" (A job id.), parameter "status_list" of list of type + "job_status" (A job state's job status.), parameter + "terminated_code" of Long, parameter "as_admin" of type "boolean" + (@range [0,1]) + :returns: instance of list of type "job_id" (A job id.) + """ + # ctx is the context object + # return variables are: job_ids + #BEGIN cancel_batch_jobs_by_status + mr = SDKMethodRunner(user_clients=self.gen_cfg.get_user_clients(ctx), clients=self.clients, + job_permission_cache=self.job_permission_cache, + admin_permissions_cache=self.admin_permissions_cache) + + job_ids = mr.cancel_batch_job(job_id=params.get('job_id'), + status_list=params.get('status_list'), + as_admin=params.get('as_admin'), + terminated_code=params.get('terminated_code')) + #END cancel_batch_jobs_by_status + + # At some point might do deeper type checking... + if not isinstance(job_ids, list): + raise ValueError('Method cancel_batch_jobs_by_status return value ' + + 'job_ids is not type list as required.') + # return the results + return [job_ids] + def check_job_canceled(self, ctx, params): """ Check whether a job has been canceled. This method is lightweight compared to check_job. @@ -1801,7 +1877,7 @@ def check_jobs_date_range_for_user(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe @@ -2016,7 +2092,7 @@ def check_jobs_date_range_for_all(self, ctx, params): retry_ids - list - list of jobs that are retried based off of this job retry_parent - str - job_id of the parent this retry is based off of. Not available on a retry_parent itself batch_id - str - - the parent of the job, if the job is a child job created via + the coordinating job, if the job is a child job created via run_job_batch batch_job - bool - whether or not this is a batch parent container child_jobs - array - Only parent container should have child job ids scheduler_type - str - scheduler, such as awe diff --git a/lib/execution_engine2/execution_engine2Server.py b/lib/execution_engine2/execution_engine2Server.py index b63fe2210..3ca72d3cc 100644 --- a/lib/execution_engine2/execution_engine2Server.py +++ b/lib/execution_engine2/execution_engine2Server.py @@ -412,6 +412,14 @@ def __init__(self): types=[dict], ) self.method_authentication["execution_engine2.retry_jobs"] = "required" # noqa + self.rpc_service.add( + impl_execution_engine2.retry_batch_jobs_by_status, + name="execution_engine2.retry_batch_jobs_by_status", + types=[dict], + ) + self.method_authentication[ + "execution_engine2.retry_batch_jobs_by_status" + ] = "required" # noqa self.rpc_service.add( impl_execution_engine2.abandon_children, name="execution_engine2.abandon_children", @@ -506,6 +514,14 @@ def __init__(self): types=[dict], ) self.method_authentication["execution_engine2.cancel_job"] = "required" # noqa + self.rpc_service.add( + impl_execution_engine2.cancel_batch_jobs_by_status, + name="execution_engine2.cancel_batch_jobs_by_status", + types=[dict], + ) + self.method_authentication[ + "execution_engine2.cancel_batch_jobs_by_status" + ] = "required" # noqa self.rpc_service.add( impl_execution_engine2.check_job_canceled, name="execution_engine2.check_job_canceled", diff --git a/lib/execution_engine2/sdk/EE2Runjob.py b/lib/execution_engine2/sdk/EE2Runjob.py index ec6d3952c..e4ab0eee7 100644 --- a/lib/execution_engine2/sdk/EE2Runjob.py +++ b/lib/execution_engine2/sdk/EE2Runjob.py @@ -26,6 +26,8 @@ CannotRetryJob, RetryFailureException, InvalidParameterForBatch, + InvalidStatusListException, + NotBatchJobException, ) from execution_engine2.sdk.EE2Constants import CONCIERGE_CLIENTGROUP from execution_engine2.sdk.job_submission_parameters import ( @@ -744,7 +746,7 @@ def _retry(self, job_id: str, job: Job, batch_job: Job, as_admin: bool = False): # to make sure the retried job is correctly submitted? Or save that for a unit test? return {"job_id": job_id, "retry_id": retry_job_id} - def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]: + def retry(self, job_id: str, as_admin: bool = False) -> Dict[str, Optional[str]]: """ #TODO Add new job requirements/cgroups as an optional param :param job_id: The main job to retry @@ -758,6 +760,62 @@ def retry(self, job_id: str, as_admin=False) -> Dict[str, Optional[str]]: job_id=job_id, job=job, batch_job=batch_job, as_admin=as_admin ) + def retry_jobs_in_batch_by_status( + self, job_id: str, status_list: list, as_admin: bool = False + ): + """ + Retry jobs by status given a BATCH_ID + :param job_id: The batch job id to retry jobs for + :param status_list: The state of jobs in that batch to retry + :param as_admin: Retry jobs for others + :return: A list of job ids that are retried from the batch + """ + valid_statuses = [Status.terminated.value, Status.error.value] + # Validation + if not status_list: + raise InvalidStatusListException( + f"Provide a list of status codes from {valid_statuses}." + ) + + invalid_statuses = [ + status for status in status_list if not self._retryable(status) + ] + if len(invalid_statuses): + raise InvalidStatusListException( + f"Provided status list contains {invalid_statuses}, which are not retryable. " + + f"Status not in {valid_statuses}" + ) + + batch_job = self.sdkmr.get_job_with_permission( + job_id, JobPermissions.WRITE, as_admin=as_admin + ) + if not batch_job.batch_job: + raise NotBatchJobException(f"{job_id} is not a batch job") + # Retry and Report + # Get jobs that + # do NOT have a retry_parent, i.e. only jobs that haven't been retried + # and jobs that have not retried retry_parents only + potentially_retryable_child_jobs = ( + self.sdkmr.get_mongo_util().get_jobs_with_status( + job_ids=batch_job.child_jobs, + status_list=status_list, + retried_jobs_allowed=False, + ) + ) + # So we don't want to retry jobs that have retry jobs in progress, + # or a retry job that has already been successful + retryable_child_job_ids = [] + for child_job in potentially_retryable_child_jobs: + if self.sdkmr.get_mongo_util().eligible_for_retry(job=child_job): + retryable_child_job_ids.append(str(child_job.id)) + + if len(retryable_child_job_ids) == 0: + raise RetryFailureException( + f"No retryable jobs found with a status in {status_list}" + ) + + return self.retry_multiple(job_ids=retryable_child_job_ids) + def retry_multiple( self, job_ids, as_admin=False ) -> List[Dict[str, Union[str, Any]]]: diff --git a/lib/execution_engine2/sdk/EE2Status.py b/lib/execution_engine2/sdk/EE2Status.py index 053cfb77d..9dcdf61a8 100644 --- a/lib/execution_engine2/sdk/EE2Status.py +++ b/lib/execution_engine2/sdk/EE2Status.py @@ -8,6 +8,8 @@ from execution_engine2.exceptions import ( InvalidStatusTransitionException, ChildrenNotFoundError, + InvalidStatusListException, + NotBatchJobException, ) from execution_engine2.sdk.EE2Constants import JobError from lib.execution_engine2.authorization.authstrategy import can_read_jobs @@ -85,25 +87,80 @@ def handle_held_job(self, cluster_id, as_admin): # There's probably a better way and a return type, but not really sure what I need yet return json.loads(json.dumps(j.to_mongo().to_dict(), default=str)) - def cancel_job(self, job_id, terminated_code=None, as_admin=False): + def cancel_jobs_in_batch_by_status( + self, job_id, status_list, terminated_code=None, as_admin=False + ): + """ + Terminate jobs by status given a BATCH_ID + :param job_id: A batch job id to terminate child jobs of + :param status_list: A list of statuses to terminate the child jobs in + :param terminated_code: The terminated code, default to TerminatedCode.terminated_by_user.value + :param as_admin: Terminate jobs for others + :return: A list of job ids that were successfully terminated + """ + # Validation + valid_statuses = [ + Status.created.value, + Status.queued.value, + Status.estimating.value, + Status.running.value, + ] + if not status_list: + raise InvalidStatusListException( + f"Provide a list of valid job statuses from {valid_statuses}." + ) + + found_invalid_statuses = [ + status for status in status_list if status not in valid_statuses + ] + if len(found_invalid_statuses): + raise InvalidStatusListException( + f"Provided status list contains {found_invalid_statuses}, which are not cancelable. Status not in {valid_statuses}" + ) + + batch_job = self.sdkmr.get_job_with_permission( + job_id, JobPermissions.WRITE, as_admin=as_admin + ) + if not batch_job.batch_job: + raise NotBatchJobException(f"{job_id} is not a batch job") + + # Termination + terminated_ids = [] + child_job_ids = self.sdkmr.get_mongo_util().get_jobs_with_status( + job_ids=batch_job.child_jobs, status_list=status_list, only_job_ids=False + ) + for job in child_job_ids: + self.cancel_job(job=job, terminated_code=terminated_code, as_admin=as_admin) + terminated_ids.append(str(job.id)) + + return terminated_ids + + def cancel_job(self, job_id=None, job=None, terminated_code=None, as_admin=False): """ Authorization Required: Ability to Read and Write to the Workspace - Default for terminated code is Terminated By User + Terminates child jobs as well + Need to provide exactly one job id or a Job + :param job: Job Object to cancel :param job_id: Job ID To cancel - :param terminated_code: + :param terminated_code: Default Terminated By User :param as_admin: Cancel the job for a different user """ # Is it inefficient to get the job twice? Is it cached? + if (not job_id and not job) or (job_id and job): + raise Exception( + "Programming Error: Need to provide exactly one job id or a job object" + ) - job = self.sdkmr.get_job_with_permission( - job_id, JobPermissions.WRITE, as_admin=as_admin - ) + if job_id: + job = self.sdkmr.get_job_with_permission( + job_id, JobPermissions.WRITE, as_admin=as_admin + ) if terminated_code is None: terminated_code = TerminatedCode.terminated_by_user.value self.sdkmr.get_mongo_util().cancel_job( - job_id=job_id, terminated_code=terminated_code + job_id=job.id, terminated_code=terminated_code ) for child_job_id in job.child_jobs: self.cancel_job( @@ -111,12 +168,6 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False): terminated_code=TerminatedCode.terminated_by_batch_abort.value, ) - for child_job_id in job.child_jobs: - self.cancel_job( - job_id=child_job_id, - terminated_code=TerminatedCode.terminated_by_batch_abort.value, - ) - self.sdkmr.logger.debug( f"About to cancel job in CONDOR using jobid {job_id} {job.scheduler_id}" ) diff --git a/lib/execution_engine2/sdk/SDKMethodRunner.py b/lib/execution_engine2/sdk/SDKMethodRunner.py index 350599960..5fbfc7e2e 100644 --- a/lib/execution_engine2/sdk/SDKMethodRunner.py +++ b/lib/execution_engine2/sdk/SDKMethodRunner.py @@ -335,6 +335,12 @@ def retry(self, job_id, as_admin=False): """Authorization Required Read/Write""" return self.get_runjob().retry(job_id=job_id, as_admin=as_admin) + def retry_batch(self, job_id, status_list, as_admin=False): + """Authorization Required Read/Write""" + return self.get_runjob().retry_jobs_in_batch_by_status( + job_id=job_id, as_admin=as_admin, status_list=status_list + ) + def run_job(self, params, as_admin=False): """Authorization Required Read/Write""" return self.get_runjob().run(params=params, as_admin=as_admin) @@ -394,6 +400,15 @@ def cancel_job(self, job_id, terminated_code=None, as_admin=False): job_id=job_id, terminated_code=terminated_code, as_admin=as_admin ) + def cancel_batch_job(self, job_id, status_list, terminated_code, as_admin=False): + """Authorization Required Read/Write""" + return self.get_jobs_status().cancel_jobs_in_batch_by_status( + job_id=job_id, + terminated_code=terminated_code, + status_list=status_list, + as_admin=as_admin, + ) + def handle_held_job(self, cluster_id): """Authorization Required Read/Write""" if self.check_as_admin(requested_perm=JobPermissions.WRITE): diff --git a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py index af441a81d..fdfb65f61 100644 --- a/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py +++ b/test/tests_for_sdkmr/ee2_SDKMethodRunner_test_EE2Runjob_test.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import copy +import itertools import logging import os import unittest @@ -11,8 +12,10 @@ from execution_engine2.exceptions import ( CannotRetryJob, - RetryFailureException, InvalidParameterForBatch, + InvalidStatusListException, + NotBatchJobException, + RetryFailureException, ) from execution_engine2.sdk.job_submission_parameters import JobRequirements from execution_engine2.utils.clients import ( @@ -541,6 +544,159 @@ def test_retry_job_with_params_and_nci_and_src_ws_objs(self, rq_mock, condor_moc # TODO Retry a job without an app_id # TODO Check narrative_cell_info + def get_batch_jobs(self): + job = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + job2 = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + job3 = get_example_job_as_dict( + user=self.user_id, wsid=None, source_ws_objects=[] + ) + jobs = [job, job2, job3] + return jobs + + @requests_mock.Mocker() + @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) + def test_retry_batch_and_cancel_batch(self, rq_mock, condor_mock): + rq_mock.add_matcher( + run_job_adapter( + ws_perms_info={"user_id": self.user_id, "ws_perms": {self.ws_id: "a"}} + ) + ) + runner = self.getRunner() + runner.get_condor = MagicMock(return_value=condor_mock) + jobs = self.get_batch_jobs() + condor_mock.run_job = MagicMock( + return_value=SubmissionInfo(clusterid="test", submit=jobs[0], error=None) + ) + job_ids = runner.run_job_batch( + params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} + ) + batch_job_id = job_ids["batch_id"] + child_jobs = job_ids["child_job_ids"] + + # TEST OUT ERROR CONDITIONS + for status_list in [ + None, + [], + ]: + expected_exception = InvalidStatusListException + with self.assertRaises(expected_exception) as e: + runner.retry_batch(job_id=batch_job_id, status_list=status_list) + assert ( + e.exception.args[0] + == "Provide a list of status codes from ['terminated', 'error']." + ) + + for status_list in [["running"], ["apple", "kertuffl"], [None]]: + expected_exception = InvalidStatusListException + with self.assertRaises(expected_exception) as e: + runner.retry_batch(job_id=batch_job_id, status_list=status_list) + assert ( + e.exception.args[0] + == f"Provided status list contains {status_list}, which are not retryable. Status not in ['terminated', 'error']" + ) + + with self.assertRaises(NotBatchJobException) as e: + runner.retry_batch(job_id=child_jobs[0], status_list=["error"]) + assert e.exception.args[0] == f"{child_jobs[0]} is not a batch job" + + for status_list in [["terminated"], ["terminated", "error"], ["error"]]: + with self.assertRaises(RetryFailureException) as e: + runner.retry_batch(job_id=batch_job_id, status_list=status_list) + assert ( + e.exception.args[0] + == f"No retryable jobs found with a status in {status_list}" + ) + + for job_id in child_jobs: + runner.update_job_status(job_id=job_id, status="error") + + batch_retry_results = runner.retry_batch( + job_id=batch_job_id, status_list=["error"] + ) + assert len(job_ids["child_job_ids"]) == len(batch_retry_results) + + for brr in batch_retry_results: + job_id = brr["job_id"] + retry_id = brr["retry_id"] + batch_check = runner.check_job(job_id) + cj_check = runner.check_job(retry_id) + assert batch_check["retry_ids"] == [retry_id] + assert cj_check["retry_parent"] == job_id + # For the next test + runner.update_job_status(retry_id, "error") + runner.update_job_status(job_id, "error") + + # Now let's generate more errors + batch_retry_results = runner.retry_batch( + job_id=batch_job_id, status_list=["error"] + ) + assert len(job_ids["child_job_ids"]) == len(batch_retry_results) + + # Now setup tests based on a failed retry and successful retry combo + runner.update_job_status(batch_retry_results[0]["job_id"], "error") + runner.update_job_status(batch_retry_results[0]["retry_id"], "error") + + runner.update_job_status(batch_retry_results[1]["job_id"], "error") + runner.update_job_status(batch_retry_results[1]["retry_id"], "created") + + runner.update_job_status(batch_retry_results[2]["job_id"], "error") + runner.update_job_status(batch_retry_results[2]["retry_id"], "running") + + batch_retry_results = runner.retry_batch( + job_id=batch_job_id, status_list=["error"] + ) + assert 1 == len(batch_retry_results) + + # Cancel Batch job tests + + features = Status.get_status_names() + combos_list = [] + for i in range(1, len(features)): + oc = itertools.combinations(features, i + 1) + for c in oc: + combos_list.append(list(c)) + + valid_statuses = [ + Status.created.value, + Status.queued.value, + Status.estimating.value, + Status.running.value, + ] + + for status_list in combos_list: + failable = False + for item in status_list: + if item in [ + Status.error.value, + Status.terminated.value, + Status.completed.value, + ]: + failable = True + + if status_list and failable: + with self.assertRaises(InvalidStatusListException) as e: + runner.cancel_batch_job( + job_id=batch_job_id, status_list=status_list, terminated_code=0 + ) + + for item in valid_statuses: + try: + del status_list[status_list.index(item)] + except Exception: + pass + + expected = f"Provided status list contains {status_list}, which are not cancelable. Status not in {valid_statuses}" + + assert e.exception.args[0] == expected + + # Assert cancelled jobs? + job_status = runner.check_job_batch(batch_id=batch_job_id) + print(job_status) + @requests_mock.Mocker() @patch("lib.execution_engine2.utils.Condor.Condor", autospec=True) def test_run_job_batch(self, rq_mock, condor_mock): @@ -554,19 +710,11 @@ def test_run_job_batch(self, rq_mock, condor_mock): ) runner = self.getRunner() runner.get_condor = MagicMock(return_value=condor_mock) - - job = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] - ) - job2 = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] + jobs = self.get_batch_jobs() + condor_mock.run_job = MagicMock( + return_value=SubmissionInfo(clusterid="test", submit=jobs[0], error=None) ) - job3 = get_example_job_as_dict( - user=self.user_id, wsid=None, source_ws_objects=[] - ) - si = SubmissionInfo(clusterid="test", submit=job, error=None) - condor_mock.run_job = MagicMock(return_value=si) - jobs = [job, job2, job3] + job_ids = runner.run_job_batch( params=copy.deepcopy(jobs), batch_params={"wsid": self.ws_id} )