From ea8a18732c4c3475cafbd4866b7b060ce6d725c7 Mon Sep 17 00:00:00 2001 From: Janet Gainer-Dewar Date: Thu, 28 Jul 2022 15:08:17 +0000 Subject: [PATCH 01/13] Update cromwell version from 83 to 84 --- project/Version.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Version.scala b/project/Version.scala index 80f74d66d3f..1d21894dfb0 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -5,7 +5,7 @@ import sbt._ object Version { // Upcoming release, or current if we're on a master / hotfix branch - val cromwellVersion = "83" + val cromwellVersion = "84" /** * Returns true if this project should be considered a snapshot. From 12f5463da7899ae8eddf6f71e4600f42664b998c Mon Sep 17 00:00:00 2001 From: Katrina P <68349264+kpierre13@users.noreply.github.com> Date: Fri, 29 Jul 2022 00:59:41 -0400 Subject: [PATCH 02/13] BW-1255 Implement POST /runs endpoint (#6779) * Adding route * Fixing HTTP method error * All formFields made optional * A compliling state * Saving * Saving * All three endpoints functioning as expected; updated RESTAPI.md * Updated response for submission from 200 to 201 to pass tests * Test submission response * Moved updated submission response to askSubmit * test * updating RESTAPI.md * saving * Adding utility file for submitRequest * cleanup --- docs/api/RESTAPI.md | 59 ++- .../src/main/resources/swagger/cromwell.yaml | 104 +++++ .../routes/CromwellApiService.scala | 361 ++++++++---------- .../routes/WesCromwellRouteSupport.scala | 89 +++++ .../webservice/routes/wes/WesRunRoutes.scala | 44 ++- .../webservice/routes/wes/WesSubmission.scala | 44 +++ 6 files changed, 481 insertions(+), 220 deletions(-) create mode 100644 engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala create mode 100644 engine/src/main/scala/cromwell/webservice/routes/wes/WesSubmission.scala diff --git a/docs/api/RESTAPI.md b/docs/api/RESTAPI.md index b15072f6407..aca058df091 100644 --- a/docs/api/RESTAPI.md +++ b/docs/api/RESTAPI.md @@ -1,5 +1,5 @@ + + + @@ -18,7 +28,7 @@ - %date %X{sourceThread} %-5level - %msg%n + %ed{yyyy-MM-dd HH:mm:ss,SSS} %et %-5level - %msg%n @@ -68,7 +78,7 @@ - %d{yyyy-MM-dd HH:mm:ss,SSS} [%thread] %-5level %logger{35} - %msg%n + %ed{yyyy-MM-dd HH:mm:ss,SSS} [%et] %-5level %logger{35} - %msg%n diff --git a/core/src/main/scala/cromwell/core/logging/EnhancedDateConverter.scala b/core/src/main/scala/cromwell/core/logging/EnhancedDateConverter.scala new file mode 100644 index 00000000000..dcf62bc0b28 --- /dev/null +++ b/core/src/main/scala/cromwell/core/logging/EnhancedDateConverter.scala @@ -0,0 +1,70 @@ +package cromwell.core.logging + +import ch.qos.logback.classic.pattern.DateConverter +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.CoreConstants +import ch.qos.logback.core.util.CachingDateFormatter + +import java.util.TimeZone +import scala.jdk.CollectionConverters._ + +/** + * Log the Akka akkaTimestamp if found in the MDC, otherwise log the original event timestamp. + * + * - https://doc.akka.io/docs/akka/current/logging.html#more-accurate-timestamps-for-log-output-in-mdc + * - https://logback.qos.ch/manual/layouts.html#customConversionSpecifier + * + * NOTE: For proper configuration both this EnhancedDateConverter should be configured into the logback.xml AND the + * configuration file should set akka.loggers = ["cromwell.core.logging.EnhancedSlf4jLogger"]. + */ +class EnhancedDateConverter extends DateConverter { + protected var cachingDateFormatterProtected: CachingDateFormatter = _ + + /* Duplicated from ch.qos.logback.classic.pattern.DateConverter as cachingDateFormatter is package private. */ + override def start(): Unit = { + cachingDateFormatterProtected = Option(getFirstOption) match { + case Some(CoreConstants.ISO8601_STR) | None => new CachingDateFormatter(CoreConstants.ISO8601_PATTERN) + case Some(datePattern) => + try { + new CachingDateFormatter(datePattern) + } catch { + case e: IllegalArgumentException => + addWarn("Could not instantiate SimpleDateFormat with pattern " + datePattern, e) + // default to the ISO8601 format + new CachingDateFormatter(CoreConstants.ISO8601_PATTERN) + } + } + // if the option list contains a TZ option, then set it. + Option(getOptionList) + .toList + .flatMap(_.asScala) + .drop(1) + .headOption + .map(TimeZone.getTimeZone) + .foreach(cachingDateFormatterProtected.setTimeZone) + + // Allow the parent class to start/initialize its private members. + super.start() + } + + /** + * Look for the Akka timestamp and use that to format the date. + * + * Until this (currently 6+ year) issue is resolved, formatting the date as a Long requires using the + * [[EnhancedSlf4jLogger]] versus Akka's basic Slf4jLogger. + * + * - https://github.com/akka/akka/issues/18079#issuecomment-125175884 + */ + override def convert(event: ILoggingEvent): String = { + val mdc = event.getMDCPropertyMap + if (mdc.containsKey("akkaTimestamp")) { + val timestamp = mdc.get("akkaTimestamp") + timestamp.toLongOption match { + case Some(value) => cachingDateFormatterProtected.format(value) + case None => timestamp // Return the original timestamp string. + } + } else { + super.convert(event) + } + } +} diff --git a/core/src/main/scala/cromwell/core/logging/EnhancedSlf4jLogger.scala b/core/src/main/scala/cromwell/core/logging/EnhancedSlf4jLogger.scala new file mode 100644 index 00000000000..0999ec18055 --- /dev/null +++ b/core/src/main/scala/cromwell/core/logging/EnhancedSlf4jLogger.scala @@ -0,0 +1,16 @@ +package cromwell.core.logging + +import akka.event.slf4j.Slf4jLogger + +class EnhancedSlf4jLogger extends Slf4jLogger { + /** + * Format the timestamp as a simple long. Allows the akkaTimestamp to be retrieved later from the MDC by custom + * converters. + * + * NOTE: Should not be necessary once this issue is resolved: + * - https://github.com/akka/akka/issues/18079#issuecomment-125175884 + * + * @see [[EnhancedDateConverter.convert()]] + */ + override protected def formatTimestamp(timestamp: Long): String = String.valueOf(timestamp) +} diff --git a/core/src/main/scala/cromwell/core/logging/EnhancedThreadConverter.scala b/core/src/main/scala/cromwell/core/logging/EnhancedThreadConverter.scala new file mode 100644 index 00000000000..73f5876597b --- /dev/null +++ b/core/src/main/scala/cromwell/core/logging/EnhancedThreadConverter.scala @@ -0,0 +1,21 @@ +package cromwell.core.logging + +import ch.qos.logback.classic.pattern.ThreadConverter +import ch.qos.logback.classic.spi.ILoggingEvent + +/** + * Log the Akka sourceThread if found, otherwise log the event thread. + * + * - https://doc.akka.io/docs/akka/current/logging.html#logging-thread-akka-source-and-actor-system-in-mdc + * - https://logback.qos.ch/manual/layouts.html#customConversionSpecifier + */ +class EnhancedThreadConverter extends ThreadConverter { + override def convert(event: ILoggingEvent): String = { + val mdc = event.getMDCPropertyMap + if (mdc.containsKey("sourceThread")) { + mdc.get("sourceThread") + } else { + super.convert(event) + } + } +} diff --git a/core/src/main/scala/cromwell/core/logging/JavaLoggingBridge.scala b/core/src/main/scala/cromwell/core/logging/JavaLoggingBridge.scala new file mode 100644 index 00000000000..1dc10fab46d --- /dev/null +++ b/core/src/main/scala/cromwell/core/logging/JavaLoggingBridge.scala @@ -0,0 +1,39 @@ +package cromwell.core.logging + +import ch.qos.logback.classic.LoggerContext +import ch.qos.logback.classic.jul.LevelChangePropagator +import org.slf4j.LoggerFactory +import org.slf4j.bridge.SLF4JBridgeHandler + +import scala.jdk.CollectionConverters._ + +object JavaLoggingBridge { + /** + * Replace java.util.logging with SLF4J while ensuring Logback is configured with a LevelChangePropogator. + * + * One likely won't need to do this but just in case: note that any libraries using JUL running BEFORE this + * initialization which require increasing or decreasing verbosity must be configured via JUL not Logback. + * + * See also: + * - https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html + * - https://docs.oracle.com/en/java/javase/11/docs/api/java.logging/java/util/logging/LogManager.html + */ + def init(): Unit = { + // Retrieve the Logback context, and as a side effect initialize Logback. + val ctx = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] + + // Ensure that Logback has a LevelChangePropagator, either here or via a logback.xml. + val listeners = ctx.getCopyOfListenerList.asScala + if (!listeners.exists(_.isInstanceOf[LevelChangePropagator])) { + val propagator = new LevelChangePropagator() + propagator.setContext(ctx) + propagator.start() + } + + // Remove all the JUL logging handlers. + SLF4JBridgeHandler.removeHandlersForRootLogger() + + // Send all JUL logging to SLF4J. + SLF4JBridgeHandler.install() + } +} diff --git a/database/migration/src/main/scala/cromwell/database/migration/liquibase/LiquibaseUtils.scala b/database/migration/src/main/scala/cromwell/database/migration/liquibase/LiquibaseUtils.scala index 20a77f58c02..e4823b8b0ac 100644 --- a/database/migration/src/main/scala/cromwell/database/migration/liquibase/LiquibaseUtils.scala +++ b/database/migration/src/main/scala/cromwell/database/migration/liquibase/LiquibaseUtils.scala @@ -1,7 +1,5 @@ package cromwell.database.migration.liquibase -import java.sql.Connection - import liquibase.changelog.{ChangeLogParameters, ChangeSet, DatabaseChangeLog} import liquibase.database.jvm.{HsqlConnection, JdbcConnection} import liquibase.database.{Database, DatabaseConnection, DatabaseFactory, ObjectQuotingStrategy} @@ -10,12 +8,21 @@ import liquibase.diff.{DiffGeneratorFactory, DiffResult} import liquibase.parser.ChangeLogParserFactory import liquibase.resource.ClassLoaderResourceAccessor import liquibase.snapshot.{DatabaseSnapshot, SnapshotControl, SnapshotGeneratorFactory} -import liquibase.{Contexts, LabelExpression, Liquibase} +import liquibase.ui.LoggerUIService +import liquibase.{Contexts, LabelExpression, Liquibase, Scope} import org.hsqldb.persist.HsqlDatabaseProperties +import java.sql.Connection import scala.jdk.CollectionConverters._ object LiquibaseUtils { + + /* + Move liquibase calls to System.out.println to a logger. + Workaround for issue: https://github.com/liquibase/liquibase/issues/1741#issuecomment-853742652 + */ + Scope.enter(Map(Scope.Attr.ui.name -> new LoggerUIService().asInstanceOf[AnyRef]).asJava) + // Paranoia: Create our own mutex. https://stackoverflow.com/questions/442564/avoid-synchronizedthis-in-java private val mutex = new Object private val DefaultContexts = new Contexts() diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4c9999f0840..2e15f88a5f9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -236,7 +236,13 @@ object Dependencies { "org.codehaus.janino" % "janino" % janinoV, // Replace all log4j usage with slf4j // https://www.slf4j.org/legacy.html#log4j-over-slf4j - "org.slf4j" % "log4j-over-slf4j" % slf4jV + "org.slf4j" % "log4j-over-slf4j" % slf4jV, + // Replace all commons-logging usage with slf4j + // https://www.slf4j.org/legacy.html#jcl-over-slf4j + "org.slf4j" % "jcl-over-slf4j" % slf4jV, + // Enable runtime replacing of java.util.logging usage with slf4j + // https://www.slf4j.org/legacy.html#jul-to-slf4j + "org.slf4j" % "jul-to-slf4j" % slf4jV, ) ++ slf4jFacadeDependencies private val slickDependencies = List( @@ -716,4 +722,12 @@ object Dependencies { nimbusdsOverrides ++ bouncyCastleOverrides ++ protobufJavaOverrides + + /* + Libraries that should be globally excluded. + */ + val cromwellExcludeDependencies: List[ExclusionRule] = List( + // Replaced with jcl-over-slf4j + ExclusionRule("commons-logging", "commons-logging"), + ) } diff --git a/project/Settings.scala b/project/Settings.scala index 79242733583..951b1fc8fe6 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -86,6 +86,7 @@ object Settings { Tags.limit(Tags.Test, 1) ), dependencyOverrides ++= cromwellDependencyOverrides, + excludeDependencies ++= cromwellExcludeDependencies, scalacOptions ++= baseSettings ++ warningSettings ++ consoleHostileSettings, // http://stackoverflow.com/questions/31488335/scaladoc-2-11-6-fails-on-throws-tag-with-unable-to-find-any-member-to-link#31497874 Compile / doc / scalacOptions ++= baseSettings ++ List("-no-link-warnings"), diff --git a/server/src/main/resources/application.conf b/server/src/main/resources/application.conf index e0769277a6b..e80cd716b4e 100644 --- a/server/src/main/resources/application.conf +++ b/server/src/main/resources/application.conf @@ -1,6 +1,6 @@ akka { log-dead-letters = "off" - loggers = ["akka.event.slf4j.Slf4jLogger"] + loggers = ["cromwell.core.logging.EnhancedSlf4jLogger"] logging-filter = "cromwell.server.CromwellAkkaLogFilter" actor.guardian-supervisor-strategy = "cromwell.core.CromwellUserGuardianStrategy" diff --git a/server/src/main/resources/logback.xml b/server/src/main/resources/logback.xml index 4868962f994..b4d4ac4a335 100644 --- a/server/src/main/resources/logback.xml +++ b/server/src/main/resources/logback.xml @@ -1,5 +1,15 @@ + + + + @@ -18,7 +28,7 @@ - %date %X{sourceThread} %-5level - %msg%n + %ed{yyyy-MM-dd HH:mm:ss,SSS} %et %-5level - %msg%n @@ -68,7 +78,7 @@ - %d{yyyy-MM-dd HH:mm:ss,SSS} [%thread] %-5level %logger{35} - %msg%n + %ed{yyyy-MM-dd HH:mm:ss,SSS} [%et] %-5level %logger{35} - %msg%n diff --git a/server/src/main/scala/cromwell/CromwellEntryPoint.scala b/server/src/main/scala/cromwell/CromwellEntryPoint.scala index 5acddd75ce9..a1707db6488 100644 --- a/server/src/main/scala/cromwell/CromwellEntryPoint.scala +++ b/server/src/main/scala/cromwell/CromwellEntryPoint.scala @@ -16,6 +16,7 @@ import cromwell.CommandLineArguments.{ValidSubmission, WorkflowSourceOrUrl} import cromwell.CromwellApp._ import cromwell.api.CromwellClient import cromwell.api.model.{Label, LabelsJsonFormatter, WorkflowSingleSubmission} +import cromwell.core.logging.JavaLoggingBridge import cromwell.core.path.{DefaultPathBuilder, Path} import cromwell.core.{WorkflowSourceFilesCollection, WorkflowSourceFilesWithDependenciesZip, WorkflowSourceFilesWithoutImports} import cromwell.engine.workflow.SingleWorkflowRunnerActor @@ -166,6 +167,13 @@ object CromwellEntryPoint extends GracefulStopSupport { Make sure that the next time one uses the ConfigFactory that our updated system properties are loaded. */ ConfigFactory.invalidateCaches() + + /* + Replace java.util.logging with SLF4J. + https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html + */ + JavaLoggingBridge.init() + () } From 19d4fdbf71ceeb5bd38c16a2f9edb524aea89ad3 Mon Sep 17 00:00:00 2001 From: mspector Date: Thu, 11 Aug 2022 14:05:02 -0400 Subject: [PATCH 09/13] [BT-698] first pass on BlobTokenGenerator with E2E test (#6824) * first pass on BlobTokenGenerator with E2E test * update BlobPathBuilder constructor args in test * account -> container level client --- .../filesystems/blob/BlobPathBuilder.scala | 6 +- .../blob/BlobPathBuilderFactory.scala | 96 ++++++++++++++++++- .../blob/BlobPathBuilderSpec.scala | 22 ++--- project/Dependencies.scala | 4 +- 4 files changed, 109 insertions(+), 19 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 4d18d134ce5..69a21c90eda 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -55,10 +55,12 @@ object BlobPathBuilder { } } -class BlobPathBuilder(credential: AzureSasCredential, container: String, endpoint: String) extends PathBuilder { +class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder { + val credential: AzureSasCredential = new AzureSasCredential(blobTokenGenerator.getAccessToken) val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), - (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container)) + (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), + (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) def retrieveFilesystem(uri: URI): Try[FileSystem] = { Try(FileSystems.getFileSystem(uri)) recover { diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index 5e639084f24..cea7269522a 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -1,15 +1,22 @@ package cromwell.filesystems.blob import akka.actor.ActorSystem -import com.azure.core.credential.AzureSasCredential +import com.azure.core.management.AzureEnvironment +import com.azure.core.management.profile.AzureProfile +import com.azure.identity.DefaultAzureCredentialBuilder +import com.azure.resourcemanager.AzureResourceManager +import com.azure.storage.blob.BlobContainerClientBuilder +import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} +import com.azure.storage.common.StorageSharedKeyCredential import com.typesafe.config.Config import cromwell.core.WorkflowOptions import cromwell.core.path.PathBuilderFactory -import cromwell.filesystems.blob.BlobPathBuilder import net.ceedubs.ficus.Ficus._ +import java.time.OffsetDateTime import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.jdk.CollectionConverters._ final case class BlobFileSystemConfig(config: Config) final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { @@ -17,11 +24,92 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co val container: String = instanceConfig.as[String]("store") val endpoint: String = instanceConfig.as[String]("endpoint") val workspaceId: String = instanceConfig.as[String]("workspace-id") - val workspaceManagerURL = singletonConfig.config.as[String]("workspace-manager-url") + val workspaceManagerURL: String = singletonConfig.config.as[String]("workspace-manager-url") + + val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( + container, endpoint, Option(workspaceId), Option(workspaceManagerURL)) override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { Future { - new BlobPathBuilder(new AzureSasCredential(sasToken), container, endpoint) + new BlobPathBuilder(blobTokenGenerator, container, endpoint) } } } + +sealed trait BlobTokenGenerator { + def getAccessToken: String +} + +object BlobTokenGenerator { + def createBlobTokenGenerator(container: String, endpoint: String): BlobTokenGenerator = { + createBlobTokenGenerator(container, endpoint, None, None) + } + def createBlobTokenGenerator(container: String, endpoint: String, workspaceId: Option[String], workspaceManagerURL: Option[String]): BlobTokenGenerator = { + (container: String, endpoint: String, workspaceId, workspaceManagerURL) match { + case (container, endpoint, None, None) => + NativeBlobTokenGenerator(container, endpoint) + case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) => + WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL) + case _ => + throw new Exception("Arguments provided do not match any available BlobTokenGenerator implementation.") + } + } +} + +case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator { + def getAccessToken: String = { + throw new NotImplementedError + } +} + +case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator { + def getAccessToken: String = { + val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match { + case Some(storageAccountName) => storageAccountName + case _ => throw new Exception("Storage account could not be parsed from endpoint") + } + + val profile = new AzureProfile(AzureEnvironment.AZURE) + val azureCredential = new DefaultAzureCredentialBuilder() + .authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint) + .build + val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription + + val storageAccounts = azure.storageAccounts() + val storageAccount = storageAccounts + .list() + .asScala + .find(_.name == storageAccountName) + + val storageAccountKeys = storageAccount match { + case Some(value) => value.getKeys.asScala.map(_.value()) + case _ => throw new Exception("Storage Account not found") + } + + val storageAccountKey = storageAccountKeys.headOption match { + case Some(value) => value + case _ => throw new Exception("Storage Account has no keys") + } + + val keyCredential = new StorageSharedKeyCredential( + storageAccountName, + storageAccountKey + ) + val blobContainerClient = new BlobContainerClientBuilder() + .credential(keyCredential) + .endpoint(endpoint) + .containerName(container) + .buildClient() + + val blobContainerSasPermission = new BlobContainerSasPermission() + .setReadPermission(true) + .setCreatePermission(true) + .setListPermission(true) + val blobServiceSasSignatureValues = new BlobServiceSasSignatureValues( + OffsetDateTime.now.plusDays(1), + blobContainerSasPermission + ) + + blobContainerClient.generateSas(blobServiceSasSignatureValues) + } +} diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index fe553b86b69..69cec235aff 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -1,10 +1,6 @@ package cromwell.filesystems.blob - -import com.azure.core.credential.AzureSasCredential -import cromwell.filesystems.blob.BlobPathBuilder import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers - import java.nio.file.Files object BlobPathBuilderSpec { @@ -47,17 +43,19 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ } ignore should "build a blob path from a test string and read a file" in { - val endpoint = BlobPathBuilderSpec.buildEndpoint("teststorageaccount") + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost - val store = "testContainer" - val evalPath = "/test/file.txt" - val sas = "{SAS TOKEN HERE}" + val store = "inputs" + val evalPath = "/test/inputFile.txt" + val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) val testString = endpoint + "/" + store + evalPath - val blobPath: BlobPath = new BlobPathBuilder(new AzureSasCredential(sas), store, endpoint) build testString getOrElse fail() + val blobPath: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() + blobPath.container should equal(store) blobPath.endpoint should equal(endpoint) blobPath.pathAsString should equal(testString) blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath) + val is = Files.newInputStream(blobPath.nioPath) val fileText = (is.readAllBytes.map(_.toChar)).mkString fileText should include ("This is my test file!!!! Did it work?") @@ -67,10 +65,10 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = "inputs" val evalPath = "/test/inputFile.txt" - val sas = "{SAS TOKEN HERE}" + val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) val testString = endpoint + "/" + store + evalPath - val blobPath1: BlobPath = new BlobPathBuilder(new AzureSasCredential(sas), store, endpoint) build testString getOrElse fail() - val blobPath2: BlobPath = new BlobPathBuilder(new AzureSasCredential(sas), store, endpoint) build testString getOrElse fail() + val blobPath1: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() + val blobPath2: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() blobPath1 should equal(blobPath2) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2e15f88a5f9..a6cc50eb351 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -190,7 +190,9 @@ object Dependencies { exclude("jakarta.activation", "jakarta.activation-api"), "com.azure" % "azure-security-keyvault-secrets" % azureKeyVaultSdkV exclude("jakarta.xml.bind", "jakarta.xml.bind-api") - exclude("jakarta.activation", "jakarta.activation-api") + exclude("jakarta.activation", "jakarta.activation-api"), + "com.azure" % "azure-core-management" % "1.7.0", + "com.azure.resourcemanager" % "azure-resourcemanager" % "2.17.0" ) val implFtpDependencies = List( From 3a19be6af1292187714b6e0b3fedfaf6f98a4c6f Mon Sep 17 00:00:00 2001 From: mspector Date: Thu, 18 Aug 2022 15:23:10 -0400 Subject: [PATCH 10/13] [BT-687] specify correct types (#6829) * specify correct types * fix test with new type * remove type declarations in function call * remove unnecessary sas-token config --- .../cromwell/filesystems/blob/BlobPathBuilderFactory.scala | 7 +++---- .../filesystems/blob/BlobPathBuilderFactorySpec.scala | 7 ++----- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index cea7269522a..b12abf5cc34 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -20,14 +20,13 @@ import scala.jdk.CollectionConverters._ final case class BlobFileSystemConfig(config: Config) final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { - val sasToken: String = instanceConfig.as[String]("sas-token") val container: String = instanceConfig.as[String]("store") val endpoint: String = instanceConfig.as[String]("endpoint") - val workspaceId: String = instanceConfig.as[String]("workspace-id") - val workspaceManagerURL: String = singletonConfig.config.as[String]("workspace-manager-url") + val workspaceId: Option[String] = instanceConfig.as[Option[String]]("workspace-id") + val workspaceManagerURL: Option[String] = singletonConfig.config.as[Option[String]]("workspace-manager-url") val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( - container, endpoint, Option(workspaceId), Option(workspaceManagerURL)) + container, endpoint, workspaceId, workspaceManagerURL) override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { Future { diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala index 8c9b2345c69..08efd534056 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala @@ -9,12 +9,10 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers { it should "parse configs for a functioning factory" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = "inputs" - val sasToken = "{SAS TOKEN HERE}" val workspaceId = "mockWorkspaceId" val workspaceManagerURL = "https://test.ws.org" val instanceConfig = ConfigFactory.parseString( s""" - |sas-token = "$sasToken" |store = "$store" |endpoint = "$endpoint" |workspace-id = "$workspaceId" @@ -24,8 +22,7 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers { val factory = BlobPathBuilderFactory(globalConfig, instanceConfig, new BlobFileSystemConfig(singletonConfig)) factory.container should equal(store) factory.endpoint should equal(endpoint) - factory.sasToken should equal(sasToken) - factory.workspaceId should equal(workspaceId) - factory.workspaceManagerURL should equal(workspaceManagerURL) + factory.workspaceId should equal(Some(workspaceId)) + factory.workspaceManagerURL should equal(Some(workspaceManagerURL)) } } From e802f29549d15d807bff65d661d86883c940336e Mon Sep 17 00:00:00 2001 From: Katrina P <68349264+kpierre13@users.noreply.github.com> Date: Wed, 24 Aug 2022 13:09:32 -0400 Subject: [PATCH 11/13] BW-1206 - Combine all Wes Endpoints & add Tests (#6833) * Add tests, getting frid of WesRunRoutes.scala * wesWorkflowId fix, ec implicits errors gone * Refactoring path for GET /runs * Indentation fix * Commit to rollback * Revert "Indentation fix" This reverts commit 63fc4842c9d4eff68ec9cb7c3ef19e110696598b. * PR trigger * Optimize imports * Missed import --- .../cromwell/server/CromwellServer.scala | 4 +- .../routes/CromwellApiService.scala | 164 +++++++++--------- .../routes/WesCromwellRouteSupport.scala | 1 + .../routes/wes/WesRouteSupport.scala | 152 +++++++++++----- .../webservice/routes/wes/WesRunRoutes.scala | 94 ---------- .../routes/CromwellApiServiceSpec.scala | 21 ++- .../routes/wes/WesRouteSupportSpec.scala | 70 +++++++- 7 files changed, 266 insertions(+), 240 deletions(-) delete mode 100644 engine/src/main/scala/cromwell/webservice/routes/wes/WesRunRoutes.scala diff --git a/engine/src/main/scala/cromwell/server/CromwellServer.scala b/engine/src/main/scala/cromwell/server/CromwellServer.scala index b5705b91dae..76f784875fc 100644 --- a/engine/src/main/scala/cromwell/server/CromwellServer.scala +++ b/engine/src/main/scala/cromwell/server/CromwellServer.scala @@ -11,7 +11,6 @@ import cromwell.services.instrumentation.CromwellInstrumentationActor import cromwell.webservice.SwaggerService import cromwell.webservice.routes.CromwellApiService import cromwell.webservice.routes.wes.WesRouteSupport -import cromwell.webservice.routes.wes.WesRunRoutes import scala.concurrent.Future import scala.util.{Failure, Success} @@ -37,7 +36,6 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool with CromwellApiService with CromwellInstrumentationActor with WesRouteSupport - with WesRunRoutes with SwaggerService with ActorLogging { implicit val actorSystem = context.system @@ -53,7 +51,7 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool * cromwell.yaml is broken unless the swagger index.html is patched. Copy/paste the code from rawls or cromiam if * actual cromwell+swagger+oauth+/api support is needed. */ - val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes, runRoutes)) + val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes)) val nonApiRoutes: Route = concat(engineRoutes, swaggerUiResourceRoute) val allRoutes: Route = concat(apiRoutes, nonApiRoutes) diff --git a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala index 47aa8b57b5c..637509cc4fd 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala @@ -38,11 +38,9 @@ import scala.io.Source import scala.util.{Failure, Success, Try} trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils with WesCromwellRouteSupport { - import CromwellApiService._ implicit def actorRefFactory: ActorRefFactory - implicit val materializer: ActorMaterializer implicit val ec: ExecutionContext @@ -57,9 +55,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w } }, path("engine" / Segment / "version") { _ => - get { - complete(versionResponse) - } + get { complete(versionResponse) } }, path("engine" / Segment / "status") { _ => onComplete(serviceRegistryActor.ask(GetCurrentStatus).mapTo[StatusCheckResponse]) { @@ -74,11 +70,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w val workflowRoutes = path("workflows" / Segment / "backends") { _ => - get { - instrumentRequest { - complete(ToResponseMarshallable(backendResponse)) - } - } + get { instrumentRequest { complete(ToResponseMarshallable(backendResponse)) } } } ~ path("workflows" / Segment / "callcaching" / "diff") { _ => parameterSeq { parameters => @@ -144,7 +136,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap { workflowId => workflowStoreActor.ask(WorkflowStoreActor.WorkflowOnHoldToSubmittedCommand(workflowId)).mapTo[WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedResponse] } - onComplete(response) { + onComplete(response){ case Success(WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedFailure(_, e: NotInOnHoldStateException)) => e.errorRequest(StatusCodes.Forbidden) case Success(WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedFailure(_, e)) => e.errorRequest(StatusCodes.InternalServerError) case Success(r: WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedSuccess) => completeResponse(StatusCodes.OK, toResponse(r.workflowId, WorkflowSubmitted), Seq.empty) @@ -180,93 +172,93 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w case Failure(e) => e.failRequest(StatusCodes.InternalServerError) } } -} - object CromwellApiService { +} - import spray.json._ +object CromwellApiService { + import spray.json._ - /** - * Sends a request to abort the workflow. Provides configurable success & error handlers to allow - * for different API endpoints to provide different effects in the appropriate situations, e.g. HTTP codes - * and error messages - */ - def abortWorkflow(possibleWorkflowId: String, - workflowStoreActor: ActorRef, - workflowManagerActor: ActorRef, - successHandler: PartialFunction[SuccessfulAbortResponse, Route] = standardAbortSuccessHandler, - errorHandler: PartialFunction[Throwable, Route] = standardAbortErrorHandler) - (implicit timeout: Timeout): Route = { - handleExceptions(ExceptionHandler(errorHandler)) { - Try(WorkflowId.fromString(possibleWorkflowId)) match { - case Success(workflowId) => - val response = workflowStoreActor.ask(WorkflowStoreActor.AbortWorkflowCommand(workflowId)).mapTo[AbortResponse] - onComplete(response) { - case Success(x: SuccessfulAbortResponse) => successHandler(x) - case Success(x: WorkflowAbortFailureResponse) => throw x.failure - case Failure(e) => throw e - } - case Failure(_) => throw InvalidWorkflowException(possibleWorkflowId) - } + /** + * Sends a request to abort the workflow. Provides configurable success & error handlers to allow + * for different API endpoints to provide different effects in the appropriate situations, e.g. HTTP codes + * and error messages + */ + def abortWorkflow(possibleWorkflowId: String, + workflowStoreActor: ActorRef, + workflowManagerActor: ActorRef, + successHandler: PartialFunction[SuccessfulAbortResponse, Route] = standardAbortSuccessHandler, + errorHandler: PartialFunction[Throwable, Route] = standardAbortErrorHandler) + (implicit timeout: Timeout): Route = { + handleExceptions(ExceptionHandler(errorHandler)) { + Try(WorkflowId.fromString(possibleWorkflowId)) match { + case Success(workflowId) => + val response = workflowStoreActor.ask(WorkflowStoreActor.AbortWorkflowCommand(workflowId)).mapTo[AbortResponse] + onComplete(response) { + case Success(x: SuccessfulAbortResponse) => successHandler(x) + case Success(x: WorkflowAbortFailureResponse) => throw x.failure + case Failure(e) => throw e + } + case Failure(_) => throw InvalidWorkflowException(possibleWorkflowId) } } + } - /** - * The abort success handler for typical cases, i.e. cromwell's API. - */ - private def standardAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = { - case WorkflowAbortedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborted.toString))) - case WorkflowAbortRequestedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborting.toString))) - } + /** + * The abort success handler for typical cases, i.e. cromwell's API. + */ + private def standardAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = { + case WorkflowAbortedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborted.toString))) + case WorkflowAbortRequestedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborting.toString))) + } - /** - * The abort error handler for typical cases, i.e. cromwell's API - */ - private def standardAbortErrorHandler: PartialFunction[Throwable, Route] = { - case e: InvalidWorkflowException => e.failRequest(StatusCodes.BadRequest) - case e: WorkflowNotFoundException => e.errorRequest(StatusCodes.NotFound) - case _: AskTimeoutException if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse - case e: TimeoutException => e.failRequest(StatusCodes.ServiceUnavailable) - case e: Exception => e.errorRequest(StatusCodes.InternalServerError) - } + /** + * The abort error handler for typical cases, i.e. cromwell's API + */ + private def standardAbortErrorHandler: PartialFunction[Throwable, Route] = { + case e: InvalidWorkflowException => e.failRequest(StatusCodes.BadRequest) + case e: WorkflowNotFoundException => e.errorRequest(StatusCodes.NotFound) + case _: AskTimeoutException if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse + case e: TimeoutException => e.failRequest(StatusCodes.ServiceUnavailable) + case e: Exception => e.errorRequest(StatusCodes.InternalServerError) + } - def validateWorkflowIdInMetadata(possibleWorkflowId: String, - serviceRegistryActor: ActorRef) - (implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = { - Try(WorkflowId.fromString(possibleWorkflowId)) match { - case Success(w) => - serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap { - case RecognizedWorkflowId => Future.successful(w) - case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor) - case FailedToCheckWorkflowId(t) => Future.failed(t) - } - case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId)) - } + def validateWorkflowIdInMetadata(possibleWorkflowId: String, + serviceRegistryActor: ActorRef) + (implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = { + Try(WorkflowId.fromString(possibleWorkflowId)) match { + case Success(w) => + serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap { + case RecognizedWorkflowId => Future.successful(w) + case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor) + case FailedToCheckWorkflowId(t) => Future.failed(t) + } + case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId)) } + } - def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String, - serviceRegistryActor: ActorRef) - (implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = { - Try(WorkflowId.fromString(possibleWorkflowId)) match { - case Success(w) => - serviceRegistryActor.ask(ValidateWorkflowIdInMetadataSummaries(w)).mapTo[WorkflowValidationResponse] map { - case RecognizedWorkflowId => w - case UnrecognizedWorkflowId => throw UnrecognizedWorkflowException(w) - case FailedToCheckWorkflowId(t) => throw t - } - case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId)) - } + def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String, + serviceRegistryActor: ActorRef) + (implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = { + Try(WorkflowId.fromString(possibleWorkflowId)) match { + case Success(w) => + serviceRegistryActor.ask(ValidateWorkflowIdInMetadataSummaries(w)).mapTo[WorkflowValidationResponse] map { + case RecognizedWorkflowId => w + case UnrecognizedWorkflowId => throw UnrecognizedWorkflowException(w) + case FailedToCheckWorkflowId(t) => throw t + } + case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId)) } + } - final case class BackendResponse(supportedBackends: List[String], defaultBackend: String) + final case class BackendResponse(supportedBackends: List[String], defaultBackend: String) - final case class UnrecognizedWorkflowException(id: WorkflowId) extends Exception(s"Unrecognized workflow ID: $id") + final case class UnrecognizedWorkflowException(id: WorkflowId) extends Exception(s"Unrecognized workflow ID: $id") - final case class InvalidWorkflowException(possibleWorkflowId: String) extends Exception(s"Invalid workflow ID: '$possibleWorkflowId'.") + final case class InvalidWorkflowException(possibleWorkflowId: String) extends Exception(s"Invalid workflow ID: '$possibleWorkflowId'.") - val cromwellVersion = VersionUtil.getVersion("cromwell-engine") - val swaggerUiVersion = VersionUtil.getVersion("swagger-ui", VersionUtil.sbtDependencyVersion("swaggerUi")) - val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name) - val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson)) - val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable) - } + val cromwellVersion = VersionUtil.getVersion("cromwell-engine") + val swaggerUiVersion = VersionUtil.getVersion("swagger-ui", VersionUtil.sbtDependencyVersion("swaggerUi")) + val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name) + val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson)) + val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable) +} \ No newline at end of file diff --git a/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala index 95cde16cd22..349d36a9251 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala @@ -31,6 +31,7 @@ trait WesCromwellRouteSupport extends WebServiceUtils { implicit val timeout: Timeout = duration implicit def actorRefFactory: ActorRefFactory + implicit val materializer: ActorMaterializer implicit val ec: ExecutionContext diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala index e4969702460..7ec56e3317a 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala @@ -1,31 +1,38 @@ package cromwell.webservice.routes.wes import akka.actor.ActorRef +import akka.http.scaladsl.model.{StatusCode, StatusCodes} import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.Route +import akka.http.scaladsl.server.directives.RouteDirectives.complete +import akka.http.scaladsl.server.{Directive1, Route} import akka.pattern.{AskTimeoutException, ask} import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import cromwell.core.WorkflowId +import cromwell.core.abort.SuccessfulAbortResponse import cromwell.engine.instrumentation.HttpInstrumentation -import cromwell.services.metadata.MetadataService.{GetStatus, MetadataServiceResponse, StatusLookupFailed} -import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata} +import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException +import cromwell.server.CromwellShutdown +import cromwell.services.metadata.MetadataService.{BuildMetadataJsonAction, GetSingleWorkflowMetadataAction, GetStatus, MetadataServiceResponse, StatusLookupFailed} +import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResponse} import cromwell.webservice.WebServiceUtils.EnhancedThrowable +import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata} +import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest} +import cromwell.webservice.routes.wes.WesResponseJsonSupport._ +import cromwell.webservice.routes.wes.WesRouteSupport._ +import cromwell.webservice.routes.{CromwellApiService, WesCromwellRouteSupport} +import net.ceedubs.ficus.Ficus._ -import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -import WesResponseJsonSupport._ -import akka.http.scaladsl.model.{StatusCode, StatusCodes} -import akka.http.scaladsl.server.directives.RouteDirectives.complete -import WesRouteSupport._ -import cromwell.core.abort.SuccessfulAbortResponse -import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException -import cromwell.server.CromwellShutdown -import cromwell.services.SuccessfulMetadataJsonResponse -import cromwell.webservice.routes.CromwellApiService -trait WesRouteSupport extends HttpInstrumentation { + + +trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { + val serviceRegistryActor: ActorRef val workflowManagerActor: ActorRef - val workflowStoreActor: ActorRef implicit val ec: ExecutionContext implicit val timeout: Timeout @@ -49,36 +56,53 @@ trait WesRouteSupport extends HttpInstrumentation { pathPrefix("ga4gh" / "wes" / "v1") { concat( path("service-info") { - complete(ServiceInfo.toWesResponse(workflowStoreActor)) + get { + complete(ServiceInfo.toWesResponse(workflowStoreActor)) + } }, - pathPrefix("runs") { - concat( - path(Segment / "status") { possibleWorkflowId => - val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]) - // WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have - onComplete(response) { - case Success(SuccessfulMetadataJsonResponse(_, jsObject)) => - val wesState = WesState.fromCromwellStatusJson(jsObject) - complete(WesRunStatus(possibleWorkflowId, wesState)) - case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError) - case Success(m: MetadataServiceResponse) => - // This should never happen, but .... - val error = new IllegalStateException("Unexpected response from Metadata service: " + m) - error.errorRequest(StatusCodes.InternalServerError) - case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError) - case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue)) - } - }, - path(Segment / "cancel") { possibleWorkflowId => - post { - CromwellApiService.abortWorkflow(possibleWorkflowId, - workflowStoreActor, - workflowManagerActor, - successHandler = WesAbortSuccessHandler, - errorHandler = WesAbortErrorHandler) + path("runs") { + get { + parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) => + completeCromwellResponse(listRuns(pageSize, pageToken, serviceRegistryActor)) + } + } ~ + post { + extractSubmission() { submission => + submitRequest(submission.entity, + isSingleSubmission = true) } } - ) + }, + path("runs" / Segment) { workflowId => + get { + // this is what it was like in code found in the project… it perhaps isn’t ideal but doesn’t seem to hurt, so leaving it like this for now. + completeCromwellResponse(runLog(workflowId, (w: WorkflowId) => GetSingleWorkflowMetadataAction(w, None, None, expandSubWorkflows = false), serviceRegistryActor)) + } + }, + path("runs" / Segment / "status") { possibleWorkflowId => + val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]) + // WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have + onComplete(response) { + case Success(SuccessfulMetadataJsonResponse(_, jsObject)) => + val wesState = WesState.fromCromwellStatusJson(jsObject) + complete(WesRunStatus(possibleWorkflowId, wesState)) + case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError) + case Success(m: MetadataServiceResponse) => + // This should never happen, but .... + val error = new IllegalStateException("Unexpected response from Metadata service: " + m) + error.errorRequest(StatusCodes.InternalServerError) + case Failure(_: UnrecognizedWorkflowException) => complete(NotFoundError) + case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue)) + } + }, + path("runs" / Segment / "cancel") { possibleWorkflowId => + post { + CromwellApiService.abortWorkflow(possibleWorkflowId, + workflowStoreActor, + workflowManagerActor, + successHandler = WesAbortSuccessHandler, + errorHandler = WesAbortErrorHandler) + } } ) } @@ -86,7 +110,15 @@ trait WesRouteSupport extends HttpInstrumentation { } } + + object WesRouteSupport { + import WesResponseJsonSupport._ + + implicit lazy val duration: FiniteDuration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout") + implicit lazy val timeout: Timeout = duration + import scala.concurrent.ExecutionContext.Implicits.global + val NotFoundError = WesErrorResponse("The requested workflow run wasn't found", StatusCodes.NotFound.intValue) def WesAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = { @@ -104,4 +136,38 @@ object WesRouteSupport { private def respondWithWesError(errorMsg: String, status: StatusCode): Route = { complete((status, WesErrorResponse(errorMsg, status.intValue))) } -} + + def extractSubmission(): Directive1[WesSubmission] = { + formFields(( + "workflow_params".?, + "workflow_type".?, + "workflow_type_version".?, + "tags".?, + "workflow_engine_parameters".?, + "workflow_url".?, + "workflow_attachment".as[String].* + )).as(WesSubmission) + } + + def completeCromwellResponse(future: => Future[WesResponse]): Route = { + onComplete(future) { + case Success(response: WesResponse) => complete(response) + case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue)) + } + } + + def listRuns(pageSize: Option[Int], pageToken: Option[String], serviceRegistryActor: ActorRef): Future[WesResponse] = { + // FIXME: to handle - page_size, page_token + // FIXME: How to handle next_page_token in response? + metadataQueryRequest(Seq.empty[(String, String)], serviceRegistryActor).map(RunListResponse.fromMetadataQueryResponse) + } + + def runLog(workflowId: String, request: WorkflowId => BuildMetadataJsonAction, serviceRegistryActor: ActorRef): Future[WesResponse] = { + val metadataJsonResponse = metadataBuilderActorRequest(workflowId, request, serviceRegistryActor) + + metadataJsonResponse.map { + case SuccessfulMetadataJsonResponse(_, responseJson) => WesResponseWorkflowMetadata(WesRunLog.fromJson(responseJson.toString())) + case FailedMetadataJsonResponse(_, reason) => WesErrorResponse(reason.getMessage, StatusCodes.InternalServerError.intValue) + } + } +} \ No newline at end of file diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRunRoutes.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRunRoutes.scala deleted file mode 100644 index 6ecbac4bdfa..00000000000 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRunRoutes.scala +++ /dev/null @@ -1,94 +0,0 @@ -package cromwell.webservice.routes.wes - -import akka.actor.ActorRef -import akka.http.scaladsl.model.StatusCodes -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.directives.RouteDirectives.complete -import akka.http.scaladsl.server.{Directive1, Route} -import akka.util.Timeout -import com.typesafe.config.ConfigFactory -import cromwell.core.WorkflowId -import cromwell.services.metadata.MetadataService.{BuildMetadataJsonAction, GetSingleWorkflowMetadataAction} -import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResponse} -import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest} -import cromwell.webservice.routes.WesCromwellRouteSupport -import cromwell.webservice.routes.wes.WesRunRoutes.{completeCromwellResponse, extractSubmission, runLog} -import net.ceedubs.ficus.Ficus._ - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration -import scala.util.{Failure, Success} - -trait WesRunRoutes extends WesCromwellRouteSupport { - - val serviceRegistryActor: ActorRef - - lazy val runRoutes: Route = - pathPrefix("ga4gh" / "wes" / "v1") { - concat( - path("runs") { - get { - parameters(("page_size".as[Int].?, "page_token".?)) { (pageSize, pageToken) => - WesRunRoutes.completeCromwellResponse(WesRunRoutes.listRuns(pageSize, pageToken, serviceRegistryActor)) - } - } ~ - post { - extractSubmission() { submission => - submitRequest(submission.entity, - isSingleSubmission = true, - ) - } - } - }, - path("runs" / Segment) { workflowId => - get { - // this is what it was like in code found in the project… it perhaps isn’t ideal but doesn’t seem to hurt, so leaving it like this for now. - completeCromwellResponse(runLog(workflowId, (w: WorkflowId) => GetSingleWorkflowMetadataAction(w, None, None, expandSubWorkflows = false), serviceRegistryActor)) - } - } - ) - } -} - -object WesRunRoutes { - - import WesResponseJsonSupport._ - - implicit lazy val duration: FiniteDuration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout") - implicit lazy val timeout: Timeout = duration - - def extractSubmission(): Directive1[WesSubmission] = { - formFields(( - "workflow_params".?, - "workflow_type".?, - "workflow_type_version".?, - "tags".?, - "workflow_engine_parameters".?, - "workflow_url".?, - "workflow_attachment".as[String].* - )).as(WesSubmission) - } - - def completeCromwellResponse(future: => Future[WesResponse]): Route = { - onComplete(future) { - case Success(response: WesResponse) => complete(response) - case Failure(e) => complete(WesErrorResponse(e.getMessage, StatusCodes.InternalServerError.intValue)) - } - } - - def listRuns(pageSize: Option[Int], pageToken: Option[String], serviceRegistryActor: ActorRef): Future[WesResponse] = { - // FIXME: to handle - page_size, page_token - // FIXME: How to handle next_page_token in response? - metadataQueryRequest(Seq.empty[(String, String)], serviceRegistryActor).map(RunListResponse.fromMetadataQueryResponse) - } - - def runLog(workflowId: String, request: WorkflowId => BuildMetadataJsonAction, serviceRegistryActor: ActorRef): Future[WesResponse] = { - val metadataJsonResponse = metadataBuilderActorRequest(workflowId, request, serviceRegistryActor) - - metadataJsonResponse.map { - case SuccessfulMetadataJsonResponse(_, responseJson) => WesResponseWorkflowMetadata(WesRunLog.fromJson(responseJson.toString())) - case FailedMetadataJsonResponse(_, reason) => WesErrorResponse(reason.getMessage, StatusCodes.InternalServerError.intValue) - } - } -} diff --git a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala index 2a3b11a41c5..c40c84397f7 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala @@ -1,7 +1,5 @@ package cromwell.webservice.routes -import java.time.OffsetDateTime - import akka.actor.{Actor, ActorLogging, ActorSystem, Props} import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.ContentTypes._ @@ -17,13 +15,13 @@ import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._ import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.{WorkflowOnHoldToSubmittedFailure, WorkflowOnHoldToSubmittedSuccess} import cromwell.engine.workflow.workflowstore.WorkflowStoreSubmitActor.{WorkflowSubmittedToStore, WorkflowsBatchSubmittedToStore} +import cromwell.services._ import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse, SubsystemStatus} import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage import cromwell.services.metadata.MetadataArchiveStatus._ import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata._ import cromwell.services.metadata.impl.builder.MetadataBuilderActor -import cromwell.services._ import cromwell.services.womtool.WomtoolServiceMessages.{DescribeFailure, DescribeRequest, DescribeSuccess} import cromwell.services.womtool.models.WorkflowDescription import cromwell.util.SampleWdl.HelloWorld @@ -34,6 +32,7 @@ import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers import spray.json._ +import java.time.OffsetDateTime import scala.concurrent.duration._ class CromwellApiServiceSpec extends AsyncFlatSpec with ScalatestRouteTest with Matchers { @@ -529,6 +528,7 @@ object CromwellApiServiceSpec { val WorkflowIdExistingOnlyInSummaryTable = WorkflowId.fromString("f0000000-0000-0000-0000-000000000011") val ArchivedWorkflowId = WorkflowId.fromString("c4c6339c-2145-47fb-acc5-b5cb8d2809f5") val ArchivedAndDeletedWorkflowId = WorkflowId.fromString("abc1234d-2145-47fb-acc5-b5cb8d2809f5") + val wesWorkflowId = WorkflowId.randomId() val SummarizedWorkflowIds = Set( SummarizedWorkflowId, WorkflowIdExistingOnlyInSummaryTable, @@ -545,7 +545,8 @@ object CromwellApiServiceSpec { FailedWorkflowId, SummarizedWorkflowId, ArchivedWorkflowId, - ArchivedAndDeletedWorkflowId + ArchivedAndDeletedWorkflowId, + wesWorkflowId ) class MockApiService()(implicit val system: ActorSystem) extends CromwellApiService { @@ -564,13 +565,21 @@ object CromwellApiServiceSpec { List( MetadataEvent(MetadataKey(workflowId, None, "testKey1a"), MetadataValue("myValue1a", MetadataString)), MetadataEvent(MetadataKey(workflowId, None, "testKey1b"), MetadataValue("myValue1b", MetadataString)), - MetadataEvent(MetadataKey(workflowId, None, "testKey2a"), MetadataValue("myValue2a", MetadataString)) + MetadataEvent(MetadataKey(workflowId, None, "testKey2a"), MetadataValue("myValue2a", MetadataString)), + ) + } + private def wesFullMetadataResponse(workflowId: WorkflowId) = { + List( + MetadataEvent(MetadataKey(workflowId, None, "status"), MetadataValue("Running", MetadataString)), + MetadataEvent(MetadataKey(workflowId, None, "submittedFiles:workflow"), MetadataValue("myValue2a", MetadataString)), + ) } def responseMetadataValues(workflowId: WorkflowId, withKeys: List[String], withoutKeys: List[String]): JsObject = { def keyFilter(keys: List[String])(m: MetadataEvent) = keys.exists(k => m.key.key.startsWith(k)) - val events = fullMetadataResponse(workflowId) + val metadataEvents = if (workflowId == wesWorkflowId) wesFullMetadataResponse(workflowId) else fullMetadataResponse(workflowId) + val events = metadataEvents .filter(m => withKeys.isEmpty || keyFilter(withKeys)(m)) .filter(m => withoutKeys.isEmpty || !keyFilter(withoutKeys)(m)) diff --git a/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala index 0c0fe1c8054..c8306088ded 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala @@ -1,22 +1,27 @@ package cromwell.webservice.routes.wes import akka.actor.Props -import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.HttpMethods.POST -import akka.http.scaladsl.testkit.ScalatestRouteTest +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.MethodRejection +import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} +import cromwell.util.SampleWdl.HelloWorld import cromwell.webservice.routes.CromwellApiServiceSpec - -import scala.concurrent.duration._ import cromwell.webservice.routes.CromwellApiServiceSpec.{MockServiceRegistryActor, MockWorkflowManagerActor, MockWorkflowStoreActor} +import cromwell.webservice.routes.wes.WesResponseJsonSupport._ import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers -import WesResponseJsonSupport._ -import akka.http.scaladsl.server.MethodRejection +import spray.json._ + +import scala.concurrent.duration._ class WesRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest with Matchers with WesRouteSupport { + val actorRefFactory = system override implicit val ec = system.dispatcher - override implicit val timeout = 5.seconds + override val timeout = routeTestTimeout.duration + implicit def routeTestTimeout = RouteTestTimeout(5.seconds) + override val workflowStoreActor = actorRefFactory.actorOf(Props(new MockWorkflowStoreActor())) override val serviceRegistryActor = actorRefFactory.actorOf(Props(new MockServiceRegistryActor())) @@ -149,4 +154,53 @@ class WesRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest with Mat rejection shouldEqual MethodRejection(POST) } } -} + + behavior of "WES API /runs POST endpoint" + it should "return 201 for a successful workflow submission" in { + val workflowSource = Multipart.FormData.BodyPart("workflow_url", HttpEntity(MediaTypes.`application/json`, "https://raw.githubusercontent.com/broadinstitute/cromwell/develop/womtool/src/test/resources/validate/wdl_draft3/valid/callable_imports/my_workflow.wdl")) + val workflowInputs = Multipart.FormData.BodyPart("workflow_params", HttpEntity(MediaTypes.`application/json`, HelloWorld.rawInputs.toJson.toString())) + val formData = Multipart.FormData(workflowSource, workflowInputs).toEntity() + Post(s"/ga4gh/wes/$version/runs", formData) ~> + wesRoutes ~> + check { + assertResult( + s"""{ + | "id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}", + | "status": "Submitted" + |}""".stripMargin) { + responseAs[String].parseJson.prettyPrint + } + assertResult(StatusCodes.Created) { + status + } + headers should be(Seq.empty) + } + } + + + behavior of "WES API /runs GET endpoint" + it should "return results for a good query" in { + Get(s"/ga4gh/wes/v1/runs") ~> + wesRoutes ~> + check { + status should be(StatusCodes.OK) + contentType should be(ContentTypes.`application/json`) + val results = responseAs[JsObject].fields("runs").convertTo[Seq[JsObject]] + results.head.fields("run_id") should be(JsString(CromwellApiServiceSpec.ExistingWorkflowId.toString)) + results.head.fields("state") should be(JsString("COMPLETE")) + } + } + + behavior of "WES API /runs/{run_id} endpoint" + it should "return valid metadata when supplied a run_id" in { + Get(s"/ga4gh/wes/v1/runs/${CromwellApiServiceSpec.wesWorkflowId}") ~> + wesRoutes ~> + check { + status should be(StatusCodes.OK) + val result = responseAs[JsObject].fields("workflowLog").asJsObject() + result.fields.keys should contain allOf("request", "run_id", "state") + result.fields("state") should be(JsString("RUNNING")) + result.fields("run_id") should be(JsString(CromwellApiServiceSpec.wesWorkflowId.toString)) + } + } +} \ No newline at end of file From 79afa2997f91718cbbacc3aa68ca4ee086b6d7f8 Mon Sep 17 00:00:00 2001 From: Katrina P <68349264+kpierre13@users.noreply.github.com> Date: Thu, 25 Aug 2022 15:15:55 -0400 Subject: [PATCH 12/13] BW-1354 - Porting CBAS preliminary step (#6837) * Getting rid of shared utility file; Adding/Updating WES version of submit. * Edit spec file * Adding Wes-like error --- .../routes/CromwellApiService.scala | 79 ++++++++++++++-- .../routes/WesCromwellRouteSupport.scala | 90 ------------------- .../routes/wes/WesRouteSupport.scala | 78 ++++++++++++++-- .../routes/wes/WesRouteSupportSpec.scala | 3 +- 4 files changed, 141 insertions(+), 109 deletions(-) delete mode 100644 engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala diff --git a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala index 637509cc4fd..a1c4f023135 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala @@ -1,10 +1,13 @@ package cromwell.webservice.routes +import java.util.UUID + import akka.actor.{ActorRef, ActorRefFactory} +import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling.ToResponseMarshallable -import akka.http.scaladsl.model.ContentTypes._ import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.ContentTypes._ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{ExceptionHandler, Route} import akka.pattern.{AskTimeoutException, ask} @@ -12,6 +15,7 @@ import akka.stream.ActorMaterializer import akka.util.Timeout import cats.data.NonEmptyList import cats.data.Validated.{Invalid, Valid} +import com.typesafe.config.ConfigFactory import common.exception.AggregatedMessageException import common.util.VersionUtil import cromwell.core.abort._ @@ -20,24 +24,25 @@ import cromwell.engine.backend.BackendConfiguration import cromwell.engine.instrumentation.HttpInstrumentation import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{CachedCallNotFoundException, CallCacheDiffActorResponse, FailedCallCacheDiffResponse, SuccessfulCallCacheDiffResponse} -import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter import cromwell.engine.workflow.lifecycle.execution.callcaching.{CallCacheDiffActor, CallCacheDiffQueryParameter} import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.NotInOnHoldStateException -import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor} +import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor, WorkflowStoreSubmitActor} import cromwell.server.CromwellShutdown -import cromwell.services._ import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse} import cromwell.services.metadata.MetadataService._ -import cromwell.webservice.WebServiceUtils.EnhancedThrowable -import cromwell.webservice.WorkflowJsonSupport._ import cromwell.webservice._ +import cromwell.services._ +import cromwell.webservice.WorkflowJsonSupport._ +import cromwell.webservice.WebServiceUtils +import cromwell.webservice.WebServiceUtils.EnhancedThrowable +import net.ceedubs.ficus.Ficus._ -import java.util.UUID +import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.io.Source import scala.util.{Failure, Success, Try} -trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils with WesCromwellRouteSupport { +trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils { import CromwellApiService._ implicit def actorRefFactory: ActorRefFactory @@ -48,6 +53,10 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w val workflowManagerActor: ActorRef val serviceRegistryActor: ActorRef + // Derive timeouts (implicit and not) from akka http's request timeout since there's no point in being higher than that + implicit val duration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout") + implicit val timeout: Timeout = duration + val engineRoutes = concat( path("engine" / Segment / "stats") { _ => get { @@ -173,6 +182,58 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w } } + private def toResponse(workflowId: WorkflowId, workflowState: WorkflowState): WorkflowSubmitResponse = { + WorkflowSubmitResponse(workflowId.toString, workflowState.toString) + } + + private def submitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = { + + def getWorkflowState(workflowOnHold: Boolean): WorkflowState = { + if (workflowOnHold) + WorkflowOnHold + else WorkflowSubmitted + } + + def askSubmit(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = { + // NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons. + onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) { + case Success(w) => + w match { + case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) => + completeResponse(StatusCodes.Created, toResponse(workflowId, workflowState), warnings) + case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) => + completeResponse(StatusCodes.Created, workflowIds.toList.map(toResponse(_, workflowState)), warnings) + case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) => + throwable.failRequest(StatusCodes.BadRequest, warnings) + } + case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings) + } + } + + onComplete(materializeFormData(formData)) { + case Success(data) => + PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match { + case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 => + val warnings = workflowSourceFiles.flatMap(_.warnings) + askSubmit(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + // Catches the case where someone has gone through the single submission endpoint w/ more than one workflow + case Success(workflowSourceFiles) if isSingleSubmission => + val warnings = workflowSourceFiles.flatMap(_.warnings) + val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.") + e.failRequest(StatusCodes.BadRequest, warnings) + case Success(workflowSourceFiles) => + val warnings = workflowSourceFiles.flatMap(_.warnings) + askSubmit( + WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)), + warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + case Failure(t) => t.failRequest(StatusCodes.BadRequest) + } + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => e.failRequest(StatusCodes.InternalServerError) + } + } } object CromwellApiService { @@ -261,4 +322,4 @@ object CromwellApiService { val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name) val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson)) val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable) -} \ No newline at end of file +} diff --git a/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala deleted file mode 100644 index 349d36a9251..00000000000 --- a/engine/src/main/scala/cromwell/webservice/routes/WesCromwellRouteSupport.scala +++ /dev/null @@ -1,90 +0,0 @@ -package cromwell.webservice.routes - - -import akka.actor.{ActorRef, ActorRefFactory} -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.model.{Multipart, StatusCodes} -import akka.http.scaladsl.server.Directives.onComplete -import akka.http.scaladsl.server.Route -import akka.pattern.{AskTimeoutException, ask} -import akka.stream.ActorMaterializer -import akka.util.Timeout -import cats.data.NonEmptyList -import com.typesafe.config.ConfigFactory -import cromwell.core.{WorkflowId, WorkflowOnHold, WorkflowState, WorkflowSubmitted, path => _} -import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreSubmitActor} -import cromwell.server.CromwellShutdown -import cromwell.webservice.WebServiceUtils.EnhancedThrowable -import cromwell.webservice.WorkflowJsonSupport._ -import cromwell.webservice.{PartialWorkflowSources, WebServiceUtils, WorkflowSubmitResponse} -import net.ceedubs.ficus.Ficus._ - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, TimeoutException} -import scala.util.{Failure, Success} - -trait WesCromwellRouteSupport extends WebServiceUtils { - - val workflowStoreActor: ActorRef - - implicit val duration = ConfigFactory.load().as[FiniteDuration]("akka.http.server.request-timeout") - implicit val timeout: Timeout = duration - - implicit def actorRefFactory: ActorRefFactory - - implicit val materializer: ActorMaterializer - implicit val ec: ExecutionContext - - def toResponse(workflowId: WorkflowId, workflowState: WorkflowState): WorkflowSubmitResponse = { - WorkflowSubmitResponse(workflowId.toString, workflowState.toString) - } - - def submitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = { - - def getWorkflowState(workflowOnHold: Boolean): WorkflowState = { - if (workflowOnHold) - WorkflowOnHold - else WorkflowSubmitted - } - - def sendToWorkflowStore(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = { - // NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons. - onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) { - case Success(w) => - w match { - case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) => - completeResponse(StatusCodes.Created, toResponse(workflowId, workflowState), warnings) - case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) => - completeResponse(StatusCodes.Created, workflowIds.toList.map(toResponse(_, workflowState)), warnings) - case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) => - throwable.failRequest(StatusCodes.BadRequest, warnings) - } - case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => CromwellApiService.serviceShuttingDownResponse - case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) - case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings) - } - } - - onComplete(materializeFormData(formData)) { - case Success(data) => - PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match { - case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 => - val warnings = workflowSourceFiles.flatMap(_.warnings) - sendToWorkflowStore(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) - // Catches the case where someone has gone through the single submission endpoint w/ more than one workflow - case Success(workflowSourceFiles) if isSingleSubmission => - val warnings = workflowSourceFiles.flatMap(_.warnings) - val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.") - e.failRequest(StatusCodes.BadRequest, warnings) - case Success(workflowSourceFiles) => - val warnings = workflowSourceFiles.flatMap(_.warnings) - sendToWorkflowStore( - WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)), - warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) - case Failure(t) => t.failRequest(StatusCodes.BadRequest) - } - case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) - case Failure(e) => e.failRequest(StatusCodes.InternalServerError) - } - } -} diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala index 7ec56e3317a..ba6546fb3e3 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala @@ -1,41 +1,47 @@ package cromwell.webservice.routes.wes import akka.actor.ActorRef -import akka.http.scaladsl.model.{StatusCode, StatusCodes} +import akka.http.scaladsl.model.{Multipart, StatusCode, StatusCodes} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.directives.RouteDirectives.complete import akka.http.scaladsl.server.{Directive1, Route} import akka.pattern.{AskTimeoutException, ask} +import akka.stream.ActorMaterializer import akka.util.Timeout +import cats.data.NonEmptyList import com.typesafe.config.ConfigFactory -import cromwell.core.WorkflowId import cromwell.core.abort.SuccessfulAbortResponse +import cromwell.core.{WorkflowId, WorkflowOnHold, WorkflowState, WorkflowSubmitted} import cromwell.engine.instrumentation.HttpInstrumentation import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException +import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreSubmitActor} import cromwell.server.CromwellShutdown import cromwell.services.metadata.MetadataService.{BuildMetadataJsonAction, GetSingleWorkflowMetadataAction, GetStatus, MetadataServiceResponse, StatusLookupFailed} import cromwell.services.{FailedMetadataJsonResponse, SuccessfulMetadataJsonResponse} -import cromwell.webservice.WebServiceUtils.EnhancedThrowable +import cromwell.webservice.PartialWorkflowSources +import cromwell.webservice.WebServiceUtils.{EnhancedThrowable, completeResponse, materializeFormData} +import cromwell.webservice.routes.CromwellApiService import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata} import cromwell.webservice.routes.MetadataRouteSupport.{metadataBuilderActorRequest, metadataQueryRequest} import cromwell.webservice.routes.wes.WesResponseJsonSupport._ -import cromwell.webservice.routes.wes.WesRouteSupport._ -import cromwell.webservice.routes.{CromwellApiService, WesCromwellRouteSupport} +import cromwell.webservice.routes.wes.WesRouteSupport.{respondWithWesError, _} import net.ceedubs.ficus.Ficus._ import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.util.{Failure, Success} -trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { +trait WesRouteSupport extends HttpInstrumentation { val serviceRegistryActor: ActorRef val workflowManagerActor: ActorRef + val workflowStoreActor: ActorRef implicit val ec: ExecutionContext implicit val timeout: Timeout + implicit val materializer: ActorMaterializer /* Defines routes intended to sit alongside the primary Cromwell REST endpoints. For instance, we'll now have: @@ -68,7 +74,7 @@ trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { } ~ post { extractSubmission() { submission => - submitRequest(submission.entity, + wesSubmitRequest(submission.entity, isSingleSubmission = true) } } @@ -108,6 +114,62 @@ trait WesRouteSupport extends HttpInstrumentation with WesCromwellRouteSupport { } ) } + + def toWesResponse(workflowId: WorkflowId, workflowState: WorkflowState): WesRunStatus = { + WesRunStatus(workflowId.toString, WesState.fromCromwellStatus(workflowState)) + } + + def toWesResponseId(workflowId: WorkflowId): WesRunId ={ + WesRunId(workflowId.toString) + } + + def wesSubmitRequest(formData: Multipart.FormData, isSingleSubmission: Boolean): Route = { + def getWorkflowState(workflowOnHold: Boolean): WorkflowState = { + if (workflowOnHold) + WorkflowOnHold + else WorkflowSubmitted + } + + def sendToWorkflowStore(command: WorkflowStoreActor.WorkflowStoreActorSubmitCommand, warnings: Seq[String], workflowState: WorkflowState): Route = { + // NOTE: Do not blindly copy the akka-http -to- ask-actor pattern below without knowing the pros and cons. + onComplete(workflowStoreActor.ask(command).mapTo[WorkflowStoreSubmitActor.WorkflowStoreSubmitActorResponse]) { + case Success(w) => + w match { + case WorkflowStoreSubmitActor.WorkflowSubmittedToStore(workflowId, _) => + completeResponse(StatusCodes.Created, toWesResponseId(workflowId), warnings) + case WorkflowStoreSubmitActor.WorkflowsBatchSubmittedToStore(workflowIds, _) => + completeResponse(StatusCodes.Created, workflowIds.toList.map(toWesResponse(_, workflowState)), warnings) + case WorkflowStoreSubmitActor.WorkflowSubmitFailed(throwable) => + respondWithWesError(throwable.getLocalizedMessage, StatusCodes.BadRequest) + } + case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => respondWithWesError("Cromwell service is shutting down", StatusCodes.InternalServerError) + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => e.failRequest(StatusCodes.InternalServerError, warnings) + } + } + + onComplete(materializeFormData(formData)) { + case Success(data) => + PartialWorkflowSources.fromSubmitRoute(data, allowNoInputs = isSingleSubmission) match { + case Success(workflowSourceFiles) if isSingleSubmission && workflowSourceFiles.size == 1 => + val warnings = workflowSourceFiles.flatMap(_.warnings) + sendToWorkflowStore(WorkflowStoreActor.SubmitWorkflow(workflowSourceFiles.head), warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + // Catches the case where someone has gone through the single submission endpoint w/ more than one workflow + case Success(workflowSourceFiles) if isSingleSubmission => + val warnings = workflowSourceFiles.flatMap(_.warnings) + val e = new IllegalArgumentException("To submit more than one workflow at a time, use the batch endpoint.") + e.failRequest(StatusCodes.BadRequest, warnings) + case Success(workflowSourceFiles) => + val warnings = workflowSourceFiles.flatMap(_.warnings) + sendToWorkflowStore( + WorkflowStoreActor.BatchSubmitWorkflows(NonEmptyList.fromListUnsafe(workflowSourceFiles.toList)), + warnings, getWorkflowState(workflowSourceFiles.head.workflowOnHold)) + case Failure(t) => t.failRequest(StatusCodes.BadRequest) + } + case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) + case Failure(e) => respondWithWesError(e.getLocalizedMessage, StatusCodes.InternalServerError) + } + } } diff --git a/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala index c8306088ded..6e9a390ad4a 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/wes/WesRouteSupportSpec.scala @@ -165,8 +165,7 @@ class WesRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest with Mat check { assertResult( s"""{ - | "id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}", - | "status": "Submitted" + | "run_id": "${CromwellApiServiceSpec.ExistingWorkflowId.toString}" |}""".stripMargin) { responseAs[String].parseJson.prettyPrint } From b5ef39aa652c607e5c370f1e8c0e7705a3fa8ff6 Mon Sep 17 00:00:00 2001 From: Adam Nichols Date: Mon, 29 Aug 2022 19:48:55 -0400 Subject: [PATCH 13/13] BW-1378 Addl CromIAM user enablement checks (#6826) --- CHANGELOG.md | 14 +++++ .../CromIamInstrumentation.scala | 1 + .../main/scala/cromiam/sam/SamClient.scala | 33 ++++++++++++ .../webservice/CromIamApiService.scala | 16 +++--- .../cromiam/webservice/QuerySupport.scala | 2 +- .../cromiam/webservice/RequestSupport.scala | 26 ++++++++- .../webservice/SubmissionSupport.scala | 2 +- .../webservice/WomtoolRouteSupport.scala | 10 ++-- .../webservice/CromIamApiServiceSpec.scala | 38 ++++++++++++- .../cromiam/webservice/MockClients.scala | 9 ++++ .../webservice/WomtoolRouteSupportSpec.scala | 54 ++++++++++++++++++- 11 files changed, 184 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c40f47b651..e1158be3ca4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Cromwell Change Log +## 84 Release Notes + +### CromIAM enabled user checks + +For Cromwell instances utilizing the optional CromIAM identity and access management component, the following endpoints now verify that the calling user is enabled before forwarding the request. +* `/api/workflows/v1/backends` +* `/api/womtool/v1/describe` + +This change makes the above endpoints consistent with the existing behavior of all the other endpoints in the `/api/` path of CromIAM. + +## 83 Release Notes + +* Changes the type of several primary key columns in call caching tables from int to bigint. The database migration may be lengthy if your database contains a large amount of call caching data. + ## 82 Release Notes * Restored missing example configuration file diff --git a/CromIAM/src/main/scala/cromiam/instrumentation/CromIamInstrumentation.scala b/CromIAM/src/main/scala/cromiam/instrumentation/CromIamInstrumentation.scala index 63f48073146..65b164f00f6 100644 --- a/CromIAM/src/main/scala/cromiam/instrumentation/CromIamInstrumentation.scala +++ b/CromIAM/src/main/scala/cromiam/instrumentation/CromIamInstrumentation.scala @@ -20,6 +20,7 @@ trait CromIamInstrumentation extends CromwellInstrumentation { val samPrefix: NonEmptyList[String] = NonEmptyList.one("sam") val getWhitelistPrefix = NonEmptyList.one("get-whitelist") + val getUserEnabledPrefix = NonEmptyList.one("get-user-enabled") val userCollectionPrefix = NonEmptyList.one("user-collection") val authCollectionPrefix = NonEmptyList.one("auth-collection") val registerCollectionPrefix = NonEmptyList.one("register-collection") diff --git a/CromIAM/src/main/scala/cromiam/sam/SamClient.scala b/CromIAM/src/main/scala/cromiam/sam/SamClient.scala index f289251a2fb..d6a315f8241 100644 --- a/CromIAM/src/main/scala/cromiam/sam/SamClient.scala +++ b/CromIAM/src/main/scala/cromiam/sam/SamClient.scala @@ -18,6 +18,7 @@ import cromiam.sam.SamResourceJsonSupport._ import cromiam.server.status.StatusCheckedSubsystem import cromwell.api.model._ import mouse.boolean._ +import spray.json.RootJsonFormat import scala.concurrent.ExecutionContextExecutor @@ -73,6 +74,33 @@ class SamClient(scheme: String, } yield whitelisted } + def isUserEnabledSam(user: User, cromIamRequest: HttpRequest): FailureResponseOrT[Boolean] = { + val request = HttpRequest( + method = HttpMethods.GET, + uri = samUserStatusUri, + headers = List[HttpHeader](user.authorization) + ) + + for { + response <- instrumentRequest( + () => Http().singleRequest(request).asFailureResponseOrT, + cromIamRequest, + instrumentationPrefixForSam(getUserEnabledPrefix) + ) + userEnabled <- response.status match { + case StatusCodes.OK => + val unmarshal: IO[UserStatusInfo] = IO.fromFuture(IO(Unmarshal(response.entity).to[UserStatusInfo])) + FailureResponseOrT.right[HttpResponse](unmarshal).map { userInfo => + if (!userInfo.enabled) log.info("Access denied for user {}", user.userId) + userInfo.enabled + } + case _ => + log.error("Could not verify access with Sam for user {}, error was {} {}", user.userId, response.status, response.toString().take(100)) + FailureResponseOrT.pure[IO, HttpResponse](false) + } + } yield userEnabled + } + def collectionsForUser(user: User, cromIamRequest: HttpRequest): FailureResponseOrT[List[Collection]] = { val request = HttpRequest(method = HttpMethods.GET, uri = samBaseCollectionUri, headers = List[HttpHeader](user.authorization)) @@ -170,6 +198,7 @@ class SamClient(scheme: String, private lazy val samBaseResourceUri = s"$samBaseUri/api/resource" private lazy val samBaseCollectionUri = s"$samBaseResourceUri/workflow-collection" private lazy val samSubmitWhitelistUri = s"$samBaseResourceUri/caas/submit/action/get_whitelist" + private lazy val samUserStatusUri = s"$samBaseUri/register/user/v2/self/info" } @@ -188,4 +217,8 @@ object SamClient { def SamRegisterCollectionExceptionResp(statusCode: StatusCode) = HttpResponse(status = statusCode, entity = SamRegisterCollectionException(statusCode).getMessage) + case class UserStatusInfo(adminEnabled: Boolean, enabled: Boolean, userEmail: String, userSubjectId: String) + + implicit val UserStatusInfoFormat: RootJsonFormat[UserStatusInfo] = jsonFormat4(UserStatusInfo) + } diff --git a/CromIAM/src/main/scala/cromiam/webservice/CromIamApiService.scala b/CromIAM/src/main/scala/cromiam/webservice/CromIamApiService.scala index 63694599476..7a16e5ea797 100644 --- a/CromIAM/src/main/scala/cromiam/webservice/CromIamApiService.scala +++ b/CromIAM/src/main/scala/cromiam/webservice/CromIamApiService.scala @@ -81,7 +81,7 @@ trait CromIamApiService extends RequestSupport def abortRoute: Route = path("api" / "workflows" / Segment / Segment / Abort) { (_, workflowId) => post { - extractUserAndRequest { (user, req) => + extractUserAndStrictRequest { (user, req) => logUserWorkflowAction(user, workflowId, Abort) complete { authorizeAbortThenForwardToCromwell(user, workflowId, req).asHttpResponse @@ -93,7 +93,7 @@ trait CromIamApiService extends RequestSupport //noinspection MutatorLikeMethodIsParameterless def releaseHoldRoute: Route = path("api" / "workflows" / Segment / Segment / ReleaseHold) { (_, workflowId) => post { - extractUserAndRequest { (user, req) => + extractUserAndStrictRequest { (user, req) => logUserWorkflowAction(user, workflowId, ReleaseHold) complete { authorizeUpdateThenForwardToCromwell(user, workflowId, req).asHttpResponse @@ -112,7 +112,7 @@ trait CromIamApiService extends RequestSupport def labelPatchRoute: Route = { path("api" / "workflows" / Segment / Segment / Labels) { (_, workflowId) => patch { - extractUserAndRequest { (user, req) => + extractUserAndStrictRequest { (user, req) => entity(as[String]) { labels => logUserWorkflowAction(user, workflowId, Labels) validateLabels(Option(labels)) { _ => // Not using the labels, just using this to verify they didn't specify labels we don't want them to @@ -130,7 +130,7 @@ trait CromIamApiService extends RequestSupport def callCacheDiffRoute: Route = path("api" / "workflows" / Segment / "callcaching" / "diff") { _ => get { - extractUserAndRequest { (user, req) => + extractUserAndStrictRequest { (user, req) => logUserAction(user, "call caching diff") parameterSeq { parameters => val paramMap = parameters.toMap @@ -150,11 +150,9 @@ trait CromIamApiService extends RequestSupport */ private def workflowGetRoute(urlSuffix: String): Route = path("api" / "workflows" / Segment / urlSuffix) { _ => get { - extractUserAndRequest { (user, req) => + extractUserAndStrictRequest { (user, req) => logUserAction(user, urlSuffix) - complete { - cromwellClient.forwardToCromwell(req).asHttpResponse - } + forwardIfUserEnabled(user, req, cromwellClient, samClient) } } } @@ -166,7 +164,7 @@ trait CromIamApiService extends RequestSupport private def workflowRoute(urlSuffix: String, method: Directive0): Route = path("api" / "workflows" / Segment / Segment / urlSuffix) { (_, workflowId) => method { - extractUserAndRequest { (user, req) => + extractUserAndStrictRequest { (user, req) => logUserWorkflowAction(user, workflowId, urlSuffix) complete { authorizeReadThenForwardToCromwell(user, List(workflowId), req).asHttpResponse diff --git a/CromIAM/src/main/scala/cromiam/webservice/QuerySupport.scala b/CromIAM/src/main/scala/cromiam/webservice/QuerySupport.scala index cddabe74a57..e9397605c6a 100644 --- a/CromIAM/src/main/scala/cromiam/webservice/QuerySupport.scala +++ b/CromIAM/src/main/scala/cromiam/webservice/QuerySupport.scala @@ -55,7 +55,7 @@ trait QuerySupport extends RequestSupport { * directive */ private def preprocessQuery: Directive[(User, List[Collection], HttpRequest)] = { - extractUserAndRequest tflatMap { case (user, cromIamRequest) => + extractUserAndStrictRequest tflatMap { case (user, cromIamRequest) => log.info("Received query " + cromIamRequest.method.value + " request for user " + user.userId) onComplete(samClient.collectionsForUser(user, cromIamRequest).value.unsafeToFuture()) flatMap { diff --git a/CromIAM/src/main/scala/cromiam/webservice/RequestSupport.scala b/CromIAM/src/main/scala/cromiam/webservice/RequestSupport.scala index 12b231485f7..c9b6a196368 100644 --- a/CromIAM/src/main/scala/cromiam/webservice/RequestSupport.scala +++ b/CromIAM/src/main/scala/cromiam/webservice/RequestSupport.scala @@ -6,6 +6,13 @@ import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server._ import cromiam.auth.User import org.broadinstitute.dsde.workbench.model.WorkbenchUserId +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.server.Directives.{authorize, complete, onComplete} +import akka.http.scaladsl.server.Route +import cromiam.cromwell.CromwellClient +import cromiam.sam.SamClient + +import scala.util.{Failure, Success} trait RequestSupport { def extractStrictRequest: Directive1[HttpRequest] = { @@ -26,10 +33,27 @@ trait RequestSupport { } } - def extractUserAndRequest: Directive[(User, HttpRequest)] = { + def extractUserAndStrictRequest: Directive[(User, HttpRequest)] = { for { user <- extractUser request <- extractStrictRequest } yield (user, request) } + + def forwardIfUserEnabled(user: User, req: HttpRequest, cromwellClient: CromwellClient, samClient: SamClient): Route = { + import cromwell.api.model.EnhancedFailureResponseOrHttpResponseT + + onComplete(samClient.isUserEnabledSam(user, req).value.unsafeToFuture()) { + case Success(Left(httpResponse: HttpResponse)) => complete(httpResponse) + case Success(Right(isEnabled: Boolean)) => + authorize(isEnabled) { + complete { + cromwellClient.forwardToCromwell(req).asHttpResponse + } + } + case Failure(e) => + val message = s"Unable to look up enablement status for user ${user.userId}: ${e.getMessage}. Please try again later." + throw new RuntimeException(message, e) + } + } } diff --git a/CromIAM/src/main/scala/cromiam/webservice/SubmissionSupport.scala b/CromIAM/src/main/scala/cromiam/webservice/SubmissionSupport.scala index c1f5477475b..52a05d1cdc7 100644 --- a/CromIAM/src/main/scala/cromiam/webservice/SubmissionSupport.scala +++ b/CromIAM/src/main/scala/cromiam/webservice/SubmissionSupport.scala @@ -31,7 +31,7 @@ trait SubmissionSupport extends RequestSupport { // FIXME - getting pathPrefix to shrink this keeps hosing up, there's gotta be some way to do this def submitRoute: Route = (path("api" / "workflows" / Segment) | path("api" / "workflows" / Segment / "batch")) { _ => post { - extractUserAndRequest { (user, request) => + extractUserAndStrictRequest { (user, request) => log.info("Received submission request from user " + user.userId) onComplete(samClient.isSubmitWhitelisted(user, request).value.unsafeToFuture()) { case Success(Left(httpResponse)) => complete(httpResponse) diff --git a/CromIAM/src/main/scala/cromiam/webservice/WomtoolRouteSupport.scala b/CromIAM/src/main/scala/cromiam/webservice/WomtoolRouteSupport.scala index 671d4a76543..a6098b1fae0 100644 --- a/CromIAM/src/main/scala/cromiam/webservice/WomtoolRouteSupport.scala +++ b/CromIAM/src/main/scala/cromiam/webservice/WomtoolRouteSupport.scala @@ -2,20 +2,18 @@ package cromiam.webservice import akka.http.scaladsl.server.Directives._ import cromiam.cromwell.CromwellClient -import cromwell.api.model._ +import cromiam.sam.SamClient trait WomtoolRouteSupport extends RequestSupport { // When this trait is mixed into `CromIamApiService` the value of `cromwellClient` is the reader (non-abort) address val cromwellClient: CromwellClient + val samClient: SamClient val womtoolRoutes = path("api" / "womtool" / Segment / "describe") { _ => post { - extractStrictRequest { req => - complete { - // This endpoint requires authn which it gets for free from the proxy, does not care about authz - cromwellClient.forwardToCromwell(req).asHttpResponse - } + extractUserAndStrictRequest { (user, req) => + forwardIfUserEnabled(user, req, cromwellClient, samClient) } } } diff --git a/CromIAM/src/test/scala/cromiam/webservice/CromIamApiServiceSpec.scala b/CromIAM/src/test/scala/cromiam/webservice/CromIamApiServiceSpec.scala index 5ad6a976204..89945c5bfcb 100644 --- a/CromIAM/src/test/scala/cromiam/webservice/CromIamApiServiceSpec.scala +++ b/CromIAM/src/test/scala/cromiam/webservice/CromIamApiServiceSpec.scala @@ -4,7 +4,8 @@ import akka.event.NoLogging import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken, RawHeader} import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpHeader} -import akka.http.scaladsl.server.MissingHeaderRejection +import akka.http.scaladsl.server.Route.seal +import akka.http.scaladsl.server.{AuthorizationFailedRejection, MissingHeaderRejection} import akka.http.scaladsl.testkit.ScalatestRouteTest import com.typesafe.config.Config import common.assertion.CromwellTimeoutSpec @@ -303,12 +304,45 @@ class CromIamApiServiceSpec extends AnyFlatSpec with CromwellTimeoutSpec with Ma } } - it should "reject request if it doesn't contain OIDC_CLAIM_user_id in header" in { + it should "reject request if it doesn't contain OIDC_CLAIM_user_id or token" in { Get(s"/api/workflows/$version/backends") ~> allRoutes ~> check { rejection shouldEqual MissingHeaderRejection("OIDC_CLAIM_user_id") } } + it should "return 403 when we request with a disabled user" in { + Get( + s"/api/workflows/$version/backends" + ).withHeaders( + List(Authorization(OAuth2BearerToken("my-token")), RawHeader("OIDC_CLAIM_user_id", "disabled@example.com")) + ) ~> allRoutes ~> check { + rejection shouldEqual AuthorizationFailedRejection + } + } + + it should "reject request if it contains a token and no OIDC_CLAIM_user_id in header" in { + Get( + s"/api/workflows/$version/backends" + ).withHeaders( + List(Authorization(OAuth2BearerToken("my-token"))) + ) ~> allRoutes ~> check { + rejection shouldEqual MissingHeaderRejection("OIDC_CLAIM_user_id") + } + } + + it should "return 404 when no auth token provided" in { + Get( + s"/api/workflows/$version/backends" + ).withHeaders( + List(RawHeader("OIDC_CLAIM_user_id", "enabled@example.com")) + // "[An] explicit call on the Route.seal method is needed in test code, but in your application code it is not necessary." + // https://doc.akka.io/docs/akka-http/current/routing-dsl/testkit.html#testing-sealed-routes + // https://doc.akka.io/docs/akka-http/current/routing-dsl/routes.html#sealing-a-route + ) ~> seal(allRoutes) ~> check { + responseAs[String] shouldEqual "The requested resource could not be found." + status shouldBe NotFound + } + } behavior of "ReleaseHold endpoint" it should "return 200 for authorized user who has collection associated with root workflow" in { diff --git a/CromIAM/src/test/scala/cromiam/webservice/MockClients.scala b/CromIAM/src/test/scala/cromiam/webservice/MockClients.scala index 3fb9689e5cc..59e75347159 100644 --- a/CromIAM/src/test/scala/cromiam/webservice/MockClients.scala +++ b/CromIAM/src/test/scala/cromiam/webservice/MockClients.scala @@ -141,6 +141,15 @@ class MockSamClient(checkSubmitWhitelist: Boolean = true) FailureResponseOrT.pure(!user.userId.value.equalsIgnoreCase(NotWhitelistedUser)) } + override def isUserEnabledSam(user: User, cromIamRequest: HttpRequest): FailureResponseOrT[Boolean] = { + if (user.userId.value == "enabled@example.com" || user.userId.value == MockSamClient.AuthorizedUserCollectionStr) + FailureResponseOrT.pure(true) + else if (user.userId.value == "disabled@example.com") + FailureResponseOrT.pure(false) + else + throw new Exception("Misconfigured test") + } + override def requestAuth(authorizationRequest: CollectionAuthorizationRequest, cromIamRequest: HttpRequest): FailureResponseOrT[Unit] = { authorizationRequest.user.userId.value match { diff --git a/CromIAM/src/test/scala/cromiam/webservice/WomtoolRouteSupportSpec.scala b/CromIAM/src/test/scala/cromiam/webservice/WomtoolRouteSupportSpec.scala index 7ba1c495f23..785c887c374 100644 --- a/CromIAM/src/test/scala/cromiam/webservice/WomtoolRouteSupportSpec.scala +++ b/CromIAM/src/test/scala/cromiam/webservice/WomtoolRouteSupportSpec.scala @@ -2,6 +2,9 @@ package cromiam.webservice import akka.http.scaladsl.model.ContentTypes import akka.http.scaladsl.model.StatusCodes._ +import akka.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken, RawHeader} +import akka.http.scaladsl.server.Route.seal +import akka.http.scaladsl.server.{AuthorizationFailedRejection, MissingHeaderRejection} import akka.http.scaladsl.testkit.ScalatestRouteTest import common.assertion.CromwellTimeoutSpec import org.scalatest.flatspec.AnyFlatSpec @@ -11,15 +14,64 @@ import org.scalatest.matchers.should.Matchers class WomtoolRouteSupportSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers with WomtoolRouteSupport with ScalatestRouteTest { override lazy val cromwellClient = new MockCromwellClient() + override lazy val samClient = new MockSamClient() behavior of "Womtool endpoint routes" it should "return 200 when we request to the right path" in { - Post(s"/api/womtool/v1/describe") ~> womtoolRoutes ~> check { + Post( + s"/api/womtool/v1/describe" + ).withHeaders( + List(Authorization(OAuth2BearerToken("my-token")), RawHeader("OIDC_CLAIM_user_id", "enabled@example.com")) + ) ~> womtoolRoutes ~> check { status shouldBe OK responseAs[String] shouldBe "Hey there, workflow describer" contentType should be(ContentTypes.`text/plain(UTF-8)`) } } + it should "return 403 when we request with a disabled user" in { + Post( + s"/api/womtool/v1/describe" + ).withHeaders( + List(Authorization(OAuth2BearerToken("my-token")), RawHeader("OIDC_CLAIM_user_id", "disabled@example.com")) + ) ~> womtoolRoutes ~> check { + rejection shouldEqual AuthorizationFailedRejection + } + } + + it should "bail out with no user ID" in { + Post( + s"/api/womtool/v1/describe" + ).withHeaders( + List(Authorization(OAuth2BearerToken("my-token"))) + ) ~> womtoolRoutes ~> check { + rejection shouldEqual MissingHeaderRejection("OIDC_CLAIM_user_id") + } + } + + it should "return 404 when no auth token provided" in { + Post( + s"/api/womtool/v1/describe" + ).withHeaders( + List(RawHeader("OIDC_CLAIM_user_id", "enabled@example.com")) + // "[An] explicit call on the Route.seal method is needed in test code, but in your application code it is not necessary." + // https://doc.akka.io/docs/akka-http/current/routing-dsl/testkit.html#testing-sealed-routes + // https://doc.akka.io/docs/akka-http/current/routing-dsl/routes.html#sealing-a-route + ) ~> seal(womtoolRoutes) ~> check { + responseAs[String] shouldEqual "The requested resource could not be found." + status shouldBe NotFound + } + } + + it should "bail out with no headers" in { + Post( + s"/api/womtool/v1/describe" + ).withHeaders( + List.empty + ) ~> womtoolRoutes ~> check { + rejection shouldEqual MissingHeaderRejection("OIDC_CLAIM_user_id") + } + } + }