diff --git a/.gitignore b/.gitignore
index 38c671ead86..85cc2761334 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
.DS_Store
.artifactory
.idea/*
+**/.idea/*
.ensime_cache/*
.config/*
.local/*
@@ -47,3 +48,6 @@ private_docker_papi_v2_usa.options
tesk_application_ftp.conf
ftp_centaur_cwl_runner.conf
tesk_application.conf
+**/__pycache__/
+**/venv/
+exome_germline_single_sample_v1.3/
diff --git a/.travis.yml b/.travis.yml
index c59e645e2b2..301a2894a22 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -108,6 +108,8 @@ env:
BUILD_TYPE=dbms
- >-
BUILD_TYPE=singleWorkflowRunner
+ - >-
+ BUILD_TYPE=metadataComparisonPython
script:
- src/ci/bin/test.sh
notifications:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2799e39ec59..cfbcde6e932 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,37 @@
# Cromwell Change Log
+## 51 Release Notes
+
+### Changes and Warnings
+
+The configuration format for call cache blacklisting has been updated, please see the [call caching documentation](
+https://cromwell.readthedocs.io/en/stable/Configuring/#call-caching) for details.
+
+### Bug fixes
+
+* Fixed a bug where the `size(...)` function did not work correctly on files
+ from a shared filesystem if `size(...)` was called in the input section on a
+ relative path.
++ Fixed a bug where the `use_relative_output_paths` option would not preserve intermediate folders.
+
+### New functionality
+
+#### Call caching blacklisting improvements
+
+Cromwell previously supported blacklisting GCS buckets containing cache hits which could not be copied for permissions
+reasons. Cromwell now adds support for blacklisting individual cache hits which could not be copied for any reason,
+as well as grouping blacklist caches according to a workflow option key. More information available in the [
+call caching documentation]( https://cromwell.readthedocs.io/en/stable/Configuring/#call-caching).
+
+#### new xxh64 and fingerprint strategies for call caching
+
+Existing call cache strategies `path` and `path+modtime` don't work when using docker on shared filesystems
+(SFS backend, i.e. not in cloud storage). The `file` (md5sum) strategy works, but uses a lot of resources.
+Two faster strategies have been added for this use case: `xxh64` and
+`fingerprint`. `xxh64` is a lightweight hashing algorithm, `fingerprint` is a strategy designed to be very
+lightweight. Read more about it in the [call caching documentation](
+https://cromwell.readthedocs.io/en/stable/Configuring/#call-caching).
+
## 50 Release Notes
### Changes and Warnings
@@ -11,7 +43,6 @@ Cromwell's metadata archival configuration has changed in a backwards incompatib
please see
[the updated documentation](https://cromwell.readthedocs.io/en/stable/Configuring#hybrid-metadata-storage-classic-carbonite) for details.
-
## 49 Release Notes
### Changes and Warnings
diff --git a/backend/src/main/scala/cromwell/backend/BackendCacheHitCopyingActor.scala b/backend/src/main/scala/cromwell/backend/BackendCacheHitCopyingActor.scala
index e6bae1bebf7..54849df4250 100644
--- a/backend/src/main/scala/cromwell/backend/BackendCacheHitCopyingActor.scala
+++ b/backend/src/main/scala/cromwell/backend/BackendCacheHitCopyingActor.scala
@@ -3,15 +3,19 @@ package cromwell.backend
import cromwell.backend.MetricableCacheCopyErrorCategory.MetricableCacheCopyErrorCategory
import cromwell.core.JobKey
import cromwell.core.simpleton.WomValueSimpleton
+import cromwell.services.CallCaching.CallCachingEntryId
object BackendCacheHitCopyingActor {
- final case class CopyOutputsCommand(womValueSimpletons: Seq[WomValueSimpleton], jobDetritusFiles: Map[String, String], returnCode: Option[Int])
+ final case class CopyOutputsCommand(womValueSimpletons: Seq[WomValueSimpleton], jobDetritusFiles: Map[String, String], cacheHit: CallCachingEntryId, returnCode: Option[Int])
- final case class CopyingOutputsFailedResponse(jobKey: JobKey, cacheCopyAttempt: Int, failure: CacheCopyError)
+ final case class CopyingOutputsFailedResponse(jobKey: JobKey, cacheCopyAttempt: Int, failure: CacheCopyFailure)
- sealed trait CacheCopyError
- final case class LoggableCacheCopyError(failure: Throwable) extends CacheCopyError
- final case class MetricableCacheCopyError(failureCategory: MetricableCacheCopyErrorCategory) extends CacheCopyError
+ sealed trait CacheCopyFailure
+ /** A cache hit copy was attempted but failed. */
+ final case class CopyAttemptError(failure: Throwable) extends CacheCopyFailure
+ /** Copying was requested for a blacklisted cache hit, however the cache hit copying actor found the hit had already
+ * been blacklisted so no novel copy attempt was made. */
+ final case class BlacklistSkip(failureCategory: MetricableCacheCopyErrorCategory) extends CacheCopyFailure
}
object MetricableCacheCopyErrorCategory {
@@ -20,4 +24,5 @@ object MetricableCacheCopyErrorCategory {
override def toString: String = getClass.getSimpleName.stripSuffix("$").toLowerCase
}
final case object BucketBlacklisted extends MetricableCacheCopyErrorCategory
+ final case object HitBlacklisted extends MetricableCacheCopyErrorCategory
}
diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/BlacklistCache.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/BlacklistCache.scala
index b1878731fda..ff3248123ff 100644
--- a/backend/src/main/scala/cromwell/backend/standard/callcaching/BlacklistCache.scala
+++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/BlacklistCache.scala
@@ -2,23 +2,59 @@ package cromwell.backend.standard.callcaching
import com.google.common.cache.{CacheBuilder, CacheLoader}
import cromwell.core.CacheConfig
+import cromwell.services.CallCaching.CallCachingEntryId
-case class BlacklistCache(config: CacheConfig) {
- val cache = {
- // Queries to the blacklist cache return false by default (i.e. not blacklisted).
- val falseLoader = new CacheLoader[String, java.lang.Boolean]() {
- override def load(key: String): java.lang.Boolean = false
+sealed trait BlacklistStatus
+case object BadCacheResult extends BlacklistStatus
+case object GoodCacheResult extends BlacklistStatus
+case object UntestedCacheResult extends BlacklistStatus
+
+sealed abstract class BlacklistCache(bucketCacheConfig: CacheConfig,
+ hitCacheConfig: CacheConfig,
+ val name: Option[String]) {
+ val bucketCache = {
+ // Queries to the bucket blacklist cache return UntestedCacheResult by default.
+ val unknownLoader = new CacheLoader[String, BlacklistStatus]() {
+ override def load(key: String): BlacklistStatus = UntestedCacheResult
+ }
+
+ CacheBuilder.
+ newBuilder().
+ concurrencyLevel(bucketCacheConfig.concurrency).
+ maximumSize(bucketCacheConfig.size).
+ expireAfterWrite(bucketCacheConfig.ttl.length, bucketCacheConfig.ttl.unit).
+ build[String, BlacklistStatus](unknownLoader)
+ }
+
+ val hitCache = {
+ // Queries to the hit blacklist cache return UntestedCacheResult by default (i.e. not blacklisted).
+ val unknownLoader = new CacheLoader[CallCachingEntryId, BlacklistStatus]() {
+ override def load(key: CallCachingEntryId): BlacklistStatus = UntestedCacheResult
}
CacheBuilder.
newBuilder().
- concurrencyLevel(config.concurrency).
- maximumSize(config.size).
- expireAfterWrite(config.ttl.length, config.ttl.unit).
- build[String, java.lang.Boolean](falseLoader)
+ concurrencyLevel(hitCacheConfig.concurrency).
+ maximumSize(hitCacheConfig.size).
+ expireAfterWrite(hitCacheConfig.ttl.length, hitCacheConfig.ttl.unit).
+ build[CallCachingEntryId, BlacklistStatus](unknownLoader)
}
- def isBlacklisted(bucket: String): Boolean = cache.get(bucket)
+ def getBlacklistStatus(hit: CallCachingEntryId): BlacklistStatus = hitCache.get(hit)
- def blacklist(bucket: String): Unit = cache.put(bucket, true)
+ def getBlacklistStatus(bucket: String): BlacklistStatus = bucketCache.get(bucket)
+
+ def blacklist(hit: CallCachingEntryId): Unit = hitCache.put(hit, BadCacheResult)
+
+ def blacklist(bucket: String): Unit = bucketCache.put(bucket, BadCacheResult)
+
+ def whitelist(hit: CallCachingEntryId): Unit = hitCache.put(hit, GoodCacheResult)
+
+ def whitelist(bucket: String): Unit = bucketCache.put(bucket, GoodCacheResult)
}
+
+class RootWorkflowBlacklistCache(bucketCacheConfig: CacheConfig, hitCacheConfig: CacheConfig) extends
+ BlacklistCache(bucketCacheConfig = bucketCacheConfig, hitCacheConfig = hitCacheConfig, name = None)
+
+class GroupingBlacklistCache(bucketCacheConfig: CacheConfig, hitCacheConfig: CacheConfig, val group: String) extends
+ BlacklistCache(bucketCacheConfig = bucketCacheConfig, hitCacheConfig = hitCacheConfig, name = Option(group))
diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/CallCachingBlacklistManager.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/CallCachingBlacklistManager.scala
new file mode 100644
index 00000000000..a22631aeeb4
--- /dev/null
+++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/CallCachingBlacklistManager.scala
@@ -0,0 +1,131 @@
+package cromwell.backend.standard.callcaching
+
+import akka.event.LoggingAdapter
+import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.typesafe.config.Config
+import cromwell.core.{CacheConfig, HasWorkflowIdAndSources}
+import mouse.boolean._
+import net.ceedubs.ficus.Ficus._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+object CallCachingBlacklistManager {
+ object Defaults {
+ object Groupings {
+ val Concurrency = 10000
+ val Size = 1000L
+ val Ttl = 2 hours
+ }
+ object Hits {
+ val Concurrency = 10000
+ val Size = 20000L
+ val Ttl = 1 hour
+ }
+ object Buckets {
+ val Concurrency = 10000
+ val Size = 1000L
+ val Ttl = 1 hour
+ }
+ }
+}
+
+class CallCachingBlacklistManager(rootConfig: Config, logger: LoggingAdapter) {
+
+ // Defined if "call-caching.blacklist-cache.enabled = true".
+ private val blacklistCacheConfig: Option[Unit] =
+ rootConfig.getOrElse("call-caching.blacklist-cache.enabled", false).option(())
+
+ // Defined if `blacklistCacheConfig` is defined and "call-caching.blacklist-cache.groupings.workflow-option" is defined.
+ private val blacklistGroupingWorkflowOptionKey: Option[String] = for {
+ _ <- blacklistCacheConfig // Only return a groupings cache if blacklisting is enabled.
+ workflowOption <- rootConfig.as[Option[String]]("call-caching.blacklist-cache.groupings.workflow-option")
+ } yield workflowOption
+
+ // Defined if `blacklistGroupingWorkflowOptionKey` is defined.
+ private val blacklistGroupingCacheConfig: Option[CacheConfig] = {
+ import CallCachingBlacklistManager.Defaults.Groupings._
+ for {
+ _ <- blacklistGroupingWorkflowOptionKey
+ groupingsOption = rootConfig.as[Option[Config]] ("call-caching.blacklist-cache.groupings")
+ conf = CacheConfig.config(groupingsOption, defaultConcurrency = Concurrency, defaultSize = Size, defaultTtl = Ttl)
+ } yield conf
+ }
+
+ // Defined if `blacklistCacheConfig` is defined.
+ private val blacklistBucketCacheConfig: Option[CacheConfig] = {
+ import CallCachingBlacklistManager.Defaults.Buckets._
+ for {
+ _ <- blacklistCacheConfig
+ bucketsOption = rootConfig.as[Option[Config]]("call-caching.blacklist-cache.buckets")
+ conf = CacheConfig.config(bucketsOption, defaultConcurrency = Concurrency, defaultSize = Size, defaultTtl = Ttl)
+ } yield conf
+ }
+
+ // Defined if `blacklistCacheConfig` is defined.
+ private val blacklistHitCacheConfig: Option[CacheConfig] = {
+ import CallCachingBlacklistManager.Defaults.Hits._
+ for {
+ _ <- blacklistCacheConfig
+ hitsOption = rootConfig.as[Option[Config]]("call-caching.blacklist-cache.hits")
+ conf = CacheConfig.config(hitsOption, defaultConcurrency = Concurrency, defaultSize = Size, defaultTtl = Ttl)
+ } yield conf
+ }
+
+ // If configuration allows, build a cache of blacklist groupings to BlacklistCaches.
+ private val blacklistGroupingsCache: Option[LoadingCache[String, BlacklistCache]] = {
+ def buildBlacklistGroupingsCache(groupingConfig: CacheConfig, bucketConfig: CacheConfig, hitConfig: CacheConfig): LoadingCache[String, BlacklistCache] = {
+ val emptyBlacklistCacheLoader = new CacheLoader[String, BlacklistCache]() {
+ override def load(key: String): BlacklistCache = new GroupingBlacklistCache(
+ bucketCacheConfig = bucketConfig,
+ hitCacheConfig = hitConfig,
+ group = key
+ )
+ }
+
+ CacheBuilder.
+ newBuilder().
+ concurrencyLevel(groupingConfig.concurrency).
+ maximumSize(groupingConfig.size).
+ expireAfterWrite(groupingConfig.ttl.length, groupingConfig.ttl.unit).
+ build[String, BlacklistCache](emptyBlacklistCacheLoader)
+ }
+
+ for {
+ groupingsConfig <- blacklistGroupingCacheConfig
+ bucketsConfig <- blacklistBucketCacheConfig
+ hitsConfig <- blacklistHitCacheConfig
+ } yield buildBlacklistGroupingsCache(groupingsConfig, bucketsConfig, hitsConfig)
+ }
+
+ /**
+ * If configured return a group blacklist cache, otherwise if configured return a root workflow cache,
+ * otherwise return nothing.
+ */
+ def blacklistCacheFor(workflow: HasWorkflowIdAndSources): Option[BlacklistCache] = {
+ // If configuration is set up for blacklist groups and a blacklist group is specified in workflow options,
+ // get the BlacklistCache for the group.
+ val groupBlacklistCache: Option[BlacklistCache] = for {
+ groupings <- blacklistGroupingsCache
+ groupKey <- blacklistGroupingWorkflowOptionKey
+ groupFromWorkflowOptions <- workflow.sources.workflowOptions.get(groupKey).toOption
+ } yield groupings.get(groupFromWorkflowOptions)
+
+ // Build a blacklist cache for a single, ungrouped root workflow.
+ def rootWorkflowBlacklistCache: Option[BlacklistCache] = for {
+ bucketConfig <- blacklistBucketCacheConfig
+ hitConfig <- blacklistHitCacheConfig
+ } yield new RootWorkflowBlacklistCache(bucketCacheConfig = bucketConfig, hitCacheConfig = hitConfig)
+
+ // Return the group blacklist cache if available, otherwise a blacklist cache for the root workflow.
+ val maybeCache = groupBlacklistCache orElse rootWorkflowBlacklistCache
+ maybeCache collect {
+ case group: GroupingBlacklistCache =>
+ logger.info("Workflow {} using group blacklist cache '{}' containing blacklist status for {} hits and {} buckets.",
+ workflow.id, group.group, group.hitCache.size(), group.bucketCache.size())
+ case _: RootWorkflowBlacklistCache =>
+ logger.info("Workflow {} using root workflow blacklist cache.", workflow.id)
+ }
+ maybeCache
+ }
+}
diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/CopyingActorBlacklistCacheSupport.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/CopyingActorBlacklistCacheSupport.scala
new file mode 100644
index 00000000000..5257c636027
--- /dev/null
+++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/CopyingActorBlacklistCacheSupport.scala
@@ -0,0 +1,160 @@
+package cromwell.backend.standard.callcaching
+import cats.data.NonEmptyList
+import cromwell.backend.BackendCacheHitCopyingActor.CopyOutputsCommand
+import cromwell.core.io.{IoCommand, IoCopyCommand}
+import cromwell.services.CallCaching.CallCachingEntryId
+
+
+object CopyingActorBlacklistCacheSupport {
+ trait HasFormatting {
+ def metricFormat: String = getClass.getName.toLowerCase.split('$').last
+ }
+
+ sealed trait Verb extends HasFormatting
+ case object Read extends Verb
+ case object Write extends Verb
+
+ sealed trait EntityType extends HasFormatting
+ case object Hit extends EntityType
+ case object Bucket extends EntityType
+
+ sealed trait CacheReadType
+ case object ReadHitOnly
+ case object ReadHitAndBucket
+}
+
+trait CopyingActorBlacklistCacheSupport {
+ this: StandardCacheHitCopyingActor =>
+
+ import CopyingActorBlacklistCacheSupport._
+
+ def handleBlacklistingForGenericFailure(): Unit = {
+ // Not a forbidden failure so do not blacklist the bucket but do blacklist the hit.
+ for {
+ data <- stateData
+ cache <- standardParams.blacklistCache
+ _ = blacklistAndMetricHit(cache, data.cacheHit)
+ } yield ()
+ ()
+ }
+
+ /* Whitelist by bucket and hit if appropriate. */
+ def handleWhitelistingForSuccess(command: IoCommand[_]): Unit = {
+ for {
+ cache <- standardParams.blacklistCache
+ data <- stateData
+ _ = whitelistAndMetricHit(cache, data.cacheHit)
+ copy <- Option(command) collect { case c: IoCopyCommand => c }
+ prefix <- extractBlacklistPrefix(copy.source.toString)
+ _ = whitelistAndMetricBucket(cache, prefix)
+ } yield ()
+ ()
+ }
+
+ def publishBlacklistMetric(blacklistCache: BlacklistCache, verb: Verb, entityType: EntityType, key: String, value: BlacklistStatus): Unit = {
+ val group = blacklistCache.name.getOrElse("none")
+ val metricPath = NonEmptyList.of(
+ "job",
+ "callcaching", "blacklist", verb.metricFormat, entityType.metricFormat, jobDescriptor.taskCall.localName, group, key, value.toString)
+ increment(metricPath)
+ }
+
+ def blacklistAndMetricHit(blacklistCache: BlacklistCache, hit: CallCachingEntryId): Unit = {
+ blacklistCache.getBlacklistStatus(hit) match {
+ case UntestedCacheResult =>
+ blacklistCache.blacklist(hit)
+ publishBlacklistMetric(blacklistCache, Write, Hit, hit.id.toString, value = BadCacheResult)
+ case BadCacheResult =>
+ // Not a surprise, race conditions abound in cache hit copying. Do not overwrite with the same value or
+ // multiply publish metrics for this hit.
+ case GoodCacheResult =>
+ // This hit was thought to be good but now a copy has failed for permissions reasons. Be conservative and
+ // mark the hit as BadCacheResult and log this strangeness.
+ log.warning(
+ "Cache hit {} found in GoodCacheResult blacklist state, but cache hit copying has failed for permissions reasons. Overwriting status to BadCacheResult state.",
+ hit.id)
+ blacklistCache.blacklist(hit)
+ publishBlacklistMetric(blacklistCache, Write, Hit, hit.id.toString, value = BadCacheResult)
+ }
+ }
+
+ def blacklistAndMetricBucket(blacklistCache: BlacklistCache, bucket: String): Unit = {
+ blacklistCache.getBlacklistStatus(bucket) match {
+ case UntestedCacheResult =>
+ blacklistCache.blacklist(bucket)
+ publishBlacklistMetric(blacklistCache, Write, Bucket, bucket, value = BadCacheResult)
+ case BadCacheResult =>
+ // Not a surprise, race conditions abound in cache hit copying. Do not overwrite with the same value or
+ // multiply publish metrics for this bucket.
+ case GoodCacheResult =>
+ // This bucket was thought to be good but now a copy has failed for permissions reasons. Be conservative and
+ // mark the bucket as BadCacheResult and log this strangeness.
+ log.warning(
+ "Bucket {} found in GoodCacheResult blacklist state, but cache hit copying has failed for permissions reasons. Overwriting status to BadCacheResult state.",
+ bucket)
+ blacklistCache.blacklist(bucket)
+ publishBlacklistMetric(blacklistCache, Write, Bucket, bucket, value = BadCacheResult)
+ }
+ }
+
+ def whitelistAndMetricHit(blacklistCache: BlacklistCache, hit: CallCachingEntryId): Unit = {
+ blacklistCache.getBlacklistStatus(hit) match {
+ case UntestedCacheResult =>
+ blacklistCache.whitelist(hit)
+ publishBlacklistMetric(blacklistCache, Write, Hit, hit.id.toString, value = GoodCacheResult)
+ case GoodCacheResult => // This hit is already known to be good, no need to rewrite or spam metrics.
+ case BadCacheResult =>
+ // This is surprising, a hit that we failed to copy before has now been the source of a successful copy.
+ // Don't overwrite this to GoodCacheResult, hopefully there are less weird cache hits out there.
+ log.warning(
+ "Cache hit {} found in BadCacheResult blacklist state, not overwriting to GoodCacheResult despite successful copy.",
+ hit.id)
+ }
+ }
+
+ def whitelistAndMetricBucket(blacklistCache: BlacklistCache, bucket: String): Unit = {
+ blacklistCache.getBlacklistStatus(bucket) match {
+ case UntestedCacheResult =>
+ blacklistCache.whitelist(bucket)
+ publishBlacklistMetric(blacklistCache, Write, Bucket, bucket, value = GoodCacheResult)
+ case GoodCacheResult => // This bucket is already known to be good, no need to rewrite or spam metrics.
+ case BadCacheResult =>
+ // This is surprising, a bucket that we failed to copy from before for auth reasons has now been the source
+ // of a successful copy. Don't overwrite this to GoodCacheResult, hopefully there are less weird cache hits out there.
+ log.warning(
+ "Bucket {} found in BadCacheResult blacklist state, not overwriting to GoodCacheResult despite successful copy.",
+ bucket)
+ }
+ }
+
+ def publishBlacklistReadMetrics(command: CopyOutputsCommand, cacheHit: CallCachingEntryId, cacheReadType: Product) = {
+ for {
+ c <- standardParams.blacklistCache
+ hitBlacklistStatus = c.getBlacklistStatus(cacheHit)
+ // If blacklisting is on the hit cache is always checked so publish a hit read metric.
+ _ = publishBlacklistMetric(c, Read, Hit, cacheHit.id.toString, hitBlacklistStatus)
+ // Conditionally publish the bucket read if the backend supports bucket / prefix blacklisting and the bucket was read.
+ _ <- Option(cacheReadType).collect { case ReadHitAndBucket => () }
+ path = sourcePathFromCopyOutputsCommand(command)
+ prefix <- extractBlacklistPrefix(path)
+ bucketBlacklistStatus = c.getBlacklistStatus(prefix)
+ _ = publishBlacklistMetric(c, Read, Bucket, prefix, bucketBlacklistStatus)
+ } yield ()
+ }
+
+ def isSourceBlacklisted(command: CopyOutputsCommand): Boolean = {
+ val path = sourcePathFromCopyOutputsCommand(command)
+ (for {
+ cache <- standardParams.blacklistCache
+ prefix <- extractBlacklistPrefix(path)
+ value = cache.getBlacklistStatus(prefix)
+ } yield value == BadCacheResult).getOrElse(false)
+ }
+
+ def isSourceBlacklisted(hit: CallCachingEntryId): Boolean = {
+ (for {
+ cache <- standardParams.blacklistCache
+ value = cache.getBlacklistStatus(hit)
+ } yield value == BadCacheResult).getOrElse(false)
+ }
+}
diff --git a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala
index df5d6a846c3..13bfd8abbce 100644
--- a/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala
+++ b/backend/src/main/scala/cromwell/backend/standard/callcaching/StandardCacheHitCopyingActor.scala
@@ -7,11 +7,12 @@ import cats.instances.list._
import cats.instances.set._
import cats.instances.tuple._
import cats.syntax.foldable._
-import cromwell.backend.BackendCacheHitCopyingActor.{CacheCopyError, CopyOutputsCommand, CopyingOutputsFailedResponse, LoggableCacheCopyError, MetricableCacheCopyError}
+import cromwell.backend.BackendCacheHitCopyingActor._
import cromwell.backend.BackendJobExecutionActor._
import cromwell.backend.BackendLifecycleActor.AbortJobCommand
import cromwell.backend.io.JobPaths
import cromwell.backend.standard.StandardCachingActorHelper
+import cromwell.backend.standard.callcaching.CopyingActorBlacklistCacheSupport._
import cromwell.backend.standard.callcaching.StandardCacheHitCopyingActor._
import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendJobDescriptor, MetricableCacheCopyErrorCategory}
import cromwell.core.CallOutputs
@@ -19,10 +20,13 @@ import cromwell.core.io._
import cromwell.core.logging.JobLogging
import cromwell.core.path.{Path, PathCopier}
import cromwell.core.simpleton.{WomValueBuilder, WomValueSimpleton}
+import cromwell.services.CallCaching.CallCachingEntryId
+import cromwell.services.instrumentation.CromwellInstrumentationActor
import wom.values.WomSingleFile
import scala.util.{Failure, Success, Try}
+
/**
* Trait of parameters passed to a StandardCacheHitCopyingActor.
*/
@@ -79,6 +83,7 @@ object StandardCacheHitCopyingActor {
case class StandardCacheHitCopyingActorData(commandsToWaitFor: List[Set[IoCommand[_]]],
newJobOutputs: CallOutputs,
newDetritus: DetritusMap,
+ cacheHit: CallCachingEntryId,
returnCode: Option[Int]
) {
@@ -109,6 +114,7 @@ object StandardCacheHitCopyingActor {
private[callcaching] case object StillWaiting extends CommandSetState
private[callcaching] case object AllCommandsDone extends CommandSetState
private[callcaching] case class NextSubSet(commands: Set[IoCommand[_]]) extends CommandSetState
+
}
class DefaultStandardCacheHitCopyingActor(standardParams: StandardCacheHitCopyingActorParams) extends StandardCacheHitCopyingActor(standardParams)
@@ -117,7 +123,8 @@ class DefaultStandardCacheHitCopyingActor(standardParams: StandardCacheHitCopyin
* Standard implementation of a BackendCacheHitCopyingActor.
*/
abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHitCopyingActorParams)
- extends FSM[StandardCacheHitCopyingActorState, Option[StandardCacheHitCopyingActorData]] with JobLogging with StandardCachingActorHelper with IoClientHelper {
+ extends FSM[StandardCacheHitCopyingActorState, Option[StandardCacheHitCopyingActorData]]
+ with JobLogging with StandardCachingActorHelper with IoClientHelper with CromwellInstrumentationActor with CopyingActorBlacklistCacheSupport {
override lazy val jobDescriptor: BackendJobDescriptor = standardParams.jobDescriptor
override lazy val backendInitializationDataOption: Option[BackendInitializationData] = standardParams.backendInitializationDataOption
@@ -139,48 +146,58 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
protected def duplicate(copyPairs: Set[PathPair]): Option[Try[Unit]] = None
when(Idle) {
- case Event(command: CopyOutputsCommand, None) if isSourceBlacklisted(command) =>
- // We don't want to log this because bucket blacklisting is a common and expected occurrence.
- failAndStop(MetricableCacheCopyError(MetricableCacheCopyErrorCategory.BucketBlacklisted))
-
- case Event(CopyOutputsCommand(simpletons, jobDetritus, returnCode), None) =>
- // Try to make a Path of the callRootPath from the detritus
- lookupSourceCallRootPath(jobDetritus) match {
- case Success(sourceCallRootPath) =>
-
- // process simpletons and detritus to get updated paths and corresponding IoCommands
- val processed = for {
- (destinationCallOutputs, simpletonIoCommands) <- processSimpletons(simpletons, sourceCallRootPath)
- (destinationDetritus, detritusIoCommands) <- processDetritus(jobDetritus)
- } yield (destinationCallOutputs, destinationDetritus, simpletonIoCommands ++ detritusIoCommands)
-
- processed match {
- case Success((destinationCallOutputs, destinationDetritus, detritusAndOutputsIoCommands)) =>
- duplicate(ioCommandsToCopyPairs(detritusAndOutputsIoCommands)) match {
- // Use the duplicate override if exists
- case Some(Success(_)) => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus)
- case Some(Failure(failure)) =>
- // Something went wrong in the custom duplication code. We consider this loggable because it's most likely a user-permission error:
- failAndStop(LoggableCacheCopyError(failure))
- // Otherwise send the first round of IoCommands (file outputs and detritus) if any
- case None if detritusAndOutputsIoCommands.nonEmpty =>
- detritusAndOutputsIoCommands foreach sendIoCommand
-
- // Add potential additional commands to the list
- val additionalCommands = additionalIoCommands(sourceCallRootPath, simpletons, destinationCallOutputs, jobDetritus, destinationDetritus)
- val allCommands = List(detritusAndOutputsIoCommands) ++ additionalCommands
-
- goto(WaitingForIoResponses) using Option(StandardCacheHitCopyingActorData(allCommands, destinationCallOutputs, destinationDetritus, returnCode))
- case _ => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus)
+ case Event(command @ CopyOutputsCommand(simpletons, jobDetritus, cacheHit, returnCode), None) =>
+ val (nextState, cacheReadType) =
+ if (isSourceBlacklisted(cacheHit)) {
+ // We don't want to log this because blacklisting is a common and expected occurrence.
+ (failAndStop(BlacklistSkip(MetricableCacheCopyErrorCategory.HitBlacklisted)), ReadHitOnly)
+ } else if (isSourceBlacklisted(command)) {
+ // We don't want to log this because blacklisting is a common and expected occurrence.
+ (failAndStop(BlacklistSkip(MetricableCacheCopyErrorCategory.BucketBlacklisted)), ReadHitAndBucket)
+ } else {
+ // Try to make a Path of the callRootPath from the detritus
+ val next = lookupSourceCallRootPath(jobDetritus) match {
+ case Success(sourceCallRootPath) =>
+
+ // process simpletons and detritus to get updated paths and corresponding IoCommands
+ val processed = for {
+ (destinationCallOutputs, simpletonIoCommands) <- processSimpletons(simpletons, sourceCallRootPath)
+ (destinationDetritus, detritusIoCommands) <- processDetritus(jobDetritus)
+ } yield (destinationCallOutputs, destinationDetritus, simpletonIoCommands ++ detritusIoCommands)
+
+ processed match {
+ case Success((destinationCallOutputs, destinationDetritus, detritusAndOutputsIoCommands)) =>
+ duplicate(ioCommandsToCopyPairs(detritusAndOutputsIoCommands)) match {
+ // Use the duplicate override if exists
+ case Some(Success(_)) => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus)
+ case Some(Failure(failure)) =>
+ // Something went wrong in the custom duplication code. We consider this loggable because it's most likely a user-permission error:
+ failAndStop(CopyAttemptError(failure))
+ // Otherwise send the first round of IoCommands (file outputs and detritus) if any
+ case None if detritusAndOutputsIoCommands.nonEmpty =>
+ detritusAndOutputsIoCommands foreach sendIoCommand
+
+ // Add potential additional commands to the list
+ val additionalCommands = additionalIoCommands(sourceCallRootPath, simpletons, destinationCallOutputs, jobDetritus, destinationDetritus)
+ val allCommands = List(detritusAndOutputsIoCommands) ++ additionalCommands
+
+ goto(WaitingForIoResponses) using Option(StandardCacheHitCopyingActorData(allCommands, destinationCallOutputs, destinationDetritus, cacheHit, returnCode))
+ case _ => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus)
+ }
+
+ // Something went wrong in generating duplication commands. We consider this loggable error because we don't expect this to happen:
+ case Failure(failure) => failAndStop(CopyAttemptError(failure))
}
- // Something went wrong in generating duplication commands. We consider this loggable error because we don't expect this to happen:
- case Failure(failure) => failAndStop(LoggableCacheCopyError(failure))
+ // Something went wrong in looking up the call root... loggable because we don't expect this to happen:
+ case Failure(failure) => failAndStop(CopyAttemptError(failure))
}
+ (next, ReadHitAndBucket)
+ }
- // Something went wrong in looking up the call root... loggable because we don't expect this to happen:
- case Failure(failure) => failAndStop(LoggableCacheCopyError(failure))
- }
+ publishBlacklistReadMetrics(command, cacheHit, cacheReadType)
+
+ nextState
}
when(WaitingForIoResponses) {
@@ -189,30 +206,36 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
commandState match {
case StillWaiting => stay() using Option(newData)
- case AllCommandsDone => succeedAndStop(newData.returnCode, newData.newJobOutputs, newData.newDetritus)
+ case AllCommandsDone =>
+ handleWhitelistingForSuccess(command)
+ succeedAndStop(newData.returnCode, newData.newJobOutputs, newData.newDetritus)
case NextSubSet(commands) =>
commands foreach sendIoCommand
stay() using Option(newData)
}
case Event(f: IoReadForbiddenFailure[_], Some(data)) =>
- handleForbidden(
+ handleBlacklistingForForbidden(
path = f.forbiddenPath,
// Loggable because this is an attempt-specific problem:
- andThen = failAndAwaitPendingResponses(LoggableCacheCopyError(f.failure), f.command, data)
+ andThen = failAndAwaitPendingResponses(CopyAttemptError(f.failure), f.command, data)
)
case Event(IoFailAck(command: IoCommand[_], failure), Some(data)) =>
+ handleBlacklistingForGenericFailure()
// Loggable because this is an attempt-specific problem:
- failAndAwaitPendingResponses(LoggableCacheCopyError(failure), command, data)
+ failAndAwaitPendingResponses(CopyAttemptError(failure), command, data)
// Should not be possible
- case Event(IoFailAck(_: IoCommand[_], failure), None) => failAndStop(LoggableCacheCopyError(failure))
+ case Event(IoFailAck(_: IoCommand[_], failure), None) => failAndStop(CopyAttemptError(failure))
}
when(FailedState) {
case Event(f: IoReadForbiddenFailure[_], Some(data)) =>
- handleForbidden(
+ handleBlacklistingForForbidden(
path = f.forbiddenPath,
andThen = stayOrStopInFailedState(f, data)
)
+ case Event(fail: IoFailAck[_], Some(data)) =>
+ handleBlacklistingForGenericFailure()
+ stayOrStopInFailedState(fail, data)
// At this point success or failure doesn't matter, we've already failed this hit
case Event(response: IoAck[_], Some(data)) =>
stayOrStopInFailedState(response, data)
@@ -238,12 +261,15 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
}
}
- /* Blacklist by prefix if appropriate. */
- private def handleForbidden[T](path: String, andThen: => State): State = {
+ /* Blacklist by bucket and hit if appropriate. */
+ private def handleBlacklistingForForbidden[T](path: String, andThen: => State): State = {
for {
+ // Blacklist the hit first in the forcomp since not all configurations will support bucket blacklisting.
cache <- standardParams.blacklistCache
+ data <- stateData
+ _ = blacklistAndMetricHit(cache, data.cacheHit)
prefix <- extractBlacklistPrefix(path)
- _ = cache.blacklist(prefix)
+ _ = blacklistAndMetricBucket(cache, prefix)
} yield()
andThen
}
@@ -256,7 +282,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
stay()
}
- def failAndStop(failure: CacheCopyError): State = {
+ def failAndStop(failure: CacheCopyFailure): State = {
context.parent ! CopyingOutputsFailedResponse(jobDescriptor.key, standardParams.cacheCopyAttempt, failure)
context stop self
stay()
@@ -264,7 +290,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
/** If there are no responses pending this behaves like `failAndStop`, otherwise this goes to `FailedState` and waits
* for all the pending responses to come back before stopping. */
- def failAndAwaitPendingResponses(failure: CacheCopyError, command: IoCommand[_], data: StandardCacheHitCopyingActorData): State = {
+ def failAndAwaitPendingResponses(failure: CacheCopyFailure, command: IoCommand[_], data: StandardCacheHitCopyingActorData): State = {
context.parent ! CopyingOutputsFailedResponse(jobDescriptor.key, standardParams.cacheCopyAttempt, failure)
val (newData, commandState) = data.commandComplete(command)
@@ -297,7 +323,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
}
/**
- * Returns a pair of the list of simpletons with copied paths, and copy commands necessary to perform those copies.
+ * Returns a pair of the list of simpletons with copied paths, and copy commands necessary to perform those copies.
*/
protected def processSimpletons(womValueSimpletons: Seq[WomValueSimpleton], sourceCallRootPath: Path): Try[(CallOutputs, Set[IoCommand[_]])] = Try {
val (destinationSimpletons, ioCommands): (List[WomValueSimpleton], Set[IoCommand[_]]) = womValueSimpletons.toList.foldMap({
@@ -324,7 +350,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
}
/**
- * Returns a pair of the detritus with copied paths, and copy commands necessary to perform those copies.
+ * Returns a pair of the detritus with copied paths, and copy commands necessary to perform those copies.
*/
protected def processDetritus(sourceJobDetritusFiles: Map[String, String]): Try[(Map[String, Path], Set[IoCommand[_]])] = Try {
val fileKeys = detritusFileKeys(sourceJobDetritusFiles)
@@ -361,24 +387,17 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
case other => s"The Cache hit copying actor timed out waiting for an unknown I/O operation: $other"
}
- // Loggable because this is an attempt-specific:
- failAndStop(LoggableCacheCopyError(new TimeoutException(exceptionMessage)))
+ // Loggable because this is attempt-specific:
+ failAndStop(CopyAttemptError(new TimeoutException(exceptionMessage)))
()
}
/**
- * If a subclass of this `StandardCacheHitCopyingActor` supports blacklisting then it should implement this
+ * If a subclass of this `StandardCacheHitCopyingActor` supports blacklisting by path then it should implement this
* to return the prefix of the path from the failed copy command to use for blacklisting.
*/
protected def extractBlacklistPrefix(path: String): Option[String] = None
- private def sourcePathFromCopyOutputsCommand(command: CopyOutputsCommand): String = command.jobDetritusFiles.values.head
+ def sourcePathFromCopyOutputsCommand(command: CopyOutputsCommand): String = command.jobDetritusFiles.values.head
- private def isSourceBlacklisted(command: CopyOutputsCommand): Boolean = {
- val path = sourcePathFromCopyOutputsCommand(command)
- (for {
- prefix <- extractBlacklistPrefix(path)
- cache <- standardParams.blacklistCache
- } yield cache.isBlacklisted(prefix)).getOrElse(false)
- }
}
diff --git a/backend/src/test/scala/cromwell/backend/standard/callcaching/BlacklistCacheSpec.scala b/backend/src/test/scala/cromwell/backend/standard/callcaching/BlacklistCacheSpec.scala
index 40af0c690f0..72025af499a 100644
--- a/backend/src/test/scala/cromwell/backend/standard/callcaching/BlacklistCacheSpec.scala
+++ b/backend/src/test/scala/cromwell/backend/standard/callcaching/BlacklistCacheSpec.scala
@@ -1,22 +1,44 @@
package cromwell.backend.standard.callcaching
import cromwell.core.CacheConfig
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import cromwell.services.CallCaching.CallCachingEntryId
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.language.postfixOps
-class BlacklistCacheSpec extends FlatSpec with BeforeAndAfterAll with Matchers {
- "The blacklist cache" should "default, blacklist and expire" in {
+class BlacklistCacheSpec extends FlatSpec with Matchers with Eventually {
+ "The blacklist cache" should "default, blacklist, whitelist and expire" in {
+ val hit = CallCachingEntryId(3)
val bucket = "foo"
- val cacheConfig = CacheConfig(concurrency = 1, size = Integer.MAX_VALUE, ttl = 1 second)
- val cache = BlacklistCache(cacheConfig)
- cache.isBlacklisted(bucket) shouldBe false
+
+ val bucketCacheConfig = CacheConfig(concurrency = 1, size = Integer.MAX_VALUE, ttl = 1 second)
+ val hitCacheConfig = CacheConfig(concurrency = 1, size = Integer.MAX_VALUE, ttl = 1 second)
+ val cache = new RootWorkflowBlacklistCache(bucketCacheConfig = bucketCacheConfig, hitCacheConfig = hitCacheConfig)
+ cache.getBlacklistStatus(bucket) shouldBe UntestedCacheResult
+ cache.getBlacklistStatus(hit) shouldBe UntestedCacheResult
cache.blacklist(bucket)
- cache.isBlacklisted(bucket) shouldBe true
+ cache.blacklist(hit)
+ cache.getBlacklistStatus(bucket) shouldBe BadCacheResult
+ cache.getBlacklistStatus(hit) shouldBe BadCacheResult
+
+ implicit val patienceConfig = PatienceConfig(timeout = scaled(5.seconds), interval = scaled(1.second))
// Test ttl
- Thread.sleep(5000L)
- cache.isBlacklisted(bucket) shouldBe false
+ eventually {
+ cache.getBlacklistStatus(bucket) shouldBe UntestedCacheResult
+ cache.getBlacklistStatus(hit) shouldBe UntestedCacheResult
+ }
+
+ cache.whitelist(bucket)
+ cache.whitelist(hit)
+ cache.getBlacklistStatus(bucket) shouldBe GoodCacheResult
+ cache.getBlacklistStatus(hit) shouldBe GoodCacheResult
+
+ eventually {
+ cache.getBlacklistStatus(bucket) shouldBe UntestedCacheResult
+ cache.getBlacklistStatus(hit) shouldBe UntestedCacheResult
+ }
}
}
diff --git a/backend/src/test/scala/cromwell/backend/standard/callcaching/CallCachingBlacklistManagerSpec.scala b/backend/src/test/scala/cromwell/backend/standard/callcaching/CallCachingBlacklistManagerSpec.scala
new file mode 100644
index 00000000000..d53f2c14c93
--- /dev/null
+++ b/backend/src/test/scala/cromwell/backend/standard/callcaching/CallCachingBlacklistManagerSpec.scala
@@ -0,0 +1,121 @@
+package cromwell.backend.standard.callcaching
+
+import akka.event.NoLogging
+import com.typesafe.config.ConfigFactory
+import cromwell.core._
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json._
+
+
+class CallCachingBlacklistManagerSpec extends FlatSpec with Matchers {
+ behavior of "CallCachingBlacklistManager"
+
+ //noinspection RedundantDefaultArgument
+ val workflowSourcesNoGrouping = WorkflowSourceFilesWithoutImports(
+ workflowSource = None,
+ workflowUrl = None,
+ workflowRoot = None,
+ workflowType = None,
+ workflowTypeVersion = None,
+ inputsJson = "",
+ workflowOptions = WorkflowOptions(JsObject.empty),
+ labelsJson = "",
+ workflowOnHold = false,
+ warnings = List.empty
+ )
+
+ val workflowSourcesYesGrouping = workflowSourcesNoGrouping.copy(
+ workflowOptions = WorkflowOptions(""" { "google_project": "blacklist_group_testing" } """.parseJson.asJsObject)
+ )
+
+ val workflowNoGrouping = new HasWorkflowIdAndSources {
+ override def sources: WorkflowSourceFilesCollection = workflowSourcesNoGrouping
+ override def id: WorkflowId = WorkflowId.randomId()
+ }
+
+ val workflowYesGrouping1 = new HasWorkflowIdAndSources {
+ override def sources: WorkflowSourceFilesCollection = workflowSourcesYesGrouping
+ override def id: WorkflowId = WorkflowId.randomId()
+ }
+
+ val workflowYesGrouping2 = new HasWorkflowIdAndSources {
+ override def sources: WorkflowSourceFilesCollection = workflowSourcesYesGrouping
+ override def id: WorkflowId = WorkflowId.randomId()
+ }
+
+ it should "be off by default" in {
+ val configString = ""
+ val manager = new CallCachingBlacklistManager(ConfigFactory.parseString(configString), logger = NoLogging)
+
+ manager.blacklistCacheFor(workflowNoGrouping) shouldBe None
+ }
+
+ it should "be on with default values if blacklisting is enabled" in {
+ val configString =
+ """
+ |call-caching {
+ | blacklist-cache {
+ | enabled: true
+ | }
+ |}
+ |""".stripMargin
+ val manager = new CallCachingBlacklistManager(ConfigFactory.parseString(configString), logger = NoLogging)
+
+ val cache = manager.blacklistCacheFor(workflowNoGrouping)
+ val rootWorkflowCache = cache.get.asInstanceOf[RootWorkflowBlacklistCache]
+
+ rootWorkflowCache.hitCache.size() shouldBe 0
+ rootWorkflowCache.bucketCache.size() shouldBe 0
+ }
+
+ it should "use root workflow level caches if no workflow-option is specified for groupings in config" in {
+ val configString =
+ """
+ |call-caching {
+ | blacklist-cache {
+ | enabled: true
+ | }
+ |}
+ |""".stripMargin
+ val manager = new CallCachingBlacklistManager(ConfigFactory.parseString(configString), logger = NoLogging)
+ val cache = manager.blacklistCacheFor(workflowYesGrouping1)
+ val _ = cache.get.asInstanceOf[RootWorkflowBlacklistCache]
+ }
+
+ it should "use root workflow level caches if no workflow-option is provided in workflow options" in {
+ val configString =
+ """
+ |call-caching {
+ | blacklist-cache {
+ | enabled: true
+ | groupings: {
+ | workflow-option: "google_project"
+ | }
+ | }
+ |}
+ |""".stripMargin
+ val manager = new CallCachingBlacklistManager(ConfigFactory.parseString(configString), logger = NoLogging)
+ val cache = manager.blacklistCacheFor(workflowNoGrouping)
+ val _ = cache.get.asInstanceOf[RootWorkflowBlacklistCache]
+ }
+
+ it should "use a grouping cache if there is a workflow-option in config and its value exists as a key in workflow options" in {
+ val configString =
+ """
+ |call-caching {
+ | blacklist-cache {
+ | enabled: true
+ | groupings: {
+ | workflow-option: "google_project"
+ | }
+ | }
+ |}
+ |""".stripMargin
+ val manager = new CallCachingBlacklistManager(ConfigFactory.parseString(configString), logger = NoLogging)
+ val cache1 = manager.blacklistCacheFor(workflowYesGrouping1).get
+ val _ = cache1.asInstanceOf[GroupingBlacklistCache]
+
+ val cache2 = manager.blacklistCacheFor(workflowYesGrouping2).get
+ System.identityHashCode(cache1) shouldEqual System.identityHashCode(cache2)
+ }
+}
diff --git a/build.sbt b/build.sbt
index 8fdd780aa90..f276912f2ec 100644
--- a/build.sbt
+++ b/build.sbt
@@ -223,7 +223,7 @@ lazy val awsBackend = (project in backendRoot / "aws")
.dependsOn(services % "test->test")
lazy val sfsBackend = (project in backendRoot / "sfs")
- .withLibrarySettings("cromwell-sfs-backend")
+ .withLibrarySettings("cromwell-sfs-backend", sfsBackendDependencies)
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(httpFileSystem)
diff --git a/centaur/src/it/scala/centaur/AbstractCromwellEngineOrBackendUpgradeTestCaseSpec.scala b/centaur/src/it/scala/centaur/AbstractCromwellEngineOrBackendUpgradeTestCaseSpec.scala
index 1cc20910886..fa8db361352 100644
--- a/centaur/src/it/scala/centaur/AbstractCromwellEngineOrBackendUpgradeTestCaseSpec.scala
+++ b/centaur/src/it/scala/centaur/AbstractCromwellEngineOrBackendUpgradeTestCaseSpec.scala
@@ -7,6 +7,7 @@ import cromwell.database.slick.{EngineSlickDatabase, MetadataSlickDatabase, Slic
import cromwell.database.sql.SqlDatabase
import org.scalatest.{Assertions, BeforeAndAfter, DoNotDiscover}
import shapeless.syntax.typeable._
+import net.ceedubs.ficus.Ficus._
import scala.concurrent.Future
@@ -71,7 +72,7 @@ object AbstractCromwellEngineOrBackendUpgradeTestCaseSpec {
private def recreateDatabase(slickDatabase: SlickDatabase)(implicit cs: ContextShift[IO]): IO[Unit] = {
import slickDatabase.dataAccess.driver.api._
- val schemaName = slickDatabase.databaseConfig.getString("db.schema")
+ val schemaName = slickDatabase.databaseConfig.getOrElse("db.cromwell-database-name", "cromwell_test")
//noinspection SqlDialectInspection
for {
_ <- IO.fromFuture(IO(slickDatabase.database.run(sqlu"""DROP SCHEMA IF EXISTS #$schemaName""")))
diff --git a/centaur/src/main/resources/reference.conf b/centaur/src/main/resources/reference.conf
index 4f7e96e52ce..48a4a404576 100644
--- a/centaur/src/main/resources/reference.conf
+++ b/centaur/src/main/resources/reference.conf
@@ -34,9 +34,6 @@ centaur {
# }
include required(classpath("reference_database.inc.conf"))
- # Override the database url to use a stable in memory database.
- database.db.schema = "cromwell_test"
- database.db.url = "jdbc:hsqldb:file:"${centaur.cromwell.database.db.schema}";shutdown=false;hsqldb.tx=mvcc"
}
# The timeout of the Centaur send/receive + unmarshal pipeline
diff --git a/centaur/src/main/resources/standardTestCases/sizerelativepath.test b/centaur/src/main/resources/standardTestCases/sizerelativepath.test
new file mode 100644
index 00000000000..c8c266ddb1b
--- /dev/null
+++ b/centaur/src/main/resources/standardTestCases/sizerelativepath.test
@@ -0,0 +1,13 @@
+name: sizerelativepath
+testFormat: workflowsuccess
+
+backends: [Local, LocalNoDocker, Slurm, SlurmNoDocker]
+backendsMode: "only"
+
+files {
+ workflow: sizerelativepath/sizerelativepath.wdl
+}
+
+metadata {
+ "outputs.sizerelativepath.size_string": "1495"
+}
diff --git a/centaur/src/main/resources/standardTestCases/sizerelativepath/1495bytes.txt b/centaur/src/main/resources/standardTestCases/sizerelativepath/1495bytes.txt
new file mode 100644
index 00000000000..9d9d23e4f4f
--- /dev/null
+++ b/centaur/src/main/resources/standardTestCases/sizerelativepath/1495bytes.txt
@@ -0,0 +1,20 @@
+KNAZHPO7MYANAWUG54SHDEXMQWBFJ4QJPTXGF4HBPB6WLJ4U7ZHXGZ53ZMLC5ZFXW6AFNPX33OZT
+SJA5XBO4GKPGJFLBGQPE2FYLRUSYNYNS7VS3TNOE7MYFEISAUX34KETVE6N2CHYJQTT23JDYJTBE
+5USVDQX5SK4WOU55ABQNALNIEV2LGGPMDCZ77EJXF25RLS3IGOER5RNQJHVIUG5RWDR5SU65JPGC
+BSG5RRD2L4H5FWVBXFKILP7SMWK7RENOVCINHWJNES2GKYVAKMJDRIYWBEJHXYOVN527HDKMYUJ2
+WH6D4VNC6E4LWFTA4ACWB2MN6WXRRRIL55SP5JYQPNUQBTEXXPKB4BRHROZZ5NBTRP6ERRP77X66
+J7TTEIBFZE3SJAOR7PLFYLDVHURUXKJMF3PMKSVBL57T2ZXYR34WEQHIGCZGVNKR7FCSPVJ2Y2IW
+WNBW3SBCIHKJCPWQ6CSZ4HNI2REHEI3O234ECVGFS6KRAGGHGSSAXLFITWBE2KTNSJP23PTCU3MV
+EYPDUX2AQVN7TXCIGWMOO2XRZJMU5UQXNKB4AQUGIJOMNCDR73QYLWKRTXLZSNHPMRLVAVS3CZ4M
+XJYNSBOB5OMJEQXSINZRSNQOUPPUVSYMMUALKW7NII33FQWF6EIOHDAE2WSOAE6CJ4SKLPWGRVFX
+TQHILVF4GWVAOJZJTMPL3JXZER44ZGWFCK7ZY7NCK3AMWWV4S2F5NBUUIOEHKBNT4SSWG2LJ2GSC
+FWXKHWSWCM7RVTMRX6E2P6FTR6O2S7MOJ2RBHAEONZEDV5OWO2IMYXJR4RYCPE5EN5XLPWY5SLCG
+A66UTVOVGPMBJ63TY5ZND4PKB4H3JD4KKJLFYYT7C34OBUEE4C6G7I54KTKK7N6JMCCZUZG3WUJN
+PEJEANB4CG76ZO4CTU2IHLE6ILJXGFNXHMN7J6QXYRTWK3WLGIDQ7GFVLHDOB4IMJ365HQOIZOYT
+QPZ65NDNMJH7PX44UJMKAZARA7PNY4AS7OQRXKGJOEJPXS5NXTY3CPA5FQ7C5N757GJPWA27UKFK
+PMCIJVZCQPYUARZUV4HVGUCXGQPIQ4AIHO4HME45EQ5HDVMDY66WTE43QOBEIURA23MVQEVNL7ZW
+EJFQQKE2WAQ6P5QNSZOQMKPSTA7GBNLK53P2MVVRLKLKPAHFDIMVYQG6LWWGHWPQQSX5ET4XBEAT
+KKNIAGIFGCJTUXAGBGYVU322HVMRJDG3WZMSC5F4OPNMUJCMEEV3JAABTGTGGNC3IUZCU47RSBTJ
+T4M3FXRWBPUA2VIT6LST5QCYK4A67OJIR7TUZGIYSCFS5SSZBNENIAXE6UYWMK7Q7E5YZVHQEYV7
+RQ4LCN2YMT63227SEDXMBKWWJSF24ARCXTBZATYNTXGRJP4TPQDIPHILNG6LD2RJDMNB3XYF3HL7
+7BVXUY4SRQQFTQQPR7B4GQSNDC7X6ZHO
\ No newline at end of file
diff --git a/centaur/src/main/resources/standardTestCases/sizerelativepath/sizerelativepath.wdl b/centaur/src/main/resources/standardTestCases/sizerelativepath/sizerelativepath.wdl
new file mode 100644
index 00000000000..5e874835f0d
--- /dev/null
+++ b/centaur/src/main/resources/standardTestCases/sizerelativepath/sizerelativepath.wdl
@@ -0,0 +1,32 @@
+version 1.0
+
+task print_size {
+ input {
+ File file
+ }
+ Int bytes = ceil(size(file))
+
+ command {
+ echo ~{bytes}
+ }
+
+ output {
+ String out = read_string(stdout())
+ }
+
+ runtime {docker: "ubuntu:latest"}
+}
+
+workflow sizerelativepath {
+ input {
+ File file = "centaur/src/main/resources/standardTestCases/sizerelativepath/1495bytes.txt"
+ }
+
+ call print_size {
+ input:
+ file = file
+ }
+ output {
+ String size_string = print_size.out
+ }
+}
diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf
index 1f83a963f8d..3eee76d200d 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -265,6 +265,9 @@ call-caching {
# to fail for external reasons which should not invalidate the cache (e.g. auth differences between users):
# (default: true)
invalidate-bad-cache-results = true
+
+ # The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job.
+ max-failed-copy-attempts = 1000000
}
google {
diff --git a/core/src/main/resources/reference_local_provider_config.inc.conf b/core/src/main/resources/reference_local_provider_config.inc.conf
index bd5a041a1ec..2d87c62f56b 100644
--- a/core/src/main/resources/reference_local_provider_config.inc.conf
+++ b/core/src/main/resources/reference_local_provider_config.inc.conf
@@ -47,16 +47,15 @@ filesystems {
]
caching {
- # When copying a cached result, what type of file duplication should occur. Attempted in the order listed below:
+ # When copying a cached result, what type of file duplication should occur.
+ # For more information check: https://cromwell.readthedocs.io/en/stable/backends/HPC/#shared-filesystem
duplication-strategy: [
"hard-link", "soft-link", "copy"
]
- # Possible values: file, path
- # "file" will compute an md5 hash of the file content.
- # "path" will compute an md5 hash of the file path. This strategy will only be effective if the duplication-strategy (above) is set to "soft-link",
- # in order to allow for the original file path to be hashed.
- hashing-strategy: "file"
+ # Strategy to determine if a file has been used before.
+ # For extended explanation and alternative strategies check: https://cromwell.readthedocs.io/en/stable/Configuring/#call-caching
+ hashing-strategy: "md5"
# When true, will check if a sibling file with the same name and the .md5 extension exists, and if it does, use the content of this file as a hash.
# If false or the md5 does not exist, will proceed with the above-defined hashing strategy.
diff --git a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala
index cf5d2baa113..9359eab5430 100644
--- a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala
+++ b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala
@@ -33,6 +33,11 @@ sealed trait WorkflowSourceFilesCollection {
}
}
+trait HasWorkflowIdAndSources {
+ def sources: WorkflowSourceFilesCollection
+ def id: WorkflowId
+}
+
object WorkflowSourceFilesCollection {
def apply(workflowSource: Option[WorkflowSource],
workflowUrl: Option[WorkflowUrl],
diff --git a/cromwell.example.backends/LocalExample.conf b/cromwell.example.backends/LocalExample.conf
index cbc79ea689f..91be9586fa7 100644
--- a/cromwell.example.backends/LocalExample.conf
+++ b/cromwell.example.backends/LocalExample.conf
@@ -92,18 +92,15 @@
# Call caching strategies
caching {
- # When copying a cached result, what type of file duplication should occur. Attempted in the order listed below:
+ # When copying a cached result, what type of file duplication should occur.
+ # For more information check: https://cromwell.readthedocs.io/en/stable/backends/HPC/#shared-filesystem
duplication-strategy: [
"hard-link", "soft-link", "copy"
]
- # Possible values: file, path, path+modtime
- # "file" will compute an md5 hash of the file content.
- # "path" will compute an md5 hash of the file path. This strategy will only be effective if the duplication-strategy (above) is set to "soft-link",
- # in order to allow for the original file path to be hashed.
- # "path+modtime" will compute an md5 hash of the file path and the last modified time. The same conditions as for "path" apply here.
- # Default: file
- hashing-strategy: "file"
+ # Strategy to determine if a file has been used before.
+ # For extended explanation and alternative strategies check: https://cromwell.readthedocs.io/en/stable/Configuring/#call-caching
+ hashing-strategy: "md5"
# When true, will check if a sibling file with the same name and the .md5 extension exists, and if it does, use the content of this file as a hash.
# If false or the md5 does not exist, will proceed with the above-defined hashing strategy.
diff --git a/cromwell.example.backends/cromwell.examples.conf b/cromwell.example.backends/cromwell.examples.conf
index de67732930f..e6898473c95 100644
--- a/cromwell.example.backends/cromwell.examples.conf
+++ b/cromwell.example.backends/cromwell.examples.conf
@@ -153,16 +153,41 @@ call-caching {
# (default: true)
#invalidate-bad-cache-results = true
+ # The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job.
+ #max-failed-copy-attempts = 1000000
+
# blacklist-cache {
- # # The call caching blacklist cache is off by default. This cache is used to blacklist cache hit paths based on the
- # # prefixes of cache hit paths that Cromwell has previously failed to copy for permissions reasons.
+ # # The call caching blacklist cache is off by default. This cache is used to blacklist cache hits based on cache
+ # # hit ids or buckets of cache hit paths that Cromwell has previously failed to copy for permissions reasons.
# enabled: true
- # # Guava cache concurrency.
- # concurrency: 10000
- # # How long entries in the cache should live from the time of their last access.
- # ttl: 20 minutes
- # # Maximum number of entries in the cache.
- # size: 1000
+ #
+ # # A blacklist grouping can be specified in workflow options which will inform the blacklister which workflows
+ # # should share a blacklist cache.
+ # groupings {
+ # workflow-option: call-cache-blacklist-group
+ # concurrency: 10000
+ # ttl: 2 hours
+ # size: 1000
+ # }
+ #
+ # buckets {
+ # # Guava cache concurrency.
+ # concurrency: 10000
+ # # How long entries in the cache should live from the time of their last access.
+ # ttl: 20 minutes
+ # # Maximum number of entries in the cache.
+ # size: 1000
+ # }
+ #
+ # hits {
+ # # Guava cache concurrency.
+ # concurrency: 10000
+ # # How long entries in the cache should live from the time of their last access.
+ # ttl: 20 minutes
+ # # Maximum number of entries in the cache.
+ # size: 100000
+ # }
+ #
# }
}
diff --git a/database/migration/src/main/resources/metadata_changesets/remove_non_summarizable_metadata_from_queue.xml b/database/migration/src/main/resources/metadata_changesets/remove_non_summarizable_metadata_from_queue.xml
new file mode 100644
index 00000000000..b8c4ce8e17d
--- /dev/null
+++ b/database/migration/src/main/resources/metadata_changesets/remove_non_summarizable_metadata_from_queue.xml
@@ -0,0 +1,67 @@
+
+
+
+
+
+ Delete rows from the summary queue corresponding to metadata that will not be summarized.
+
+
+ DELETE FROM SUMMARY_QUEUE_ENTRY queue WHERE queue.METADATA_JOURNAL_ID NOT IN (
+ SELECT metadata.METADATA_JOURNAL_ID FROM METADATA_ENTRY metadata WHERE
+ metadata.METADATA_JOURNAL_ID = queue.METADATA_JOURNAL_ID AND
+ metadata.CALL_FQN IS NULL AND
+ metadata.JOB_SCATTER_INDEX IS NULL AND
+ metadata.JOB_RETRY_ATTEMPT IS NULL AND (
+ metadata.METADATA_KEY in
+ ('start', 'end', 'workflowName', 'status', 'submission', 'parentWorkflowId', 'rootWorkflowId')
+ OR
+ metadata.METADATA_KEY LIKE 'labels%'
+ )
+ )
+
+
+
+
+
+ Delete rows from the summary queue corresponding to metadata that will not be summarized.
+
+
+ DELETE FROM "SUMMARY_QUEUE_ENTRY" queue WHERE queue."METADATA_JOURNAL_ID" NOT IN (
+ SELECT metadata."METADATA_JOURNAL_ID" FROM "METADATA_ENTRY" metadata WHERE
+ metadata."METADATA_JOURNAL_ID" = queue."METADATA_JOURNAL_ID" AND
+ metadata."CALL_FQN" IS NULL AND
+ metadata."JOB_SCATTER_INDEX" IS NULL AND
+ metadata."JOB_RETRY_ATTEMPT" IS NULL AND (
+ metadata."METADATA_KEY" in
+ ('start', 'end', 'workflowName', 'status', 'submission', 'parentWorkflowId', 'rootWorkflowId')
+ OR
+ metadata."METADATA_KEY" LIKE 'labels%'
+ )
+ )
+
+
+
+
+
+ Delete rows from the summary queue corresponding to metadata that will not be summarized.
+
+
+ DELETE SUMMARY_QUEUE_ENTRY FROM SUMMARY_QUEUE_ENTRY
+ INNER JOIN METADATA_ENTRY ON
+ SUMMARY_QUEUE_ENTRY.METADATA_JOURNAL_ID = METADATA_ENTRY.METADATA_JOURNAL_ID WHERE NOT (
+ METADATA_ENTRY.CALL_FQN IS NULL AND
+ METADATA_ENTRY.JOB_SCATTER_INDEX IS NULL AND
+ METADATA_ENTRY.JOB_RETRY_ATTEMPT IS NULL AND (
+ METADATA_ENTRY.METADATA_KEY in
+ ('start', 'end', 'workflowName', 'status', 'submission', 'parentWorkflowId', 'rootWorkflowId')
+ OR
+ METADATA_ENTRY.METADATA_KEY LIKE 'labels%'
+ )
+ )
+
+
+
+
diff --git a/database/migration/src/main/resources/sql_metadata_changelog.xml b/database/migration/src/main/resources/sql_metadata_changelog.xml
index 0f74d8f6679..8ce89e5050a 100644
--- a/database/migration/src/main/resources/sql_metadata_changelog.xml
+++ b/database/migration/src/main/resources/sql_metadata_changelog.xml
@@ -16,5 +16,6 @@
+
diff --git a/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala b/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala
index 506ebee2a33..c558c6e432b 100644
--- a/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala
+++ b/database/sql/src/main/scala/cromwell/database/slick/MetadataSlickDatabase.scala
@@ -18,6 +18,33 @@ object MetadataSlickDatabase {
val databaseConfig = SlickDatabase.getDatabaseConfig("metadata", parentConfig)
new MetadataSlickDatabase(databaseConfig)
}
+
+ case class SummarizationPartitionedMetadata(nonSummarizableMetadata: Seq[MetadataEntry],
+ summarizableMetadata: Seq[MetadataEntry])
+
+ def partitionSummarizationMetadata(rawMetadataEntries: Seq[MetadataEntry],
+ startMetadataKey: String,
+ endMetadataKey: String,
+ nameMetadataKey: String,
+ statusMetadataKey: String,
+ submissionMetadataKey: String,
+ parentWorkflowIdKey: String,
+ rootWorkflowIdKey: String,
+ labelMetadataKey: String): SummarizationPartitionedMetadata = {
+
+ val exactMatchMetadataKeys = Set(startMetadataKey, endMetadataKey, nameMetadataKey, statusMetadataKey, submissionMetadataKey, parentWorkflowIdKey, rootWorkflowIdKey)
+ val startsWithMetadataKeys = Set(labelMetadataKey)
+
+ val (summarizable, nonSummarizable) = rawMetadataEntries partition { entry =>
+ entry.callFullyQualifiedName.isEmpty && entry.jobIndex.isEmpty && entry.jobAttempt.isEmpty &&
+ (exactMatchMetadataKeys.contains(entry.metadataKey) || startsWithMetadataKeys.exists(entry.metadataKey.startsWith))
+ }
+
+ SummarizationPartitionedMetadata(
+ summarizableMetadata = summarizable,
+ nonSummarizableMetadata = nonSummarizable
+ )
+ }
}
class MetadataSlickDatabase(originalDatabaseConfig: Config)
@@ -28,24 +55,56 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
override lazy val dataAccess = new MetadataDataAccessComponent(slickConfig.profile)
import dataAccess.driver.api._
+ import MetadataSlickDatabase._
override def existsMetadataEntries()(implicit ec: ExecutionContext): Future[Boolean] = {
val action = dataAccess.metadataEntriesExists.result
runTransaction(action)
}
- override def addMetadataEntries(metadataEntries: Iterable[MetadataEntry])
+ override def addMetadataEntries(metadataEntries: Iterable[MetadataEntry],
+ startMetadataKey: String,
+ endMetadataKey: String,
+ nameMetadataKey: String,
+ statusMetadataKey: String,
+ submissionMetadataKey: String,
+ parentWorkflowIdKey: String,
+ rootWorkflowIdKey: String,
+ labelMetadataKey: String)
(implicit ec: ExecutionContext): Future[Unit] = {
- if (metadataEntries.isEmpty) Future.successful(()) else {
-
- val batchesToWrite = metadataEntries.grouped(insertBatchSize).toList
+ val partitioned = partitionSummarizationMetadata(
+ rawMetadataEntries = metadataEntries.toSeq,
+ startMetadataKey,
+ endMetadataKey,
+ nameMetadataKey,
+ statusMetadataKey,
+ submissionMetadataKey,
+ parentWorkflowIdKey,
+ rootWorkflowIdKey,
+ labelMetadataKey)
+
+ // These entries also require a write to the summary queue.
+ def writeSummarizable(): Future[Unit] = if (partitioned.summarizableMetadata.isEmpty) Future.successful(()) else {
+ val batchesToWrite = partitioned.summarizableMetadata.grouped(insertBatchSize).toList
val insertActions = batchesToWrite.map { batch =>
val insertMetadata = dataAccess.metadataEntryIdsAutoInc ++= batch
insertMetadata.flatMap(ids => writeSummaryQueueEntries(ids))
}
runTransaction(DBIO.sequence(insertActions)).void
}
+
+ // Non-summarizable metadata that only needs to go to the metadata table can be written much more efficiently
+ // than summarizable metadata.
+ def writeNonSummarizable(): Future[Unit] = if (partitioned.nonSummarizableMetadata.isEmpty) Future.successful(()) else {
+ val action = DBIO.sequence(partitioned.nonSummarizableMetadata.grouped(insertBatchSize).map(dataAccess.metadataEntries ++= _))
+ runLobAction(action).void
+ }
+
+ for {
+ _ <- writeSummarizable()
+ _ <- writeNonSummarizable()
+ } yield ()
}
override def metadataEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Boolean] = {
@@ -177,14 +236,7 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
}
}
- override def summarizeIncreasing(startMetadataKey: String,
- endMetadataKey: String,
- nameMetadataKey: String,
- statusMetadataKey: String,
- submissionMetadataKey: String,
- parentWorkflowIdKey: String,
- rootWorkflowIdKey: String,
- labelMetadataKey: String,
+ override def summarizeIncreasing(labelMetadataKey: String,
limit: Int,
buildUpdatedSummary:
(Option[WorkflowMetadataSummaryEntry], Seq[MetadataEntry])
@@ -195,13 +247,6 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
_ <-
buildMetadataSummaryFromRawMetadataAndWriteToDb(
rawMetadataEntries = rawMetadataEntries,
- startMetadataKey = startMetadataKey,
- endMetadataKey = endMetadataKey,
- nameMetadataKey = nameMetadataKey,
- statusMetadataKey = statusMetadataKey,
- submissionMetadataKey = submissionMetadataKey,
- parentWorkflowIdKey = parentWorkflowIdKey,
- rootWorkflowIdKey = rootWorkflowIdKey,
labelMetadataKey = labelMetadataKey,
buildUpdatedSummary = buildUpdatedSummary
)
@@ -214,13 +259,6 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
override def summarizeDecreasing(summaryNameDecreasing: String,
summaryNameIncreasing: String,
- startMetadataKey: String,
- endMetadataKey: String,
- nameMetadataKey: String,
- statusMetadataKey: String,
- submissionMetadataKey: String,
- parentWorkflowIdKey: String,
- rootWorkflowIdKey: String,
labelMetadataKey: String,
limit: Int,
buildUpdatedSummary:
@@ -242,13 +280,6 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
_ <-
buildMetadataSummaryFromRawMetadataAndWriteToDb(
rawMetadataEntries = rawMetadataEntries,
- startMetadataKey = startMetadataKey,
- endMetadataKey = endMetadataKey,
- nameMetadataKey = nameMetadataKey,
- statusMetadataKey = statusMetadataKey,
- submissionMetadataKey = submissionMetadataKey,
- parentWorkflowIdKey = parentWorkflowIdKey,
- rootWorkflowIdKey = rootWorkflowIdKey,
labelMetadataKey = labelMetadataKey,
buildUpdatedSummary = buildUpdatedSummary
)
@@ -262,33 +293,17 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
}
private def buildMetadataSummaryFromRawMetadataAndWriteToDb(rawMetadataEntries: Seq[MetadataEntry],
- startMetadataKey: String,
- endMetadataKey: String,
- nameMetadataKey: String,
- statusMetadataKey: String,
- submissionMetadataKey: String,
- parentWorkflowIdKey: String,
- rootWorkflowIdKey: String,
labelMetadataKey: String,
buildUpdatedSummary:
(Option[WorkflowMetadataSummaryEntry], Seq[MetadataEntry]) => WorkflowMetadataSummaryEntry
)(implicit ec: ExecutionContext): DBIO[Unit] = {
- val exactMatchMetadataKeys = Set(startMetadataKey, endMetadataKey, nameMetadataKey, statusMetadataKey, submissionMetadataKey, parentWorkflowIdKey, rootWorkflowIdKey)
- val startsWithMetadataKeys = Set(labelMetadataKey)
-
- val metadataEntries = rawMetadataEntries filter { entry =>
- entry.callFullyQualifiedName.isEmpty && entry.jobIndex.isEmpty && entry.jobAttempt.isEmpty &&
- (exactMatchMetadataKeys.contains(entry.metadataKey) || startsWithMetadataKeys.exists(entry.metadataKey.startsWith))
- }
- val metadataWithoutLabels = metadataEntries
- .filterNot(_.metadataKey.contains(labelMetadataKey)) // Why are these "contains" while the filtering is "starts with"?
- .groupBy(_.workflowExecutionUuid)
- val customLabelEntries = metadataEntries.filter(_.metadataKey.contains(labelMetadataKey))
+ val (summarizableLabelsMetadata, summarizableRegularMetadata) = rawMetadataEntries.partition(_.metadataKey.contains(labelMetadataKey))
+ val groupedSummarizableRegularMetadata = summarizableRegularMetadata.groupBy(_.workflowExecutionUuid)
for {
- _ <- DBIO.sequence(metadataWithoutLabels map updateWorkflowMetadataSummaryEntry(buildUpdatedSummary))
- _ <- DBIO.sequence(customLabelEntries map toCustomLabelEntry map upsertCustomLabelEntry)
+ _ <- DBIO.sequence(groupedSummarizableRegularMetadata map updateWorkflowMetadataSummaryEntry(buildUpdatedSummary))
+ _ <- DBIO.sequence(summarizableLabelsMetadata map toCustomLabelEntry map upsertCustomLabelEntry)
} yield ()
}
@@ -407,4 +422,8 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
runAction(
countSummaryQueueEntries()
)
+
+ override def getMetadataTotalRowNumberByRootWorkflowId(rootWorkflowId: String, timeout: Duration)(implicit ec: ExecutionContext): Future[Int] = {
+ runTransaction(dataAccess.metadataTotalSizeRowsForRootWorkflowId(rootWorkflowId).result, timeout = timeout)
+ }
}
diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala
index bdcc7ac9c0c..04addbbc9b9 100644
--- a/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala
+++ b/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataEntryComponent.scala
@@ -78,6 +78,21 @@ trait MetadataEntryComponent {
}
)
+ val metadataTotalSizeRowsForRootWorkflowId = Compiled(
+ (rootWorkflowId: Rep[String]) => {
+ val targetWorkflowIds = for {
+ summary <- workflowMetadataSummaryEntries
+ // Uses `IX_WORKFLOW_METADATA_SUMMARY_ENTRY_RWEU`, `UC_WORKFLOW_METADATA_SUMMARY_ENTRY_WEU`
+ if summary.rootWorkflowExecutionUuid === rootWorkflowId || summary.workflowExecutionUuid === rootWorkflowId
+ } yield summary.workflowExecutionUuid
+
+ for {
+ metadata <- metadataEntries
+ if metadata.workflowExecutionUuid in targetWorkflowIds // Uses `METADATA_WORKFLOW_IDX`
+ } yield metadata
+ }.size
+ )
+
val metadataEntryExistsForWorkflowExecutionUuid = Compiled(
(workflowExecutionUuid: Rep[String]) => (for {
metadataEntry <- metadataEntries
diff --git a/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala b/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala
index 23caa602e11..631de1383b0 100644
--- a/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala
+++ b/database/sql/src/main/scala/cromwell/database/sql/MetadataSqlDatabase.scala
@@ -25,7 +25,15 @@ trait MetadataSqlDatabase extends SqlDatabase {
/**
* Add metadata events to the database transactionally.
*/
- def addMetadataEntries(metadataEntries: Iterable[MetadataEntry])(implicit ec: ExecutionContext): Future[Unit]
+ def addMetadataEntries(metadataEntries: Iterable[MetadataEntry],
+ startMetadataKey: String,
+ endMetadataKey: String,
+ nameMetadataKey: String,
+ statusMetadataKey: String,
+ submissionMetadataKey: String,
+ parentWorkflowIdKey: String,
+ rootWorkflowIdKey: String,
+ labelMetadataKey: String)(implicit ec: ExecutionContext): Future[Unit]
def metadataEntryExists(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Boolean]
@@ -68,14 +76,7 @@ trait MetadataSqlDatabase extends SqlDatabase {
* @param buildUpdatedSummary Takes in the optional existing summary and the metadata, returns the new summary.
* @return A `Future` with the number of rows summarized by the invocation, and the number of rows still to summarize.
*/
- def summarizeIncreasing(startMetadataKey: String,
- endMetadataKey: String,
- nameMetadataKey: String,
- statusMetadataKey: String,
- submissionMetadataKey: String,
- parentWorkflowIdKey: String,
- rootWorkflowIdKey: String,
- labelMetadataKey: String,
+ def summarizeIncreasing(labelMetadataKey: String,
limit: Int,
buildUpdatedSummary:
(Option[WorkflowMetadataSummaryEntry], Seq[MetadataEntry])
@@ -90,13 +91,6 @@ trait MetadataSqlDatabase extends SqlDatabase {
*/
def summarizeDecreasing(summaryNameDecreasing: String,
summaryNameIncreasing: String,
- startMetadataKey: String,
- endMetadataKey: String,
- nameMetadataKey: String,
- statusMetadataKey: String,
- submissionMetadataKey: String,
- parentWorkflowIdKey: String,
- rootWorkflowIdKey: String,
labelMetadataKey: String,
limit: Int,
buildUpdatedSummary:
@@ -156,4 +150,6 @@ trait MetadataSqlDatabase extends SqlDatabase {
def countRootWorkflowIdsByArchiveStatusAndEndedOnOrBeforeThresholdTimestamp(archiveStatus: Option[String], thresholdTimestamp: Timestamp)(implicit ec: ExecutionContext): Future[Int]
def getSummaryQueueSize()(implicit ec: ExecutionContext): Future[Int]
+
+ def getMetadataTotalRowNumberByRootWorkflowId(rootWorkflowId: String, timeout: Duration)(implicit ec: ExecutionContext): Future[Int]
}
diff --git a/docs/Configuring.md b/docs/Configuring.md
index 9b31641c0a1..4aa76e3fbca 100644
--- a/docs/Configuring.md
+++ b/docs/Configuring.md
@@ -497,7 +497,8 @@ Read the [Abort](execution/ExecutionTwists/#abort) section to learn more about h
### Call caching
-Call Caching allows Cromwell to detect when a job has been run in the past so it doesn't have to re-compute results. To learn more see [Call Caching](cromwell_features/CallCaching).
+Call Caching allows Cromwell to detect when a job has been run in the past so it doesn't have to re-compute results.
+To learn more see [Call Caching](cromwell_features/CallCaching).
To enable Call Caching, add the following to your Cromwell configuration:
@@ -515,25 +516,38 @@ Cromwell also accepts [Workflow Options](wf_options/Overview#call-caching-option
### Local filesystem options
-When running a job on the Config (Shared Filesystem) backend, Cromwell provides some additional options in the backend's config section:
+When running a job on the Config (Shared Filesystem) backend, Cromwell provides some additional options in the backend's
+config section:
```HOCON
config {
filesystems {
local {
caching {
- # When copying a cached result, what type of file duplication should occur. Attempted in the order listed below:
+ # When copying a cached result, what type of file duplication should occur.
+ # possible values: "hard-link", "soft-link", "copy", "cached-copy".
+ # For more information check: https://cromwell.readthedocs.io/en/stable/backends/HPC/#shared-filesystem
+ # Attempted in the order listed below:
duplication-strategy: [
"hard-link", "soft-link", "copy"
]
- # Possible values: file, path, path+modtime
- # "file" will compute an md5 hash of the file content.
+ # Possible values: md5, xxh64, fingerprint, path, path+modtime
+ # For extended explanation check: https://cromwell.readthedocs.io/en/stable/Configuring/#call-caching
+ # "md5" will compute an md5 hash of the file content.
+ # "xxh64" will compute an xxh64 hash of the file content. Much faster than md5
+ # "fingerprint" will take last modified time, size and hash the first 10 mb with xxh64 to create a file fingerprint.
+ # This strategy will only be effective if the duplication-strategy (above) is set to "hard-link", as copying changes the last modified time.
# "path" will compute an md5 hash of the file path. This strategy will only be effective if the duplication-strategy (above) is set to "soft-link",
# in order to allow for the original file path to be hashed.
# "path+modtime" will compute an md5 hash of the file path and the last modified time. The same conditions as for "path" apply here.
- # Default: file
- hashing-strategy: "file"
+ # Default: "md5"
+ hashing-strategy: "md5"
+
+ # When the 'fingerprint' strategy is used set how much of the beginning of the file is read as fingerprint.
+ # If the file is smaller than this size the entire file will be read.
+ # Default: 10485760 (10MB).
+ fingerprint-size: 10485760
# When true, will check if a sibling file with the same name and the .md5 extension exists, and if it does, use the content of this file as a hash.
# If false or the md5 does not exist, will proceed with the above-defined hashing strategy.
@@ -545,6 +559,30 @@ When running a job on the Config (Shared Filesystem) backend, Cromwell provides
}
```
+#### Call cache strategy options for local filesystem
+
+* hash based options. These read the entire file. These strategies work with containers.
+ * `xxh64` (community-supported*). This uses the 64-bit implementation of the [xxHash](https://www.xxhash.com)
+ algorithm. This algorithm is optimized for file integrity hashing and provides a more than 10x speed improvement over
+ md5.
+ * `md5`. The well-known md5sum algorithm
+* Path based options. These are based on filepath. Extremely lightweight, but only work with the `soft-link` file
+caching strategy and can therefore never work with containers.
+ * `path` creates a md5 hash of the path.
+ * `path+modtime` creates a md5 hash of the path and its modification time.
+* Fingerprinting. This strategy works with containers.
+ * `fingerprint` (community-supported*) tries to create a fingerprint for each file by taking its last modified time (milliseconds since
+ epoch in hexadecimal) + size (bytes in hexadecimal) + the xxh64 sum of the first 10 MB** of the file.
+ It is much more lightweight than the hash based options while still unique enough that collisions are unlikely. This
+ strategy works well for workflows that generate multi-gigabyte files and where hashing these files on the
+ cromwell instance provides CPU or I/O problems.
+ NOTE: This strategy requires hard-linking as a dupliation strategy, as copying changes the last modified time.
+
+(*) The `fingerprint` and `xxh64` strategies are features that are community supported by Cromwell's HPC community. There
+is no official support from the core Cromwell team.
+
+(**) This value is configurable.
+
### Workflow log directory
To change the directory where Cromwell writes workflow logs, change the directory location via the setting:
diff --git a/docs/RuntimeAttributes.md b/docs/RuntimeAttributes.md
index 16c0a0732b4..c7c0e8295c5 100644
--- a/docs/RuntimeAttributes.md
+++ b/docs/RuntimeAttributes.md
@@ -51,8 +51,8 @@ There are a number of additional runtime attributes that apply to the Google Clo
- [preemptible](#preemptible)
- [bootDiskSizeGb](#bootdisksizegb)
- [noAddress](#noaddress)
-- [gpuCount and gpuType](#gpucount-and-gputype)
-- [cpuPlatform](#cpuPlatform)
+- [gpuCount, gpuType, and nvidiaDriverVersion](#gpucount-gputype-and-nvidiadriverversion)
+- [cpuPlatform](#cpuplatform)
diff --git a/docs/cromwell_features/CallCaching.md b/docs/cromwell_features/CallCaching.md
index 5a9669717bd..323bddec04d 100644
--- a/docs/cromwell_features/CallCaching.md
+++ b/docs/cromwell_features/CallCaching.md
@@ -27,14 +27,8 @@ or **referenced from the original cached job** depending on the Cromwell
Cromwell offers the option to cache file hashes within the scope of a root workflow to prevent repeatedly requesting the hashes of the
same files multiple times. File hash caching is off by default and can be turned on with the configuration option `system.file-hash-cache=true`.
-***Call cache copy authorization failure prefix blacklisting***
-
-Cromwell has the option to filter call cache hits based on authorization failures copying previous
-call cache hits. In a multi-user environment user A might cache hit to one of user B's results
-but that doesn't necessarily mean user A is authorized to read user B's outputs from the filesystem. Call cache blacklisting
-allows Cromwell to record on a per-root-workflow level which file path prefixes were involved in cache result copy authorization failures.
-If Cromwell sees that the file paths for a candidate cache hit have a blacklisted prefix, Cromwell will quickly
-fail the copy attempt without doing any potentially expensive I/O.
+***Call cache blacklisting***
+Cromwell offers the ability to filter cache hits based on copying failures.
Call cache blacklisting configuration looks like:
@@ -47,23 +41,82 @@ call-caching {
invalidate-bad-cache-results = false
blacklist-cache {
- # The call caching blacklist cache is off by default. This is used to blacklist cache hit paths based on the
- # prefixes of cache hit paths that Cromwell previously failed to copy for authorization reasons.
- enabled: true
- # Guava cache concurrency.
- concurrency: 10000
- # How long entries in the cache should live from the time of their last access.
- ttl: 20 minutes
- # Maximum number of entries in the cache.
- size: 1000
+ # The call caching blacklist cache is off by default. This cache is used to blacklist cache hits based on cache
+ # hit ids or buckets of cache hit paths that Cromwell has previously failed to copy for permissions reasons.
+ enabled: true
+
+ # All blacklisting values below are optional. In order to use groupings (blacklist caches shared among root
+ # workflows) a value must be specified for `groupings.workflow-option` in configuration and the workflows to
+ # be grouped must be submitted with workflow options specifying the same group.
+ groupings {
+ workflow-option: call-cache-blacklist-group
+ concurrency: 10000
+ ttl: 2 hours
+ size: 1000
+ }
+
+ buckets {
+ # Guava cache concurrency.
+ concurrency: 10000
+ # How long entries in the cache should live from the time of their last access.
+ ttl: 1 hour
+ # Maximum number of entries in the cache.
+ size: 1000
+ }
+
+ hits {
+ # Guava cache concurrency.
+ concurrency: 10000
+ # How long entries in the cache should live from the time of their last access.
+ ttl: 1 hour
+ # Maximum number of entries in the cache.
+ size: 20000
+ }
}
}
```
-Call cache blacklisting could be supported by any backend type though is currently implemented only for the Google Pipelines API (PAPI) backends.
-For PAPI backends the bucket is considered the prefix for blacklisting purposes.
+**** Blacklist cache grouping ****
+
+By default Cromwell's blacklist caches work at the granularity of root workflows, but Cromwell can also be configured to
+share a blacklist cache among a group of workflows.
+If a value is specified for `call-caching.blacklisting.groupings.workflow-option` and a workflow option is specified
+having a matching key, all workflows specifying the same value will share a blacklist cache.
+
+For example, if Cromwell configuration contains `call-caching.blacklisting.groupings.workflow-option = "project"` and
+a workflow is submitted with the options
+
+```json
+{
+ "project": "Mary"
+}
+```
+
+then this workflow will share a blacklist cache with any other workflows whose workflow options contain `"project": "Mary"`.
+
+Grouping of blacklist caches can significantly improve blacklisting effectiveness and overall call caching performance.
+Workflows should be grouped by their effective authorization to ensure the same filesystem/object store permissions
+exist for every workflow in the group.
+
+**** Hit blacklisting ****
+
+If a cache hit fails copying for any reason, Cromwell will record that failure in the blacklist cache and will not use
+the hit again. Hit blacklisting is particularly effective at improving call caching performance in conjunction with the
+grouping feature described above.
+
+**** Path prefix (GCS bucket) blacklisting on 403 Forbidden errors ****
+
+In a multi-user environment user A might cache hit to one of user B's results
+but that doesn't necessarily mean user A is authorized to read user B's outputs from the filesystem. Call cache blacklisting
+allows Cromwell to record which file path prefixes were involved in cache result copy authorization failures.
+If Cromwell sees that the file paths for a candidate cache hit have a blacklisted prefix, Cromwell will quickly
+fail the copy attempt without doing any potentially expensive I/O.
+
+Path prefix blacklisting could be supported by any backend type though it is currently implemented only for Google
+(PAPI) backends. For Google backends the GCS bucket is considered the prefix for blacklisting purposes.
+
-***Call cache hit path prefixes***
+***Call cache whitelisting***
In a multi-user environment where access to job outputs may be restricted among different users, it can be useful to limit
cache hits to those that are more likely to actually be readable for cache hit copies.
diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala
index 197b5e5845f..34efa3809e5 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala
@@ -9,9 +9,9 @@ import cats.data.NonEmptyList
import com.typesafe.config.Config
import common.exception.ThrowableAggregation
import cromwell.backend.async.KnownJobFailureException
-import cromwell.backend.standard.callcaching.{BlacklistCache, RootWorkflowFileHashCacheActor}
+import cromwell.backend.standard.callcaching.{CallCachingBlacklistManager, RootWorkflowFileHashCacheActor}
import cromwell.core.Dispatcher.EngineDispatcher
-import cromwell.core.{CacheConfig, WorkflowId}
+import cromwell.core.WorkflowId
import cromwell.engine.SubWorkflowStart
import cromwell.engine.backend.BackendSingletonCollection
import cromwell.engine.workflow.WorkflowActor._
@@ -25,7 +25,6 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.util.Try
object WorkflowManagerActor {
@@ -278,6 +277,8 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
logger.debug(s"$tag transitioning from $fromState to $toState")
}
+ private val callCachingBlacklistManager = new CallCachingBlacklistManager(config, logger)
+
/**
* Submit the workflow and return an updated copy of the state data reflecting the addition of a
* Workflow ID -> WorkflowActorRef entry.
@@ -293,11 +294,6 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
val fileHashCacheActorProps: Option[Props] = fileHashCacheEnabled.option(RootWorkflowFileHashCacheActor.props(params.ioActor, workflowId))
- val callCachingBlacklistCache: Option[BlacklistCache] = for {
- config <- config.as[Option[Config]]("call-caching.blacklist-cache")
- cacheConfig <- CacheConfig.optionalConfig(config, defaultConcurrency = 1000, defaultSize = 1000, defaultTtl = 1 hour)
- } yield BlacklistCache(cacheConfig)
-
val wfProps = WorkflowActor.props(
workflowToStart = workflow,
conf = config,
@@ -318,7 +314,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
workflowHeartbeatConfig = params.workflowHeartbeatConfig,
totalJobsByRootWf = new AtomicInteger(),
fileHashCacheActorProps = fileHashCacheActorProps,
- blacklistCache = callCachingBlacklistCache)
+ blacklistCache = callCachingBlacklistManager.blacklistCacheFor(workflow))
val wfActor = context.actorOf(wfProps, name = s"WorkflowActor-$workflowId")
wfActor ! SubscribeTransitionCallBack(self)
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/deletion/DeleteWorkflowFilesActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/deletion/DeleteWorkflowFilesActor.scala
index 7bf5017841a..9959de379e0 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/deletion/DeleteWorkflowFilesActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/deletion/DeleteWorkflowFilesActor.scala
@@ -10,6 +10,7 @@ import cromwell.engine.io.IoAttempts.EnhancedCromwellIoException
import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor._
import cromwell.engine.workflow.lifecycle.execution.callcaching._
import cromwell.filesystems.gcs.batch.GcsBatchCommandBuilder
+import cromwell.services.CallCaching.CallCachingEntryId
import cromwell.services.EngineServicesStore
import cromwell.services.metadata.MetadataService.PutMetadataAction
import cromwell.services.metadata.impl.FileDeletionStatus
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala
index 2b23d647b5a..23efde68e4b 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/WorkflowExecutionActor.scala
@@ -621,6 +621,16 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
val ejeaName = s"${workflowDescriptor.id}-EngineJobExecutionActor-${jobKey.tag}"
val backendName = backendLifecycleActorFactory.name
val backendSingleton = params.backendSingletonCollection.backendSingletonActors(backendName)
+
+ val callCachingParameters = EngineJobExecutionActor.CallCachingParameters(
+ mode = workflowDescriptor.callCachingMode,
+ readActor = params.callCacheReadActor,
+ writeActor = params.callCacheWriteActor,
+ fileHashCacheActor = params.fileHashCacheActor,
+ maxFailedCopyAttempts = params.rootConfig.getInt("call-caching.max-failed-copy-attempts"),
+ blacklistCache = params.blacklistCache
+ )
+
val ejeaProps = EngineJobExecutionActor.props(
self,
jobKey,
@@ -631,15 +641,11 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
serviceRegistryActor = serviceRegistryActor,
ioActor = params.ioActor,
jobStoreActor = params.jobStoreActor,
- callCacheReadActor = params.callCacheReadActor,
- callCacheWriteActor = params.callCacheWriteActor,
workflowDockerLookupActor = params.workflowDockerLookupActor,
jobTokenDispenserActor = params.jobTokenDispenserActor,
backendSingleton,
- workflowDescriptor.callCachingMode,
command,
- fileHashCacheActor = params.fileHashCacheActor,
- blacklistCache = params.blacklistCache
+ callCachingParameters
)
val ejeaRef = context.actorOf(ejeaProps, ejeaName)
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCache.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCache.scala
index 188f4d2924d..78ba0561cea 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCache.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCache.scala
@@ -16,11 +16,11 @@ import cromwell.database.sql.tables._
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCache._
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheReadActor.AggregatedCallHashes
import cromwell.engine.workflow.lifecycle.execution.callcaching.EngineJobHashingActor.CallCacheHashes
+import cromwell.services.CallCaching.CallCachingEntryId
import wom.core._
import scala.concurrent.{ExecutionContext, Future}
-final case class CallCachingEntryId(id: Int)
/**
* Given a database-layer CallCacheStore, this accessor can access the database with engine-friendly data types.
*/
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala
index ad2a41a24b9..585f78f63a0 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala
@@ -32,17 +32,17 @@ class CallCacheDiffActor(serviceRegistryActor: ActorRef) extends LoggingFSM[Call
when(WaitingForMetadata) {
// First Response
// Response A
- case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _), responseJson), data@CallCacheDiffWithRequest(queryA, _, None, None, _)) if queryA == originalQuery =>
+ case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _, _), responseJson), data@CallCacheDiffWithRequest(queryA, _, None, None, _)) if queryA == originalQuery =>
stay() using data.copy(responseA = Option(WorkflowMetadataJson(responseJson)))
// Response B
- case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _), responseJson), data@CallCacheDiffWithRequest(_, queryB, None, None, _)) if queryB == originalQuery =>
+ case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _, _), responseJson), data@CallCacheDiffWithRequest(_, queryB, None, None, _)) if queryB == originalQuery =>
stay() using data.copy(responseB = Option(WorkflowMetadataJson(responseJson)))
// Second Response
// Response A
- case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _), responseJson), CallCacheDiffWithRequest(queryA, queryB, None, Some(responseB), replyTo)) if queryA == originalQuery =>
+ case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _, _), responseJson), CallCacheDiffWithRequest(queryA, queryB, None, Some(responseB), replyTo)) if queryA == originalQuery =>
buildDiffAndRespond(queryA, queryB, WorkflowMetadataJson(responseJson), responseB, replyTo)
// Response B
- case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _), responseJson), CallCacheDiffWithRequest(queryA, queryB, Some(responseA), None, replyTo)) if queryB == originalQuery =>
+ case Event(SuccessfulMetadataJsonResponse(GetMetadataAction(originalQuery, _, _), responseJson), CallCacheDiffWithRequest(queryA, queryB, Some(responseA), None, replyTo)) if queryB == originalQuery =>
buildDiffAndRespond(queryA, queryB, responseA, WorkflowMetadataJson(responseJson), replyTo)
case Event(FailedMetadataJsonResponse(_, failure), data: CallCacheDiffWithRequest) =>
data.replyTo ! FailedCallCacheDiffResponse(failure)
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheInvalidateActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheInvalidateActor.scala
index 2a7fda049fd..16bb0fb9caa 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheInvalidateActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheInvalidateActor.scala
@@ -3,6 +3,7 @@ package cromwell.engine.workflow.lifecycle.execution.callcaching
import akka.actor.{Actor, ActorLogging, Props}
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.database.sql.tables.CallCachingEntry
+import cromwell.services.CallCaching.CallCachingEntryId
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadActor.scala
index ba3c213ed45..696c90099e7 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadActor.scala
@@ -13,6 +13,7 @@ import cromwell.services.EnhancedThrottlerActor
import scala.concurrent.Future
import scala.util.{Failure, Success}
import CallCache._
+import cromwell.services.CallCaching.CallCachingEntryId
/**
* Queues up work sent to it because its receive is non-blocking.
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/EngineJobHashingActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/EngineJobHashingActor.scala
index f553859711c..5a3494795d5 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/EngineJobHashingActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/EngineJobHashingActor.scala
@@ -11,6 +11,7 @@ import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCache.CallCa
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheHashingJobActor.{CompleteFileHashingResult, FinalFileHashingResult, InitialHashingResult, NoFileHashesResult}
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheReadingJobActor.NextHit
import cromwell.engine.workflow.lifecycle.execution.callcaching.EngineJobHashingActor._
+import cromwell.services.CallCaching.CallCachingEntryId
import cromwell.services.metadata.CallMetadataKeys
/**
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/FetchCachedResultsActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/FetchCachedResultsActor.scala
index daa13b70e2f..0eef565caf2 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/FetchCachedResultsActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/FetchCachedResultsActor.scala
@@ -6,6 +6,7 @@ import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.core.simpleton.WomValueSimpleton
import cromwell.database.sql.SqlConverters._
import cromwell.engine.workflow.lifecycle.execution.callcaching.FetchCachedResultsActor.{CachedOutputLookupFailed, CachedOutputLookupSucceeded}
+import cromwell.services.CallCaching.CallCachingEntryId
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala
index 43c962db4d8..e0ad5290ad4 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/EngineJobExecutionActor.scala
@@ -3,7 +3,7 @@ package cromwell.engine.workflow.lifecycle.execution.job
import akka.actor.SupervisorStrategy.{Escalate, Stop}
import akka.actor.{ActorInitializationException, ActorRef, LoggingFSM, OneForOneStrategy, Props}
import cats.data.NonEmptyList
-import cromwell.backend.BackendCacheHitCopyingActor.{CacheCopyError, CopyOutputsCommand, CopyingOutputsFailedResponse, LoggableCacheCopyError, MetricableCacheCopyError}
+import cromwell.backend.BackendCacheHitCopyingActor.{CacheCopyFailure, CopyOutputsCommand, CopyingOutputsFailedResponse, CopyAttemptError, BlacklistSkip}
import cromwell.backend.BackendJobExecutionActor._
import cromwell.backend.BackendLifecycleActor.AbortJobCommand
import cromwell.backend.MetricableCacheCopyErrorCategory.MetricableCacheCopyErrorCategory
@@ -37,6 +37,7 @@ import cromwell.engine.workflow.lifecycle.{EngineLifecycleActorAbortCommand, Tim
import cromwell.engine.workflow.tokens.JobExecutionTokenDispenserActor.{JobExecutionTokenDispensed, JobExecutionTokenRequest, JobExecutionTokenReturn}
import cromwell.jobstore.JobStoreActor._
import cromwell.jobstore._
+import cromwell.services.CallCaching.CallCachingEntryId
import cromwell.services.EngineServicesStore
import cromwell.services.instrumentation.CromwellInstrumentation
import cromwell.services.metadata.CallMetadataKeys.CallCachingKeys
@@ -57,15 +58,11 @@ class EngineJobExecutionActor(replyTo: ActorRef,
val serviceRegistryActor: ActorRef,
ioActor: ActorRef,
jobStoreActor: ActorRef,
- callCacheReadActor: ActorRef,
- callCacheWriteActor: ActorRef,
workflowDockerLookupActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonActor: Option[ActorRef],
- callCachingMode: CallCachingMode,
command: BackendJobExecutionActorCommand,
- fileHashCachingActor: Option[ActorRef],
- blacklistCache: Option[BlacklistCache]) extends LoggingFSM[EngineJobExecutionActorState, EJEAData]
+ callCachingParameters: CallCachingParameters) extends LoggingFSM[EngineJobExecutionActorState, EJEAData]
with WorkflowLogging
with CallMetadataHelper
with JobInstrumentation
@@ -97,8 +94,8 @@ class EngineJobExecutionActor(replyTo: ActorRef,
if (backendLifecycleActorFactory.fileHashingActorProps.isEmpty) CallCachingOff
else if (jobDescriptorKey.node.callable.meta.get("volatile").contains(MetaValueElementBoolean(true))) CallCachingOff
else if (backendLifecycleActorFactory.cacheHitCopyingActorProps.isEmpty || jobDescriptorKey.attempt > 1) {
- callCachingMode.withoutRead
- } else callCachingMode
+ callCachingParameters.mode.withoutRead
+ } else callCachingParameters.mode
}
// For tests:
@@ -226,7 +223,21 @@ class EngineJobExecutionActor(replyTo: ActorRef,
writeToMetadata(Map(
callCachingHitResultMetadataKey -> false,
callCachingReadResultMetadataKey -> "Cache Miss"))
- log.debug("Cache miss for job {}", jobTag)
+
+ if (data.cacheHitFailureCount > 0) {
+ val totalHits = data.cacheHitFailureCount
+ val copyFails = data.failedCopyAttempts
+ val blacklisted = totalHits - copyFails
+ workflowLogger.info(
+ s"Could not copy a suitable cache hit for $jobTag. " +
+ s"EJEA attempted to copy $totalHits cache hits before failing. " +
+ s"Of these $copyFails failed to copy and $blacklisted were already blacklisted from previous attempts). " +
+ s"Falling back to running job."
+ )
+ } else {
+ workflowLogger.info("Could not copy a suitable cache hit for {}. No copy attempts were made.", jobTag)
+ }
+
runJob(data)
case Event(hashes: CallCacheHashes, data: ResponsePendingData) =>
addHashesAndStay(data, hashes)
@@ -241,7 +252,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
when(FetchingCachedOutputsFromDatabase) {
case Event(
CachedOutputLookupSucceeded(womValueSimpletons, jobDetritus, returnCode, cacheResultId, cacheHitDetails),
- data@ResponsePendingData(_, _, _, _, Some(ejeaCacheHit), _, _),
+ data@ResponsePendingData(_, _, _, _, Some(ejeaCacheHit), _, _, _),
) =>
if (cacheResultId != ejeaCacheHit.hit.cacheResultId) {
// Sanity check: was this the right set of results (a false here is a BAD thing!):
@@ -267,7 +278,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
// Backend copying response:
case Event(
response: JobSucceededResponse,
- data@ResponsePendingData(_, _, Some(Success(hashes)), _, _, _, _),
+ data@ResponsePendingData(_, _, Some(Success(hashes)), _, _, _, _, _),
) =>
logCacheHitSuccessAndNotifyMetadata(data)
saveCacheResults(hashes, data.withSuccessResponse(response))
@@ -280,7 +291,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
saveJobCompletionToJobStore(data.withSuccessResponse(response))
case Event(
CopyingOutputsFailedResponse(_, cacheCopyAttempt, reason),
- data@ResponsePendingData(_, _, _, _, Some(cacheHit), _, _)
+ data@ResponsePendingData(_, _, _, _, Some(cacheHit), _, _, _)
) if cacheCopyAttempt == cacheHit.hitNumber =>
invalidateCacheHitAndTransition(cacheHit, data, reason)
@@ -320,7 +331,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
// writeToCache is true and all hashes have already been retrieved - save to the cache
case Event(
response: JobSucceededResponse,
- data@ResponsePendingData(_, _, Some(Success(hashes)), _, _, _, _)
+ data@ResponsePendingData(_, _, Some(Success(hashes)), _, _, _, _, _)
) if effectiveCallCachingMode.writeToCache =>
eventList ++= response.executionEvents
// Publish the image used now that we have it as we might lose the information if Cromwell is restarted
@@ -342,7 +353,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
// writeToCache is true and all hashes already retrieved - save to job store
case Event(
response: BackendJobFailedResponse,
- data@ResponsePendingData(_, _, Some(Success(_)), _, _, _, _)
+ data@ResponsePendingData(_, _, Some(Success(_)), _, _, _, _, _)
) if effectiveCallCachingMode.writeToCache =>
saveJobCompletionToJobStore(data.withFailedResponse(response))
// Hashes are still missing and we want them (writeToCache is true) - wait for them
@@ -579,7 +590,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
def initializeJobHashing(jobDescriptor: BackendJobDescriptor, activity: CallCachingActivity, callCachingEligible: CallCachingEligible): Try[ActorRef] = {
val maybeFileHashingActorProps = backendLifecycleActorFactory.fileHashingActorProps map {
- _.apply(jobDescriptor, initializationData, serviceRegistryActor, ioActor, fileHashCachingActor)
+ _.apply(jobDescriptor, initializationData, serviceRegistryActor, ioActor, callCachingParameters.fileHashCacheActor)
}
maybeFileHashingActorProps match {
@@ -590,7 +601,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
jobDescriptor,
initializationData,
fileHashingActorProps,
- CallCacheReadingJobActor.props(callCacheReadActor, callCachePathPrefixes),
+ CallCacheReadingJobActor.props(callCachingParameters.readActor, callCachePathPrefixes),
backendLifecycleActorFactory.runtimeAttributeDefinitions(initializationData),
backendLifecycleActorFactory.nameForCallCachingPurposes,
activity,
@@ -623,9 +634,9 @@ class EngineJobExecutionActor(replyTo: ActorRef,
cacheCopyAttempt: Int) = {
backendLifecycleActorFactory.cacheHitCopyingActorProps match {
case Some(propsMaker) =>
- val backendCacheHitCopyingActorProps = propsMaker(data.jobDescriptor, initializationData, serviceRegistryActor, ioActor, cacheCopyAttempt, blacklistCache)
+ val backendCacheHitCopyingActorProps = propsMaker(data.jobDescriptor, initializationData, serviceRegistryActor, ioActor, cacheCopyAttempt, callCachingParameters.blacklistCache)
val cacheHitCopyActor = context.actorOf(backendCacheHitCopyingActorProps, buildCacheHitCopyingActorName(data.jobDescriptor, cacheResultId))
- cacheHitCopyActor ! CopyOutputsCommand(womValueSimpletons, jobDetritusFiles, returnCode)
+ cacheHitCopyActor ! CopyOutputsCommand(womValueSimpletons, jobDetritusFiles, cacheResultId, returnCode)
replyTo ! JobRunning(data.jobDescriptor.key, data.jobDescriptor.evaluatedTaskInputs)
goto(BackendIsCopyingCachedOutputs)
case None =>
@@ -668,15 +679,18 @@ class EngineJobExecutionActor(replyTo: ActorRef,
}
data.ejha match {
- case Some(ejha) =>
+ case Some(ejha) if data.failedCopyAttempts < callCachingParameters.maxFailedCopyAttempts =>
workflowLogger.debug("Trying to use another cache hit for job: {}", jobDescriptorKey)
ejha ! NextHit
- goto(CheckingCallCache)
+ goto(CheckingCallCache) using data
+ case Some(_) =>
+ writeToMetadata(Map(
+ callCachingHitResultMetadataKey -> false,
+ callCachingReadResultMetadataKey -> s"Cache Miss (${callCachingParameters.maxFailedCopyAttempts} failed copy attempts)"))
+ log.warning("Cache miss for job {} due to exceeding the maximum of {} failed copy attempts.", jobTag, callCachingParameters.maxFailedCopyAttempts)
+ runJob(data)
case _ =>
- workflowLogger.info(
- "Could not find a suitable cache hit. " +
- "Call cache hit process had {} total hit failures before completing unsuccessfully. " +
- "Falling back to running job: {}", data.cacheHitFailureCount, jobDescriptorKey)
+ workflowLogger.error("Programmer error: We got a cache failure but there was no hashing actor scanning for hits. Falling back to running job")
runJob(data)
}
}
@@ -691,17 +705,36 @@ class EngineJobExecutionActor(replyTo: ActorRef,
writeToMetadata(metadataMap)
- workflowLogger.info(
- "Call cache hit process had {} total hit failures before completing successfully",
- data.cacheHitFailureCount,
- )
+ val totalFailures = data.cacheHitFailureCount
+ if (totalFailures > 0) {
+ val copyFailures = data.failedCopyAttempts
+ val blacklisted = totalFailures - copyFailures
+
+ workflowLogger.info(
+ s"Call cache hit process had $totalFailures total copy failures before completing successfully" +
+ s" (of which, $copyFailures were copy failures, $blacklisted were already blacklisted)"
+ )
+ } else {
+ workflowLogger.info("Call cache hit process had 0 total hit failures before completing successfully")
+ }
}
private def logCacheHitFailure(data: ResponsePendingData, reason: Throwable): Unit = {
- workflowLogger.info(s"Failed copying cache results for job $jobDescriptorKey (${reason.getClass.getSimpleName}: ${reason.getMessage})")
+ val totalFailures = data.cacheHitFailureCount
+
+ val multipleFailuresContext = if (totalFailures > 0) {
+ val copyFailures = data.failedCopyAttempts
+ val blacklisted = totalFailures - copyFailures
+ s"(this job has already failed to copy from another $totalFailures other hits, of which $copyFailures were copy failures and $blacklisted were already blacklisted)"
+ } else ""
+
+ workflowLogger.info(
+ s"Failure copying cache results for job $jobDescriptorKey (${reason.getClass.getSimpleName}: ${reason.getMessage})"
+ + multipleFailuresContext
+ )
}
- private def metricizeCacheHitFailure(data: ResponsePendingData, failureCategory: MetricableCacheCopyErrorCategory): Unit = {
+ private def publishBlacklistReadMetrics(data: ResponsePendingData, failureCategory: MetricableCacheCopyErrorCategory): Unit = {
val callCachingErrorsMetricPath: NonEmptyList[String] =
NonEmptyList.of(
"job",
@@ -709,13 +742,21 @@ class EngineJobExecutionActor(replyTo: ActorRef,
increment(callCachingErrorsMetricPath)
}
- private def invalidateCacheHitAndTransition(ejeaCacheHit: EJEACacheHit, data: ResponsePendingData, reason: CacheCopyError) = {
- reason match {
- case LoggableCacheCopyError(failure) => logCacheHitFailure(data, failure)
- case MetricableCacheCopyError(failureCategory) => metricizeCacheHitFailure(data, failureCategory)
+ private def invalidateCacheHitAndTransition(ejeaCacheHit: EJEACacheHit, data: ResponsePendingData, reason: CacheCopyFailure) = {
+ val copyAttemptIncrement = reason match {
+ case CopyAttemptError(failure) =>
+ logCacheHitFailure(data, failure)
+ // An actual attempt to copy was made and failed so increment the attempt counter by 1.
+ 1
+ case BlacklistSkip(failureCategory) =>
+ publishBlacklistReadMetrics(data, failureCategory)
+ // Blacklisted hits are simply skipped and do not result in incrementing the attempt counter.
+ 0
}
- val updatedData = data.copy(cacheHitFailureCount = data.cacheHitFailureCount + 1)
+ // Increment the total failure count and actual copy failure count as appropriate.
+ val updatedData = data.copy(cacheHitFailureCount = data.cacheHitFailureCount + 1,
+ failedCopyAttempts = data.failedCopyAttempts + copyAttemptIncrement)
if (invalidationRequired) {
workflowLogger.warn(s"Invalidating cache entry ${ejeaCacheHit.hit.cacheResultId} (Cache entry details: ${ejeaCacheHit.details})")
@@ -733,12 +774,12 @@ class EngineJobExecutionActor(replyTo: ActorRef,
}
private def checkCacheEntryExistence() = {
- callCacheReadActor ! CallCacheEntryForCall(workflowIdForLogging, jobDescriptorKey)
+ callCachingParameters.readActor ! CallCacheEntryForCall(workflowIdForLogging, jobDescriptorKey)
goto(CheckingCacheEntryExistence)
}
private def saveCacheResults(hashes: CallCacheHashes, data: SucceededResponseData) = {
- callCacheWriteActor ! SaveCallCacheHashes(CallCacheHashBundle(workflowIdForLogging, hashes, data.response))
+ callCachingParameters.writeActor ! SaveCallCacheHashes(CallCacheHashBundle(workflowIdForLogging, hashes, data.response))
val updatedData = data.copy(hashes = Option(Success(hashes)))
goto(UpdatingCallCache) using updatedData
}
@@ -824,7 +865,15 @@ object EngineJobExecutionActor {
}
}
+ case class CallCachingParameters(
+ mode: CallCachingMode,
+ readActor: ActorRef,
+ writeActor: ActorRef,
+ fileHashCacheActor: Option[ActorRef],
+ maxFailedCopyAttempts: Int,
+ blacklistCache: Option[BlacklistCache]
+ )
/** Commands */
sealed trait EngineJobExecutionActorCommand
@@ -841,15 +890,12 @@ object EngineJobExecutionActor {
serviceRegistryActor: ActorRef,
ioActor: ActorRef,
jobStoreActor: ActorRef,
- callCacheReadActor: ActorRef,
- callCacheWriteActor: ActorRef,
workflowDockerLookupActor: ActorRef,
jobTokenDispenserActor: ActorRef,
backendSingletonActor: Option[ActorRef],
- callCachingMode: CallCachingMode,
command: BackendJobExecutionActorCommand,
- fileHashCacheActor: Option[ActorRef],
- blacklistCache: Option[BlacklistCache]) = {
+ callCachingParameters: EngineJobExecutionActor.CallCachingParameters) = {
+
Props(new EngineJobExecutionActor(
replyTo = replyTo,
jobDescriptorKey = jobDescriptorKey,
@@ -860,15 +906,11 @@ object EngineJobExecutionActor {
serviceRegistryActor = serviceRegistryActor,
ioActor = ioActor,
jobStoreActor = jobStoreActor,
- callCacheReadActor = callCacheReadActor,
- callCacheWriteActor = callCacheWriteActor,
workflowDockerLookupActor = workflowDockerLookupActor,
jobTokenDispenserActor = jobTokenDispenserActor,
backendSingletonActor = backendSingletonActor,
- callCachingMode = callCachingMode,
command = command,
- fileHashCachingActor = fileHashCacheActor,
- blacklistCache = blacklistCache)).withDispatcher(EngineDispatcher)
+ callCachingParameters = callCachingParameters)).withDispatcher(EngineDispatcher)
}
case class EJEACacheHit(hit: CacheHit, hitNumber: Int, details: Option[String])
@@ -885,7 +927,8 @@ object EngineJobExecutionActor {
ejha: Option[ActorRef] = None,
ejeaCacheHit: Option[EJEACacheHit] = None,
backendJobActor: Option[ActorRef] = None,
- cacheHitFailureCount: Int = 0
+ cacheHitFailureCount: Int = 0,
+ failedCopyAttempts: Int = 0
) extends EJEAData {
def withEJHA(ejha: ActorRef): EJEAData = this.copy(ejha = Option(ejha))
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/preparation/JobPreparationActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/preparation/JobPreparationActor.scala
index 6625d2af953..2fbaf9d5d59 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/preparation/JobPreparationActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/job/preparation/JobPreparationActor.scala
@@ -26,6 +26,7 @@ import cromwell.services.metadata.{CallMetadataKeys, MetadataEvent, MetadataValu
import eu.timepit.refined.api.Refined
import wom.RuntimeAttributesKeys
import wom.callable.Callable.InputDefinition
+import wom.expression.IoFunctionSet
import wom.format.MemorySize
import wom.values._
@@ -59,7 +60,11 @@ class JobPreparationActor(workflowDescriptor: EngineWorkflowDescriptor,
private[preparation] lazy val noResponseTimeout: FiniteDuration = 3 minutes
private[preparation] val ioEc = context.system.dispatchers.lookup(Dispatcher.IoDispatcher)
- private[preparation] lazy val expressionLanguageFunctions = factory.expressionLanguageFunctions(workflowDescriptor.backendDescriptor, jobKey, initializationData, ioActor, ioEc)
+ private[preparation] lazy val expressionLanguageFunctions = {
+ val ioFunctionSet: IoFunctionSet = factory.expressionLanguageFunctions(workflowDescriptor.backendDescriptor, jobKey, initializationData, ioActor, ioEc)
+ ioFunctionSet.makeInputSpecificFunctions
+ }
+
private[preparation] lazy val dockerHashCredentials = factory.dockerHashCredentials(workflowDescriptor.backendDescriptor, initializationData)
private[preparation] lazy val runtimeAttributeDefinitions = factory.runtimeAttributeDefinitions(initializationData)
private[preparation] lazy val hasDockerDefinition = runtimeAttributeDefinitions.exists(_.name == DockerValidation.instance.key)
diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/CopyWorkflowOutputsActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/CopyWorkflowOutputsActor.scala
index 6da958c2fe6..0fdeafc06a1 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/CopyWorkflowOutputsActor.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/CopyWorkflowOutputsActor.scala
@@ -99,7 +99,7 @@ class CopyWorkflowOutputsActor(workflowId: WorkflowId, override val ioActor: Act
// compiled for every single file.
// "execution" should be optional, because its not created on AWS.
// Also cacheCopy or attempt- folders are optional.
- lazy val truncateRegex = ".*/call-[^/]*/(cacheCopy/)?(attempt-[1-9]+/)?(execution/)?".r
+ lazy val truncateRegex = ".*/call-[^/]*/(shard-[0-9]+/)?(cacheCopy/)?(attempt-[0-9]+/)?(execution/)?".r
val outputFileDestinations = rootAndFiles flatMap {
case (workflowRoot, outputs) =>
outputs map { output =>
diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/workflowstore_.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/workflowstore_.scala
index 685fa5eb45d..382b963670d 100644
--- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/workflowstore_.scala
+++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/workflowstore_.scala
@@ -2,7 +2,7 @@ package cromwell.engine.workflow.workflowstore
import java.time.OffsetDateTime
-import cromwell.core.{HogGroup, WorkflowId, WorkflowSourceFilesCollection}
+import cromwell.core.{HasWorkflowIdAndSources, HogGroup, WorkflowId, WorkflowSourceFilesCollection}
/**
* States of a workflow for which it can be fetched from the workflow store and started.
@@ -27,4 +27,4 @@ final case class WorkflowToStart(id: WorkflowId,
submissionTime: OffsetDateTime,
sources: WorkflowSourceFilesCollection,
state: StartableState,
- hogGroup: HogGroup)
+ hogGroup: HogGroup) extends HasWorkflowIdAndSources
diff --git a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadingJobActorSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadingJobActorSpec.scala
index 9a4da415240..f61806c2583 100644
--- a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadingJobActorSpec.scala
+++ b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheReadingJobActorSpec.scala
@@ -7,6 +7,7 @@ import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheHashing
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheReadActor._
import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheReadingJobActor.{CCRJAWithData, WaitingForCacheHitOrMiss, _}
import cromwell.engine.workflow.lifecycle.execution.callcaching.EngineJobHashingActor.{CacheHit, CacheMiss, HashError}
+import cromwell.services.CallCaching.CallCachingEntryId
import org.scalatest.concurrent.Eventually
import org.scalatest.{FlatSpecLike, Matchers}
diff --git a/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala b/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala
index 70bb9cf2d92..d46bee3ac3f 100644
--- a/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala
+++ b/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala
@@ -28,6 +28,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp
behavior of "MetadataBuilderActor"
+ val defaultSafetyRowNumberThreshold = 1000000
val defaultTimeout: FiniteDuration = 1.second.dilated
implicit val timeout: Timeout = defaultTimeout
@@ -39,7 +40,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props
- val mba = system.actorOf(MetadataBuilderActor.props(readMetadataWorkerMaker))
+ val mba = system.actorOf(MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000))
val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryReply, events))
@@ -47,6 +48,24 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson}
}
+ def assertMetadataFailureResponse(action: MetadataServiceAction,
+ mdQuery: MetadataQuery,
+ metadataServiceResponse: MetadataServiceResponse,
+ expectedException: Exception): Future[Assertion] = {
+ val mockReadMetadataWorkerActor = TestProbe()
+ val mba = system.actorOf(MetadataBuilderActor.props(() => mockReadMetadataWorkerActor.props, defaultSafetyRowNumberThreshold))
+ val response = mba.ask(action).mapTo[MetadataServiceResponse]
+
+ mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
+ mockReadMetadataWorkerActor.reply(metadataServiceResponse)
+
+ response map { r => r shouldBe a [FailedMetadataJsonResponse] }
+ response.mapTo[FailedMetadataJsonResponse] map { b =>
+ b.reason.getClass shouldBe expectedException.getClass
+ b.reason.getMessage shouldBe expectedException.getMessage
+ }
+ }
+
it should "build workflow scope tree from metadata events" in {
def makeEvent(workflow: WorkflowId, key: Option[MetadataJobKey]) = {
MetadataEvent(MetadataKey(workflow, key, "NOT_CHECKED"), MetadataValue("NOT_CHECKED"))
@@ -494,14 +513,14 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp
val mainQueryAction = GetMetadataAction(mainQuery)
val subQuery = MetadataQuery(subWorkflowId, None, None, None, None, expandSubWorkflows = true)
- val subQueryAction = GetMetadataAction(subQuery)
+ val subQueryAction = GetMetadataAction(subQuery, checkTotalMetadataRowNumberBeforeQuerying = false)
val parentProbe = TestProbe()
val mockReadMetadataWorkerActor = TestProbe()
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props
- val metadataBuilder = TestActorRef(MetadataBuilderActor.props(readMetadataWorkerMaker), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}")
+ val metadataBuilder = TestActorRef(MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}")
val response = metadataBuilder.ask(mainQueryAction).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, mainQueryAction)
mockReadMetadataWorkerActor.reply(MetadataLookupResponse(mainQuery, mainEvents))
@@ -550,7 +569,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp
val mockReadMetadataWorkerActor = TestProbe()
def readMetadataWorkerMaker= () => mockReadMetadataWorkerActor.props
- val metadataBuilder = TestActorRef(MetadataBuilderActor.props(readMetadataWorkerMaker), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}")
+ val metadataBuilder = TestActorRef(MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}")
val response = metadataBuilder.ask(queryNoExpandAction).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, queryNoExpandAction)
mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryNoExpand, mainEvents))
@@ -662,6 +681,37 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp
}
matchesExpectations.reduceLeft(_ && _) shouldBe true
}
+
+ it should "politely refuse building metadata JSON if metadata number of rows is too large" in {
+ val workflowId = WorkflowId.randomId()
+
+ val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false)
+ val action = GetMetadataAction(mdQuery)
+
+ val metadataRowNumber = 100500
+ val expectedException = new MetadataTooLargeNumberOfRowsException(workflowId, metadataRowNumber, defaultSafetyRowNumberThreshold)
+ assertMetadataFailureResponse(
+ action,
+ mdQuery,
+ MetadataLookupFailedTooLargeResponse(mdQuery, metadataRowNumber),
+ expectedException
+ )
+ }
+
+ it should "politely refuse building metadata JSON if timeout occurs on attempt to read metadata from database" in {
+ val workflowId = WorkflowId.randomId()
+
+ val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false)
+ val action = GetMetadataAction(mdQuery)
+
+ val expectedException = new MetadataTooLargeTimeoutException(workflowId)
+ assertMetadataFailureResponse(
+ action,
+ mdQuery,
+ MetadataLookupFailedTimeoutResponse(mdQuery),
+ expectedException
+ )
+ }
}
object MetadataBuilderActorSpec {
diff --git a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala
index 77ac86346e7..dcbba0b3fe1 100644
--- a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala
+++ b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala
@@ -609,7 +609,7 @@ object CromwellApiServiceSpec {
sender ! SuccessfulMetadataJsonResponse(request, MetadataBuilderActor.processOutputsResponse(id, event))
case request @ GetLogs(id, _) =>
sender ! SuccessfulMetadataJsonResponse(request, MetadataBuilderActor.workflowMetadataResponse(id, logsEvents(id), includeCallsIfEmpty = false, Map.empty))
- case request @ GetMetadataAction(MetadataQuery(id, _, _, withKeys, withoutKeys, _), _) =>
+ case request @ GetMetadataAction(MetadataQuery(id, _, _, withKeys, withoutKeys, _), _, _) =>
val withKeysList = withKeys.map(_.toList).getOrElse(List.empty)
val withoutKeysList = withoutKeys.map(_.toList).getOrElse(List.empty)
sender ! SuccessfulMetadataJsonResponse(request, responseMetadataValues(id, withKeysList, withoutKeysList))
diff --git a/hybridCarboniteMetadataService/src/main/scala/cromwell/services/metadata/hybridcarbonite/CarbonitingMetadataFreezerActor.scala b/hybridCarboniteMetadataService/src/main/scala/cromwell/services/metadata/hybridcarbonite/CarbonitingMetadataFreezerActor.scala
index 476538c00a9..9c2582656b5 100644
--- a/hybridCarboniteMetadataService/src/main/scala/cromwell/services/metadata/hybridcarbonite/CarbonitingMetadataFreezerActor.scala
+++ b/hybridCarboniteMetadataService/src/main/scala/cromwell/services/metadata/hybridcarbonite/CarbonitingMetadataFreezerActor.scala
@@ -5,13 +5,13 @@ import java.nio.file.StandardOpenOption
import akka.actor.{ActorRef, LoggingFSM, Props}
import cromwell.core.WorkflowId
import cromwell.core.io.{AsyncIo, DefaultIoCommandBuilder}
-import cromwell.services.metadata.MetadataArchiveStatus.{ArchiveFailed, Archived}
+import cromwell.services.metadata.MetadataArchiveStatus.{ArchiveFailed, Archived, TooLargeToArchive}
import cromwell.services.metadata.MetadataService.GetMetadataAction
import cromwell.services.metadata.hybridcarbonite.CarboniteWorkerActor.CarboniteWorkflowComplete
import cromwell.services.metadata.hybridcarbonite.CarbonitingMetadataFreezerActor._
import cromwell.services.metadata.impl.MetadataDatabaseAccess
import cromwell.services.metadata.{MetadataArchiveStatus, MetadataQuery}
-import cromwell.services.{FailedMetadataJsonResponse, MetadataServicesStore, SuccessfulMetadataJsonResponse}
+import cromwell.services.{FailedMetadataJsonResponse, MetadataServicesStore, MetadataTooLargeException, SuccessfulMetadataJsonResponse}
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
import scala.concurrent.ExecutionContext
@@ -55,6 +55,10 @@ class CarbonitingMetadataFreezerActor(freezingConfig: ActiveMetadataFreezingConf
}
goto(Freezing) using FreezingData(workflowId)
+ case Event(FailedMetadataJsonResponse(_, reason: MetadataTooLargeException), FetchingData(workflowId)) =>
+ log.error(reason, s"Carboniting failure: $reason. Marking as $TooLargeToArchive")
+ scheduleDatabaseUpdateAndAwaitResult(workflowId, TooLargeToArchive)
+
case Event(FailedMetadataJsonResponse(_, reason), FetchingData(workflowId)) =>
log.error(reason, s"Failed to fetch workflow $workflowId's metadata to archive. Marking as $ArchiveFailed")
scheduleDatabaseUpdateAndAwaitResult(workflowId, ArchiveFailed)
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 04158459e79..6d7d0acfa84 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -56,6 +56,7 @@ object Dependencies {
private val liquibaseSlf4jV = "2.0.0"
private val liquibaseV = "3.6.3"
private val logbackV = "1.2.3"
+ private val lz4JavaV = "1.7.1"
private val mariadbV = "2.4.2"
private val metrics3ScalaV = "3.5.10" // https://github.com/erikvanoosten/metrics-scala/tree/f733e26#download-4x
private val metrics3StatsdV = "4.2.0"
@@ -526,6 +527,9 @@ object Dependencies {
val bcsBackendDependencies = commonDependencies ++ refinedTypeDependenciesList ++ aliyunBatchComputeDependencies
val tesBackendDependencies = akkaHttpDependencies
val sparkBackendDependencies = akkaHttpDependencies
+ val sfsBackendDependencies = List (
+ "org.lz4" % "lz4-java" % lz4JavaV
+ )
val testDependencies = List(
"org.scalatest" %% "scalatest" % scalatestV,
@@ -579,6 +583,7 @@ object Dependencies {
ossFileSystemDependencies ++
perfDependencies ++
serverDependencies ++
+ sfsBackendDependencies ++
sparkBackendDependencies ++
spiDependencies ++
spiUtilDependencies ++
diff --git a/project/Version.scala b/project/Version.scala
index d143cefa8ac..5328133a361 100644
--- a/project/Version.scala
+++ b/project/Version.scala
@@ -5,7 +5,7 @@ import sbt._
object Version {
// Upcoming release, or current if we're on a master / hotfix branch
- val cromwellVersion = "50"
+ val cromwellVersion = "51"
/**
* Returns true if this project should be considered a snapshot.
diff --git a/scripts/metadata_comparison/README.MD b/scripts/metadata_comparison/README.MD
new file mode 100644
index 00000000000..42a85cc035f
--- /dev/null
+++ b/scripts/metadata_comparison/README.MD
@@ -0,0 +1,36 @@
+# Metadata Comparison Scripts
+
+This `metadata_comparison` python project provides tools to compare workflows run
+in different Cromwell environments to compare overall cost and performance.
+
+## Running a script
+
+Choose a script to run. For this example we will use the `extractor`.
+
+From this top-level directory `metadata_comparison` directory (ie the one
+containing this README.MD file), run:
+
+```sh
+# python3 -m metadata_comparison.extractor
+```
+
+### Questions
+
+- Q: Why not run the scripts directly, eg `python3 extractor.py`?
+ - A: Running python from this outer directory allows it to discover the `metadata_comparison`
+ project, and thus allows imports across and between scripts.
+
+## Unit tests
+
+To run the python unit tests from the top-level `metadata_comparison` directory
+(ie the one containing this README.MD file), run:
+```sh
+# python3 -m unittest discover -v
+```
+
+This will:
+ - Find the `metadata_comparison` project in that subdirectory.
+ - And make it importable to other scripts.
+ - Run the python built-in unittest script, which will:
+ - Discover the tests project in the `test` directory
+ - Run them, verbosely.
\ No newline at end of file
diff --git a/scripts/metadata_comparison/metadata_comparison/__init__.py b/scripts/metadata_comparison/metadata_comparison/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/scripts/metadata_comparison/metadata_comparison/comparer.py b/scripts/metadata_comparison/metadata_comparison/comparer.py
new file mode 100644
index 00000000000..aed83d73128
--- /dev/null
+++ b/scripts/metadata_comparison/metadata_comparison/comparer.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+#
+# comparer.py
+#
+# Purpose: Compare performance metadata JSON files produced by Digester and produce result in CSV format
+#
+# Usage: python3 comparer.py [-h] [-v] [--json_paths JSONPATH [JSONPATH ...]] [--output_path OUTPUTPATH]
+#
+# Python Prereqs (at least, the ones which I needed to manually install... YMMV):
+#
+# * pip3 install --upgrade pandas
+# * pip3 install --upgrade google-api-python-client
+# * pip3 install --upgrade google-cloud-storage
+#
+# Remember to login to create application default credentials before use:
+# % gcloud auth application-default login
+
+from typing import List, Tuple
+import argparse
+import json
+import pandas
+import google.auth
+from google.cloud import storage
+import logging
+from metadata_comparison.lib.logging import set_log_verbosity, quieten_chatty_imports
+from metadata_comparison.lib.storage import upload_blob
+from metadata_comparison.lib.argument_regex import gcs_path_regex_validator, digester_version_regex_validator, \
+ workflow_regex_validator
+
+logger = logging.getLogger('metadata_comparison.comparer')
+
+def read_digester_jsons_from_gcs(bucket_name: str,
+ base_path: str,
+ digester_version: str,
+ workflow_ids: List[str],
+ storage_client: storage.Client) -> List[Tuple[str, dict]]:
+ bucket = storage_client.get_bucket(bucket_name)
+ result = []
+ for workflow_id in workflow_ids:
+ blob = bucket.blob(f"{base_path}/{workflow_id}/digests/{digester_version}/digest.json")
+ json_string_bytes = blob.download_as_string()
+ result.append((workflow_id, json.loads(json_string_bytes)))
+
+ return result
+
+
+def compare_jsons(workflow_ids_and_jsons: List[Tuple[str, dict]]) -> pandas.DataFrame:
+ """
+ Uses pandas library to convert JSONs into dataframes, and concatenate those dataframes into a single one.
+ Performs sanity check, producing exception, if at least one of the JSONs doesn't have matching subset of keys.
+ """
+ columnToCompareNameEnding = ".cromwellTotalTimeSeconds"
+ versionColumnName = "version"
+ result = pandas.DataFrame()
+ last_cols = []
+ for workflow_id_and_json in workflow_ids_and_jsons:
+ df = pandas.json_normalize(workflow_id_and_json[1])
+ cols = [c for c in df.columns if c.endswith(columnToCompareNameEnding)]
+ cols.sort()
+ cols.insert(0, versionColumnName)
+
+ if last_cols and last_cols != cols:
+ raise Exception(f"JSON data at {workflow_ids_and_jsons[0]} doesn't have matching subset of columns. Expected: {last_cols} but got {cols}")
+
+ last_cols = cols
+ df.index = [workflow_id_and_json[0]]
+ result = pandas.concat([result, df[cols]])
+
+ renameVersionColumnTo = "digester format version"
+ result.rename(columns={versionColumnName: renameVersionColumnTo}, inplace=True)
+ result.index.name = "workflow id"
+
+ return result
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description='Compare performance metadata JSONs and produce CSV result')
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('--digester-version', metavar='DIGESTERVERSION', type=digester_version_regex_validator, nargs=1,
+ help='Compare digests produced by this version of the digester')
+ parser.add_argument('--digest-gcs-base-path', metavar='DIGESTGCSBASEPATH', type=gcs_path_regex_validator, nargs=1,
+ help='GCS base path to the directory containing JSONs produced by digester')
+ parser.add_argument('--output-gcs-file-path', metavar='OUTPUTGCSFILE', type=gcs_path_regex_validator, nargs=1,
+ help='GCS path to output CSV file')
+ parser.add_argument('--workflow-ids', metavar='WORKFLOWIDS', type=workflow_regex_validator, nargs='+',
+ help='Workflow ids for performance comparison')
+
+ args = parser.parse_args()
+ set_log_verbosity(args.verbose)
+ quieten_chatty_imports()
+ logger.info("Starting Comparer operation.")
+
+ credentials, project_id = google.auth.default()
+ storage_client = storage.Client(credentials = credentials)
+ input_gcs_bucket, input_gcs_path = args.digest_gcs_base_path[0]
+
+ workflow_ids_and_jsons = read_digester_jsons_from_gcs(input_gcs_bucket, input_gcs_path, args.digester_version[0], args.workflow_ids, storage_client)
+ comparison_result_df = compare_jsons(workflow_ids_and_jsons)
+ result_csv_string = comparison_result_df.to_csv()
+
+ output_gcs_bucket, output_gcs_path = args.output_gcs_file_path[0]
+ upload_blob(output_gcs_bucket, result_csv_string, output_gcs_path, storage_client, logger)
+
+ logger.info('Comparer operation completed successfully.')
diff --git a/scripts/metadata_comparison/metadata_comparison/digester.py b/scripts/metadata_comparison/metadata_comparison/digester.py
new file mode 100644
index 00000000000..8e0cf9f4cea
--- /dev/null
+++ b/scripts/metadata_comparison/metadata_comparison/digester.py
@@ -0,0 +1,109 @@
+import argparse
+import json
+from metadata_comparison.lib import logging, operation_ids
+from metadata_comparison.lib.operation_ids import CallNameSequence, JsonObject, OperationId
+from metadata_comparison.lib.comparison_paths import ComparisonPath
+from metadata_comparison.lib.operations_digesters import OperationDigester
+
+import dateutil.parser
+from typing import AnyStr, Dict
+
+Version = "0.0.1"
+
+
+def main(args: argparse.Namespace) -> None:
+ for path in args.paths:
+ parent_path = ComparisonPath.create(path)
+
+ workflow_path = parent_path / 'workflow.json'
+ operations_dir_path = parent_path / 'operations'
+
+ digest_parent = parent_path / 'digests' / Version
+ digest_path = digest_parent / 'digest.json'
+
+ if not digest_path.exists() or args.force:
+ digest_parent.mkdir_p()
+ digest_json = digest(workflow_path, operations_dir_path)
+ digest_string = json.dumps(digest_json, sort_keys=True, indent=4)
+ digest_path.write_text(digest_string)
+ else:
+ raise ValueError(f'digest file already exists at {digest_path} and --force not specified')
+
+
+def parse_args() -> argparse.Namespace:
+ def validate_path(p: AnyStr) -> AnyStr:
+ if ComparisonPath.is_valid_path_string(p):
+ return p
+ raise ValueError(f'{p} is not a valid path whatsoever')
+
+ parser = argparse.ArgumentParser(
+ description='Digest workflow metadata and job operation details, reading from and reuploading to GCS.')
+ parser.add_argument('-v', '--verbose', action='store_true',
+ help='whether to log verbosely (default False)')
+ parser.add_argument('-f', '--force', action='store_true',
+ help='whether to overwrite existing digests (default False)')
+ parser.add_argument('paths', metavar="PATH", nargs='+', type=validate_path,
+ help="Location at which to find metadata (local or GCS)")
+
+ return parser.parse_args()
+
+
+CallName = AnyStr
+
+
+def digest(workflow_path: ComparisonPath, operations_path: ComparisonPath) -> JsonObject:
+ def call_fn(succeeded_operations: Dict[CallName, JsonObject],
+ operation_id: OperationId,
+ path: CallNameSequence,
+ attempt: JsonObject) -> None:
+ backend_status = attempt.get('backendStatus', 'Unknown')
+ # This script should only ever be pointed at successful workflow metadata. All jobs that have a backend status
+ # other than `Success` must have later been re-run successfully, so any un`Success`ful attempts are ignored.
+ # It's possible that a future version of the digester might actually want to look at these jobs since they
+ # may have completed some lifecycle events which could be useful in accumulating more performance data.
+ if backend_status == 'Success':
+ string_path = '.'.join(path)
+ cromwell_start = attempt.get('start')
+ cromwell_end = attempt.get('end')
+
+ cromwell_total_time_seconds = (dateutil.parser.parse(cromwell_end) -
+ dateutil.parser.parse(cromwell_start)).total_seconds()
+
+ bare_operation_id = operation_id.split('/')[-1]
+ operations_file_path = operations_path / f'{bare_operation_id}.json'
+ operations_data = operations_file_path.read_text()
+ operations_metadata = json.loads(operations_data)
+ operation = OperationDigester.create(operations_metadata)
+
+ papi_total_time_seconds = operation.total_time_seconds()
+
+ cromwell_additional_total_time_seconds = \
+ float("%.3f" % (cromwell_total_time_seconds - papi_total_time_seconds))
+
+ succeeded_operations[string_path] = {
+ "attempt": attempt.get('attempt'),
+ "shardIndex": attempt.get('shardIndex'),
+ "operationId": operation_id,
+ "cromwellStart": cromwell_start,
+ "cromwellEnd": cromwell_end,
+ "cromwellTotalTimeSeconds": cromwell_total_time_seconds,
+ "papiStart": operation.start_time(),
+ "papiEnd": operation.end_time(),
+ "papiTotalTimeSeconds": operation.total_time_seconds(),
+ "cromwellAdditionalTotalTimeSeconds": cromwell_additional_total_time_seconds,
+ "dockerImagePullSeconds": operation.docker_image_pull_seconds()
+ }
+
+ data = workflow_path.read_text()
+ metadata = json.loads(data)
+
+ shards = operation_ids.visit_papi_operations(metadata, call_fn, initial_accumulator={})
+ return {'version': Version, 'calls': shards, 'workflowId': metadata['id']}
+
+
+if __name__ == "__main__":
+ logging.quieten_chatty_imports()
+ _args = parse_args()
+ logging.set_log_verbosity(_args.verbose)
+
+ main(_args)
diff --git a/scripts/metadata_comparison/metadata_comparison/extractor.py b/scripts/metadata_comparison/metadata_comparison/extractor.py
new file mode 100755
index 00000000000..f41a645c059
--- /dev/null
+++ b/scripts/metadata_comparison/metadata_comparison/extractor.py
@@ -0,0 +1,186 @@
+#!/usr/bin/env python3
+#
+# extractor.py
+#
+# Purpose: Read workflow metadata from Cromwell, and all metadata for its jobs,
+# and upload it to a GCS bucket
+#
+# Usage: python3 extractor.py [ [...]]
+#
+# Python Prereqs (at least, the ones which I needed to manually install... YMMV):
+#
+# * pip3 install --upgrade requests
+# * pip3 install --upgrade google-api-python-client
+# * pip3 install --upgrade google-cloud
+# * pip3 install --upgrade google-cloud-storage
+# * pip3 install --upgrade gitpython
+#
+# Remember to login to create application default credentials before use:
+# % gcloud auth application-default login
+
+import argparse
+import json
+import requests
+from google.cloud import storage
+import google.auth
+from pathlib import Path
+import git
+import os
+import zipfile
+import logging
+from metadata_comparison.lib.argument_regex import gcs_path_regex_validator, workflow_regex_validator
+from metadata_comparison.lib.operation_ids import get_operation_id_number, visit_papi_operations, CallNameSequence, \
+ JsonObject, OperationId
+from metadata_comparison.lib.papi.papi_clients import PapiClients
+from metadata_comparison.lib.storage import upload_blob
+from typing import Any, AnyStr, List, Mapping, Sequence, Union
+from metadata_comparison.lib.logging import quieten_chatty_imports, set_log_verbosity
+
+logger = logging.getLogger('metadata_comparison.extractor')
+
+
+def __create_snapshot_of_local_repo(repo: git.Repo, cromwell_snapshots_path: Union[Path, str]) -> Union[Path, str]:
+ last_commit_hash = repo.head.commit.hexsha
+ if not os.path.exists(cromwell_snapshots_path):
+ os.makedirs(cromwell_snapshots_path)
+ current_snapshot_path = cromwell_snapshots_path / last_commit_hash
+ if not os.path.exists(current_snapshot_path):
+ os.makedirs(current_snapshot_path)
+ repo.clone(current_snapshot_path)
+ return current_snapshot_path
+
+
+def __create_zip_file(zip_file_path: Union[Path, str], current_snapshot_path: Union[Path, str]):
+ with zipfile.ZipFile(zip_file_path, "a", allowZip64=False) as zip_file:
+ for root, dirs, files in os.walk(current_snapshot_path):
+ for file in files:
+ zip_file.write(os.path.join(root, file))
+
+
+def upload_local_checkout(cromwell_path: Path,
+ gcs_bucket: str,
+ gcs_path: str,
+ gcs_storage_client: storage.Client) -> None:
+ cromwell_snapshots_path = cromwell_path.parent / "cromwell_snapshots"
+
+ repo = git.Repo(cromwell_path)
+ if repo.is_dirty():
+ raise Exception("Unable to upload local checkout to GCS: repository is dirty - need to do check in first.")
+
+ zip_file_name = f"cromwell_code.zip"
+ zip_file_path = Path(cromwell_snapshots_path / zip_file_name)
+ if not os.path.exists(zip_file_path):
+ current_snapshot_path = __create_snapshot_of_local_repo(repo, cromwell_snapshots_path)
+ __create_zip_file(zip_file_path, current_snapshot_path)
+
+ upload_blob(gcs_bucket, zip_file_path.read_bytes(), f"{gcs_path}/{zip_file_name}", gcs_storage_client, logger)
+
+
+def upload_local_config(config_path: Path, gcs_bucket: str, gcs_path: str, gcs_storage_client: storage.Client):
+ configuration_file_name = "cromwell.conf"
+ upload_blob(gcs_bucket, config_path.read_text(), f"{gcs_path}/{configuration_file_name}", gcs_storage_client, logger)
+
+
+def fetch_raw_workflow_metadata(cromwell_url: str, workflow: str) -> (requests.Response, JsonObject):
+ """Fetches workflow metadata for a workflow. Returns the raw response and the dict read from json"""
+ url = f'{cromwell_url}/api/workflows/v1/{workflow}/metadata?expandSubWorkflows=true'
+ logger.info(f'Fetching Cromwell metadata from {url}...')
+ result = requests.get(url)
+ return result.content, result.json()
+
+
+def upload_workflow_metadata_json(bucket_name: str,
+ raw_workflow_metadata: bytes,
+ workflow_gcs_base_path: str,
+ gcs_storage_client: storage.Client) -> None:
+ workflow_gcs_metadata_upload_path = f'{workflow_gcs_base_path}/metadata.json'
+ upload_blob(bucket_name, raw_workflow_metadata, workflow_gcs_metadata_upload_path, gcs_storage_client, logger)
+
+
+def upload_operations_metadata_json(bucket_name: str,
+ operation_id: str,
+ operations_metadata: Mapping[str, Any],
+ workflow_gcs_base_path: str,
+ gcs_storage_client: storage.Client) -> None:
+ """Uploads metadata to cloud storage, as json"""
+ operation_upload_path = f'{workflow_gcs_base_path}/operations/{get_operation_id_number(operation_id)}.json'
+ formatted_metadata = json.dumps(operations_metadata, indent=2)
+ upload_blob(bucket_name, bytes(formatted_metadata, 'utf-8'), operation_upload_path, gcs_storage_client, logger)
+
+
+def find_operation_ids_in_metadata(json_metadata: JsonObject) -> Sequence[AnyStr]:
+ """Finds all instances of PAPI operations IDs in a workflow"""
+ # Eg given:
+ # {
+ # "calls": {
+ # "workflow_name.task_name": [
+ # {
+ # "jobId": "projects/broad-dsde-cromwell-dev/operations/01234567891011121314",
+ # ...
+ #
+ # We want to extract "projects/broad-dsde-cromwell-dev/operations/01234567891011121314"
+ def call_fn(acc: List[AnyStr],
+ operation_id: OperationId,
+ call_name_sequence: CallNameSequence,
+ attempt: JsonObject) -> None:
+ acc.append(operation_id)
+
+ return visit_papi_operations(json_metadata, call_fn, initial_accumulator=[])
+
+
+def process_workflow(cromwell_url: str,
+ gcs_bucket: str,
+ gcs_path: str,
+ gcs_storage_client: storage.Client,
+ papi_clients: PapiClients,
+ workflow: str) -> None:
+ raw_metadata, json_metadata = fetch_raw_workflow_metadata(cromwell_url, workflow)
+ workflow_gcs_base_path = f'{gcs_path}/{workflow}/extractor'
+
+ operation_ids = find_operation_ids_in_metadata(json_metadata)
+ for id in operation_ids:
+ operation_metadata = papi_clients.request_operation_metadata(id)
+ upload_operations_metadata_json(gcs_bucket, id, operation_metadata, workflow_gcs_base_path, gcs_storage_client)
+ upload_workflow_metadata_json(gcs_bucket, raw_metadata, workflow_gcs_base_path, gcs_storage_client)
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description='Extract metadata and operation details for workflows and upload to GCS')
+ parser.add_argument('-v', '--verbose', action='store_true')
+ parser.add_argument('cromwell_url', metavar='CROMWELL', type=str, nargs=1,
+ help='Cromwell host')
+ parser.add_argument('gcs_path', metavar='GCSPATH', type=gcs_path_regex_validator, nargs=1,
+ help='GCS path to upload to')
+ parser.add_argument('workflows', metavar='WORKFLOW', type=workflow_regex_validator, nargs='+',
+ help='Workflows to process')
+ parser.add_argument('cromwell_checkout_path', metavar='CROMWELLCHECKOUTPATH', type=Path,
+ help='Path to Cromwell git checkout used to run workflows')
+ parser.add_argument('cromwell_config_path', metavar='CROMWELLCONFIGPATH', type=Path,
+ help='Path to Cromwell configuration file used to run workflows')
+
+ args = parser.parse_args()
+ set_log_verbosity(args.verbose)
+ quieten_chatty_imports()
+
+ cromwell_url = args.cromwell_url[0]
+ gcs_bucket, gcs_path = args.gcs_path[0]
+ workflows = args.workflows
+
+ credentials, project_id = google.auth.default()
+ storage_client = storage.Client(credentials=credentials)
+ papi_clients = PapiClients(credentials)
+
+ logger.info(f'cromwell: {cromwell_url}')
+ logger.info(f'gcs_bucket: {gcs_bucket}; gcs_path: {gcs_path}')
+ logger.info(f'workflows: {workflows}')
+
+ for workflow in workflows:
+ process_workflow(cromwell_url, gcs_bucket, gcs_path, storage_client, papi_clients, workflow)
+
+ if args.cromwell_checkout_path:
+ upload_local_checkout(args.cromwell_checkout_path, gcs_bucket, gcs_path, storage_client)
+ if args.cromwell_config_path:
+ upload_local_config(args.cromwell_config_path, gcs_bucket, gcs_path, storage_client)
+
+ logger.info('Extractor operation completed successfully.')
diff --git a/scripts/metadata_comparison/metadata_comparison/lib/__init__.py b/scripts/metadata_comparison/metadata_comparison/lib/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/scripts/metadata_comparison/metadata_comparison/lib/argument_regex.py b/scripts/metadata_comparison/metadata_comparison/lib/argument_regex.py
new file mode 100644
index 00000000000..6cee2009243
--- /dev/null
+++ b/scripts/metadata_comparison/metadata_comparison/lib/argument_regex.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+
+import argparse
+import re
+
+
+def workflow_regex_validator(value: str) -> str:
+ """Makes sure that a value is a valid Cromwell workflow ID then returns the workflow ID"""
+ workflow_regex=re.compile('^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$')
+ if not workflow_regex.match(value):
+ msg = f'Invalid workflow ID {value}. Expected {workflow_regex.pattern}'
+ raise argparse.ArgumentTypeError(msg)
+ else:
+ return value
+
+
+def url_regex_validator(value: str) -> str:
+ """
+ Validates then extract the root of the Cromwell URL from the various URL strings which might be provided.
+ Deliberately flexible because it's tedious to remember which script requires which type of format.
+ eg:
+ 'http://localhost' => 'http://localhost'
+ 'http://localhost:8000' => 'http://localhost:8000'
+ 'http://localhost:8000/' => 'http://localhost:8000'
+ 'http://localhost:8000/api/workflows/' => 'http://localhost:8000'
+ 'http://localhost:8000/custom/prefix/api/workflows/' => 'http://localhost:8000/custom/prefix'
+ """
+ url_regex = re.compile('(http(s?)://((?!/api).)*[^/])(/(api.*)?)?')
+ m = url_regex.match(value)
+ if m:
+ return m.group(1)
+ else:
+ msg = f'Invalid Cromwell URL {value}. Expected {url_regex.pattern}'
+ raise argparse.ArgumentTypeError(msg)
+
+
+def gcs_path_regex_validator(value: str) -> (str, str):
+ """
+ Validates then extracts the bucket and object-path from a GS string. Returned as a pair.
+ eg:
+ 'gs://bucket/path/to/directory/' -> ('bucket', 'path/to/directory')
+ or
+ 'gs://bucket/path/to/file.ext' -> ('bucket', 'path/to/file.ext')
+ """
+ bucket_class = 'a-zA-Z0-9-'
+ object_class = '_\\.' + bucket_class
+ gcs_regex = re.compile(f'^gs://(?P[{bucket_class}]+)/(?P