Skip to content

Commit

Permalink
Allow workflow ID choice by submitters [BW-1056] (#6659)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjllanwarne authored Feb 3, 2022
1 parent 8d5f786 commit 660229b
Show file tree
Hide file tree
Showing 27 changed files with 440 additions and 79 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ Previously:
| `executionStatus` |`Running`|`Running`| Job state Cromwell is requesting from the backend |
| `backendStatus` |`Running`|`Running`| Job state reported by backend |

### New 'requestedWorkflowId' API Option

Allows users to choose their own workflow IDs at workflow submission time.

If supplied for single workflows, this value must be a JSON string containing a valid, and not already used, UUID. For batch submissions, this value must be a JSON array of valid UUIDs.

If not supplied, the behavior is as today: Cromwell will generate a random workflow ID for every workflow submitted.

### Bug Fixes

Expand Down
10 changes: 10 additions & 0 deletions CromIAM/src/main/resources/swagger/cromiam.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ paths:
required: false
type: file
in: formData
- name: requestedWorkflowId
description: An ID to assign to this workflow. Must be a JSON string in UUID-format. If not supplied a random ID will be generated for the workflow.
required: false
type: string
in: formData
tags:
- Workflows
responses:
Expand Down Expand Up @@ -198,6 +203,11 @@ paths:
required: false
type: file
in: formData
- name: requestedWorkflowId
description: A set of IDs to assign to these workflows. Must be a JSON list of strings in UUID-format. Must have the same number of entries and be in the same order as the workflow inputs list. If not supplied, random ID will be generated for the workflows.
required: false
type: string
in: formData
tags:
- Workflows
responses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class CallCachingBlacklistManagerSpec extends AnyFlatSpec with CromwellTimeoutSp
workflowOptions = WorkflowOptions(JsObject.empty),
labelsJson = "",
workflowOnHold = false,
warnings = List.empty
warnings = List.empty,
requestedWorkflowId = None
)

val workflowSourcesYesGrouping = workflowSourcesNoGrouping.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ sealed trait WorkflowSourceFilesCollection {
def workflowType: Option[WorkflowType]
def workflowTypeVersion: Option[WorkflowTypeVersion]
def workflowOnHold: Boolean
def requestedWorkflowId: Option[WorkflowId]

def warnings: Seq[String]

Expand Down Expand Up @@ -49,7 +50,8 @@ object WorkflowSourceFilesCollection {
labelsJson: WorkflowJson,
importsFile: Option[Array[Byte]],
workflowOnHold: Boolean,
warnings: Seq[String]): WorkflowSourceFilesCollection = importsFile match {
warnings: Seq[String],
requestedWorkflowId: Option[WorkflowId]): WorkflowSourceFilesCollection = importsFile match {
case Some(imports) =>
WorkflowSourceFilesWithDependenciesZip(
workflowSource = workflowSource,
Expand All @@ -62,7 +64,8 @@ object WorkflowSourceFilesCollection {
labelsJson = labelsJson,
importsZip = imports,
workflowOnHold = workflowOnHold,
warnings = warnings)
warnings = warnings,
requestedWorkflowId = requestedWorkflowId)
case None =>
WorkflowSourceFilesWithoutImports(
workflowSource = workflowSource,
Expand All @@ -74,7 +77,8 @@ object WorkflowSourceFilesCollection {
workflowOptions = workflowOptions,
labelsJson = labelsJson,
workflowOnHold = workflowOnHold,
warnings = warnings)
warnings = warnings,
requestedWorkflowId = requestedWorkflowId)
}
}

Expand All @@ -87,7 +91,8 @@ final case class WorkflowSourceFilesWithoutImports(workflowSource: Option[Workfl
workflowOptions: WorkflowOptions,
labelsJson: WorkflowJson,
workflowOnHold: Boolean = false,
warnings: Seq[String]) extends WorkflowSourceFilesCollection
warnings: Seq[String],
requestedWorkflowId: Option[WorkflowId]) extends WorkflowSourceFilesCollection

final case class WorkflowSourceFilesWithDependenciesZip(workflowSource: Option[WorkflowSource],
workflowUrl: Option[WorkflowUrl],
Expand All @@ -99,7 +104,8 @@ final case class WorkflowSourceFilesWithDependenciesZip(workflowSource: Option[W
labelsJson: WorkflowJson,
importsZip: Array[Byte],
workflowOnHold: Boolean = false,
warnings: Seq[String]) extends WorkflowSourceFilesCollection {
warnings: Seq[String],
requestedWorkflowId: Option[WorkflowId]) extends WorkflowSourceFilesCollection {
override def toString = {
s"WorkflowSourceFilesWithDependenciesZip($workflowSource, $workflowUrl, $workflowType, $workflowTypeVersion," +
s""" $inputsJson, ${workflowOptions.asPrettyJson}, $labelsJson, <<ZIP BINARY CONTENT>>, $warnings)"""
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/cromwell/util/SampleWdl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ trait SampleWdl extends TestFileUtil {
workflowTypeVersion = workflowTypeVersion,
warnings = Vector.empty,
workflowOnHold = workflowOnHold,
importsZip = zip)
importsZip = zip,
requestedWorkflowId = None)
case None =>
WorkflowSourceFilesWithoutImports(
workflowSource = Option(workflowSource(runtime)),
Expand All @@ -56,7 +57,8 @@ trait SampleWdl extends TestFileUtil {
workflowType = workflowType,
workflowTypeVersion = workflowTypeVersion,
warnings = Vector.empty,
workflowOnHold = workflowOnHold)
workflowOnHold = workflowOnHold,
requestedWorkflowId = None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,8 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {
override def findWorkflows(cromwellId: String)(implicit ec: ExecutionContext): Future[Iterable[String]] = {
runTransaction(dataAccess.findWorkflows(cromwellId).result)
}

override def checkWhetherWorkflowExists(workflowId: String)(implicit ec: ExecutionContext): Future[Boolean] = {
runTransaction(dataAccess.checkExists(workflowId).result.map(_.nonEmpty))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,26 @@ trait WorkflowStoreEntryComponent {
}
)

// Find workflows running on a given Cromwell instance with abort requested:
val findWorkflowsWithAbortRequested = Compiled(
(cromwellId: Rep[String]) => for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.workflowState === "Aborting" && workflowStoreEntry.cromwellId === cromwellId
} yield workflowStoreEntry.workflowExecutionUuid
)

// Find workflows running on a given Cromwell instance:
val findWorkflows = Compiled(
(cromwellId: Rep[String]) => for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.cromwellId === cromwellId
} yield workflowStoreEntry.workflowExecutionUuid
)

val checkExists = Compiled(
(workflowId: Rep[String]) => (for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.workflowExecutionUuid === workflowId
} yield 1)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,6 @@ ____ __ ____ ______ .______ __ ___ _______ __ ______

def findWorkflows(cromwellId: String)(implicit ec: ExecutionContext): Future[Iterable[String]]

def checkWhetherWorkflowExists(cromwellId: String)(implicit ec: ExecutionContext): Future[Boolean]

}
4 changes: 3 additions & 1 deletion docs/api/RESTAPI.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
This file was generated by `sbt generateRestApiDocs` on Thu, 25 Mar 2021 19:28:57 -0400
This file was generated by `sbt generateRestApiDocs` on Mon, 31 Jan 2022 15:23:32 -0500
!!! DO NOT CHANGE THIS FILE DIRECTLY !!!
Expand Down Expand Up @@ -85,6 +85,7 @@ Submits a workflow to Cromwell. Note that this endpoint can accept an unlimited
|---|---|---|---|---|
|**Path**|**version** <br>*required*|Cromwell API Version|string|`"v1"`|
|**FormData**|**labels** <br>*optional*|JSON object of labels to apply to this workflow.|file||
|**FormData**|**requestedWorkflowId** <br>*optional*|An ID to ascribe to this workflow. Must be a JSON string in UUID-format. If not supplied a random ID will be generated for the workflow.|string||
|**FormData**|**workflowDependencies** <br>*optional*|ZIP file containing workflow source files that are used to resolve local imports. This zip bundle will be unpacked in a sandbox accessible to this workflow.|file||
|**FormData**|**workflowInputs** <br>*optional*|JSON or YAML file containing the inputs as an object. For WDL workflows a skeleton file can be generated from WOMtool using the "inputs" subcommand. When multiple files are specified, in case of key conflicts between multiple input JSON files, higher values of x in workflowInputs_x override lower values. For example, an input specified in workflowInputs_3 will override an input with the same name in workflowInputs or workflowInputs_2. Similarly, an input key specified in workflowInputs_5 will override an identical input key in any other input file.|file||
|**FormData**|**workflowInputs_2** <br>*optional*|A second JSON or YAML file containing inputs.|file||
Expand Down Expand Up @@ -166,6 +167,7 @@ In instances where you want to run the same workflow multiple times with varying
|---|---|---|---|---|
|**Path**|**version** <br>*required*|Cromwell API Version|string|`"v1"`|
|**FormData**|**labels** <br>*optional*|JSON object of labels to apply to this workflow.|file||
|**FormData**|**requestedWorkflowId** <br>*optional*|A set of IDs to ascribe to these workflows. Must be a JSON list of strings in UUID-format. Must have the same number of entries and be in the same order as the workflow inputs list. If not supplied, random ID will be generated for the workflows.|string||
|**FormData**|**workflowDependencies** <br>*optional*|ZIP file containing workflow source files that are used to resolve local imports. This zip bundle will be unpacked in a sandbox accessible to these workflows.|file||
|**FormData**|**workflowInputs** <br>*required*|JSON file containing the inputs as an array of objects. Every element of the array will correspond to a single workflow. For WDL workflows a skeleton file can be generated from WOMtool using the "inputs" subcommand. When multiple files are specified, in case of key conflicts between multiple input JSON files, higher values of x in workflowInputs_x override lower values. For example, an input specified in workflowInputs_3 will override an input with the same name in workflowInputs or workflowInputs_2. Similarly, an input key specified in workflowInputs_5 will override an identical input key in any other input file.|file||
|**FormData**|**workflowOnHold** <br>*optional*|Put workflow on hold upon submission. By default, it is taken as false.|boolean||
Expand Down
10 changes: 10 additions & 0 deletions engine/src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ paths:
required: false
type: file
in: formData
- name: requestedWorkflowId
description: An ID to ascribe to this workflow. Must be a JSON string in UUID-format. If not supplied a random ID will be generated for the workflow.
required: false
type: string
in: formData
tags:
- Workflows
responses:
Expand Down Expand Up @@ -144,6 +149,11 @@ paths:
required: false
type: file
in: formData
- name: requestedWorkflowId
description: A set of IDs to ascribe to these workflows. Must be a JSON list of strings in UUID-format. Must have the same number of entries and be in the same order as the workflow inputs list. If not supplied, random ID will be generated for the workflows.
required: false
type: string
in: formData
tags:
- Workflows
responses:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import common.validation.ErrorOr.ErrorOr
import common.validation.Validation._
import cromwell.core.{HogGroup, WorkflowId, WorkflowOptions, WorkflowSourceFilesCollection}
import cromwell.database.sql.SqlConverters._
import cromwell.database.sql.WorkflowStoreSqlDatabase
import cromwell.database.sql.{MetadataSqlDatabase, WorkflowStoreSqlDatabase}
import cromwell.database.sql.tables.WorkflowStoreEntry
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.WorkflowStoreAbortResponse.WorkflowStoreAbortResponse
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.WorkflowStoreState.WorkflowStoreState
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.{NotInOnHoldStateException, WorkflowStoreAbortResponse, WorkflowStoreState, WorkflowSubmissionResponse}
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.{DuplicateWorkflowIdsRequested, NotInOnHoldStateException, WorkflowIdsAlreadyInUseException, WorkflowStoreAbortResponse, WorkflowStoreState, WorkflowSubmissionResponse}
import eu.timepit.refined.api.Refined
import eu.timepit.refined.collection._

Expand All @@ -23,6 +23,12 @@ import scala.concurrent.{ExecutionContext, Future}
object SqlWorkflowStore {
case class WorkflowSubmissionResponse(state: WorkflowStoreState, id: WorkflowId)

case class DuplicateWorkflowIdsRequested(workflowIds: Seq[WorkflowId]) extends
Exception (s"Requested workflow IDs are duplicated: ${workflowIds.mkString(", ")}")

case class WorkflowIdsAlreadyInUseException(workflowIds: Seq[WorkflowId]) extends
Exception (s"Requested workflow IDs are already in use: ${workflowIds.mkString(", ")}")

case class NotInOnHoldStateException(workflowId: WorkflowId) extends
Exception(
s"Couldn't change status of workflow $workflowId to " +
Expand All @@ -45,7 +51,7 @@ object SqlWorkflowStore {
}
}

case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends WorkflowStore {
case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDatabase: MetadataSqlDatabase) extends WorkflowStore {
/** This is currently hardcoded to success but used to do stuff, left in place for now as a useful
* startup initialization hook. */
override def initialize(implicit ec: ExecutionContext): Future[Unit] = Future.successful(())
Expand Down Expand Up @@ -114,22 +120,51 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf
sqlDatabase.writeWorkflowHeartbeats(sortedWorkflowIds, heartbeatDateTime.toSystemTimestamp)
}

def workflowAlreadyExists(workflowId: WorkflowId)(implicit ec: ExecutionContext): Future[Boolean] = {
Future.sequence(Seq(
sqlDatabase.checkWhetherWorkflowExists(workflowId.id.toString),
metadataSqlDatabase.getWorkflowStatus(workflowId.id.toString).map(_.nonEmpty)
)).map(_.exists(_ == true))
}

def findPreexistingWorkflowIds(workflowIds: Seq[WorkflowId])(implicit ec: ExecutionContext): Future[Seq[WorkflowId]] = {
Future.sequence(workflowIds.map(wfid => {
workflowAlreadyExists(wfid).map {
case true => Option(wfid)
case false => None
}
})).map { _.collect { case Some(existingId) => existingId } }
}

/**
* Adds the requested WorkflowSourceFiles to the store and returns a WorkflowId for each one (in order)
* for tracking purposes.
*/
override def add(sources: NonEmptyList[WorkflowSourceFilesCollection])(implicit ec: ExecutionContext): Future[NonEmptyList[WorkflowSubmissionResponse]] = {

val asStoreEntries = sources map toWorkflowStoreEntry
val returnValue = asStoreEntries map { workflowStore =>
WorkflowSubmissionResponse(
WorkflowStoreState.withName(workflowStore.workflowState),
WorkflowId.fromString(workflowStore.workflowExecutionUuid)
)
val requestedWorkflowIds = sources.map(_.requestedWorkflowId).collect { case Some(id) => id }
val duplicatedIds = requestedWorkflowIds.diff(requestedWorkflowIds.toSet.toSeq)

if(duplicatedIds.nonEmpty) {
Future.failed(DuplicateWorkflowIdsRequested(duplicatedIds))
} else {
findPreexistingWorkflowIds(requestedWorkflowIds) flatMap { preexistingIds =>
if (preexistingIds.nonEmpty) {
Future.failed(WorkflowIdsAlreadyInUseException(preexistingIds))
} else {
val asStoreEntries = sources map toWorkflowStoreEntry
val returnValue = asStoreEntries map { workflowStore =>
WorkflowSubmissionResponse(
WorkflowStoreState.withName(workflowStore.workflowState),
WorkflowId.fromString(workflowStore.workflowExecutionUuid)
)
}

// The results from the Future aren't useful, so on completion map it into the precalculated return value instead. Magic!
sqlDatabase.addWorkflowStoreEntries(asStoreEntries.toList) map { _ => returnValue }
}
}
}

// The results from the Future aren't useful, so on completion map it into the precalculated return value instead. Magic!
sqlDatabase.addWorkflowStoreEntries(asStoreEntries.toList) map { _ => returnValue }
}

override def switchOnHoldToSubmitted(id: WorkflowId)(implicit ec: ExecutionContext): Future[Unit] = {
Expand All @@ -154,6 +189,8 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf

(startableStateValidation, workflowOptionsValidation) mapN { (startableState, workflowOptions) =>

val id = WorkflowId.fromString(workflowStoreEntry.workflowExecutionUuid)

val sources = WorkflowSourceFilesCollection(
workflowSource = workflowStoreEntry.workflowDefinition.toRawStringOption,
workflowUrl = workflowStoreEntry.workflowUrl,
Expand All @@ -165,10 +202,10 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf
labelsJson = workflowStoreEntry.customLabels.toRawString,
importsFile = workflowStoreEntry.importsZip.toBytesOption,
warnings = Vector.empty,
workflowOnHold = false
workflowOnHold = false,
requestedWorkflowId = Option(id)
)

val id = WorkflowId.fromString(workflowStoreEntry.workflowExecutionUuid)
val hogGroup: HogGroup = workflowStoreEntry.hogGroup.map(HogGroup(_)).getOrElse(HogGroup.decide(workflowOptions, id))

WorkflowToStart(
Expand All @@ -194,7 +231,7 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf

val actualWorkflowState = workflowSubmissionState(workflowSourceFiles)

val workflowId = WorkflowId.randomId()
val workflowId = workflowSourceFiles.requestedWorkflowId.getOrElse(WorkflowId.randomId())
val hogGroup = HogGroup.decide(workflowSourceFiles.workflowOptions, workflowId)

WorkflowStoreEntry(
Expand Down
4 changes: 2 additions & 2 deletions engine/src/main/scala/cromwell/server/CromwellRootActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import cromwell.engine.workflow.workflowstore.AbortRequestScanningActor.AbortCon
import cromwell.engine.workflow.workflowstore._
import cromwell.jobstore.{JobStore, JobStoreActor, SqlJobStore}
import cromwell.services.ServiceRegistryActor.IoActorRef
import cromwell.services.{EngineServicesStore, ServiceRegistryActor}
import cromwell.services.{EngineServicesStore, MetadataServicesStore, ServiceRegistryActor}
import cromwell.subworkflowstore.{SqlSubWorkflowStore, SubWorkflowStore, SubWorkflowStoreActor}
import cromwell.util.GracefulShutdownHelper
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
Expand Down Expand Up @@ -72,7 +72,7 @@ abstract class CromwellRootActor(terminator: CromwellTerminator,
lazy val serviceRegistryActor: ActorRef = context.actorOf(ServiceRegistryActor.props(config), "ServiceRegistryActor")
lazy val numberOfWorkflowLogCopyWorkers = systemConfig.as[Option[Int]]("number-of-workflow-log-copy-workers").getOrElse(DefaultNumberOfWorkflowLogCopyWorkers)

lazy val workflowStore: WorkflowStore = SqlWorkflowStore(EngineServicesStore.engineDatabaseInterface)
lazy val workflowStore: WorkflowStore = SqlWorkflowStore(EngineServicesStore.engineDatabaseInterface, MetadataServicesStore.metadataDatabaseInterface)

val workflowStoreAccess: WorkflowStoreAccess = {
val coordinatedWorkflowStoreAccess = config.as[Option[Boolean]]("system.coordinated-workflow-store-access")
Expand Down
Loading

0 comments on commit 660229b

Please sign in to comment.