Skip to content

Commit

Permalink
Aborts jobs on shutdown.
Browse files Browse the repository at this point in the history
Added configuration option for abortJobsOnTerminate. Added support for server mode. SingleWorkflowRunnerActor now uses the internal state to determine whether the workflow is done (and not a separately maintained flag).

Added configuration option for abortJobsOnTerminate. Added support for server mode. SingleWorkflowRunnerActor now uses the internal state to determine whether the workflow is done (and not a separately maintained flag).
  • Loading branch information
dgtester authored and scottfrazer committed Jan 29, 2016
1 parent c4e4b45 commit 8a48eee
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
16 changes: 16 additions & 0 deletions src/main/scala/cromwell/engine/CromwellActor.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
package cromwell.engine

import akka.util.Timeout
import com.typesafe.config.{ConfigException, ConfigFactory}

import scala.concurrent.duration._
import scala.language.postfixOps

trait CromwellActor {
protected implicit val timeout = Timeout(5 seconds)

/**
* Retrieves the configuration option that determines whether this actor should abort all jobs if it receives
* a shutdown hook.
* @return - The value of the configuration option, or 'false' if the option isn't specified.
*/
def getAbortJobsOnTerminate: Boolean = {
val config=ConfigFactory.load.getConfig("backend")
try {
config.getBoolean("abortJobsOnTerminate")
} catch {
case _:ConfigException => false
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ case class SingleWorkflowRunnerActor(source: WorkflowSourceFiles,
case Event(id: WorkflowId, data) =>
log.info(s"$tag: workflow ID UUID($id)")
workflowManager ! SubscribeToWorkflow(id)

if (getAbortJobsOnTerminate) {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
workflowManager ! WorkflowAbort(id)
log.info(s"$tag: Waiting for workflow $id to abort...")
while(stateName != Done)
Thread.sleep(1000)
log.info(s"$tag: Workflow $id aborted.")
}
})
}
stay using data.copy(id = Option(id))
case Event(Transition(_, _, WorkflowSucceeded), data) =>
workflowManager ! WorkflowOutputs(data.id.get)
Expand Down
24 changes: 22 additions & 2 deletions src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import cromwell.engine.db.DataAccess._
import cromwell.engine.db.ExecutionDatabaseKey
import cromwell.engine.db.slick._
import cromwell.engine.workflow.WorkflowActor.{Restart, Start}
import cromwell.server.CromwellServer
import cromwell.util.WriteOnceStore
import cromwell.webservice._
import org.joda.time.DateTime
import spray.json._
import wdl4s._

import wdl4s.values.WdlFile
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
Expand Down Expand Up @@ -68,6 +69,25 @@ class WorkflowManagerActor(backend: Backend) extends Actor with CromwellActor {

private val workflowStore = new WriteOnceStore[WorkflowId, WorkflowActorRef]

if (getAbortJobsOnTerminate) {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
log.info(s"$tag: Received shutdown signal. Aborting all running workflows...")
workflowStore.toMap.foreach{case (id, actor)=>
CromwellServer.workflowManagerActor ! WorkflowManagerActor.WorkflowAbort(id)
}
var numRemaining = -1
while(numRemaining != 0) {
Thread.sleep(1000)
val result = globalDataAccess.getWorkflowsByState(Seq(WorkflowRunning, WorkflowAborting))
numRemaining = Await.result(result,Duration.Inf).size
log.info(s"$tag: Waiting for all workflows to abort ($numRemaining remaining).")
}
log.info(s"$tag: All workflows aborted.")
}
})
}

override def preStart() {
restartIncompleteWorkflows()
}
Expand Down

0 comments on commit 8a48eee

Please sign in to comment.