Skip to content

Commit

Permalink
BW-1354 - Porting CBAS preliminary step (#6837)
Browse files Browse the repository at this point in the history
* Getting rid of shared utility file; Adding/Updating WES version of submit.

* Edit spec file

* Adding Wes-like error
  • Loading branch information
kpierre13 authored Aug 25, 2022
1 parent e802f29 commit 79afa29
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package cromwell.webservice.routes

import java.util.UUID

import akka.actor.{ActorRef, ActorRefFactory}
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import akka.http.scaladsl.model.ContentTypes._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ContentTypes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.pattern.{AskTimeoutException, ask}
import akka.stream.ActorMaterializer
import akka.util.Timeout
import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import com.typesafe.config.ConfigFactory
import common.exception.AggregatedMessageException
import common.util.VersionUtil
import cromwell.core.abort._
Expand All @@ -20,24 +24,25 @@ import cromwell.engine.backend.BackendConfiguration
import cromwell.engine.instrumentation.HttpInstrumentation
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{CachedCallNotFoundException, CallCacheDiffActorResponse, FailedCallCacheDiffResponse, SuccessfulCallCacheDiffResponse}
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter
import cromwell.engine.workflow.lifecycle.execution.callcaching.{CallCacheDiffActor, CallCacheDiffQueryParameter}
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.NotInOnHoldStateException
import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor}
import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor, WorkflowStoreSubmitActor}
import cromwell.server.CromwellShutdown
import cromwell.services._
import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse}
import cromwell.services.metadata.MetadataService._
import cromwell.webservice.WebServiceUtils.EnhancedThrowable
import cromwell.webservice.WorkflowJsonSupport._
import cromwell.webservice._
import cromwell.services._
import cromwell.webservice.WorkflowJsonSupport._
import cromwell.webservice.WebServiceUtils
import cromwell.webservice.WebServiceUtils.EnhancedThrowable
import net.ceedubs.ficus.Ficus._

import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.io.Source
import scala.util.{Failure, Success, Try}

trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils with WesCromwellRouteSupport {
trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils {
import CromwellApiService._

implicit def actorRefFactory: ActorRefFactory
Expand All @@ -48,6 +53,10 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
val workflowManagerActor: ActorRef
val serviceRegistryActor: ActorRef

// Derive timeouts (implicit and not) from akka http's request timeout since there's no point in being higher than that
implicit val duration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout")
implicit val timeout: Timeout = duration

val engineRoutes = concat(
path("engine" / Segment / "stats") { _ =>
get {
Expand Down Expand Up @@ -173,6 +182,58 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
}
}

private def toResponse(workflowId: WorkflowId, workflowState: WorkflowState): WorkflowSubmitResponse = {
WorkflowSubmitResponse(workflowId.toString, workflowState.toString)
}

private def submitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = {

def getWorkflowState(workflowOnHold: Boolean): WorkflowState = {
if (workflowOnHold)
WorkflowOnHold
else WorkflowSubmitted
}

def askSubmit(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = {
// NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons.
onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) {
case Success(w) =>
w match {
case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) =>
completeResponse(StatusCodes.Created, toResponse(workflowId, workflowState), warnings)
case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) =>
completeResponse(StatusCodes.Created, workflowIds.toList.map(toResponse(_, workflowState)), warnings)
case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) =>
throwable.failRequest(StatusCodes.BadRequest, warnings)
}
case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable)
case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings)
}
}

onComplete(materializeFormData(formData)) {
case Success(data) =>
PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match {
case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 =>
val warnings = workflowSourceFiles.flatMap(_.warnings)
askSubmit(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold))
// Catches the case where someone has gone through the single submission endpoint w/ more than one workflow
case Success(workflowSourceFiles) if isSingleSubmission =>
val warnings = workflowSourceFiles.flatMap(_.warnings)
val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.")
e.failRequest(StatusCodes.BadRequest, warnings)
case Success(workflowSourceFiles) =>
val warnings = workflowSourceFiles.flatMap(_.warnings)
askSubmit(
WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)),
warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold))
case Failure(t) => t.failRequest(StatusCodes.BadRequest)
}
case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable)
case Failure(e) => e.failRequest(StatusCodes.InternalServerError)
}
}
}

object CromwellApiService {
Expand Down Expand Up @@ -261,4 +322,4 @@ object CromwellApiService {
val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name)
val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson))
val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,41 +1,47 @@
package cromwell.webservice.routes.wes

import akka.actor.ActorRef
import akka.http.scaladsl.model.{StatusCode, StatusCodes}
import akka.http.scaladsl.model.{Multipart, StatusCode, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import akka.http.scaladsl.server.{Directive1, Route}
import akka.pattern.{AskTimeoutException, ask}
import akka.stream.ActorMaterializer
import akka.util.Timeout
import cats.data.NonEmptyList
import com.typesafe.config.ConfigFactory
import cromwell.core.WorkflowId
import cromwell.core.abort.SuccessfulAbortResponse
import cromwell.core.{WorkflowId, WorkflowOnHold, WorkflowState, WorkflowSubmitted}
import cromwell.engine.instrumentation.HttpInstrumentation
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreSubmitActor}
import cromwell.server.CromwellShutdown
import cromwell.services.metadata.MetadataService.{BuildMetadataJsonAction, GetSingleWorkflowMetadataAction, GetStatus, MetadataServiceResponse, StatusLookupFailed}
import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResponse}
import cromwell.webservice.WebServiceUtils.EnhancedThrowable
import cromwell.webservice.PartialWorkflowSources
import cromwell.webservice.WebServiceUtils.{EnhancedThrowable, completeResponse, materializeFormData}
import cromwell.webservice.routes.CromwellApiService
import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata}
import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest}
import cromwell.webservice.routes.wes.WesResponseJsonSupport._
import cromwell.webservice.routes.wes.WesRouteSupport._
import cromwell.webservice.routes.{CromwellApiService, WesCromwellRouteSupport}
import cromwell.webservice.routes.wes.WesRouteSupport.{respondWithWesError, _}
import net.ceedubs.ficus.Ficus._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.{Failure, Success}



trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport {
trait WesRouteSupport extends HttpInstrumentation {

val serviceRegistryActor: ActorRef
val workflowManagerActor: ActorRef
val workflowStoreActor: ActorRef

implicit val ec: ExecutionContext
implicit val timeout: Timeout
implicit val materializer: ActorMaterializer

/*
Defines routes intended to sit alongside the primary Cromwell REST endpoints. For instance, we'll now have:
Expand Down Expand Up @@ -68,7 +74,7 @@ trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport {
} ~
post {
extractSubmission() { submission =>
submitRequest(submission.entity,
wesSubmitRequest(submission.entity,
isSingleSubmission = true)
}
}
Expand Down Expand Up @@ -108,6 +114,62 @@ trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport {
}
)
}

def toWesResponse(workflowId: WorkflowId, workflowState: WorkflowState): WesRunStatus = {
WesRunStatus(workflowId.toString, WesState.fromCromwellStatus(workflowState))
}

def toWesResponseId(workflowId: WorkflowId): WesRunId ={
WesRunId(workflowId.toString)
}

def wesSubmitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = {
def getWorkflowState(workflowOnHold: Boolean): WorkflowState = {
if (workflowOnHold)
WorkflowOnHold
else WorkflowSubmitted
}

def sendToWorkflowStore(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = {
// NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons.
onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) {
case Success(w) =>
w match {
case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) =>
completeResponse(StatusCodes.Created, toWesResponseId(workflowId), warnings)
case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) =>
completeResponse(StatusCodes.Created, workflowIds.toList.map(toWesResponse(_, workflowState)), warnings)
case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) =>
respondWithWesError(throwable.getLocalizedMessage, StatusCodes.BadRequest)
}
case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => respondWithWesError("Cromwell service is shutting down", StatusCodes.InternalServerError)
case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable)
case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings)
}
}

onComplete(materializeFormData(formData)) {
case Success(data) =>
PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match {
case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 =>
val warnings = workflowSourceFiles.flatMap(_.warnings)
sendToWorkflowStore(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold))
// Catches the case where someone has gone through the single submission endpoint w/ more than one workflow
case Success(workflowSourceFiles) if isSingleSubmission =>
val warnings = workflowSourceFiles.flatMap(_.warnings)
val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.")
e.failRequest(StatusCodes.BadRequest, warnings)
case Success(workflowSourceFiles) =>
val warnings = workflowSourceFiles.flatMap(_.warnings)
sendToWorkflowStore(
WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)),
warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold))
case Failure(t) => t.failRequest(StatusCodes.BadRequest)
}
case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable)
case Failure(e) => respondWithWesError(e.getLocalizedMessage, StatusCodes.InternalServerError)
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ class WesRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest with Mat
check {
assertResult(
s"""{
| "id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}",
| "status": "Submitted"
| "run_id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}"
|}""".stripMargin) {
responseAs[String].parseJson.prettyPrint
}
Expand Down

0 comments on commit 79afa29

Please sign in to comment.