Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
kpierre13 committed Nov 4, 2021
2 parents 9e4e9f5 + c93519c commit c778c33
Show file tree
Hide file tree
Showing 48 changed files with 765 additions and 266 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Cromwell Change Log

## 71 Release Notes

### Bug Fixes

* Fixed an issue handling data in Google Cloud Storage buckets with requester pays enabled that could sometimes cause I/O to fail.

## 70 Release Notes

### CWL security fix [#6510](https://github.com/broadinstitute/cromwell/pull/6510)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import wom.graph.CommandCallNode
import wom.values.WomValue

import scala.concurrent.Future
import scala.util.Try
import scala.util.{Success, Try}

trait StandardInitializationActorParams {
def workflowDescriptor: BackendWorkflowDescriptor
Expand Down Expand Up @@ -82,20 +82,29 @@ class StandardInitializationActor(val standardParams: StandardInitializationActo
RuntimeAttributesDefault.workflowOptionsDefault(options, runtimeAttributesBuilder.coercionMap)
}

override def validate(): Future[Unit] = {
Future.fromTry(Try {
calls foreach { call =>
val runtimeAttributeKeys = call.callable.runtimeAttributes.attributes.keys.toList
val notSupportedAttributes = runtimeAttributesBuilder.unsupportedKeys(runtimeAttributeKeys).toList

if (notSupportedAttributes.nonEmpty) {
val notSupportedAttrString = notSupportedAttributes mkString ", "
workflowLogger.warn(
s"Key/s [$notSupportedAttrString] is/are not supported by backend. " +
s"Unsupported attributes will not be part of job executions.")
}
def validateWorkflowOptions(): Try[Unit] = Success(())

def checkForUnsupportedRuntimeAttributes(): Try[Unit] = Try {
calls foreach { call =>
val runtimeAttributeKeys = call.callable.runtimeAttributes.attributes.keys.toList
val notSupportedAttributes = runtimeAttributesBuilder.unsupportedKeys(runtimeAttributeKeys).toList

if (notSupportedAttributes.nonEmpty) {
val notSupportedAttrString = notSupportedAttributes mkString ", "
workflowLogger.warn(
s"Key/s [$notSupportedAttrString] is/are not supported by backend. " +
s"Unsupported attributes will not be part of job executions.")
}
})
}
}

override def validate(): Future[Unit] = {
Future.fromTry(
for {
_ <- validateWorkflowOptions()
_ <- checkForUnsupportedRuntimeAttributes()
} yield ()
)
}

override protected lazy val workflowDescriptor: BackendWorkflowDescriptor = standardParams.workflowDescriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ metadata {
status: Succeeded

"outputs.drs_usa_hca.path" =
"/cromwell_root/jade.datarepo-dev.broadinstitute.org/v1_4641bafb-5190-425b-aea9-9c7b125515c8_e37266ba-790d-4641-aa76-854d94be2fbe/E18_20161004_Neurons_Sample_49_S048_L004_R2_005.fastq.gz"
"/cromwell_root/drs_localization_paths/hca_dev_20201217_test4/5acd55ef/E18_20161004_Neurons_Sample_49_S048_L004_R2_005.fastq.gz"
"outputs.drs_usa_hca.hash" = "badf266412ff0e307232421e56d647ed"
"outputs.drs_usa_hca.size" = 438932948
"outputs.drs_usa_hca.cloud" =
Expand Down
6 changes: 3 additions & 3 deletions centaur/src/main/resources/standardTestCases/drs_usa_jdr.test
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ metadata {
status: Succeeded

"outputs.drs_usa_jdr.path1" =
"/cromwell_root/jade.datarepo-dev.broadinstitute.org/v1_f90f5d7f-c507-4e56-abfc-b965a66023fb_585f3f19-985f-43b0-ab6a-79fa4c8310fc/hello_jade.json"
"/cromwell_root/drs_localization_paths/CromwellSimpleWithFilerefs/hello_jade.json"
"outputs.drs_usa_jdr.path2" =
"/cromwell_root/jade.datarepo-dev.broadinstitute.org/v1_001afc86-da4c-4739-85be-26ca98d2693f_ad783b60-aeba-4055-8f7b-194880f37259/hello_jade_2.json"
"/cromwell_root/drs_localization_paths/CromwellSimpleWithFilerefs2/hello_jade_2.json"
"outputs.drs_usa_jdr.hash1" = "faf12e94c25bef7df62e4a5eb62573f5"
"outputs.drs_usa_jdr.hash2" = "19e1b021628130fda04c79ee9a056b67"
"outputs.drs_usa_jdr.size1" = 18.0
"outputs.drs_usa_jdr.size2" = 38.0
# This JDR file has a gsUri that doesn't end in /fileName so it must be downloaded with the DRS localizer
"outputs.drs_usa_jdr.cloud1" =
"/cromwell_root/jade.datarepo-dev.broadinstitute.org/v1_f90f5d7f-c507-4e56-abfc-b965a66023fb_585f3f19-985f-43b0-ab6a-79fa4c8310fc/hello_jade.json"
"/cromwell_root/drs_localization_paths/CromwellSimpleWithFilerefs/hello_jade.json"
# This JDR file has a gsUri that can skip localization
"outputs.drs_usa_jdr.cloud2" =
"gs://broad-jade-dev-data-bucket/e1941fb9-6537-4e1a-b70d-34352a3a7817/ad783b60-aeba-4055-8f7b-194880f37259/hello_jade_2.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ metadata {
status: Succeeded

"outputs.drs_usa_jdr.path1" =
"/cromwell_root/jade.datarepo-dev.broadinstitute.org/v1_f90f5d7f-c507-4e56-abfc-b965a66023fb_585f3f19-985f-43b0-ab6a-79fa4c8310fc/hello_jade.json"
"/cromwell_root/drs_localization_paths/CromwellSimpleWithFilerefs/hello_jade.json"
# This JDR file has a gsUri that can be preresolved to a regular GCS file for improved localization performance.
# However this means that the file's container path is determined by the GCS localization logic and not the
# `localizationPath`-aware DRS localization logic. The GCS localization logic always uses a containerized version
# of the GCS path, which is what this expectation represents.
"outputs.drs_usa_jdr.path2" =
"/cromwell_root/broad-jade-dev-data-bucket/e1941fb9-6537-4e1a-b70d-34352a3a7817/ad783b60-aeba-4055-8f7b-194880f37259/hello_jade_2.json"
"outputs.drs_usa_jdr.hash1" = "faf12e94c25bef7df62e4a5eb62573f5"
Expand All @@ -25,7 +29,7 @@ metadata {
"outputs.drs_usa_jdr.size2" = 38.0
# This JDR file has a gsUri that doesn't end in /fileName so it must be downloaded with the DRS localizer
"outputs.drs_usa_jdr.cloud1" =
"/cromwell_root/jade.datarepo-dev.broadinstitute.org/v1_f90f5d7f-c507-4e56-abfc-b965a66023fb_585f3f19-985f-43b0-ab6a-79fa4c8310fc/hello_jade.json"
"/cromwell_root/drs_localization_paths/CromwellSimpleWithFilerefs/hello_jade.json"
# This JDR file has a gsUri that can skip localization
"outputs.drs_usa_jdr.cloud2" =
"gs://broad-jade-dev-data-bucket/e1941fb9-6537-4e1a-b70d-34352a3a7817/ad783b60-aeba-4055-8f7b-194880f37259/hello_jade_2.json"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"wf_reference_disk_test.check_if_localized_as_symlink.reference_file_input": "gs://gcp-public-data--broad-references/hg19/v0/README"
"wf_reference_disk_test.check_if_localized_as_symlink.reference_file_input": "gs://gcp-public-data--broad-references/hg19/v0/Homo_sapiens_assembly19.tile_db_header.vcf"
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package cloud.nio.impl.drs

import cats.data.NonEmptyList
import cats.data.Validated.{Invalid, Valid}
import cats.effect.{IO, Resource}
import cats.implicits._
import cloud.nio.impl.drs.DrsPathResolver.{FatalRetryDisposition, RegularRetryDisposition}
import cloud.nio.impl.drs.MarthaResponseSupport._
import common.exception.toIO
import common.exception.{AggregatedMessageException, toIO}
import common.validation.ErrorOr.ErrorOr
import io.circe._
import io.circe.generic.semiauto._
import io.circe.parser.decode
Expand Down Expand Up @@ -35,14 +37,21 @@ abstract class DrsPathResolver(drsConfig: DrsConfig, retryInternally: Boolean =
clientBuilder
}

def getAccessToken: String
def getAccessToken: ErrorOr[String]

private def makeHttpRequestToMartha(drsPath: String, fields: NonEmptyList[MarthaField.Value]): HttpPost = {
val postRequest = new HttpPost(drsConfig.marthaUrl)
val requestJson = MarthaRequest(drsPath, fields).asJson.noSpaces
postRequest.setEntity(new StringEntity(requestJson, ContentType.APPLICATION_JSON))
postRequest.setHeader("Authorization", s"Bearer $getAccessToken")
postRequest
private def makeHttpRequestToMartha(drsPath: String, fields: NonEmptyList[MarthaField.Value]): Resource[IO, HttpPost] = {
val io = getAccessToken match {
case Valid(token) => IO {
val postRequest = new HttpPost(drsConfig.marthaUrl)
val requestJson = MarthaRequest(drsPath, fields).asJson.noSpaces
postRequest.setEntity(new StringEntity(requestJson, ContentType.APPLICATION_JSON))
postRequest.setHeader("Authorization", s"Bearer $token")
postRequest
}
case Invalid(errors) =>
IO.raiseError(AggregatedMessageException("Error getting access token", errors.toList))
}
Resource.eval(io)
}

private def httpResponseToMarthaResponse(drsPathForDebugging: String)(httpResponse: HttpResponse): IO[MarthaResponse] = {
Expand Down Expand Up @@ -83,8 +92,10 @@ abstract class DrsPathResolver(drsConfig: DrsConfig, retryInternally: Boolean =
}

def rawMarthaResponse(drsPath: String, fields: NonEmptyList[MarthaField.Value]): Resource[IO, HttpResponse] = {
val httpPost = makeHttpRequestToMartha(drsPath, fields)
executeMarthaRequest(httpPost)
for {
httpPost <- makeHttpRequestToMartha(drsPath, fields)
response <- executeMarthaRequest(httpPost)
} yield response
}

/** *
Expand Down Expand Up @@ -151,6 +162,7 @@ object MarthaField extends Enumeration {
val Hashes: MarthaField.Value = Value("hashes")
val FileName: MarthaField.Value = Value("fileName")
val AccessUrl: MarthaField.Value = Value("accessUrl")
val LocalizationPath: MarthaField.Value = Value("localizationPath")
}

final case class MarthaRequest(url: String, fields: NonEmptyList[MarthaField.Value])
Expand All @@ -171,6 +183,9 @@ final case class AccessUrl(url: String, headers: Option[Map[String, String]])
* @param fileName A possible different file name for the object at gsUri, ex: "gsutil cp gs://bucket/12/345 my.vcf"
* @param hashes Hashes for the contents stored at gsUri
* @param accessUrl URL to query for signed URL
* @param localizationPath Optional localization path. TDR is currently the sole DRS provider specifying this value in
* DRS metadata, via the `aliases` field. As this is a distinct field from `fileName` in DRS
* metadata it is also made a distinct field in this response object.
*/
final case class MarthaResponse(size: Option[Long] = None,
timeCreated: Option[String] = None,
Expand All @@ -180,7 +195,8 @@ final case class MarthaResponse(size: Option[Long] = None,
googleServiceAccount: Option[SADataObject] = None,
fileName: Option[String] = None,
hashes: Option[Map[String, String]] = None,
accessUrl: Option[AccessUrl] = None
accessUrl: Option[AccessUrl] = None,
localizationPath: Option[String] = None
)

// Adapted from https://github.com/broadinstitute/martha/blob/f31933a3a11e20d30698ec4b4dc1e0abbb31a8bc/common/helpers.js#L210-L218
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cloud.nio.impl.drs

import com.google.auth.oauth2.{AccessToken, OAuth2Credentials}
import common.validation.ErrorOr.ErrorOr

import scala.concurrent.duration._
import cats.syntax.validated._


case class EngineDrsPathResolver(drsConfig: DrsConfig,
Expand All @@ -12,16 +14,20 @@ case class EngineDrsPathResolver(drsConfig: DrsConfig,
extends DrsPathResolver(drsConfig, retryInternally = false) {

//Based on method from GoogleRegistry
override def getAccessToken: String = {
override def getAccessToken: ErrorOr[String] = {
def accessTokenTTLIsAcceptable(accessToken: AccessToken): Boolean = {
(accessToken.getExpirationTime.getTime - System.currentTimeMillis()).millis.gteq(accessTokenAcceptableTTL)
}

Option(authCredentials.getAccessToken) match {
case Some(accessToken) if accessTokenTTLIsAcceptable(accessToken) => accessToken.getTokenValue
case Some(accessToken) if accessTokenTTLIsAcceptable(accessToken) =>
accessToken.getTokenValue.validNel
case _ =>
authCredentials.refresh()
authCredentials.getAccessToken.getTokenValue
Option(authCredentials.getAccessToken.getTokenValue) match {
case Some(accessToken) => accessToken.validNel
case None => "Could not refresh access token".invalidNel
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ object MockDrsPaths {

val mockToken = "mock.token"

val DrsLocalizationPathsContainer = "drs_localization_paths"

private val drsPathPrefix = "drs://drs-host"

val drsRelativePath = "drs-host/4d427aa3-5640-4f00-81ae-c33443f84acf/f3b148ac-1802-4acc-a0b9-610ea266fb61"
Expand All @@ -17,12 +19,20 @@ object MockDrsPaths {

val gcsRelativePathWithFileName = "drs-host/d7c75399-bcd3-4762-90e9-434de005679b/file.txt"

val gcsRelativePathWithFileNameFromLocalizationPath = s"$DrsLocalizationPathsContainer/dir/subdir/file.txt"

val gcsRelativePathWithFileNameFromAllThePaths = s"$DrsLocalizationPathsContainer/dir/subdir/file.txt"

val drsPathResolvingGcsPath = s"$drsPathPrefix/4d427aa3-5640-4f00-81ae-c33443f84acf"

val drsPathWithNonPathChars = s"$drsPathPrefix/4d427aa3_5640_4f00_81ae_c33443f84acf"

val drsPathResolvingWithFileName = s"$drsPathPrefix/d7c75399-bcd3-4762-90e9-434de005679b"

val drsPathResolvingWithLocalizationPath = s"$drsPathPrefix/1e7ecfa6-2a77-41d7-a251-38a2f4919842"

val drsPathResolvingWithAllThePaths = s"$drsPathPrefix/0524678a-365e-42f3-a1e7-e4c6ac499b35"

val drsPathResolvingToNoGcsPath = s"$drsPathPrefix/226686cf-22c9-4472-9f79-7a0b0044f253"

val drsPathNotExistingInMartha = s"$drsPathPrefix/5e21b8c3-8eda-48d5-9a04-2b32e1571765"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cloud.nio.impl.drs

import cats.data.NonEmptyList
import cats.effect.IO
import cats.syntax.validated._
import com.google.cloud.NoCredentials
import common.validation.ErrorOr.ErrorOr
import org.apache.http.impl.client.HttpClientBuilder
import org.specs2.mock.Mockito
import org.specs2.mock.Mockito._
Expand Down Expand Up @@ -37,13 +39,19 @@ class MockEngineDrsPathResolver(drsConfig: DrsConfig = MockDrsPaths.mockDrsConfi

private val marthaObjWithFileName = marthaObjWithGcsPath.copy(fileName = Option("file.txt"))

private val marthaObjWithLocalizationPath = marthaObjWithGcsPath.copy(localizationPath = Option("/dir/subdir/file.txt"))

private val marthaObjWithAllThePaths = marthaObjWithLocalizationPath.copy(fileName = marthaObjWithFileName.fileName)

private val marthaObjWithNoGcsPath = marthaObjWithGcsPath.copy(gsUri = None)

override def resolveDrsThroughMartha(drsPath: String, fields: NonEmptyList[MarthaField.Value]): IO[MarthaResponse] = {
drsPath match {
case MockDrsPaths.drsPathResolvingGcsPath => IO(marthaObjWithGcsPath)
case MockDrsPaths.drsPathWithNonPathChars => IO(marthaObjWithGcsPath)
case MockDrsPaths.drsPathResolvingWithFileName => IO(marthaObjWithFileName)
case MockDrsPaths.drsPathResolvingWithLocalizationPath => IO.pure(marthaObjWithLocalizationPath)
case MockDrsPaths.drsPathResolvingWithAllThePaths => IO.pure(marthaObjWithAllThePaths)
case MockDrsPaths.drsPathResolvingToNoGcsPath => IO(marthaObjWithNoGcsPath)
case MockDrsPaths.drsPathNotExistingInMartha =>
IO.raiseError(
Expand All @@ -55,5 +63,5 @@ class MockEngineDrsPathResolver(drsConfig: DrsConfig = MockDrsPaths.mockDrsConfi
}
}

override lazy val getAccessToken: String = MockDrsPaths.mockToken
override lazy val getAccessToken: ErrorOr[String] = MockDrsPaths.mockToken.validNel
}
2 changes: 1 addition & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ google {
}
# Filesystems available in this Crowmell instance
# They can be enabled individually in the engine.filesystems stanza and in the config.filesystems stanza of backends
# There is a default built-in local filesytem that can also be referenced as "local" as well.
# There is a default built-in local filesystem that can also be referenced as "local" as well.
filesystems {
drs {
class = "cromwell.filesystems.drs.DrsPathBuilderFactory"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package drs.localizer

import common.util.VersionUtil
import drs.localizer.CommandLineParser.AccessTokenStrategy._
import drs.localizer.CommandLineParser.Usage


class CommandLineParser extends scopt.OptionParser[CommandLineArguments](Usage) {
lazy val localizerVersion: String = VersionUtil.getVersion("cromwell-drs-localizer")

version("version")

help("help").text("Cromwell DRS Localizer")

head("cromwell-drs-localizer", localizerVersion)

arg[String]("drs-object-id").text("DRS object ID").required().
action((s, c) =>
c.copy(drsObject = Option(s)))
arg[String]("container-path").text("Container path").required().
action((s, c) =>
c.copy(containerPath = Option(s)))
arg[String]("requester-pays-project").text("Requester pays project").optional().
action((s, c) =>
c.copy(googleRequesterPaysProject = Option(s)))
opt[String]('t', "access-token-strategy").text(s"Access token strategy, must be one of '$Azure' or '$Google' (default '$Google')").
action((s, c) =>
c.copy(accessTokenStrategy = Option(s.toLowerCase())))
opt[String]('v', "vault-name").text("Azure vault name").
action((s, c) =>
c.copy(azureVaultName = Option(s)))
opt[String]('s', "secret-name").text("Azure secret name").
action((s, c) =>
c.copy(azureSecretName = Option(s)))
opt[String]('i', "identity-client-id").text("Azure identity client id").
action((s, c) =>
c.copy(azureIdentityClientId = Option(s)))
checkConfig(c =>
c.accessTokenStrategy match {
case Some(Azure) if c.googleRequesterPaysProject.isEmpty => Right(())
case Some(Google) if List(c.azureSecretName, c.azureVaultName, c.azureIdentityClientId).forall(_.isEmpty) => Right(())
case Some(Azure) => Left(s"Requester pays project is only valid with access token strategy '$Google'")
case Some(Google) => Left(s"One or more specified options are only valid with access token strategy '$Azure'")
case Some(huh) => Left(s"Unrecognized access token strategy '$huh'")
case None => Left("Unspecified access token strategy")
}
)
}

object CommandLineParser {
/**
* These access token strategies are named simplistically as there is currently only one access token strategy being
* used for each of these cloud vendors. But it is certainly possible that multiple strategies could come into use
* for a particular vendor, in which case the names may need to become more specific for disambiguation.
*/
object AccessTokenStrategy {
val Azure = "azure"
val Google = "google"
}

val Usage =
s"""
Usage:
java -jar /path/to/localizer.jar [options] drs://provider/object /local/path/to/file.txt [requester pays project]

Note that the <requester pays project> optional argument is only valid with access token strategy 'Google'.
"""

}

case class CommandLineArguments(accessTokenStrategy: Option[String] = Option(Google),
drsObject: Option[String] = None,
containerPath: Option[String] = None,
googleRequesterPaysProject: Option[String] = None,
azureVaultName: Option[String] = None,
azureSecretName: Option[String] = None,
azureIdentityClientId: Option[String] = None)
Loading

0 comments on commit c778c33

Please sign in to comment.