Skip to content

Commit 51042db

Browse files
authored
Do not delete K8s jobs when ttlSecondsAfterFinished is set (#6597)
Signed-off-by: Ben Sherman <[email protected]>
1 parent 6f4cf1a commit 51042db

File tree

2 files changed

+62
-31
lines changed

2 files changed

+62
-31
lines changed

plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -416,8 +416,10 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
416416

417417
@Override
418418
boolean checkIfCompleted() {
419-
if( !podName ) throw new IllegalStateException("Missing K8s ${resourceType.lower()} name - cannot check if complete")
420-
def state = getState()
419+
if( !podName )
420+
throw new IllegalStateException("Missing K8s ${resourceType.lower()} name - cannot check if complete")
421+
422+
final state = getState()
421423
if( state && state.terminated ) {
422424
if( state.nodeTermination instanceof NodeTerminationException ||
423425
state.nodeTermination instanceof PodUnschedulableException ) {
@@ -441,8 +443,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
441443
task.stderr = errorFile
442444
}
443445
status = TaskStatus.COMPLETED
444-
savePodLogOnError(task)
445-
deletePodIfSuccessful(task)
446+
saveJobLogOnError(task)
447+
deleteJobIfSuccessful(task)
446448
updateTimestamps(state.terminated as Map)
447449
determineNode()
448450
return true
@@ -451,7 +453,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
451453
return false
452454
}
453455

454-
protected void savePodLogOnError(TaskRun task) {
456+
protected void saveJobLogOnError(TaskRun task) {
455457
if( task.isSuccess() )
456458
return
457459

@@ -491,53 +493,55 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
491493
*/
492494
@Override
493495
protected void killTask() {
496+
if( !podName )
497+
return
498+
494499
if( cleanupDisabled() )
495500
return
496501

497-
if( podName ) {
498-
log.trace "[K8s] deleting ${resourceType.lower()} name=$podName"
499-
if ( useJobResource() )
500-
client.jobDelete(podName)
501-
else
502-
client.podDelete(podName)
503-
}
504-
else {
505-
log.debug "[K8s] Invalid delete action"
506-
}
502+
log.trace "[K8s] deleting ${resourceType.lower()} name=$podName"
503+
delete0(podName)
507504
}
508505

509506
protected boolean cleanupDisabled() {
510507
!k8sConfig.getCleanup()
511508
}
512509

513-
protected void deletePodIfSuccessful(TaskRun task) {
510+
protected void deleteJobIfSuccessful(TaskRun task) {
514511
if( !podName )
515512
return
516513

517514
if( cleanupDisabled() )
518515
return
519516

520-
if( !task.isSuccess() ) {
521-
// do not delete successfully executed pods for debugging purpose
517+
// preserve failed pods for debugging purposes
518+
if( !task.isSuccess() )
522519
return
523-
}
524520

521+
// k8s cluster will cleanup job on its own if TTL is set
522+
if( useJobResource() && getPodOptions().getTtlSecondsAfterFinished() != null )
523+
return
524+
525+
delete0(podName)
526+
}
527+
528+
private void delete0(String podName) {
525529
try {
526530
if ( useJobResource() )
527531
client.jobDelete(podName)
528532
else
529533
client.podDelete(podName)
530534
}
531535
catch( Exception e ) {
532-
log.warn "Unable to cleanup ${resourceType.lower()}: $podName -- see the log file for details", e
536+
log.warn "Unable to delete ${resourceType.lower()}: $podName -- see the log file for details", e
533537
}
534538
}
535539

536-
private void determineNode(){
540+
private void determineNode() {
537541
try {
538542
if ( k8sConfig.fetchNodeName() && !runsOnNode )
539543
runsOnNode = client.getNodeOfPod( podName )
540-
} catch ( Exception e ){
544+
} catch ( Exception e ) {
541545
log.warn ("Unable to get the node name of pod $podName -- see the log file for details", e)
542546
}
543547
}

plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,8 @@ class K8sTaskHandlerTest extends Specification {
497497
1 * handler.getState() >> fullState
498498
1 * handler.updateTimestamps(termState)
499499
1 * handler.readExitFile() >> EXIT_STATUS
500-
1 * handler.deletePodIfSuccessful(task) >> null
501-
1 * handler.savePodLogOnError(task) >> null
500+
1 * handler.deleteJobIfSuccessful(task) >> null
501+
1 * handler.saveJobLogOnError(task) >> null
502502
handler.task.exitStatus == EXIT_STATUS
503503
handler.task.@stdout == OUT_FILE
504504
handler.task.@stderr == ERR_FILE
@@ -528,8 +528,8 @@ class K8sTaskHandlerTest extends Specification {
528528
1 * handler.getState() >> [terminated: termState]
529529
1 * handler.updateTimestamps(termState)
530530
0 * handler.readExitFile()
531-
1 * handler.deletePodIfSuccessful(task) >> null
532-
1 * handler.savePodLogOnError(task) >> null
531+
1 * handler.deleteJobIfSuccessful(task) >> null
532+
1 * handler.saveJobLogOnError(task) >> null
533533
handler.task.exitStatus == 137
534534
handler.status == TaskStatus.COMPLETED
535535
result == true
@@ -764,30 +764,57 @@ class K8sTaskHandlerTest extends Specification {
764764
def executor = Mock(K8sExecutor)
765765
def client = Mock(K8sClient)
766766
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
767+
handler.useJobResource() >> false
767768
and:
768769
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
769770
def TASK_FAIL = Mock(TaskRun); TASK_FAIL.isSuccess() >> false
770771

771772
when:
772-
handler.deletePodIfSuccessful(TASK_OK)
773+
handler.deleteJobIfSuccessful(TASK_OK)
773774
then:
774775
1 * executor.getK8sConfig() >> new K8sConfig()
775776
1 * client.podDelete(POD_NAME) >> null
776777

777778
when:
778-
handler.deletePodIfSuccessful(TASK_OK)
779+
handler.deleteJobIfSuccessful(TASK_OK)
779780
then:
780781
1 * executor.getK8sConfig() >> new K8sConfig(cleanup: true)
781782
1 * client.podDelete(POD_NAME) >> null
782783

783784
when:
784-
handler.deletePodIfSuccessful(TASK_FAIL)
785+
handler.deleteJobIfSuccessful(TASK_FAIL)
785786
then:
786787
1 * executor.getK8sConfig() >> new K8sConfig(cleanup: false)
787788
0 * client.podDelete(POD_NAME) >> null
788789

789790
}
790791

792+
def 'should not delete job if ttlSecondsAfterFinished is set' () {
793+
794+
given:
795+
def POD_NAME = 'the-job-name'
796+
def executor = Mock(K8sExecutor)
797+
def client = Mock(K8sClient)
798+
def handler = Spy(new K8sTaskHandler(podName: POD_NAME, executor:executor, client:client))
799+
handler.useJobResource() >> true
800+
and:
801+
def TASK_OK = Mock(TaskRun); TASK_OK.isSuccess() >> true
802+
803+
when: 'job with ttlSecondsAfterFinished should not be deleted'
804+
handler.deleteJobIfSuccessful(TASK_OK)
805+
then:
806+
1 * executor.getK8sConfig() >> new K8sConfig()
807+
1 * handler.getPodOptions() >> new PodOptions([[ttlSecondsAfterFinished: 100]])
808+
0 * client.jobDelete(POD_NAME)
809+
810+
when: 'job without ttlSecondsAfterFinished should be deleted'
811+
handler.deleteJobIfSuccessful(TASK_OK)
812+
then:
813+
1 * executor.getK8sConfig() >> new K8sConfig()
814+
1 * handler.getPodOptions() >> new PodOptions()
815+
1 * client.jobDelete(POD_NAME) >> null
816+
}
817+
791818
def 'should save pod log' () {
792819

793820
given:
@@ -803,13 +830,13 @@ class K8sTaskHandlerTest extends Specification {
803830
def handler = Spy(new K8sTaskHandler(executor: executor, client: client, podName: POD_NAME))
804831

805832
when:
806-
handler.savePodLogOnError(task)
833+
handler.saveJobLogOnError(task)
807834
then:
808835
task.isSuccess() >> true
809836
0 * client.podLog(_)
810837

811838
when:
812-
handler.savePodLogOnError(task)
839+
handler.saveJobLogOnError(task)
813840
then:
814841
task.isSuccess() >> false
815842
task.getWorkDir() >> folder

0 commit comments

Comments
 (0)