diff --git a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala index 637509cc4fd..a1c4f023135 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala @@ -1,10 +1,13 @@ package cromwell.webservice.routes +import java.util.UUID + import akka.actor.{ActorRef, ActorRefFactory} +import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling.ToResponseMarshallable -import akka.http.scaladsl.model.ContentTypes._ import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.ContentTypes._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{ExceptionHandler, Route} import akka.pattern.{AskTimeoutException, ask} @@ -12,6 +15,7 @@ import akka.stream.ActorMaterializer import akka.util.Timeout import cats.data.NonEmptyList import cats.data.Validated.{Invalid, Valid} +import com.typesafe.config.ConfigFactory import common.exception.AggregatedMessageException import common.util.VersionUtil import cromwell.core.abort._ @@ -20,24 +24,25 @@ import cromwell.engine.backend.BackendConfiguration import cromwell.engine.instrumentation.HttpInstrumentation import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{CachedCallNotFoundException, CallCacheDiffActorResponse, FailedCallCacheDiffResponse, SuccessfulCallCacheDiffResponse} -import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter import cromwell.engine.workflow.lifecycle.execution.callcaching.{CallCacheDiffActor, CallCacheDiffQueryParameter} import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.NotInOnHoldStateException -import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor} +import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor, WorkflowStoreSubmitActor} import cromwell.server.CromwellShutdown -import cromwell.services._ import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse} import cromwell.services.metadata.MetadataService._ -import cromwell.webservice.WebServiceUtils.EnhancedThrowable -import cromwell.webservice.WorkflowJsonSupport._ import cromwell.webservice._ +import cromwell.services._ +import cromwell.webservice.WorkflowJsonSupport._ +import cromwell.webservice.WebServiceUtils +import cromwell.webservice.WebServiceUtils.EnhancedThrowable +import net.ceedubs.ficus.Ficus._ -import java.util.UUID +import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.io.Source import scala.util.{Failure, Success, Try} -trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils with WesCromwellRouteSupport { +trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils { import CromwellApiService._ implicit def actorRefFactory: ActorRefFactory @@ -48,6 +53,10 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w val workflowManagerActor: ActorRef val serviceRegistryActor: ActorRef + // Derive timeouts (implicit and not) from akka http's request timeout since there's no point in being higher than that + implicit val duration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout") + implicit val timeout: Timeout = duration + val engineRoutes = concat( path("engine" / Segment / "stats") { _ => get { @@ -173,6 +182,58 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w } } + private def toResponse(workflowId: WorkflowId, workflowState: WorkflowState): WorkflowSubmitResponse = { + WorkflowSubmitResponse(workflowId.toString, workflowState.toString) + } + + private def submitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = { + + def getWorkflowState(workflowOnHold: Boolean): WorkflowState = { + if (workflowOnHold) + WorkflowOnHold + else WorkflowSubmitted + } + + def askSubmit(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = { + // NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons. + onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) { + case Success(w) => + w match { + case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) => + completeResponse(StatusCodes.Created, toResponse(workflowId, workflowState), warnings) + case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) => + completeResponse(StatusCodes.Created, workflowIds.toList.map(toResponse(_, workflowState)), warnings) + case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) => + throwable.failRequest(StatusCodes.BadRequest, warnings) + } + case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings) + } + } + + onComplete(materializeFormData(formData)) { + case Success(data) => + PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match { + case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 => + val warnings = workflowSourceFiles.flatMap(_.warnings) + askSubmit(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + // Catches the case where someone has gone through the single submission endpoint w/ more than one workflow + case Success(workflowSourceFiles) if isSingleSubmission => + val warnings = workflowSourceFiles.flatMap(_.warnings) + val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.") + e.failRequest(StatusCodes.BadRequest, warnings) + case Success(workflowSourceFiles) => + val warnings = workflowSourceFiles.flatMap(_.warnings) + askSubmit( + WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)), + warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + case Failure(t) => t.failRequest(StatusCodes.BadRequest) + } + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => e.failRequest(StatusCodes.InternalServerError) + } + } } object CromwellApiService { @@ -261,4 +322,4 @@ object CromwellApiService { val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name) val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson)) val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable) -} \ No newline at end of file +} diff --git a/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala deleted file mode 100644 index 349d36a9251..00000000000 --- a/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala +++ /dev/null @@ -1,90 +0,0 @@ -package cromwell.webservice.routes - - -import akka.actor.{ActorRef, ActorRefFactory} -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.model.{Multipart, StatusCodes} -import akka.http.scaladsl.server.Directives.onComplete -import akka.http.scaladsl.server.Route -import akka.pattern.{AskTimeoutException, ask} -import akka.stream.ActorMaterializer -import akka.util.Timeout -import cats.data.NonEmptyList -import com.typesafe.config.ConfigFactory -import cromwell.core.{WorkflowId, WorkflowOnHold, WorkflowState, WorkflowSubmitted, path => _} -import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreSubmitActor} -import cromwell.server.CromwellShutdown -import cromwell.webservice.WebServiceUtils.EnhancedThrowable -import cromwell.webservice.WorkflowJsonSupport._ -import cromwell.webservice.{PartialWorkflowSources, WebServiceUtils, WorkflowSubmitResponse} -import net.ceedubs.ficus.Ficus._ - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, TimeoutException} -import scala.util.{Failure, Success} - -trait WesCromwellRouteSupport extends WebServiceUtils { - - val workflowStoreActor: ActorRef - - implicit val duration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout") - implicit val timeout: Timeout = duration - - implicit def actorRefFactory: ActorRefFactory - - implicit val materializer: ActorMaterializer - implicit val ec: ExecutionContext - - def toResponse(workflowId: WorkflowId, workflowState: WorkflowState): WorkflowSubmitResponse = { - WorkflowSubmitResponse(workflowId.toString, workflowState.toString) - } - - def submitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = { - - def getWorkflowState(workflowOnHold: Boolean): WorkflowState = { - if (workflowOnHold) - WorkflowOnHold - else WorkflowSubmitted - } - - def sendToWorkflowStore(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = { - // NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons. - onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) { - case Success(w) => - w match { - case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) => - completeResponse(StatusCodes.Created, toResponse(workflowId, workflowState), warnings) - case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) => - completeResponse(StatusCodes.Created, workflowIds.toList.map(toResponse(_, workflowState)), warnings) - case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) => - throwable.failRequest(StatusCodes.BadRequest, warnings) - } - case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => CromwellApiService.serviceShuttingDownResponse - case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) - case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings) - } - } - - onComplete(materializeFormData(formData)) { - case Success(data) => - PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match { - case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 => - val warnings = workflowSourceFiles.flatMap(_.warnings) - sendToWorkflowStore(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) - // Catches the case where someone has gone through the single submission endpoint w/ more than one workflow - case Success(workflowSourceFiles) if isSingleSubmission => - val warnings = workflowSourceFiles.flatMap(_.warnings) - val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.") - e.failRequest(StatusCodes.BadRequest, warnings) - case Success(workflowSourceFiles) => - val warnings = workflowSourceFiles.flatMap(_.warnings) - sendToWorkflowStore( - WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)), - warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) - case Failure(t) => t.failRequest(StatusCodes.BadRequest) - } - case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) - case Failure(e) => e.failRequest(StatusCodes.InternalServerError) - } - } -} diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala index 7ec56e3317a..ba6546fb3e3 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala @@ -1,41 +1,47 @@ package cromwell.webservice.routes.wes import akka.actor.ActorRef -import akka.http.scaladsl.model.{StatusCode, StatusCodes} +import akka.http.scaladsl.model.{Multipart, StatusCode, StatusCodes} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.directives.RouteDirectives.complete import akka.http.scaladsl.server.{Directive1, Route} import akka.pattern.{AskTimeoutException, ask} +import akka.stream.ActorMaterializer import akka.util.Timeout +import cats.data.NonEmptyList import com.typesafe.config.ConfigFactory -import cromwell.core.WorkflowId import cromwell.core.abort.SuccessfulAbortResponse +import cromwell.core.{WorkflowId, WorkflowOnHold, WorkflowState, WorkflowSubmitted} import cromwell.engine.instrumentation.HttpInstrumentation import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException +import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreSubmitActor} import cromwell.server.CromwellShutdown import cromwell.services.metadata.MetadataService.{BuildMetadataJsonAction, GetSingleWorkflowMetadataAction, GetStatus, MetadataServiceResponse, StatusLookupFailed} import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResponse} -import cromwell.webservice.WebServiceUtils.EnhancedThrowable +import cromwell.webservice.PartialWorkflowSources +import cromwell.webservice.WebServiceUtils.{EnhancedThrowable, completeResponse, materializeFormData} +import cromwell.webservice.routes.CromwellApiService import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata} import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest} import cromwell.webservice.routes.wes.WesResponseJsonSupport._ -import cromwell.webservice.routes.wes.WesRouteSupport._ -import cromwell.webservice.routes.{CromwellApiService, WesCromwellRouteSupport} +import cromwell.webservice.routes.wes.WesRouteSupport.{respondWithWesError, _} import net.ceedubs.ficus.Ficus._ import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.util.{Failure, Success} -trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { +trait WesRouteSupport extends HttpInstrumentation { val serviceRegistryActor: ActorRef val workflowManagerActor: ActorRef + val workflowStoreActor: ActorRef implicit val ec: ExecutionContext implicit val timeout: Timeout + implicit val materializer: ActorMaterializer /* Defines routes intended to sit alongside the primary Cromwell REST endpoints. For instance, we'll now have: @@ -68,7 +74,7 @@ trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { } ~ post { extractSubmission() { submission => - submitRequest(submission.entity, + wesSubmitRequest(submission.entity, isSingleSubmission = true) } } @@ -108,6 +114,62 @@ trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { } ) } + + def toWesResponse(workflowId: WorkflowId, workflowState: WorkflowState): WesRunStatus = { + WesRunStatus(workflowId.toString, WesState.fromCromwellStatus(workflowState)) + } + + def toWesResponseId(workflowId: WorkflowId): WesRunId ={ + WesRunId(workflowId.toString) + } + + def wesSubmitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = { + def getWorkflowState(workflowOnHold: Boolean): WorkflowState = { + if (workflowOnHold) + WorkflowOnHold + else WorkflowSubmitted + } + + def sendToWorkflowStore(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = { + // NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons. + onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) { + case Success(w) => + w match { + case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) => + completeResponse(StatusCodes.Created, toWesResponseId(workflowId), warnings) + case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) => + completeResponse(StatusCodes.Created, workflowIds.toList.map(toWesResponse(_, workflowState)), warnings) + case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) => + respondWithWesError(throwable.getLocalizedMessage, StatusCodes.BadRequest) + } + case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => respondWithWesError("Cromwell service is shutting down", StatusCodes.InternalServerError) + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings) + } + } + + onComplete(materializeFormData(formData)) { + case Success(data) => + PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match { + case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 => + val warnings = workflowSourceFiles.flatMap(_.warnings) + sendToWorkflowStore(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + // Catches the case where someone has gone through the single submission endpoint w/ more than one workflow + case Success(workflowSourceFiles) if isSingleSubmission => + val warnings = workflowSourceFiles.flatMap(_.warnings) + val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.") + e.failRequest(StatusCodes.BadRequest, warnings) + case Success(workflowSourceFiles) => + val warnings = workflowSourceFiles.flatMap(_.warnings) + sendToWorkflowStore( + WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)), + warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + case Failure(t) => t.failRequest(StatusCodes.BadRequest) + } + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => respondWithWesError(e.getLocalizedMessage, StatusCodes.InternalServerError) + } + } } diff --git a/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala index c8306088ded..6e9a390ad4a 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala @@ -165,8 +165,7 @@ class WesRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest with Mat check { assertResult( s"""{ - | "id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}", - | "status": "Submitted" + | "run_id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}" |}""".stripMargin) { responseAs[String].parseJson.prettyPrint }