Skip to content

Commit

Permalink
Limit the number of failed cache hit copy attempts [BA-6430] (#5514)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr authored May 29, 2020
1 parent 4e8ba56 commit fc6ad11
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import cromwell.services.CallCaching.CallCachingEntryId
object BackendCacheHitCopyingActor {
final case class CopyOutputsCommand(womValueSimpletons: Seq[WomValueSimpleton], jobDetritusFiles: Map[String, String], cacheHit: CallCachingEntryId, returnCode: Option[Int])

final case class CopyingOutputsFailedResponse(jobKey: JobKey, cacheCopyAttempt: Int, failure: CacheCopyError)
final case class CopyingOutputsFailedResponse(jobKey: JobKey, cacheCopyAttempt: Int, failure: CacheCopyFailure)

sealed trait CacheCopyError
final case class LoggableCacheCopyError(failure: Throwable) extends CacheCopyError
final case class MetricableCacheCopyError(failureCategory: MetricableCacheCopyErrorCategory) extends CacheCopyError
sealed trait CacheCopyFailure
/** A cache hit copy was attempted but failed. */
final case class CopyAttemptError(failure: Throwable) extends CacheCopyFailure
/** Copying was requested for a blacklisted cache hit, however the cache hit copying actor found the hit had already
* been blacklisted so no novel copy attempt was made. */
final case class BlacklistSkip(failureCategory: MetricableCacheCopyErrorCategory) extends CacheCopyFailure
}

object MetricableCacheCopyErrorCategory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
val (nextState, cacheReadType) =
if (isSourceBlacklisted(cacheHit)) {
// We don't want to log this because blacklisting is a common and expected occurrence.
(failAndStop(MetricableCacheCopyError(MetricableCacheCopyErrorCategory.HitBlacklisted)), ReadHitOnly)
(failAndStop(BlacklistSkip(MetricableCacheCopyErrorCategory.HitBlacklisted)), ReadHitOnly)
} else if (isSourceBlacklisted(command)) {
// We don't want to log this because blacklisting is a common and expected occurrence.
(failAndStop(MetricableCacheCopyError(MetricableCacheCopyErrorCategory.BucketBlacklisted)), ReadHitAndBucket)
(failAndStop(BlacklistSkip(MetricableCacheCopyErrorCategory.BucketBlacklisted)), ReadHitAndBucket)
} else {
// Try to make a Path of the callRootPath from the detritus
val next = lookupSourceCallRootPath(jobDetritus) match {
Expand All @@ -172,7 +172,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
case Some(Success(_)) => succeedAndStop(returnCode, destinationCallOutputs, destinationDetritus)
case Some(Failure(failure)) =>
// Something went wrong in the custom duplication code. We consider this loggable because it's most likely a user-permission error:
failAndStop(LoggableCacheCopyError(failure))
failAndStop(CopyAttemptError(failure))
// Otherwise send the first round of IoCommands (file outputs and detritus) if any
case None if detritusAndOutputsIoCommands.nonEmpty =>
detritusAndOutputsIoCommands foreach sendIoCommand
Expand All @@ -186,11 +186,11 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
}

// Something went wrong in generating duplication commands. We consider this loggable error because we don't expect this to happen:
case Failure(failure) => failAndStop(LoggableCacheCopyError(failure))
case Failure(failure) => failAndStop(CopyAttemptError(failure))
}

// Something went wrong in looking up the call root... loggable because we don't expect this to happen:
case Failure(failure) => failAndStop(LoggableCacheCopyError(failure))
case Failure(failure) => failAndStop(CopyAttemptError(failure))
}
(next, ReadHitAndBucket)
}
Expand All @@ -217,14 +217,14 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
handleBlacklistingForForbidden(
path = f.forbiddenPath,
// Loggable because this is an attempt-specific problem:
andThen = failAndAwaitPendingResponses(LoggableCacheCopyError(f.failure), f.command, data)
andThen = failAndAwaitPendingResponses(CopyAttemptError(f.failure), f.command, data)
)
case Event(IoFailAck(command: IoCommand[_], failure), Some(data)) =>
handleBlacklistingForGenericFailure()
// Loggable because this is an attempt-specific problem:
failAndAwaitPendingResponses(LoggableCacheCopyError(failure), command, data)
failAndAwaitPendingResponses(CopyAttemptError(failure), command, data)
// Should not be possible
case Event(IoFailAck(_: IoCommand[_], failure), None) => failAndStop(LoggableCacheCopyError(failure))
case Event(IoFailAck(_: IoCommand[_], failure), None) => failAndStop(CopyAttemptError(failure))
}

when(FailedState) {
Expand Down Expand Up @@ -282,15 +282,15 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
stay()
}

def failAndStop(failure: CacheCopyError): State = {
def failAndStop(failure: CacheCopyFailure): State = {
context.parent ! CopyingOutputsFailedResponse(jobDescriptor.key, standardParams.cacheCopyAttempt, failure)
context stop self
stay()
}

/** If there are no responses pending this behaves like `failAndStop`, otherwise this goes to `FailedState` and waits
* for all the pending responses to come back before stopping. */
def failAndAwaitPendingResponses(failure: CacheCopyError, command: IoCommand[_], data: StandardCacheHitCopyingActorData): State = {
def failAndAwaitPendingResponses(failure: CacheCopyFailure, command: IoCommand[_], data: StandardCacheHitCopyingActorData): State = {
context.parent ! CopyingOutputsFailedResponse(jobDescriptor.key, standardParams.cacheCopyAttempt, failure)

val (newData, commandState) = data.commandComplete(command)
Expand Down Expand Up @@ -388,7 +388,7 @@ abstract class StandardCacheHitCopyingActor(val standardParams: StandardCacheHit
}

// Loggable because this is attempt-specific:
failAndStop(LoggableCacheCopyError(new TimeoutException(exceptionMessage)))
failAndStop(CopyAttemptError(new TimeoutException(exceptionMessage)))
()
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ call-caching {
# to fail for external reasons which should not invalidate the cache (e.g. auth differences between users):
# (default: true)
invalidate-bad-cache-results = true

# The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job.
max-failed-copy-attempts = 1000000
}

google {
Expand Down
3 changes: 3 additions & 0 deletions cromwell.example.backends/cromwell.examples.conf
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ call-caching {
# (default: true)
#invalidate-bad-cache-results = true

# The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job.
#max-failed-copy-attempts = 1000000

# blacklist-cache {
# # The call caching blacklist cache is off by default. This cache is used to blacklist cache hits based on cache
# # hit ids or buckets of cache hit paths that Cromwell has previously failed to copy for permissions reasons.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,16 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
val ejeaName = s"${workflowDescriptor.id}-EngineJobExecutionActor-${jobKey.tag}"
val backendName = backendLifecycleActorFactory.name
val backendSingleton = params.backendSingletonCollection.backendSingletonActors(backendName)

val callCachingParameters = EngineJobExecutionActor.CallCachingParameters(
mode = workflowDescriptor.callCachingMode,
readActor = params.callCacheReadActor,
writeActor = params.callCacheWriteActor,
fileHashCacheActor = params.fileHashCacheActor,
maxFailedCopyAttempts = params.rootConfig.getInt("call-caching.max-failed-copy-attempts"),
blacklistCache = params.blacklistCache
)

val ejeaProps = EngineJobExecutionActor.props(
self,
jobKey,
Expand All @@ -631,15 +641,11 @@ case class WorkflowExecutionActor(params: WorkflowExecutionActorParams)
serviceRegistryActor = serviceRegistryActor,
ioActor = params.ioActor,
jobStoreActor = params.jobStoreActor,
callCacheReadActor = params.callCacheReadActor,
callCacheWriteActor = params.callCacheWriteActor,
workflowDockerLookupActor = params.workflowDockerLookupActor,
jobTokenDispenserActor = params.jobTokenDispenserActor,
backendSingleton,
workflowDescriptor.callCachingMode,
command,
fileHashCacheActor = params.fileHashCacheActor,
blacklistCache = params.blacklistCache
callCachingParameters
)

val ejeaRef = context.actorOf(ejeaProps, ejeaName)
Expand Down
Loading

0 comments on commit fc6ad11

Please sign in to comment.