From dcd596a8d2db0f82977149d06fc07bfe6b9a9090 Mon Sep 17 00:00:00 2001 From: coreone Date: Fri, 8 Jan 2016 15:19:10 -0500 Subject: [PATCH 01/31] Refactor proxy-compose.yaml.ctmpl to use multiple Vault keys --- src/main/config/cromwell-compose.yaml.ctmpl | 47 +++++++++++++-------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/src/main/config/cromwell-compose.yaml.ctmpl b/src/main/config/cromwell-compose.yaml.ctmpl index bf63b053568..eaff76fa0ce 100644 --- a/src/main/config/cromwell-compose.yaml.ctmpl +++ b/src/main/config/cromwell-compose.yaml.ctmpl @@ -1,33 +1,44 @@ -{{with $environment := env "ENVIRONMENT"}} -{{$keyname := printf "secret/dsde/%s/cromwell/cromwell-compose.yaml" $environment}} -{{with vault $keyname}} +{{with $environment := env "ENVIRONMENT"}}{{$cromkey := printf "secret/dsde/%s/cromwell/cromwell-compose.yaml" $environment}}{{with vault $cromkey}} +{{$cromwell_image := .Data.cromwell_image}} +{{$cromwell_dns := .Data.cromwell_dns}} +{{$env_log_driver := .Data.env_log_driver}} +{{$env_java_opts := .Data.env_java_opts}} +{{$cromwell_volumes := .Data.cromwell_volumes}} +{{$proxy_hostname := .Data.proxy_hostname}} +{{$proxy_log_driver := .Data.proxy_log_driver}} +{{$proxy_volumes := .Data.proxy_volumes}} +{{$env_callback_uri := .Data.env_callback_uri}} +{{$env_log_level := .Data.env_log_level}} +{{$env_server_name := .Data.env_server_name}} +{{$commonkey := printf "secret/dsde/%s/common/proxy-ldap" $environment}}{{with vault $commonkey}} +{{$proxy_ldap_group := .Data.proxy_ldap_group}} +{{$proxy_ldap_url := .Data.proxy_ldap_url}} app: - image: {{.Data.cromwell_image}} - {{.Data.cromwell_dns}} - log_driver: "{{.Data.env_log_driver}}" + image: {{ $cromwell_image }} + {{ $cromwell_dns }} + log_driver: "{{ $env_log_driver }}" environment: - JAVA_OPTS: {{.Data.env_java_opts}} - {{.Data.cromwell_volumes}} + JAVA_OPTS: {{ $env_java_opts }} + {{ $cromwell_volumes }} proxy: image: broadinstitute/openidc-proxy:latest - hostname: {{.Data.proxy_hostname}} - log_driver: "{{.Data.proxy_log_driver}}" + hostname: {{ $proxy_hostname }} + log_driver: "{{ $proxy_log_driver }}" links: - app:app ports: - "80:80" - "443:443" - {{.Data.proxy_volumes}} + {{ $proxy_volumes }} environment: - CALLBACK_URI: {{.Data.env_callback_uri}} - LOG_LEVEL: {{.Data.env_log_level}} + CALLBACK_URI: {{ $env_callback_uri }} + LOG_LEVEL: {{ $env_log_level }} PROXY_URL: http://app:8000/ PROXY_URL2: http://app:8000/api - SERVER_NAME: {{.Data.env_server_name}} - AUTH_REQUIRE2: Require ldap-group {{.Data.proxy_ldap_group}} - AUTH_LDAP_URL2: 'AuthLDAPURL "{{.Data.proxy_ldap_url}}"' + SERVER_NAME: {{ $env_server_name }} + AUTH_REQUIRE2: Require ldap-group {{ $proxy_ldap_group }} + AUTH_LDAP_URL2: 'AuthLDAPURL "{{ $proxy_ldap_url }}"' AUTH_LDAP_GROUP_ATTR2: 'AuthLDAPGroupAttribute member' REMOTE_USER_CLAIM: sub -{{end}} -{{end}} +{{end}}{{end}}{{end}} From d13975db21efb0b45906d13179df2bcf816d5553 Mon Sep 17 00:00:00 2001 From: coreone Date: Fri, 8 Jan 2016 15:47:59 -0500 Subject: [PATCH 02/31] Add config to require LDAP auth to query LDAP membership in proxy --- src/main/config/cromwell-compose.yaml.ctmpl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/config/cromwell-compose.yaml.ctmpl b/src/main/config/cromwell-compose.yaml.ctmpl index eaff76fa0ce..1d865707d94 100644 --- a/src/main/config/cromwell-compose.yaml.ctmpl +++ b/src/main/config/cromwell-compose.yaml.ctmpl @@ -10,10 +10,11 @@ {{$env_callback_uri := .Data.env_callback_uri}} {{$env_log_level := .Data.env_log_level}} {{$env_server_name := .Data.env_server_name}} - {{$commonkey := printf "secret/dsde/%s/common/proxy-ldap" $environment}}{{with vault $commonkey}} {{$proxy_ldap_group := .Data.proxy_ldap_group}} {{$proxy_ldap_url := .Data.proxy_ldap_url}} +{{$proxy_ldap_bind_dn := .Data.proxy_ldap_bind_dn}} +{{$proxy_ldap_bind_password := .Data.proxy_ldap_bind_password}} app: image: {{ $cromwell_image }} {{ $cromwell_dns }} @@ -40,5 +41,7 @@ proxy: AUTH_REQUIRE2: Require ldap-group {{ $proxy_ldap_group }} AUTH_LDAP_URL2: 'AuthLDAPURL "{{ $proxy_ldap_url }}"' AUTH_LDAP_GROUP_ATTR2: 'AuthLDAPGroupAttribute member' + AUTH_LDAP_BIND_DN2: 'AuthLDAPBindDN "{{ $proxy_ldap_bind_dn }}"' + AUTH_LDAP_BIND_PASSWORD2: 'AuthLDAPBindPassword {{ $proxy_ldap_bind_password }}' REMOTE_USER_CLAIM: sub {{end}}{{end}}{{end}} From fe30beef0e81861e186d62abceee45698940b0fc Mon Sep 17 00:00:00 2001 From: geoffjentry Date: Mon, 11 Jan 2016 15:01:37 -0500 Subject: [PATCH 03/31] add gitter badge --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 2f4efb9a71a..4f8231e3b1b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ [![Build Status](https://travis-ci.org/broadinstitute/cromwell.svg?branch=develop)](https://travis-ci.org/broadinstitute/cromwell?branch=develop) [![Coverage Status](https://coveralls.io/repos/broadinstitute/cromwell/badge.svg?branch=develop)](https://coveralls.io/r/broadinstitute/cromwell?branch=develop) +[![Join the chat at https://gitter.im/broadinstitute/cromwell](https://badges.gitter.im/broadinstitute/cromwell.svg)](https://gitter.im/broadinstitute/cromwell?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) + Cromwell ======== From 13f59c8fb255fa9513dedf49c1c9bb8c180ccb3d Mon Sep 17 00:00:00 2001 From: Scott Frazer Date: Mon, 11 Jan 2016 15:06:45 -0500 Subject: [PATCH 04/31] version bump --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index afc05b9017e..dc609b6061b 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import sbtrelease.ReleasePlugin._ name := "cromwell" -version := "0.16" +version := "0.17" organization := "org.broadinstitute" From 72481fb2e3b9dbd33caa2f7758173db9f5a1e4d3 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Sat, 19 Dec 2015 10:40:19 -0500 Subject: [PATCH 05/31] `CromwellServer` startup now checks that there are no additional args. The server also exits with a 1 if the server does not start up, using a synchronous error message. Still could use clean termination handling (via lenthall?). Refactored the `WorkflowManagerSystem` passing into `Main`, including ensuring that during tests the internal actor system does get shut down, but not prematurely. Made the `PromiseActor` more generic. Eventually may move to lenthall, and possibly make it even easier to use, as right now it requires using `tell` instead of `!` to ensure the `sender` is set correctly. `sys.exit()` is only called in the `object Main`, not conditionally in the `class Main`. Moved `getAction()` from the `Main` class to the object. --- src/main/scala/cromwell/Main.scala | 90 +++++++++---------- .../cromwell/server/CromwellServer.scala | 24 ++++- .../server/WorkflowManagerSystem.scala | 13 ++- .../scala/cromwell/CromwellTestkitSpec.scala | 19 ++++ src/test/scala/cromwell/MainSpec.scala | 65 +++++++------- .../engine/backend/jes/JesBackendSpec.scala | 18 ++-- .../engine/db/slick/SlickDataAccessSpec.scala | 10 ++- .../cromwell/logging/WorkflowLoggerSpec.scala | 9 +- .../CromwellApiServiceIntegrationSpec.scala | 5 ++ 9 files changed, 160 insertions(+), 93 deletions(-) diff --git a/src/main/scala/cromwell/Main.scala b/src/main/scala/cromwell/Main.scala index 59b1d6cb769..3259a759b10 100644 --- a/src/main/scala/cromwell/Main.scala +++ b/src/main/scala/cromwell/Main.scala @@ -3,7 +3,7 @@ package cromwell import java.io.{File => JFile} import java.nio.file.{Files, Path, Paths} -import akka.actor.{Actor, ActorRef, Props, Status} +import akka.actor.{Actor, Props, Status} import better.files._ import wdl4s.formatter.{AnsiSyntaxHighlighter, HtmlSyntaxHighlighter, SyntaxFormatter} import wdl4s.{AstTools, _} @@ -17,7 +17,7 @@ import org.slf4j.LoggerFactory import spray.json._ import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise} +import scala.concurrent.{Future, Await, Promise} import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -43,7 +43,7 @@ object Main extends App { * Also now passing args to runAction instead of the constructor, as even sbt seemed to have issues with the args * array becoming null in "new Main(args)" when used with: sbt 'run run ...' */ - new Main().runAction(args) + sys.exit(new Main().runAction(args)) /** * If a cromwell server is going to be run, makes adjustments to the default logback configuration. @@ -55,48 +55,55 @@ object Main extends App { * @param args The command line arguments. */ private def setupServerLogging(args: Array[String]): Unit = { - args.headOption.map(_.capitalize) match { - case Some("Server") => sys.props.getOrElseUpdate("LOG_MODE", "STANDARD") + getAction(args) match { + case Some(Actions.Server) => sys.props.getOrElseUpdate("LOG_MODE", "STANDARD") case _ => } } + + private def getAction(args: Seq[String]): Option[Actions.Value] = for { + arg <- args.headOption + argCapitalized = arg.capitalize + action <- Actions.values find (_.toString == argCapitalized) + } yield action } /** A simplified version of the Akka `PromiseActorRef` that doesn't time out. */ -private class PromiseWorkflowActor(promise: Promise[Any], runner: ActorRef) extends Actor { - runner ! RunWorkflow - +private class PromiseActor(promise: Promise[Any]) extends Actor { override def receive = { - case Status.Failure(f) => promise.tryFailure(f) - case success => promise.trySuccess(success) + case Status.Failure(f) => + promise.tryFailure(f) + context.stop(self) + case success => + promise.trySuccess(success) + context.stop(self) } } - -class Main private[cromwell](enableTermination: Boolean, managerSystem: () => WorkflowManagerSystem) { +class Main private[cromwell](managerSystem: WorkflowManagerSystem) { private[cromwell] val initLoggingReturnCode = initLogging() lazy val Log = LoggerFactory.getLogger("cromwell") Monitor.start() - def this() = this(enableTermination = true, managerSystem = () => new WorkflowManagerSystem {}) + def this() = this(managerSystem = new WorkflowManagerSystem {}) // CromwellServer still doesn't clean up... so => Any - def runAction(args: Seq[String]): Any = { - getAction(args.headOption) match { + def runAction(args: Seq[String]): Int = { + Main.getAction(args) match { case Some(x) if x == Actions.Validate => validate(args.tail) case Some(x) if x == Actions.Highlight => highlight(args.tail) case Some(x) if x == Actions.Inputs => inputs(args.tail) case Some(x) if x == Actions.Run => run(args.tail) case Some(x) if x == Actions.Parse => parse(args.tail) - case Some(x) if x == Actions.Server => CromwellServer + case Some(x) if x == Actions.Server => runServer(args.tail) case _ => usageAndExit() } } def validate(args: Seq[String]): Int = { continueIf(args.length == 1) { - loadWdl(args.head) { _ => exit(0) } + loadWdl(args.head) { _ => 0 } } } @@ -105,7 +112,7 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo loadWdl(args.head) { namespace => val formatter = new SyntaxFormatter(if (args(1) == "html") HtmlSyntaxHighlighter else AnsiSyntaxHighlighter) println(formatter.format(namespace)) - exit(0) + 0 } } } @@ -118,11 +125,15 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo case x: NamespaceWithWorkflow => println(x.workflow.inputs.toJson.prettyPrint) case _ => println("WDL does not have a local workflow") } - exit(0) + 0 } } } + def runServer(args: Seq[String]): Int = { + continueIf(args.isEmpty)(waitAndExit(CromwellServer.run(), CromwellServer)) + } + /* Begin .run() method and utilities */ val WdlLabel = "WDL file" @@ -147,7 +158,7 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo case Success(workflowSourceFiles) => runWorkflow(workflowSourceFiles, metadataPath) case Failure(ex) => Console.err.println(ex.getMessage) - exit(1) + 1 } } } @@ -176,23 +187,27 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo } private[this] def runWorkflow(workflowSourceFiles: WorkflowSourceFiles, metadataPath: Option[Path]): Int = { - val workflowManagerSystem = managerSystem() + val workflowManagerSystem = managerSystem val runnerProps = SingleWorkflowRunnerActor.props(workflowSourceFiles, metadataPath, workflowManagerSystem.workflowManagerActor) val runner = workflowManagerSystem.actorSystem.actorOf(runnerProps, "SingleWorkflowRunnerActor") val promise = Promise[Any]() - workflowManagerSystem.actorSystem.actorOf(Props(classOf[PromiseWorkflowActor], promise, runner)) - val futureResult = promise.future + val promiseActor = workflowManagerSystem.actorSystem.actorOf(Props(classOf[PromiseActor], promise)) + runner.tell(RunWorkflow, promiseActor) + waitAndExit(promise.future, workflowManagerSystem) + } + + private[this] def waitAndExit(futureResult: Future[Any], workflowManagerSystem: WorkflowManagerSystem): Int = { Await.ready(futureResult, Duration.Inf) - if (enableTermination) workflowManagerSystem.actorSystem.shutdown() + workflowManagerSystem.shutdownActorSystem() futureResult.value.get match { - case Success(_) => exit(0) + case Success(_) => 0 case Failure(e) => Console.err.println(e.getMessage) - exit(1) + 1 } } @@ -260,7 +275,7 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo def parse(args: Seq[String]): Int = { continueIf(args.length == 1) { println(AstTools.getAst(new JFile(args.head)).toPrettyString) - exit(0) + 0 } } @@ -314,7 +329,7 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo | will output colorized text to the terminal | """.stripMargin) - exit(-1) + -1 } private[this] def initLogging(): Int = { @@ -337,33 +352,18 @@ class Main private[cromwell](enableTermination: Boolean, managerSystem: () => Wo case e: Throwable => Console.err.println(s"Could not create log directory: $logRoot") e.printStackTrace() - exit(1) + 1 } } private[this] def continueIf(valid: => Boolean)(block: => Int): Int = if (valid) block else usageAndExit() - private[this] def getAction(firstArg: Option[String]): Option[Actions.Value] = for { - arg <- firstArg - argCapitalized = arg.capitalize - a <- Actions.values find (_.toString == argCapitalized) - } yield a - private[this] def loadWdl(path: String)(f: WdlNamespace => Int): Int = { Try(WdlNamespace.load(new JFile(path))) match { case Success(namespace) => f(namespace) case Failure(t) => println(t.getMessage) - exit(1) - } - } - - private[this] def exit(returnCode: Int): Int = { - if (enableTermination) { - // $COVERAGE-OFF$Exit not allowed during tests - sys.exit(returnCode) - // $COVERAGE-ON$ + 1 } - returnCode } } diff --git a/src/main/scala/cromwell/server/CromwellServer.scala b/src/main/scala/cromwell/server/CromwellServer.scala index b0a790db1e7..8a825f6fd6f 100644 --- a/src/main/scala/cromwell/server/CromwellServer.scala +++ b/src/main/scala/cromwell/server/CromwellServer.scala @@ -5,7 +5,9 @@ import com.typesafe.config.ConfigFactory import cromwell.webservice.CromwellApiServiceActor import lenthall.spray.SprayCanHttpService._ +import scala.concurrent.Future import scala.concurrent.duration._ +import scala.util.{Failure, Success} // Note that as per the language specification, this is instantiated lazily and only used when necessary (i.e. server mode) object CromwellServer extends WorkflowManagerSystem { @@ -15,7 +17,25 @@ object CromwellServer extends WorkflowManagerSystem { val conf = ConfigFactory.load() val service = actorSystem.actorOf(CromwellApiServiceActor.props(workflowManagerActor, conf), "cromwell-service") val webserviceConf = conf.getConfig("webservice") - service.bindOrShutdown(interface = webserviceConf.getString("interface"), port = webserviceConf.getInt("port")) onSuccess { - case _ => actorSystem.log.info("Cromwell service started...") + + def run(): Future[Any] = { + val interface = webserviceConf.getString("interface") + val port = webserviceConf.getInt("port") + val futureBind = service.bind(interface = interface, port = port) + futureBind andThen { + case Success(_) => + actorSystem.log.info("Cromwell service started...") + actorSystem.awaitTermination() + case Failure(throwable) => + /* + TODO: + If/when CromwellServer behaves like a better async citizen, we may be less paranoid about our async log messages + not appearing due to the actor system shutdown. For now, synchronously print to the stderr so that the user has + some idea of why the server failed to start up. + */ + Console.err.println(s"Binding failed interface $interface port $port") + throwable.printStackTrace(Console.err) + shutdownActorSystem() + } } } diff --git a/src/main/scala/cromwell/server/WorkflowManagerSystem.scala b/src/main/scala/cromwell/server/WorkflowManagerSystem.scala index f2f9a32b258..b1e5b1b3f4c 100644 --- a/src/main/scala/cromwell/server/WorkflowManagerSystem.scala +++ b/src/main/scala/cromwell/server/WorkflowManagerSystem.scala @@ -2,15 +2,22 @@ package cromwell.server import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory -import cromwell.engine.backend.{CromwellBackend, Backend} +import cromwell.engine.backend.{Backend, CromwellBackend} import cromwell.engine.workflow.WorkflowManagerActor -import cromwell.engine.backend.BackendType trait WorkflowManagerSystem { protected def systemName = "cromwell-system" + protected def newActorSystem(): ActorSystem = ActorSystem(systemName) - implicit final val actorSystem = newActorSystem() + + implicit final lazy val actorSystem = newActorSystem() + + def shutdownActorSystem(): Unit = { + actorSystem.shutdown() + } + def backendType: String = ConfigFactory.load.getConfig("backend").getString("backend") + lazy val backend: Backend = CromwellBackend.initBackend(backendType, actorSystem) // For now there's only one WorkflowManagerActor so no need to dynamically name it lazy val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props(backend), "WorkflowManagerActor") diff --git a/src/test/scala/cromwell/CromwellTestkitSpec.scala b/src/test/scala/cromwell/CromwellTestkitSpec.scala index f96a1b16325..35936f910fb 100644 --- a/src/test/scala/cromwell/CromwellTestkitSpec.scala +++ b/src/test/scala/cromwell/CromwellTestkitSpec.scala @@ -57,6 +57,25 @@ object CromwellTestkitSpec { override protected def newActorSystem() = ActorSystem(systemName, ConfigFactory.parseString(CromwellTestkitSpec.ConfigText)) override val backendType = "local" backend // Force initialization + /** + * Do NOT shut down the test actor system inside the normal flow. + * The actor system will be externally shutdown outside the block. + */ + override def shutdownActorSystem() = {} + + def shutdownTestActorSystem() = super.shutdownActorSystem() + } + + /** + * Loans a test actor system. NOTE: This should be run OUTSIDE of a wait block, never within one. + */ + def withTestWorkflowManagerSystem[T](block: WorkflowManagerSystem => T): T = { + val testWorkflowManagerSystem = new CromwellTestkitSpec.TestWorkflowManagerSystem + try { + block(testWorkflowManagerSystem) + } finally { + TestKit.shutdownActorSystem(testWorkflowManagerSystem.actorSystem, timeoutDuration) + } } /** diff --git a/src/test/scala/cromwell/MainSpec.scala b/src/test/scala/cromwell/MainSpec.scala index e8364079e85..921fe0f32df 100644 --- a/src/test/scala/cromwell/MainSpec.scala +++ b/src/test/scala/cromwell/MainSpec.scala @@ -4,7 +4,7 @@ import java.io.{ByteArrayOutputStream, OutputStream} import java.nio.file.Path import java.text.SimpleDateFormat import java.util.Date -import akka.testkit.TestKit + import akka.util.Timeout import better.files._ import cromwell.util.FileUtil._ @@ -14,6 +14,7 @@ import org.apache.commons.io.output.TeeOutputStream import org.scalatest.concurrent.TimeLimitedTests import org.scalatest.time.Span import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import scala.concurrent.duration._ import scala.language.postfixOps @@ -371,12 +372,13 @@ object MainSpec { * @return The return code of run. */ def traceInfoRun(args: String*)(pattern: String): Int = { - val workflowManagerSystem = new CromwellTestkitSpec.TestWorkflowManagerSystem - waitForInfo(pattern)( - printBlock("run", args) { - new Main(enableTermination = false, () => workflowManagerSystem).run(args) - } - )(workflowManagerSystem.actorSystem) + withTestWorkflowManagerSystem { workflowManagerSystem => + waitForInfo(pattern)( + printBlock("run", args) { + new Main(workflowManagerSystem).run(args) + } + )(workflowManagerSystem.actorSystem) + } } /** @@ -388,17 +390,13 @@ object MainSpec { * @return The return code of run. */ def traceErrorWithExceptionRun(args: String*)(pattern: String, throwableClass: Class[_ <: Throwable] = classOf[Throwable]): Int = { - val workflowManagerSystem = new TestWorkflowManagerSystem - val result = waitForErrorWithException(pattern, throwableClass = throwableClass)( - printBlock("run", args) { - // Explicitly disable shutting down the actor system from within Main, there's a race condition to deliver - // log messages to the TestKit filter before the system is torn down. Wait until we see the message we - // want, then shutdown the system ourselves. - new Main(enableTermination = false, () => workflowManagerSystem).run(args) - } - )(workflowManagerSystem.actorSystem) - TestKit.shutdownActorSystem(workflowManagerSystem.actorSystem, timeoutDuration) - result + withTestWorkflowManagerSystem { workflowManagerSystem => + waitForErrorWithException(pattern, throwableClass = throwableClass)( + printBlock("run", args) { + new Main(workflowManagerSystem).run(args) + } + )(workflowManagerSystem.actorSystem) + } } /** @@ -409,14 +407,15 @@ object MainSpec { * @return The return code of run. */ def traceInfoAction(args: String*)(pattern: String): Int = { - val workflowManagerSystem = new CromwellTestkitSpec.TestWorkflowManagerSystem - waitForInfo(pattern)( - printBlock("runAction", args) { - new Main(enableTermination = false, () => workflowManagerSystem).runAction(args) match { - case status: Int => status + withTestWorkflowManagerSystem { workflowManagerSystem => + waitForInfo(pattern)( + printBlock("runAction", args) { + new Main(workflowManagerSystem).runAction(args) match { + case status: Int => status + } } - } - )(workflowManagerSystem.actorSystem) + )(workflowManagerSystem.actorSystem) + } } /** @@ -427,16 +426,18 @@ object MainSpec { * @return return code plus Console.out/Console.err during the block. */ def traceMain(block: Main => Int): TraceResult = { - val outStream = TeeStream(Console.out) - val errStream = TeeStream(Console.err) - val status = { - Console.withOut(outStream.teed) { - Console.withErr(errStream.teed) { - block(new Main(enableTermination = false, () => new CromwellTestkitSpec.TestWorkflowManagerSystem)) + withTestWorkflowManagerSystem { workflowManagerSystem => + val outStream = TeeStream(Console.out) + val errStream = TeeStream(Console.err) + val status = { + Console.withOut(outStream.teed) { + Console.withErr(errStream.teed) { + block(new Main(workflowManagerSystem)) + } } } + TraceResult(status, outStream.captured, errStream.captured) } - TraceResult(status, outStream.captured, errStream.captured) } /** diff --git a/src/test/scala/cromwell/engine/backend/jes/JesBackendSpec.scala b/src/test/scala/cromwell/engine/backend/jes/JesBackendSpec.scala index 4b1f6456fcd..8af06064d33 100644 --- a/src/test/scala/cromwell/engine/backend/jes/JesBackendSpec.scala +++ b/src/test/scala/cromwell/engine/backend/jes/JesBackendSpec.scala @@ -15,18 +15,22 @@ import cromwell.engine.backend.jes.authentication._ import cromwell.engine.io.gcs.{GoogleConfiguration, Refresh, ServiceAccountMode, SimpleClientSecrets} import cromwell.engine.workflow.{CallKey, WorkflowOptions} import cromwell.util.EncryptionSpec -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.specs2.mock.Mockito import scala.util.Success -object JesBackendSpec { - val ActorSystem = new CromwellTestkitSpec.TestWorkflowManagerSystem().actorSystem -} +class JesBackendSpec extends FlatSpec with Matchers with Mockito with BeforeAndAfterAll { + val testWorkflowManagerSystem = new CromwellTestkitSpec.TestWorkflowManagerSystem() + val actorSystem = testWorkflowManagerSystem.actorSystem + + override protected def afterAll() = { + testWorkflowManagerSystem.shutdownTestActorSystem() + super.afterAll() + } -class JesBackendSpec extends FlatSpec with Matchers with Mockito { val clientSecrets = SimpleClientSecrets("id", "secrets") - val jesBackend = new JesBackend(JesBackendSpec.ActorSystem) { + val jesBackend = new JesBackend(actorSystem) { private val anyString = "" private val anyURL: URL = null override lazy val jesConf = new JesAttributes( @@ -55,7 +59,7 @@ class JesBackendSpec extends FlatSpec with Matchers with Mockito { gcsFileKey -> gcsFileVal ) - val mappedInputs: CallInputs = new JesBackend(JesBackendSpec.ActorSystem).adjustInputPaths(ignoredCall, emptyRuntimeAttributes, inputs, mock[WorkflowDescriptor]) + val mappedInputs: CallInputs = new JesBackend(actorSystem).adjustInputPaths(ignoredCall, emptyRuntimeAttributes, inputs, mock[WorkflowDescriptor]) mappedInputs.get(stringKey).get match { case WdlString(v) => assert(v.equalsIgnoreCase(stringVal.value)) diff --git a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala index 7b050223709..c9ded80282c 100644 --- a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala +++ b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala @@ -27,8 +27,9 @@ import org.scalactic.StringNormalizations._ import org.scalatest.concurrent.ScalaFutures import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.time.{Millis, Seconds, Span} -import org.scalatest.{FlatSpec, Matchers} import org.scalatest.PartialFunctionValues._ +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + import scala.concurrent.{ExecutionContext, Future} import Hashing._ @@ -37,12 +38,17 @@ object SlickDataAccessSpec { val AllowTrue = Seq(webservice.QueryParameter("allow", "true")) } -class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { +class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll { import TableDrivenPropertyChecks._ val workflowManagerSystem = new TestWorkflowManagerSystem + override protected def afterAll() = { + workflowManagerSystem.shutdownTestActorSystem() + super.afterAll() + } + implicit val ec = ExecutionContext.global implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis)) diff --git a/src/test/scala/cromwell/logging/WorkflowLoggerSpec.scala b/src/test/scala/cromwell/logging/WorkflowLoggerSpec.scala index 6800dd7f875..c7e7311b3ae 100644 --- a/src/test/scala/cromwell/logging/WorkflowLoggerSpec.scala +++ b/src/test/scala/cromwell/logging/WorkflowLoggerSpec.scala @@ -7,11 +7,16 @@ import wdl4s.values.WdlValue import cromwell.engine.backend.local.{LocalBackend, LocalBackendCall} import cromwell.engine.workflow.CallKey import cromwell.engine.{AbortRegistrationFunction, WorkflowDescriptor, WorkflowId, WorkflowSourceFiles} -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -class WorkflowLoggerSpec extends FlatSpec with Matchers { +class WorkflowLoggerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val testWorkflowManagerSystem = new CromwellTestkitSpec.TestWorkflowManagerSystem() + override protected def afterAll() = { + testWorkflowManagerSystem.shutdownTestActorSystem() + super.afterAll() + } + val descriptor = WorkflowDescriptor( WorkflowId(UUID.fromString("fc6cfad9-65e9-4eb7-853f-7e08c1c8cf8e")), WorkflowSourceFiles( diff --git a/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala b/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala index a8e447b2dfa..15101feebdf 100644 --- a/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala +++ b/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala @@ -17,6 +17,11 @@ class CromwellApiServiceIntegrationSpec extends FlatSpec with CromwellApiService override val workflowManager = TestActorRef(new WorkflowManagerActor(new LocalBackend(actorRefFactory))) val version = "v1" + override protected def afterAll() = { + testWorkflowManagerSystem.shutdownTestActorSystem() + super.afterAll() + } + it should "return 400 for a malformed WDL " in { Post(s"/workflows/$version", FormData(Seq("wdlSource" -> CromwellApiServiceSpec.MalformedWdl, "workflowInputs" -> HelloWorld.rawInputs.toJson.toString()))) ~> submitRoute ~> From 8543c8cfe5ac1d6e1ae0f79421edaf7aed7e23ab Mon Sep 17 00:00:00 2001 From: Thibault Jeandet Date: Wed, 23 Dec 2015 15:48:44 -0500 Subject: [PATCH 06/31] Copy workflow outputs using final calls. --- README.md | 13 +- build.sbt | 1 + .../scala/cromwell/engine/CallActor.scala | 3 - .../cromwell/engine/CopyWorkflowOutputs.scala | 19 ++ .../scala/cromwell/engine/FinalCall.scala | 37 ++ .../cromwell/engine/WorkflowDescriptor.scala | 99 +++++- .../engine/backend/jes/JesBackend.scala | 2 +- .../backend/jes/JesEngineFunctions.scala | 8 +- .../backend/local/LocalEngineFunctions.scala | 4 +- .../backend/local/SharedFileSystem.scala | 7 +- .../engine/db/slick/SlickDataAccess.scala | 3 +- .../engine/io/gcs/GcsFileAttributes.scala | 16 + .../engine/io/gcs/GcsFileSystem.scala | 59 ++++ .../engine/io/gcs/GcsFileSystemProvider.scala | 136 ++++++++ .../engine/io/gcs/GoogleCloudStorage.scala | 22 +- .../cromwell/engine/io/gcs/NioGcsPath.scala | 158 +++++++++ .../shared/SharedFileSystemIoInterface.scala | 3 +- src/main/scala/cromwell/engine/package.scala | 26 +- .../engine/workflow/ExecutionStoreKey.scala | 6 +- .../engine/workflow/WorkflowActor.scala | 65 +++- .../engine/workflow/WorkflowOptions.scala | 4 +- src/main/scala/cromwell/util/PathUtil.scala | 11 - src/main/scala/cromwell/util/TryUtil.scala | 6 +- .../cromwell/CopyWorkflowOutputsSpec.scala | 47 +++ .../scala/cromwell/CromwellTestkitSpec.scala | 3 +- .../engine/WorkflowDescriptorSpec.scala | 32 +- .../engine/io/gcs/NioGcsPathSpec.scala | 318 ++++++++++++++++++ src/test/scala/cromwell/util/SampleWdl.scala | 69 ++++ 28 files changed, 1104 insertions(+), 73 deletions(-) create mode 100644 src/main/scala/cromwell/engine/CopyWorkflowOutputs.scala create mode 100644 src/main/scala/cromwell/engine/FinalCall.scala create mode 100644 src/main/scala/cromwell/engine/io/gcs/GcsFileAttributes.scala create mode 100644 src/main/scala/cromwell/engine/io/gcs/GcsFileSystem.scala create mode 100644 src/main/scala/cromwell/engine/io/gcs/GcsFileSystemProvider.scala create mode 100644 src/main/scala/cromwell/engine/io/gcs/NioGcsPath.scala delete mode 100644 src/main/scala/cromwell/util/PathUtil.scala create mode 100644 src/test/scala/cromwell/CopyWorkflowOutputsSpec.scala create mode 100644 src/test/scala/cromwell/engine/io/gcs/NioGcsPathSpec.scala diff --git a/README.md b/README.md index 9ebaff91610..c558c149151 100644 --- a/README.md +++ b/README.md @@ -221,16 +221,6 @@ Only a few workflow options are available currently and are all to be used with $ java -jar cromwell.jar run my_jes_wf.wdl my_jes_wf.json wf_options.json ``` -Where `wf_options.json` would contain: - -``` -{ - "jes_gcs_root": "gs://my-bucket/workflows", - "google_project": "my_google_project", - "refresh_token": "1/Fjf8gfJr5fdfNf9dk26fdn23FDm4x" -} -``` - The fourth, optional parameter to the 'run' subcommand is a path where the workflow metadata will be written. By default, no workflow metadata will be written. ``` @@ -955,7 +945,6 @@ Example workflow options file: ```json { - "default_backend": "jes", "jes_gcs_root": "gs://my-bucket/workflows", "google_project": "my_google_project", "refresh_token": "1/Fjf8gfJr5fdfNf9dk26fdn23FDm4x" @@ -964,9 +953,9 @@ Example workflow options file: Valid keys and their meanings: -* **default_backend** - Backend to use to run this workflow. Accepts values `jes`, `local`, or `sge`. * **write_to_cache** - Accepts values `true` or `false`. If `false`, the completed calls from this workflow will not be added to the cache. See the [Call Caching](#call-caching) section for more details. * **read_from_cache** - Accepts values `true` or `false`. If `false`, Cromwell will not search the cache when invoking a call (i.e. every call will be executed unconditionally). See the [Call Caching](#call-caching) section for more details. +* **outputs_path** - Specifies a path where final workflow outputs will be written. If this is not specified, workflow outputs will not be copied out of the Cromwell workflow execution directory/path. * **jes_gcs_root** - (JES backend only) Specifies where outputs of the workflow will be written. Expects this to be a GCS URL (e.g. `gs://my-bucket/workflows`). If this is not set, this defaults to the value within `backend.jes.baseExecutionBucket` in the [configuration](#configuring-cromwell). * **google_project** - (JES backend only) Specifies which google project to execute this workflow. * **refresh_token** - (JES backend only) Only used if `localizeWithRefreshToken` is specified in the [configuration file](#configuring-cromwell). See the [Data Localization](#data-localization) section below for more details. diff --git a/build.sbt b/build.sbt index c1501ebe3ac..584d7717089 100644 --- a/build.sbt +++ b/build.sbt @@ -59,6 +59,7 @@ libraryDependencies ++= Seq( "com.google.api-client" % "google-api-client-java6" % googleClientApiV, "com.google.api-client" % "google-api-client-jackson2" % googleClientApiV, "com.google.oauth-client" % "google-oauth-client" % googleClientApiV, + "com.google.cloud.bigdataoss" % "gcsio" % "1.4.3", "mysql" % "mysql-connector-java" % "5.1.36", "org.scalaz" % "scalaz-core_2.11" % "7.1.3", "com.github.pathikrit" %% "better-files" % "2.13.0", diff --git a/src/main/scala/cromwell/engine/CallActor.scala b/src/main/scala/cromwell/engine/CallActor.scala index 42a6c091ac8..d9ffb6c008c 100644 --- a/src/main/scala/cromwell/engine/CallActor.scala +++ b/src/main/scala/cromwell/engine/CallActor.scala @@ -9,7 +9,6 @@ import wdl4s.values.WdlValue import cromwell.engine.CallActor.{CallActorData, CallActorState} import cromwell.engine.CallExecutionActor.CallExecutionActorMessage import cromwell.engine.backend._ -import cromwell.engine.db.slick.Execution import cromwell.engine.workflow.{CallKey, WorkflowActor} import cromwell.instrumentation.Instrumentation.Monitor import cromwell.logging.WorkflowLogger @@ -18,8 +17,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.{Try, Success, Failure} - object CallActor { diff --git a/src/main/scala/cromwell/engine/CopyWorkflowOutputs.scala b/src/main/scala/cromwell/engine/CopyWorkflowOutputs.scala new file mode 100644 index 00000000000..10c17040a31 --- /dev/null +++ b/src/main/scala/cromwell/engine/CopyWorkflowOutputs.scala @@ -0,0 +1,19 @@ +package cromwell.engine + + +import wdl4s.Workflow + +import scala.concurrent.{ExecutionContext, Future} + +object CopyWorkflowOutputs { + val Name: LocallyQualifiedName = "$final_call$copy_workflow_outputs" +} + +/** + * Final call implementation that copies workflow outputs to a specified destination. + */ +case class CopyWorkflowOutputs(workflow: WorkflowDescriptor) extends FinalCall { + override def unqualifiedName = CopyWorkflowOutputs.Name + override def rootWorkflow: Workflow = workflow.namespace.workflow + override def execute(implicit ec: ExecutionContext): Future[Unit] = workflow.copyWorkflowOutputs +} diff --git a/src/main/scala/cromwell/engine/FinalCall.scala b/src/main/scala/cromwell/engine/FinalCall.scala new file mode 100644 index 00000000000..14566e24156 --- /dev/null +++ b/src/main/scala/cromwell/engine/FinalCall.scala @@ -0,0 +1,37 @@ +package cromwell.engine + +import cromwell.engine.workflow.{ExecutionStoreKey, FinalCallKey} +import wdl4s.Scope + +import scala.concurrent.{ExecutionContext, Future} + +object FinalCall { + implicit class FinalCallString(val fqn: FullyQualifiedName) extends AnyVal { + /** Does this FQN conform to a final call? */ + def isFinalCall = fqn startsWith "$final_call" + /** Convert this FQN to a `FinalCall` or throw if it can't be converted. */ + def toStoreKey(workflow: WorkflowDescriptor): ExecutionStoreKey = { + fqn match { + case CopyWorkflowOutputs.Name => FinalCallKey(CopyWorkflowOutputs(workflow)) + case _ => throw new IllegalArgumentException(s"Unrecognized call key $fqn") + } + } + } +} + +/** Scope representing a "final call" that is inserted after all other workflow executions. */ +trait FinalCall extends Scope { + def execute(implicit ec: ExecutionContext): Future[Unit] + + def prerequisiteCallNames: Set[LocallyQualifiedName] = throw new UnsupportedOperationException("prerequisiteCallNames not supported for FinalCalls") + + /** + * Note: This implementation makes final calls dependent on all "real" scopes in the workflow. For a system with + * at most one final call this should be fine, but if there are > 1 final calls should they all be made runnable + * at the same time (consuming multiple slots in the thread pool with potentially time consuming work) or run + * serially? This implementation starts all FinalCalls at the same time. + */ + def prerequisiteScopes: Set[Scope] = rootWorkflow.children.toSet + + val parent: Option[Scope] = None +} diff --git a/src/main/scala/cromwell/engine/WorkflowDescriptor.scala b/src/main/scala/cromwell/engine/WorkflowDescriptor.scala index 957e59d3e13..d53c21e8992 100644 --- a/src/main/scala/cromwell/engine/WorkflowDescriptor.scala +++ b/src/main/scala/cromwell/engine/WorkflowDescriptor.scala @@ -1,26 +1,33 @@ package cromwell.engine -import java.nio.file.{Path, Paths} +import java.nio.file.{FileAlreadyExistsException, Files, Path, Paths} import ch.qos.logback.classic.encoder.PatternLayoutEncoder import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.classic.{Level, LoggerContext} import ch.qos.logback.core.FileAppender import com.typesafe.config.{Config, ConfigFactory} -import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes -import wdl4s._ -import wdl4s.values.WdlFile -import cromwell.engine.backend.{BackendType, CromwellBackend, Backend} import cromwell.engine.backend.jes.JesBackend -import cromwell.engine.io.{IoInterface, IoManager} -import cromwell.engine.io.gcs.GoogleCloudStorage +import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes +import cromwell.engine.backend.{Backend, BackendType, CromwellBackend} +import cromwell.engine.db.DataAccess._ +import cromwell.engine.io.gcs.{GcsFileSystem, GoogleCloudStorage} import cromwell.engine.io.shared.SharedFileSystemIoInterface +import cromwell.engine.io.{IoInterface, IoManager} import cromwell.engine.workflow.WorkflowOptions +import cromwell.logging.WorkflowLogger +import cromwell.util.TryUtil import lenthall.config.ScalaConfig._ import org.slf4j.helpers.NOPLogger import org.slf4j.{Logger, LoggerFactory} import spray.json.{JsObject, _} +import wdl4s._ +import wdl4s.values.{WdlFile, WdlSingleFile} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} +import scala.language.postfixOps import scala.util.{Failure, Success, Try} import scalaz.Scalaz._ @@ -44,10 +51,30 @@ case class WorkflowDescriptor(id: WorkflowId, val name = namespace.workflow.unqualifiedName val actualInputs: WorkflowCoercedInputs = coercedInputs ++ declarations val props = sys.props + val relativeWorkflowRootPath = s"$name/$id" + private val log = WorkflowLogger("WorkflowDescriptor", this) + val workflowOutputsPath = workflowOptions.get("outputs_path") recover { case e: IllegalArgumentException => + log.warn("outputs_path expected to be of type String", e) + throw e + } lazy val fileHasher: FileHasher = { wdlFile: WdlFile => SymbolHash(ioManager.hash(wdlFile.value)) } - private lazy val optionCacheWriting = workflowOptions.getBoolean("write_to_cache") getOrElse configCallCaching - private lazy val optionCacheReading = workflowOptions.getBoolean("read_from_cache") getOrElse configCallCaching + // GCS FS with the workflow working directory as root + val gcsFilesystem = for { + interface <- gcsInterface + fs <- Try(GcsFileSystem.instance(interface, wfContext.root)) + } yield fs + + // GCS FS with the workflow outputs directory as root + val gcsOutputsFilesystem = for { + root <- workflowOutputsPath + interface <- gcsInterface + fs <- Try(GcsFileSystem.instance(interface, root)) + } yield fs + + + private lazy val optionCacheWriting = workflowOptions getBoolean "write_to_cache" getOrElse configCallCaching + private lazy val optionCacheReading = workflowOptions getBoolean "read_from_cache" getOrElse configCallCaching if (!configCallCaching) { if (optionCacheWriting) logWriteDisabled() @@ -66,6 +93,58 @@ case class WorkflowDescriptor(id: WorkflowId, case _ => NOPLogger.NOP_LOGGER } + def copyWorkflowOutputs(implicit executionContext: ExecutionContext): Future[Unit] = { + // Try to copy outputs to final destination + workflowOutputsPath map copyOutputFiles getOrElse Future.successful(()) + } + + private def copyOutputFiles(destDirectory: String)(implicit executionContext: ExecutionContext): Future[Unit] = { + import PathString._ + val logger = backend.workflowLogger(this) + + def copyFile(file: WdlFile): Try[Unit] = { + val src = file.valueString.toPath(workflowLogger, gcsFilesystem) + val wfPath = wfContext.root.toPath(workflowLogger, gcsFilesystem).toAbsolutePath + val relativeFilePath = src.subpath(wfPath.getNameCount, src.getNameCount) + val dest = destDirectory.toPath(workflowLogger, gcsOutputsFilesystem).resolve(relativeWorkflowRootPath).resolve(relativeFilePath) + + def copy(): Unit = { + logger.info(s"Trying to copy output file $src to $dest") + Files.createDirectories(dest.getParent) + Files.copy(src, dest) + } + + TryUtil.retryBlock( + fn = (retries: Option[Unit]) => copy(), + retryLimit = Option(5), + pollingInterval = 5 seconds, + pollingBackOffFactor = 1, + maxPollingInterval = 10 seconds, + logger = logger, + failMessage = Option(s"Failed to copy file $src to $dest"), + fatalExceptions = Seq(classOf[FileAlreadyExistsException]) + ) recover { + case _: FileAlreadyExistsException => logger.info(s"Tried to copy the same file multiple times. Skipping subsequent copies for $src") + } + } + + def processOutputs(outputs: Traversable[SymbolStoreEntry]): Unit = { + // All outputs should have wdl values at this point, if they don't there's nothing we can do here + val copies = TryUtil.sequence(outputs map { o => Try(o.wdlValue.get) } toSeq) match { + case Success(wdlValues) => wdlValues flatMap { _ collectAsSeq { case f: WdlSingleFile => f } } map copyFile + case Failure(e) => throw new Throwable(s"Unable to resolve the following workflow outputs for workflow $id: ${e.getMessage}") + } + + // Throw an exception if one or more of the copies failed. + TryUtil.sequence(copies) match { + case Success(_) => () + case Failure(e) => throw new Throwable(s"Output copy failed for the following files:\n ${e.getMessage}") + } + } + + globalDataAccess.getWorkflowOutputs(id) map processOutputs + } + private def makeFileLogger(root: Path, name: String, level: Level): Logger = { val ctx = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] val encoder = new PatternLayoutEncoder() @@ -229,4 +308,4 @@ object WorkflowDescriptor { value <- config.getBooleanOption(key) } yield value) getOrElse default } -} \ No newline at end of file +} diff --git a/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala b/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala index 919fc40af5a..cd99d5e5744 100644 --- a/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala +++ b/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala @@ -2,7 +2,7 @@ package cromwell.engine.backend.jes import java.math.BigInteger import java.net.SocketTimeoutException -import java.nio.file.{Path, Paths} +import java.nio.file.{Files, Path, Paths} import akka.actor.ActorSystem import com.google.api.services.genomics.model.Parameter diff --git a/src/main/scala/cromwell/engine/backend/jes/JesEngineFunctions.scala b/src/main/scala/cromwell/engine/backend/jes/JesEngineFunctions.scala index 5fb88f7f5b2..d57db35ed3b 100644 --- a/src/main/scala/cromwell/engine/backend/jes/JesEngineFunctions.scala +++ b/src/main/scala/cromwell/engine/backend/jes/JesEngineFunctions.scala @@ -1,12 +1,12 @@ package cromwell.engine.backend.jes +import cromwell.engine.Hashing._ +import cromwell.engine.PathString._ +import cromwell.engine._ import cromwell.engine.io.IoInterface import wdl4s.values.WdlValue -import cromwell.engine._ -import cromwell.util.PathUtil._ -import cromwell.engine.Hashing._ -import scala.util.Try +import scala.util.Try class JesWorkflowEngineFunctions(interface: IoInterface, context: WorkflowContext) extends WorkflowEngineFunctions(interface, context) { override def globPath(glob: String) = s"${context.root}/glob-${glob.md5Sum}/" diff --git a/src/main/scala/cromwell/engine/backend/local/LocalEngineFunctions.scala b/src/main/scala/cromwell/engine/backend/local/LocalEngineFunctions.scala index da1a1c1f1ff..4fd518166dd 100644 --- a/src/main/scala/cromwell/engine/backend/local/LocalEngineFunctions.scala +++ b/src/main/scala/cromwell/engine/backend/local/LocalEngineFunctions.scala @@ -3,7 +3,7 @@ package cromwell.engine.backend.local import java.nio.file.Paths import wdl4s.values.WdlValue import cromwell.engine.io.IoInterface -import cromwell.engine.{CallContext, CallEngineFunctions, WorkflowContext, WorkflowEngineFunctions} +import cromwell.engine.{CallContext, CallEngineFunctions, WorkflowContext, WorkflowEngineFunctions, _} import scala.language.postfixOps import scala.util.Try @@ -11,7 +11,7 @@ import scala.util.Try class LocalWorkflowEngineFunctions(interface: IoInterface, context: WorkflowContext) extends WorkflowEngineFunctions(interface, context) class LocalCallEngineFunctions(interface: IoInterface, context: CallContext) extends LocalWorkflowEngineFunctions(interface ,context) with CallEngineFunctions { - import cromwell.util.PathUtil._ + import PathString._ override def fileContentsToString(path: String) = { if (!Paths.get(path).isAbsolute && !path.isUriWithProtocol) diff --git a/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala b/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala index 16b04ef5b60..e9c0d5561ed 100644 --- a/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala +++ b/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala @@ -6,7 +6,7 @@ import java.nio.file.{Files, Path, Paths} import better.files.{File => ScalaFile, _} import com.typesafe.config.ConfigFactory import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes -import wdl4s._ +import wdl4s.{Call, TaskOutput} import wdl4s.types.{WdlArrayType, WdlFileType, WdlMapType} import wdl4s.values.{WdlValue, _} import cromwell.engine.ExecutionIndex.ExecutionIndex @@ -14,7 +14,6 @@ import cromwell.engine.backend.{CallLogs, LocalFileSystemBackendCall, _} import cromwell.engine.io.IoInterface import cromwell.engine.io.gcs.{GcsPath, GoogleCloudStorage} import cromwell.engine.workflow.{CallKey, WorkflowOptions} -import cromwell.engine.{WorkflowContext, WorkflowDescriptor, WorkflowEngineFunctions, WorkflowId, _} import cromwell.engine._ import cromwell.util.TryUtil import org.apache.commons.io.FileUtils @@ -48,7 +47,7 @@ object SharedFileSystem { }).+:(localizeFromGcs _) private def localizeFromGcs(originalPath: String, executionPath: Path, descriptor: WorkflowDescriptor): Try[Unit] = Try { - import cromwell.util.PathUtil._ + import PathString._ assert(originalPath.isGcsUrl) val content = descriptor.gcsInterface.get.downloadObject(GcsPath(originalPath)) new ScalaFile(executionPath).createIfNotExists().write(content) @@ -175,7 +174,7 @@ trait SharedFileSystem { runtimeAttributes: CromwellRuntimeAttributes, inputs: CallInputs, workflowDescriptor: WorkflowDescriptor): CallInputs = { - import cromwell.util.PathUtil._ + import PathString._ val strategies = if (runtimeAttributes.docker.isDefined) DockerLocalizers else Localizers diff --git a/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala b/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala index fbd1b071679..ed3420ead5f 100644 --- a/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala +++ b/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala @@ -18,7 +18,7 @@ import cromwell.engine.backend.local.LocalBackend import cromwell.engine.backend.sge.SgeBackend import cromwell.engine.backend.{Backend, WorkflowQueryResult} import cromwell.engine.db._ -import cromwell.engine.workflow.{CallKey, ExecutionStoreKey, OutputKey, ScatterKey} +import cromwell.engine.workflow._ import cromwell.engine.{SymbolHash, CallOutput, WorkflowOutputs} import cromwell.webservice.{CallCachingParameters, WorkflowQueryParameters, WorkflowQueryResponse} import lenthall.config.ScalaConfig._ @@ -187,6 +187,7 @@ class SlickDataAccess(databaseConfig: Config) extends DataAccess { val scopeKeys: Traversable[ExecutionStoreKey] = scopes collect { case call: Call => CallKey(call, None) case scatter: Scatter => ScatterKey(scatter, None) + case finalCall: FinalCall => FinalCallKey(finalCall) } val action = for { diff --git a/src/main/scala/cromwell/engine/io/gcs/GcsFileAttributes.scala b/src/main/scala/cromwell/engine/io/gcs/GcsFileAttributes.scala new file mode 100644 index 00000000000..fa34aaffd19 --- /dev/null +++ b/src/main/scala/cromwell/engine/io/gcs/GcsFileAttributes.scala @@ -0,0 +1,16 @@ +package cromwell.engine.io.gcs + +import java.nio.file.Path +import java.nio.file.attribute.{FileTime, BasicFileAttributes} + +class GcsFileAttributes(path: Path) extends BasicFileAttributes { + override def fileKey(): AnyRef = throw new NotImplementedError("To be implemented when/if needed") + override def isRegularFile: Boolean = throw new NotImplementedError("To be implemented when/if needed") + override def isOther: Boolean = throw new NotImplementedError("To be implemented when/if needed") + override def lastModifiedTime(): FileTime = throw new NotImplementedError("To be implemented when/if needed") + override def size(): Long = throw new NotImplementedError("To be implemented when/if needed") + override def isDirectory: Boolean = false + override def isSymbolicLink: Boolean = false + override def creationTime(): FileTime = throw new NotImplementedError("To be implemented when/if needed") + override def lastAccessTime(): FileTime = throw new NotImplementedError("To be implemented when/if needed") +} diff --git a/src/main/scala/cromwell/engine/io/gcs/GcsFileSystem.scala b/src/main/scala/cromwell/engine/io/gcs/GcsFileSystem.scala new file mode 100644 index 00000000000..561c8d8433b --- /dev/null +++ b/src/main/scala/cromwell/engine/io/gcs/GcsFileSystem.scala @@ -0,0 +1,59 @@ +package cromwell.engine.io.gcs + +import java.lang.Iterable +import java.nio.file._ +import java.nio.file.attribute.UserPrincipalLookupService +import java.nio.file.spi.FileSystemProvider +import java.util.{Collections, Set => JSet} + +import cromwell.engine.PathString + +import scala.concurrent.ExecutionContext + +object GcsFileSystem { + import PathString._ + def instance(interface: GoogleCloudStorage, root: String)(implicit executionContext: ExecutionContext) = { + if (root.isGcsUrl) new GcsFileSystem(GcsFileSystemProvider.instance(interface), root) + else throw new IllegalArgumentException(s"$root is not am absolute GCS path") + } + val Separator = "/" + private [io] val Protocol = "gs://" + private val GsUriRegex = s"""$Protocol(.*)""".r + private val AttributeViews = Collections.singleton("basic") +} + +/** + * Implements the java.nio.FileSystem interface for GoogleCloudStorage. + * + */ +class GcsFileSystem private (gcsFileSystemProvider: GcsFileSystemProvider, gcsRoot: String) extends FileSystem { + + import GcsFileSystem._ + + val root = gcsRoot match { + case GsUriRegex(chunks) => new NioGcsPath(chunks.split(Separator), true)(this) + case _ => throw new InvalidPathException(gcsRoot, s"Root of GCS file system must be an absolute GCS path: $gcsRoot") + } + + override def supportedFileAttributeViews(): JSet[String] = AttributeViews + override def getSeparator: String = Separator + override def getRootDirectories: Iterable[Path] = Collections.singleton(root) + override def newWatchService(): WatchService = throw new NotImplementedError("GCS FS does not support Watch Service at this time") + override def getFileStores: Iterable[FileStore] = Collections.emptyList() + override def isReadOnly: Boolean = false + override def provider(): FileSystemProvider = gcsFileSystemProvider + override def isOpen: Boolean = true + override def close(): Unit = throw new UnsupportedOperationException("GCS FS cannot be closed") + override def getPathMatcher(syntaxAndPattern: String): PathMatcher = FileSystems.getDefault.getPathMatcher(syntaxAndPattern) + override def getUserPrincipalLookupService: UserPrincipalLookupService = throw new UnsupportedOperationException() + override def getPath(first: String, more: String*): Path = { + first match { + case GsUriRegex(chunks) => + val path = new NioGcsPath(chunks.split(Separator) ++ more.toArray[String], true)(this) + if (path.startsWith(root)) path + else throw new InvalidPathException(first, s"Path $path has a different root than this filesystem: $root") + case empty if empty.isEmpty => new NioGcsPath(Array.empty[String] ++ more.toArray[String], false)(this) + case _ => new NioGcsPath(first.split(Separator) ++ more.toArray[String], false)(this) + } + } +} diff --git a/src/main/scala/cromwell/engine/io/gcs/GcsFileSystemProvider.scala b/src/main/scala/cromwell/engine/io/gcs/GcsFileSystemProvider.scala new file mode 100644 index 00000000000..fa612fd35cb --- /dev/null +++ b/src/main/scala/cromwell/engine/io/gcs/GcsFileSystemProvider.scala @@ -0,0 +1,136 @@ +package cromwell.engine.io.gcs + +import java.io.{FileNotFoundException, OutputStream} +import java.net.URI +import java.nio.channels.{Channels, SeekableByteChannel} +import java.nio.file.DirectoryStream.Filter +import java.nio.file._ +import java.nio.file.attribute.{BasicFileAttributes, FileAttribute, FileAttributeView} +import java.nio.file.spi.FileSystemProvider +import java.util +import java.util.Collections +import java.util.concurrent.{AbstractExecutorService, TimeUnit} + +import com.google.api.services.storage.model.StorageObject +import com.google.cloud.hadoop.gcsio.{GoogleCloudStorageReadChannel, GoogleCloudStorageWriteChannel, ObjectWriteConditions} +import com.google.cloud.hadoop.util.{ApiErrorExtractor, AsyncWriteChannelOptions, ClientRequestHelper} + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} + +object GcsFileSystemProvider { + def instance(gcsInterface: GoogleCloudStorage)(implicit executionContext: ExecutionContext) = new GcsFileSystemProvider(gcsInterface, executionContext) +} + +/** + * Converts a Scala ExecutionContext to a Java ExecutorService. + * https://groups.google.com/forum/#!topic/scala-user/ZyHrfzD7eX8 + */ +object ExecutionContextExecutorServiceBridge { + def apply(ec: ExecutionContext): ExecutionContextExecutorService = ec match { + case null => throw new Throwable("Execution context cannot be null") + case eces: ExecutionContextExecutorService => eces + case executionContext => new AbstractExecutorService with ExecutionContextExecutorService { + override def prepare(): ExecutionContext = executionContext + override def isShutdown = false + override def isTerminated = false + override def shutdown() = () + override def shutdownNow() = Collections.emptyList[Runnable] + override def execute(runnable: Runnable): Unit = executionContext execute runnable + override def reportFailure(t: Throwable): Unit = executionContext reportFailure t + override def awaitTermination(length: Long,unit: TimeUnit): Boolean = false + } + } +} + +/** + * Implements java.nio.FileSystemProvider for GoogleCloudStorage + * This implementation is not complete and mostly a proof of concept that it's possible to *copy* around files from/to local/gcs. + * Copying is the only functionality that has been successfully tested (same and cross filesystems). + * @param googleCloudStorage must be properly set up (credentials) according to the context. Might be absorbed by this class eventually. + * @param executionContext executionContext, will be used to perform async writes to GCS after being converted to a Java execution service + */ +class GcsFileSystemProvider private (googleCloudStorage: GoogleCloudStorage, executionContext: ExecutionContext) extends FileSystemProvider { + + private val executionService = ExecutionContextExecutorServiceBridge(executionContext) + private val errorExtractor = new ApiErrorExtractor() + + private def checkExists(path: Path) = { + if (!googleCloudStorage.exists(path.toString)) throw new FileNotFoundException(path.toString) + } + + /** + * Note: options and attributes are not honored. + */ + override def newByteChannel(path: Path, options: util.Set[_ <: OpenOption], attrs: FileAttribute[_]*): SeekableByteChannel = { + path match { + case gcsPath: NioGcsPath => + new GoogleCloudStorageReadChannel(googleCloudStorage.client, + gcsPath.bucket, + gcsPath.objectName, + errorExtractor, + new ClientRequestHelper[StorageObject]() + ) + case _ => throw new UnsupportedOperationException("Only Gcs paths are supported.") + } + } + + /** + * Overrides the default implementation to provide a writable channel (which newByteChannel doesn't). + * NOTE: options are not honored. + */ + override def newOutputStream(path: Path, options: OpenOption*): OutputStream = { + def initializeOutputStream(gcsPath: NioGcsPath) = { + val channel = new GoogleCloudStorageWriteChannel( + executionService, + googleCloudStorage.client, + new ClientRequestHelper[StorageObject](), + gcsPath.bucket, + gcsPath.objectName, + AsyncWriteChannelOptions.newBuilder().build(), + new ObjectWriteConditions(), + Map.empty[String, String].asJava, + "text/plain") + channel.initialize() + Channels.newOutputStream(channel) + } + + path match { + case gcsPath: NioGcsPath => initializeOutputStream(gcsPath) + case _ => throw new UnsupportedOperationException("Only Gcs paths are supported.") + } + } + + override def copy(source: Path, target: Path, options: CopyOption*): Unit = { + (source, target) match { + case (s: NioGcsPath, d: NioGcsPath) => googleCloudStorage.copy(s, d) + case _ => throw new UnsupportedOperationException(s"Can only copy from GCS to GCS: $source or $target is not a GCS path") + } + } + + override def delete(path: Path): Unit = { + path match { + case gcs: GcsPath => googleCloudStorage.deleteObject(gcs) + } + } + + override def readAttributes[A <: BasicFileAttributes](path: Path, `type`: Class[A], options: LinkOption*): A = { + checkExists(path) + new GcsFileAttributes(path).asInstanceOf[A] + } + + override def move(source: Path, target: Path, options: CopyOption*): Unit = throw new NotImplementedError() + override def checkAccess(path: Path, modes: AccessMode*): Unit = {checkExists(path)} + override def createDirectory(dir: Path, attrs: FileAttribute[_]*): Unit = {} + override def getFileSystem(uri: URI): FileSystem = throw new UnsupportedOperationException() + override def isHidden(path: Path): Boolean = throw new NotImplementedError() + override def newDirectoryStream(dir: Path, filter: Filter[_ >: Path]): DirectoryStream[Path] = throw new NotImplementedError() + override def setAttribute(path: Path, attribute: String, value: scala.Any, options: LinkOption*): Unit = throw new NotImplementedError() + override def getPath(uri: URI): Path = throw new NotImplementedError() + override def newFileSystem(uri: URI, env: util.Map[String, _]): FileSystem = throw new NotImplementedError() + override def readAttributes(path: Path, attributes: String, options: LinkOption*): util.Map[String, AnyRef] = throw new NotImplementedError() + override def isSameFile(path: Path, path2: Path): Boolean = throw new NotImplementedError() + override def getFileAttributeView[V <: FileAttributeView](path: Path, `type`: Class[V], options: LinkOption*): V = throw new NotImplementedError() + override def getFileStore(path: Path): FileStore = throw new NotImplementedError() + override def getScheme: String = GcsFileSystem.Protocol +} diff --git a/src/main/scala/cromwell/engine/io/gcs/GoogleCloudStorage.scala b/src/main/scala/cromwell/engine/io/gcs/GoogleCloudStorage.scala index d487078b40d..0eae69fc131 100644 --- a/src/main/scala/cromwell/engine/io/gcs/GoogleCloudStorage.scala +++ b/src/main/scala/cromwell/engine/io/gcs/GoogleCloudStorage.scala @@ -11,12 +11,12 @@ import com.google.api.client.util.DateTime import com.google.api.services.storage.Storage import com.google.api.services.storage.model.Bucket.Owner import com.google.api.services.storage.model.{Bucket, StorageObject} +import cromwell.engine.PathString import cromwell.engine.io.IoInterface import cromwell.engine.workflow.WorkflowOptions import cromwell.util.TryUtil import cromwell.util.google.GoogleCredentialFactory - import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps @@ -52,7 +52,7 @@ object GoogleCloudStorage { case class GoogleCloudStorage private(client: Storage) extends IoInterface { import GcsPath._ - import cromwell.util.PathUtil._ + import PathString._ def isValidPath(path: String) = path.isGcsUrl @@ -120,6 +120,18 @@ case class GoogleCloudStorage private(client: Storage) extends IoInterface { client.objects.copy(from.bucket, from.objectName, to.bucket, to.objectName, storageObject).execute } + /** + * Copy file from one GCS path to another + * + * @param from - source GCS path (must exist and point to a file) + * @param to - destination GCS path + * @return a Try[StorageObject] which is a result of the call to Storage.Objects.copy() + */ + def copy(from: NioGcsPath, to: NioGcsPath): Try[StorageObject] = Try { + val storageObject = client.objects.get(from.bucket, from.objectName).execute + client.objects.copy(from.bucket, from.objectName, to.bucket, to.objectName, storageObject).execute + } + /** * Copy files with prefix `from` to files with prefix `to` * @@ -187,9 +199,9 @@ case class GoogleCloudStorage private(client: Storage) extends IoInterface { insertObject.execute() } - def deleteObject(gcsPath: GcsPath): Unit = { - client.objects.delete(gcsPath.bucket, gcsPath.objectName).execute() - } + def deleteObject(gcsPath: GcsPath): Unit = client.objects.delete(gcsPath.bucket, gcsPath.objectName).execute() + + def deleteObject(gcsPath: NioGcsPath): Unit = client.objects.delete(gcsPath.bucket, gcsPath.objectName).execute() def downloadObject(gcsPath: GcsPath): Array[Byte] = { val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream() diff --git a/src/main/scala/cromwell/engine/io/gcs/NioGcsPath.scala b/src/main/scala/cromwell/engine/io/gcs/NioGcsPath.scala new file mode 100644 index 00000000000..7dd698b6814 --- /dev/null +++ b/src/main/scala/cromwell/engine/io/gcs/NioGcsPath.scala @@ -0,0 +1,158 @@ +package cromwell.engine.io.gcs + +import java.io.File +import java.net.URI +import java.nio.file.WatchEvent.{Kind, Modifier} +import java.nio.file._ +import java.util + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions +import scala.language.postfixOps +import scala.util.Try + +object NioGcsPath { + def apply(path: String)(implicit gcsFileSystem: GcsFileSystem)= gcsFileSystem.getPath(path) + + implicit class PathEnhanced(val path: Path) extends AnyVal { + def asGcsPath(implicit gcsFileSystem: GcsFileSystem) = path match { + case gcsPath: NioGcsPath => gcsPath + case otherPath: Path => gcsFileSystem.getPath(otherPath.toString).asInstanceOf[NioGcsPath] + case _ => throw new IllegalArgumentException("Only GcsPaths are supported.") + } + } + + val Protocol = GcsFileSystem.Protocol +} + +/** + * NOTE: Currently called NioGcsPath so it can exist alongside the current GcsPath class. + * If this approach was to be validated the current GcsPath class would be replaced by this one. + * This class proposes an implementation of the java.nio.Path interface for GoogleCloudStorage. + * The following methods are yet to be implemented: + * relativize + * compareTo + * @param chunks array containing all parts of the path in between separators - except the protocol (gs://) + * eg: gs://path/to/resource.txt -> chunks = [path, to, resource.txt] + * @param absolute true if this path is to be considered absolute. + * Only absolute GCS paths can be used to actually locate resources. + * Calling methods on an absolute path can return a relative paths (eg subpath). + * @param gcsFileSystem the gcsFileSystem to be used when performing operations on this path + */ +class NioGcsPath(private val chunks: Array[String], absolute: Boolean)(implicit gcsFileSystem: GcsFileSystem) extends Path { + import NioGcsPath._ + + private val separator = GcsFileSystem.Separator + + private val objectChunks = chunks match { + case values if isAbsolute && values.nonEmpty => values.tail + case _ => chunks + } + + private val fullPath = chunksToString(chunks) + + lazy val bucket: String = chunks match { + case values if values.isEmpty && isAbsolute => throw new IllegalStateException("An absolute gcs path cannot be empty") + case _ => if(isAbsolute) chunks.head else gcsFileSystem.root.asGcsPath.bucket + } + + val objectName = chunksToString(objectChunks) + + private def chunksToString(chunksArray: Array[String]): String = chunksArray.mkString(separator) + + override def subpath(beginIndex: Int, endIndex: Int): Path = { + new NioGcsPath(chunks.slice(beginIndex, endIndex), isAbsolute && beginIndex == 0) + } + + override def toFile: File = throw new UnsupportedOperationException("A GCS path cannot be converted to a File.") + + override def resolveSibling(other: Path): Path = new NioGcsPath(getParent.asGcsPath.chunks ++ other.asGcsPath.chunks, isAbsolute) + + override def resolveSibling(other: String): Path = new NioGcsPath(getParent.asGcsPath.chunks ++ gcsFileSystem.getPath(other).asGcsPath.chunks, isAbsolute) + + override def getFileSystem: FileSystem = gcsFileSystem + + override def getName(index: Int): Path = new NioGcsPath(Array(chunks(index)), isAbsolute && index == 0) + + override def getParent: Path = chunks match { + case values if values.isEmpty || values.length == 1 => null + case values => new NioGcsPath(values.init, isAbsolute) + } + + override def toAbsolutePath: Path = if (isAbsolute) this else gcsFileSystem.root.resolve(this) + + override def relativize(other: Path): Path = throw new NotImplementedError() + + override def getNameCount: Int = chunks.length + + override def toUri: URI = throw new UnsupportedOperationException() + + override def compareTo(other: Path): Int = throw new NotImplementedError() + + override def register(watcher: WatchService, events: Array[Kind[_]], modifiers: Modifier*): WatchKey = throw new UnsupportedOperationException() + + override def register(watcher: WatchService, events: Kind[_]*): WatchKey = throw new UnsupportedOperationException() + + override def getFileName: Path = chunks match { + case values if values.isEmpty => null + case _ => new NioGcsPath(Array(chunks.last), isAbsolute && chunks.length == 1) + } + + override def getRoot: Path = new NioGcsPath(Array(bucket), true) + + override def iterator(): util.Iterator[Path] = (chunks map { elt => new NioGcsPath(Array(elt), false).asInstanceOf[Path] } iterator).asJava + + override def normalize(): Path = if (isAbsolute) this else throw new UnsupportedOperationException("Cannot normalize a relative GCS path.") + + override def endsWith(other: Path): Boolean = { + other match { + case rel: NioGcsPath if !isAbsolute && rel.isAbsolute => false + case _: NioGcsPath => chunks.endsWith(other.asGcsPath.chunks) + case _ => false + } + } + + override def endsWith(other: String): Boolean = { + Try(gcsFileSystem.getPath(other)) map { + case rel: NioGcsPath if !isAbsolute && rel.isAbsolute => false + case path@(_: NioGcsPath) => chunks.endsWith(path.asGcsPath.chunks) + case _ => false + } getOrElse false + } + + override def resolve(other: Path): Path = { + if (other.isAbsolute) other + else new NioGcsPath(chunks ++ other.asGcsPath.chunks, isAbsolute) + } + + override def resolve(other: String): Path = { + val otherPath = gcsFileSystem.getPath(other) + if (otherPath.isAbsolute) otherPath + else new NioGcsPath(chunks ++ otherPath.asGcsPath.chunks, isAbsolute) + } + + override def toRealPath(options: LinkOption*): Path = this + + override def startsWith(other: Path): Boolean = { + other match { + case rel: NioGcsPath if !isAbsolute && rel.isAbsolute => false + case _: NioGcsPath => chunks.startsWith(other.asGcsPath.chunks) + case _ => false + } + } + + override def startsWith(other: String): Boolean = { + Try(gcsFileSystem.getPath(other)) map { + case rel: NioGcsPath if !isAbsolute && rel.isAbsolute => false + case path@(_: NioGcsPath) => chunks.startsWith(path.asGcsPath.chunks) + case _ => false + } getOrElse false + } + + override def toString: String = { + if (absolute) s"$Protocol$fullPath" + else fullPath + } + + override def isAbsolute: Boolean = absolute +} diff --git a/src/main/scala/cromwell/engine/io/shared/SharedFileSystemIoInterface.scala b/src/main/scala/cromwell/engine/io/shared/SharedFileSystemIoInterface.scala index a571382f734..d0b142378df 100644 --- a/src/main/scala/cromwell/engine/io/shared/SharedFileSystemIoInterface.scala +++ b/src/main/scala/cromwell/engine/io/shared/SharedFileSystemIoInterface.scala @@ -2,6 +2,7 @@ package cromwell.engine.io.shared import java.nio.file.{Files, Paths} +import cromwell.engine.PathString import cromwell.engine.io.IoInterface import scala.language.postfixOps @@ -13,8 +14,8 @@ object SharedFileSystemIoInterface { class SharedFileSystemIoInterface private() extends IoInterface { + import PathString._ import better.files._ - import cromwell.util.PathUtil._ override def readFile(path: String): String = Paths.get(path).contentAsString diff --git a/src/main/scala/cromwell/engine/package.scala b/src/main/scala/cromwell/engine/package.scala index 8a6a85e560f..3e3e41d42ae 100644 --- a/src/main/scala/cromwell/engine/package.scala +++ b/src/main/scala/cromwell/engine/package.scala @@ -1,10 +1,15 @@ package cromwell -import wdl4s._ +import java.nio.file.{Path, Paths} + +import cromwell.engine.io.gcs.GcsFileSystem import org.joda.time.DateTime -import wdl4s.values.{WdlValue, WdlFile} +import org.slf4j.Logger +import wdl4s._ +import wdl4s.values.{WdlFile, WdlValue} import scala.language.implicitConversions +import scala.util.{Failure, Try} import scalaz.ValidationNel package object engine { @@ -52,4 +57,21 @@ package object engine { case (k, CallOutput(wdlValue, hash)) => (k, wdlValue) } } + + object PathString { + implicit class UriString(val str: String) extends AnyVal { + def isGcsUrl: Boolean = str.startsWith("gs://") + + def isUriWithProtocol: Boolean = "^[a-z]+://".r.findFirstIn(str).nonEmpty + + def toPath(workflowLogger: Logger, gcsFileSystem: Try[GcsFileSystem] = Failure(new Throwable("No GCS Filesystem"))): Path = { + str match { + case path if path.isGcsUrl && gcsFileSystem.isSuccess => gcsFileSystem.get.getPath(str) + case path if path.isGcsUrl => throw new Throwable(s"Unable to parse GCS path $path: ${gcsFileSystem.failed.get.getMessage}") + case path if !path.isUriWithProtocol => Paths.get(path) + case path => throw new Throwable(s"Unable to parse $path") + } + } + } + } } diff --git a/src/main/scala/cromwell/engine/workflow/ExecutionStoreKey.scala b/src/main/scala/cromwell/engine/workflow/ExecutionStoreKey.scala index fe025b10caa..9363d170717 100644 --- a/src/main/scala/cromwell/engine/workflow/ExecutionStoreKey.scala +++ b/src/main/scala/cromwell/engine/workflow/ExecutionStoreKey.scala @@ -1,7 +1,7 @@ package cromwell.engine.workflow import wdl4s._ -import cromwell.engine.ExecutionStatus +import cromwell.engine.{FinalCall, ExecutionStatus} import cromwell.engine.workflow.WorkflowActor.ExecutionStore import scala.language.postfixOps @@ -46,3 +46,7 @@ case class ScatterKey(scope: Scatter, index: Option[Int]) extends ExecutionStore } } } + +case class FinalCallKey(scope: FinalCall) extends OutputKey { + override def index: Option[Int] = None +} diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index cefeca919f7..3e9205c1582 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -223,8 +223,10 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) def createWorkflow(inputs: HostInputs): Future[Unit] = { val symbolStoreEntries = buildSymbolStoreEntries(workflow, inputs) symbolCache = symbolStoreEntries.groupBy(entry => SymbolCacheKey(entry.scope, entry.isInput)) + // Currently assumes there is at most one possible final call, a `CopyWorkflowOutputs`. + val finalCall = workflow.workflowOutputsPath.toOption map { _ => CopyWorkflowOutputs(workflow) } globalDataAccess.createWorkflow( - workflow, symbolStoreEntries, workflow.namespace.workflow.children, backend) + workflow, symbolStoreEntries, workflow.namespace.workflow.children ++ finalCall, backend) } // This is passed as an implicit parameter to methods of classes in the companion object. @@ -408,11 +410,31 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) } } + private def handleFinalCallStarted(finalCallKey: FinalCallKey): State = { + executionStore += finalCallKey -> ExecutionStatus.Running + val finalCallWork = for { + _ <- persistStatus(finalCallKey, ExecutionStatus.Running) + _ <- finalCallKey.scope.execute + _ = self ! CallCompleted(finalCallKey, callOutputs = Map.empty, executionEvents = Seq.empty, returnCode = 0, hash = None, resultsClonedFrom = None) + } yield () + + finalCallWork onFailure { + case t => + log.error("Final call work failed", t) + scheduleTransition(WorkflowFailed) + } + + val updatedData = stateData.addPersisting(finalCallKey, ExecutionStatus.Running) + stay() using updatedData + } + when(WorkflowRunning) { case Event(StartRunnableCalls, data) => val updatedData = startRunnableCalls(data) if (isWorkflowDone) scheduleTransition(WorkflowSucceeded) stay() using updatedData + case Event(CallStarted(finalCallKey: FinalCallKey), _) => + handleFinalCallStarted(finalCallKey) case Event(CallStarted(callKey), data) if !data.isPending(callKey) => executionStore += callKey -> ExecutionStatus.Running persistStatus(callKey, ExecutionStatus.Running) @@ -421,6 +443,12 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) case Event(message: CallStarted, _) => resendDueToPendingExecutionWrites(message) stay() + case Event(message @ CallCompleted(finalCallKey: FinalCallKey, _, _, _, _, _), data) if !data.isPersistedRunning(finalCallKey) => + // Unlike "real" calls which are managed by a CallActor, the final call completion message is triggered + // internally and there isn't an automatic retry if the message isn't processed. So if the message can't be + // processed due to pending writes, schedule it to be resent here. + resendDueToPendingExecutionWrites(message) + stay() case Event(message @ CallCompleted(callKey, outputs, executionEvents, returnCode, hash, resultsClonedFrom), data) if data.isPersistedRunning(callKey) => handleCallCompleted(callKey, outputs, executionEvents, returnCode, message, hash, resultsClonedFrom, data) case Event(message @ CallCompleted(collectorKey: CollectorKey, outputs, executionEvents, returnCode, hash, resultsClonedFrom), data) if !data.isPending(collectorKey) => @@ -489,6 +517,12 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) } when(WorkflowAborting) { + case Event(message @ CallCompleted(finalCallKey: FinalCallKey, _, _, _, _, _), data) if !data.isPersistedRunning(finalCallKey) => + // Unlike "real" calls which are managed by a CallActor, the final call completion message is triggered + // internally and there isn't an automatic retry if the message isn't processed. So if the message can't be + // processed due to pending writes, schedule it to be resent here. + resendDueToPendingExecutionWrites(message) + stay() case Event(message @ CallCompleted(callKey, outputs, executionEvents, returnCode, hash, resultsClonedFrom), data) if data.isPersistedRunning(callKey) => handleCallCompleted(callKey, outputs, executionEvents, returnCode, message, hash, resultsClonedFrom, data) case Event(message @ CallCompleted(collectorKey: CollectorKey, outputs, executionEvents, returnCode, hash, resultsClonedFrom), data) if !data.isPending(collectorKey) => @@ -688,9 +722,7 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) status == ExecutionStatus.NotStarted && arePrerequisitesDone(key) } - def findRunnableEntries: Traversable[ExecutionStoreEntry] = executionStore filter isRunnable - - val runnableEntries = findRunnableEntries + val runnableEntries = executionStore filter isRunnable val runnableCalls = runnableEntries collect { case(k: CallKey, v) => k.scope } if (runnableCalls.nonEmpty) @@ -700,6 +732,7 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) case (k: ScatterKey, _) => processRunnableScatter(k) case (k: CollectorKey, _) => processRunnableCollector(k) case (k: CallKey, _) => processRunnableCall(k) + case (k: FinalCallKey, _) => processRunnableFinalCall(k) case (k, v) => val message = s"Unknown entry in execution store:\nKEY: $k\nVALUE:$v" logger.error(message) @@ -880,15 +913,21 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) private def createCaches: Future[(ExecutionStore, SymbolCache)] = { def isInScatterBlock(c: Call) = c.ancestry.exists(_.isInstanceOf[Scatter]) + import FinalCall._ val futureExecutionCache = globalDataAccess.getExecutionStatuses(workflow.id) map { statuses => statuses map { case (k, v) => - val key: ExecutionStoreKey = (workflow.namespace.resolve(k.fqn), k.index) match { - case (Some(c: Call), Some(i)) => CallKey(c, Option(i)) - case (Some(c: Call), None) if isInScatterBlock(c) => CollectorKey(c) - case (Some(c: Call), None) => CallKey(c, None) - case (Some(s: Scatter), None) => ScatterKey(s, None) - case _ => throw new UnsupportedOperationException(s"Execution entry invalid: $k -> $v") + val key: ExecutionStoreKey = if (k.fqn.isFinalCall) { + // Final calls are not part of the workflow namespace, handle these differently from other keys. + k.fqn.toStoreKey(workflow) + } else { + (workflow.namespace.resolve(k.fqn), k.index) match { + case (Some(c: Call), Some(i)) => CallKey(c, Option(i)) + case (Some(c: Call), None) if isInScatterBlock(c) => CollectorKey(c) + case (Some(c: Call), None) => CallKey(c, None) + case (Some(s: Scatter), None) => ScatterKey(s, None) + case _ => throw new UnsupportedOperationException(s"Execution entry invalid: $k -> $v") + } } key -> v.executionStatus } @@ -988,6 +1027,12 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) Success(ExecutionStartResult(Set(StartEntry(collector, ExecutionStatus.Starting)))) } + private def processRunnableFinalCall(finalCallKey: FinalCallKey): Try[ExecutionStartResult] = { + executionStore += finalCallKey -> ExecutionStatus.Starting + persistStatus(finalCallKey, ExecutionStatus.Starting) map { _ => self ! CallStarted(finalCallKey) } + Success(ExecutionStartResult(Set(StartEntry(finalCallKey, ExecutionStatus.Starting)))) + } + private def sendStartMessage(callKey: CallKey, callInputs: Map[String, WdlValue]) = { def registerAbortFunction(abortFunction: AbortFunction): Unit = {} val backendCall = backend.bindCall(workflow, callKey, callInputs, AbortRegistrationFunction(registerAbortFunction)) diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowOptions.scala b/src/main/scala/cromwell/engine/workflow/WorkflowOptions.scala index 5f9075f3b83..88358efc907 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowOptions.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowOptions.scala @@ -100,13 +100,13 @@ case class WorkflowOptions(jsObject: JsObject) { def get(key: String): Try[String] = jsObject.fields.get(key) match { case Some(jsStr: JsString) => Success(jsStr.value) case Some(jsObj: JsObject) if isEncryptedField(jsObj) => decryptField(jsObj) - case Some(jsVal: JsValue) => Failure(new Throwable(s"Unsupported value as JsValue: $jsVal")) + case Some(jsVal: JsValue) => Failure(new IllegalArgumentException(s"Unsupported value as JsValue: $jsVal")) case None => Failure(new Throwable(s"Field not found: $key")) } def getBoolean(key: String): Try[Boolean] = jsObject.fields.get(key) match { case Some(jsBool: JsBoolean) => Success(jsBool.value) - case Some(jsVal: JsValue) => Failure(new Throwable(s"Unsupported JsValue as JsBoolean: $jsVal")) + case Some(jsVal: JsValue) => Failure(new IllegalArgumentException(s"Unsupported JsValue as JsBoolean: $jsVal")) case None => Failure(new Throwable(s"Field not found: $key")) } diff --git a/src/main/scala/cromwell/util/PathUtil.scala b/src/main/scala/cromwell/util/PathUtil.scala deleted file mode 100644 index c9e04fa9a0a..00000000000 --- a/src/main/scala/cromwell/util/PathUtil.scala +++ /dev/null @@ -1,11 +0,0 @@ -package cromwell.util - - -object PathUtil { - - implicit class UriString(val str: String) extends AnyVal { - def isGcsUrl: Boolean = str.startsWith("gs://") - def isUriWithProtocol: Boolean = "^[a-z]+://".r.findFirstIn(str).nonEmpty - } - -} diff --git a/src/main/scala/cromwell/util/TryUtil.scala b/src/main/scala/cromwell/util/TryUtil.scala index 97f72f40d65..554be3281c6 100644 --- a/src/main/scala/cromwell/util/TryUtil.scala +++ b/src/main/scala/cromwell/util/TryUtil.scala @@ -4,7 +4,7 @@ import java.io.{PrintWriter, StringWriter} import cromwell.logging.WorkflowLogger -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration, _} import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -53,7 +53,8 @@ object TryUtil { maxPollingInterval: Duration, logger: WorkflowLogger, failMessage: Option[String] = None, - priorValue: Option[T] = None): Try[T] = { + priorValue: Option[T] = None, + fatalExceptions: Seq[Class[_ <: Throwable]] = Seq.empty): Try[T] = { def logFailures(attempt: Try[T]): Unit = { attempt recover { @@ -63,6 +64,7 @@ object TryUtil { Try { fn(priorValue) } match { case Success(x) if isSuccess(x) => Success(x) + case Failure(f) if fatalExceptions.contains(f.getClass) => Failure(f) case value if (retryLimit.isDefined && retryLimit.get > 1) || retryLimit.isEmpty => logFailures(value) val retryCountMessage = if (retryLimit.getOrElse(0) > 0) s" (${retryLimit.getOrElse(0) - 1} more retries) " else "" diff --git a/src/test/scala/cromwell/CopyWorkflowOutputsSpec.scala b/src/test/scala/cromwell/CopyWorkflowOutputsSpec.scala new file mode 100644 index 00000000000..a754baf213f --- /dev/null +++ b/src/test/scala/cromwell/CopyWorkflowOutputsSpec.scala @@ -0,0 +1,47 @@ +package cromwell + +import java.nio.file.{Files, Paths} + +import akka.testkit.EventFilter +import cromwell.util.SampleWdl +import org.scalatest.prop.TableDrivenPropertyChecks._ +import org.scalatest.prop.Tables.Table + +import scala.language.postfixOps + + +class CopyWorkflowOutputsSpec extends CromwellTestkitSpec { + + "CopyWorkflowOutputs" should { + "copy workflow outputs" in { + val workflowOutputsPath = "copy-workflow-outputs" + + val tmpDir = Files.createTempDirectory(workflowOutputsPath).toAbsolutePath + + val outputs = Table( + ("call", "file"), + ("call-A", "out"), + ("call-A", "out2"), + ("call-B", "out"), + ("call-B", "out2") + ) + + val workflowId = runWdlAndAssertOutputs( + sampleWdl = SampleWdl.WorkflowOutputsWithFiles, + eventFilter = EventFilter.info(pattern = "transitioning from Running to Succeeded.", occurrences = 1), + runtime = "", + workflowOptions = s""" { "outputs_path": "$tmpDir" } """, + expectedOutputs = Seq("A.out", "A.out2", "B.outs") map { o => ("wfoutputs." + o) -> CromwellTestkitSpec.AnyValueIsFine } toMap, + allowOtherOutputs = false + ) + + forAll(outputs) { (call, file) => + val path = tmpDir.resolve(Paths.get("wfoutputs", workflowId.id.toString, call, file)) + Files.exists(path) shouldBe true + } + val path = tmpDir.resolve(Paths.get("wfoutputs", workflowId.id.toString, "call-C", "out")) + Files.exists(path) shouldBe false + } + } + +} diff --git a/src/test/scala/cromwell/CromwellTestkitSpec.scala b/src/test/scala/cromwell/CromwellTestkitSpec.scala index f96a1b16325..2ad4d8cc84d 100644 --- a/src/test/scala/cromwell/CromwellTestkitSpec.scala +++ b/src/test/scala/cromwell/CromwellTestkitSpec.scala @@ -215,7 +215,7 @@ with DefaultTimeout with ImplicitSender with WordSpecLike with Matchers with Bef runtime: String = "", workflowOptions: String = "{}", allowOtherOutputs: Boolean = true, - terminalState: WorkflowState = WorkflowSucceeded): Unit = { + terminalState: WorkflowState = WorkflowSucceeded): WorkflowId = { val wma = buildWorkflowManagerActor(sampleWdl, runtime) val wfSources = sampleWdl.asWorkflowSources(runtime, workflowOptions) val submitMessage = WorkflowManagerActor.SubmitWorkflow(wfSources) @@ -238,6 +238,7 @@ with DefaultTimeout with ImplicitSender with WordSpecLike with Matchers with Bef if (expectedValue != AnyValueIsFine) validateOutput(actualValue.wdlValue, expectedValue) } } + workflowId } } } diff --git a/src/test/scala/cromwell/engine/WorkflowDescriptorSpec.scala b/src/test/scala/cromwell/engine/WorkflowDescriptorSpec.scala index e4656a01ae9..87ddff19186 100644 --- a/src/test/scala/cromwell/engine/WorkflowDescriptorSpec.scala +++ b/src/test/scala/cromwell/engine/WorkflowDescriptorSpec.scala @@ -1,11 +1,15 @@ package cromwell.engine -import com.typesafe.config.{Config, ConfigFactory} +import java.nio.file.{Files, Paths} + +import com.typesafe.config.ConfigFactory +import cromwell.util.SampleWdl import org.scalatest.prop.TableDrivenPropertyChecks._ import org.scalatest.prop.Tables.Table import org.scalatest.{FlatSpec, Matchers} import scala.collection.JavaConverters._ +import scala.util.{Failure, Success} class WorkflowDescriptorSpec extends FlatSpec with Matchers { @@ -69,4 +73,30 @@ class WorkflowDescriptorSpec extends FlatSpec with Matchers { } + it should "copy workflow outputs to their final (local) destination" in { + import scala.concurrent.ExecutionContext.Implicits.global + + val tmpDir = Files.createTempDirectory("wf-outputs").toAbsolutePath + val sources = WorkflowSourceFiles(SampleWdl.WorkflowOutputsWithFiles.wdlSource(), "{}", s"""{ "outputs_path": "$tmpDir" }""") + + val outputs = Table( + ("call", "file"), + ("call-A", "out"), + ("call-A", "out2"), + ("call-B", "out"), + ("call-B", "out2") + ) + + val descriptor = WorkflowDescriptor(WorkflowId.randomId(), sources, ConfigFactory.load) + + descriptor.copyWorkflowOutputs onComplete { + case Success(_) => + forAll(outputs) { (call, file) => + val path = tmpDir.resolve(Paths.get(descriptor.name, descriptor.id.toString, call, file)) + Files.exists(path) shouldBe true + } + case Failure(f) => fail(f) + } + } + } diff --git a/src/test/scala/cromwell/engine/io/gcs/NioGcsPathSpec.scala b/src/test/scala/cromwell/engine/io/gcs/NioGcsPathSpec.scala new file mode 100644 index 00000000000..e93b01e0ef1 --- /dev/null +++ b/src/test/scala/cromwell/engine/io/gcs/NioGcsPathSpec.scala @@ -0,0 +1,318 @@ +package cromwell.engine.io.gcs + +import java.nio.file.Path + +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.TableDrivenPropertyChecks._ +import org.scalatest.prop.Tables.Table +import org.scalatest.{FlatSpec, Matchers} + +import scala.concurrent.ExecutionContext + +class NioGcsPathSpec extends FlatSpec with Matchers with MockitoSugar { + + behavior of "NioGcsPath" + + implicit val GCSFs = GcsFileSystem.instance(mock[GoogleCloudStorage], "gs://absolute")(mock[ExecutionContext]) + + it should "implement toString" in { + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + absPath1.toString shouldBe "gs://absolute/path/to/somewhere" + relPath1.toString shouldBe "some/relative/path" + } + + it should "implement subpath" in { + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + val absSub1 = absPath1.subpath(0, 2) + absSub1.isAbsolute shouldBe true + absSub1.toString shouldBe "gs://absolute/path" + + val absSub2 = absPath1.subpath(1, 2) + absSub2.isAbsolute shouldBe false + absSub2.toString shouldBe "path" + + val relSub1 = relPath1.subpath(0, 2) + relSub1.isAbsolute shouldBe false + relSub1.toString shouldBe "some/relative" + } + + it should "implement resolveSibling" in { + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val absPath2 = new NioGcsPath(Array("absolute", "location"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + val relPath2 = new NioGcsPath(Array("another", "relative", "resource", "path"), false) + + val absSibling = absPath1.resolveSibling("somewhere else") + absSibling.isAbsolute shouldBe true + absSibling.toString shouldBe "gs://absolute/path/to/somewhere else" + + val absSiblingPath = absPath1.resolveSibling(relPath1) + absSiblingPath.isAbsolute shouldBe true + absSiblingPath.toString shouldBe "gs://absolute/path/to/some/relative/path" + + val absRel = relPath1.resolveSibling("other path") + absRel.isAbsolute shouldBe false + absRel.toString shouldBe "some/relative/other path" + + val absRelPath = relPath1.resolveSibling(relPath2) + absRelPath.isAbsolute shouldBe false + absRelPath.toString shouldBe "some/relative/another/relative/resource/path" + } + + it should "implement resolve" in { + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val absPath2 = new NioGcsPath(Array("absolute", "location"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + val relPath2 = new NioGcsPath(Array("another", "relative", "resource", "path"), false) + + val absToRel = absPath1.resolve(relPath1) + absToRel.isAbsolute shouldBe true + absToRel.toString shouldBe "gs://absolute/path/to/somewhere/some/relative/path" + + val absToAbs = absPath1.resolve(absPath2) + absToAbs.isAbsolute shouldBe true + absToAbs.toString shouldBe "gs://absolute/location" + + val relToAbs = relPath1.resolve(absPath1) + relToAbs.isAbsolute shouldBe true + relToAbs.toString shouldBe "gs://absolute/path/to/somewhere" + + val relToRel = relPath1.resolve(relPath2) + relToRel.isAbsolute shouldBe false + relToRel.toString shouldBe "some/relative/path/another/relative/resource/path" + } + + it should "implement getName" in { + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + val nameAbs1 = absPath1.getName(0) + nameAbs1.isAbsolute shouldBe true + nameAbs1.toString shouldBe "gs://absolute" + + val nameAbs2 = absPath1.getName(1) + nameAbs2.isAbsolute shouldBe false + nameAbs2.toString shouldBe "path" + + val nameRel1 = relPath1.getName(0) + nameRel1.isAbsolute shouldBe false + nameRel1.toString shouldBe "some" + + val nameRel2 = relPath1.getName(1) + nameRel2.isAbsolute shouldBe false + nameRel2.toString shouldBe "relative" + } + + it should "implement getParent" in { + val empty = new NioGcsPath(Array.empty[String], true) + val singleton = new NioGcsPath(Array("singleton"), true) + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + val parentAbs1 = absPath1.getParent + parentAbs1.isAbsolute shouldBe true + parentAbs1.toString shouldBe "gs://absolute/path/to" + + empty.getParent shouldBe null + singleton.getParent shouldBe null + + val nameRel1 = relPath1.getParent + nameRel1.isAbsolute shouldBe false + nameRel1.toString shouldBe "some/relative" + } + + it should "implement toAbsolutePath" in { + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + val abs = absPath1.toAbsolutePath + abs.isAbsolute shouldBe true + abs.toString shouldBe "gs://absolute/path/to/somewhere" + + val rel = relPath1.toAbsolutePath + rel.isAbsolute shouldBe true + rel.toString shouldBe "gs://absolute/some/relative/path" + } + + it should "implement getNameCount" in { + val empty = new NioGcsPath(Array.empty[String], true) + val singleton = new NioGcsPath(Array("singleton"), true) + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + absPath1.getNameCount shouldBe 4 + relPath1.getNameCount shouldBe 3 + empty.getNameCount shouldBe 0 + singleton.getNameCount shouldBe 1 + } + + it should "implement getFileName" in { + val empty = new NioGcsPath(Array.empty[String], true) + val singletonAbs = new NioGcsPath(Array("singleton"), true) + val singletonRel = new NioGcsPath(Array("singleton"), false) + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + val emptyFileName = empty.getFileName + emptyFileName shouldBe null + + val singletonAbsFileName = singletonAbs.getFileName + singletonAbsFileName.isAbsolute shouldBe true + singletonAbsFileName.toString shouldBe "gs://singleton" + + val singletonRelFileName = singletonRel.getFileName + singletonRelFileName.isAbsolute shouldBe false + singletonRelFileName.toString shouldBe "singleton" + + val relFileName = relPath1.getFileName + relFileName.isAbsolute shouldBe false + relFileName.toString shouldBe "path" + + val absFileName = absPath1.getFileName + absFileName.isAbsolute shouldBe false + absFileName.toString shouldBe "somewhere" + } + + it should "implement getRoot" in { + val empty = new NioGcsPath(Array.empty[String], true) + val singletonAbs = new NioGcsPath(Array("singleton"), true) + val singletonRel = new NioGcsPath(Array("singleton"), false) + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + an[IllegalStateException] shouldBe thrownBy(empty.getRoot) + + val singletonAbsFileName = singletonAbs.getRoot + singletonAbsFileName.isAbsolute shouldBe true + singletonAbsFileName.toString shouldBe "gs://singleton" + + val singletonRelFileName = singletonRel.getRoot + singletonRelFileName.isAbsolute shouldBe true + singletonRelFileName.toString shouldBe "gs://absolute" + + val relFileName = relPath1.getRoot + relFileName.isAbsolute shouldBe true + relFileName.toString shouldBe "gs://absolute" + + val absFileName = absPath1.getRoot + absFileName.isAbsolute shouldBe true + absFileName.toString shouldBe "gs://absolute" + } + + it should "implement getIterator" in { + val empty = new NioGcsPath(Array.empty[String], true) + val singletonAbs = new NioGcsPath(Array("singleton"), true) + val singletonRel = new NioGcsPath(Array("singleton"), false) + val absPath1 = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val relPath1 = new NioGcsPath(Array("some", "relative", "path"), false) + + empty.iterator().hasNext shouldBe false + + val singletonAbsIterator = singletonAbs.iterator() + val nextAbsSingleton: Path = singletonAbsIterator.next() + nextAbsSingleton.isAbsolute shouldBe false + nextAbsSingleton.toString shouldBe "singleton" + singletonAbsIterator.hasNext shouldBe false + + val singletonRelIterator = singletonRel.iterator() + val nextRelSingleton: Path = singletonRelIterator.next() + nextRelSingleton.isAbsolute shouldBe false + nextRelSingleton.toString shouldBe "singleton" + singletonRelIterator.hasNext shouldBe false + + val relIterator = relPath1.iterator() + val nextRel: Path = relIterator.next() + nextRel.isAbsolute shouldBe false + nextRel.toString shouldBe "some" + relIterator.next().toString shouldBe "relative" + relIterator.next().toString shouldBe "path" + relIterator.hasNext shouldBe false + + val absIterator = absPath1.iterator() + val absRel: Path = absIterator.next() + absRel.isAbsolute shouldBe false + absRel.toString shouldBe "absolute" + absIterator.next().toString shouldBe "path" + absIterator.next().toString shouldBe "to" + absIterator.next().toString shouldBe "somewhere" + absIterator.hasNext shouldBe false + } + + it should "implement startsWith" in { + val empty = new NioGcsPath(Array.empty[String], false) + val singletonAbs = new NioGcsPath(Array("absolute"), true) + val singletonRel = new NioGcsPath(Array("absolute"), false) + + val absPath = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val startsWithAbsPath = new NioGcsPath(Array("absolute", "path", "to"), true) + val doesntStartsWithAbsPath = new NioGcsPath(Array("absolute", "path", "to", "another", "place"), true) + val absPathStartingLikeRel = new NioGcsPath(Array("some", "relative", "path"), true) + + val relPath = new NioGcsPath(Array("some", "relative", "path"), false) + val startsWithRelPath = new NioGcsPath(Array("some", "relative"), false) + val doesntStartsWithRelPath = new NioGcsPath(Array("some", "relative", "other", "path"), false) + val relPathStartingLikeAbs = new NioGcsPath(Array("absolute", "path", "to"), false) + + val paths = Table( + ("path1", "path2", "result"), + (empty, empty, true), + (empty, absPath, false), + (singletonAbs, singletonAbs, true), + (absPath, startsWithAbsPath, true), + (absPath, doesntStartsWithAbsPath, false), + (absPath, relPathStartingLikeAbs, true), + (absPath, relPath, false), + (relPath, startsWithRelPath, true), + (relPath, doesntStartsWithRelPath, false), + (relPath, absPathStartingLikeRel, false), + (relPath, absPath, false) + ) + + forAll(paths) { (p1, p2, res) => + val startsWith: Boolean = p1.startsWith(p2) + startsWith shouldBe res + val startsWith1: Boolean = p1.startsWith(p2.toString) + startsWith1 shouldBe res + } + } + + it should "implement endsWith" in { + val empty = new NioGcsPath(Array.empty[String], false) + val singletonAbs = new NioGcsPath(Array("absolute"), true) + val singletonRel = new NioGcsPath(Array("absolute"), false) + + val absPath = new NioGcsPath(Array("absolute", "path", "to", "somewhere"), true) + val doesntEndWithAbsPath = new NioGcsPath(Array("absolute", "path", "to", "another", "place"), true) + val absPathEndingLikeRel = new NioGcsPath(Array("relative", "path"), true) + + val relPath = new NioGcsPath(Array("some", "relative", "path"), false) + val endsWithRelPath = new NioGcsPath(Array("relative", "path"), false) + val doesntStartsWithRelPath = new NioGcsPath(Array("relative", "other", "path"), false) + val relPathEndingLikeAbs = new NioGcsPath(Array("path", "to", "somewhere"), false) + + val paths = Table( + ("path1", "path2", "result"), + (empty, empty, true), + (empty, absPath, false), + (singletonAbs, singletonAbs, true), + (absPath, absPath, true), + (absPath, doesntEndWithAbsPath, false), + (absPath, relPathEndingLikeAbs, true), + (absPath, relPath, false), + (relPath, endsWithRelPath, true), + (relPath, doesntStartsWithRelPath, false), + (relPath, absPathEndingLikeRel, false), + (relPath, absPath, false) + ) + + forAll(paths) { (p1, p2, res) => + p1.endsWith(p2) shouldBe res + p1.endsWith(p2.toString) shouldBe res + } + } + +} diff --git a/src/test/scala/cromwell/util/SampleWdl.scala b/src/test/scala/cromwell/util/SampleWdl.scala index b3df1cac692..f36b55e7b1f 100644 --- a/src/test/scala/cromwell/util/SampleWdl.scala +++ b/src/test/scala/cromwell/util/SampleWdl.scala @@ -298,6 +298,75 @@ object SampleWdl { override lazy val rawInputs = Map(ThreeStep.PatternKey -> "." * 10000) } + object WorkflowOutputsWithFiles extends SampleWdl { + // ASCII art from http://www.chris.com/ascii/joan/www.geocities.com/SoHo/7373/flag.html with pipes + // replaced by exclamation points to keep stripMargin from removing the flagpole. + override def wdlSource(runtime: String = "") = + """ + task A { + command { + echo "Enfin un peu de francais pour contrer ce raz-de-marée anglais !" > out + echo "Jacques Chirac fait du jetski sur la Seine en costume traditionnel russe" > out2 + } + output { + File out = "out" + File out2 = "out2" + } + } + task B { + command { + echo "Je contre avec un bonnet peruvien et tire une carte chance" > out + echo "Kamoulox !" > out2 + } + output { + Array[File] outs = ["out", "out2"] + } + } + task C { + command { + cat > out < Date: Tue, 5 Jan 2016 18:30:10 -0500 Subject: [PATCH 07/31] Outputs decls can reference previous outputs --- .../cromwell/engine/backend/Backend.scala | 17 ++++- .../cromwell/engine/backend/BackendCall.scala | 5 +- .../engine/backend/jes/JesBackend.scala | 64 ++++++++++--------- .../backend/local/SharedFileSystem.scala | 31 +++++---- src/main/scala/cromwell/engine/package.scala | 1 - .../ReferencingPreviousInputsAndOutputs.scala | 24 +++++++ src/test/scala/cromwell/util/SampleWdl.scala | 29 +++++++++ 7 files changed, 126 insertions(+), 45 deletions(-) create mode 100644 src/test/scala/cromwell/ReferencingPreviousInputsAndOutputs.scala diff --git a/src/main/scala/cromwell/engine/backend/Backend.scala b/src/main/scala/cromwell/engine/backend/Backend.scala index d048e980a82..bfe525613de 100644 --- a/src/main/scala/cromwell/engine/backend/Backend.scala +++ b/src/main/scala/cromwell/engine/backend/Backend.scala @@ -2,7 +2,6 @@ package cromwell.engine.backend import akka.actor.ActorSystem import com.typesafe.config.Config -import cromwell.engine.{CallInputs, CallOutputs} import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes import wdl4s._ import cromwell.engine.ExecutionIndex.ExecutionIndex @@ -17,9 +16,11 @@ import cromwell.engine.{HostInputs, CallOutputs} import cromwell.logging.WorkflowLogger import cromwell.util.docker.SprayDockerRegistryApiClient import org.slf4j.LoggerFactory +import wdl4s.values.WdlValue +import scala.language.postfixOps import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try +import scala.util.{Success, Try} object Backend { class StdoutStderrException(message: String) extends RuntimeException(message) @@ -53,6 +54,18 @@ object Backend { trait JobKey +final case class AttemptedLookupResult(name: String, value: Try[WdlValue]) { + def toPair = name -> value +} + +object AttemptedLookupResult { + implicit class AugmentedAttemptedLookupSequence(s: Seq[AttemptedLookupResult]) { + def toLookupMap: Map[String, WdlValue] = s collect { + case AttemptedLookupResult(name, Success(value)) => (name, value) + } toMap + } +} + /** * Trait to be implemented by concrete backends. */ diff --git a/src/main/scala/cromwell/engine/backend/BackendCall.scala b/src/main/scala/cromwell/engine/backend/BackendCall.scala index 7556c953d52..b1ad32e3921 100644 --- a/src/main/scala/cromwell/engine/backend/BackendCall.scala +++ b/src/main/scala/cromwell/engine/backend/BackendCall.scala @@ -106,7 +106,10 @@ trait BackendCall { * expression `read_lines(my_file_var)` would have to call lookupFunction()("my_file_var") * during expression evaluation */ - def lookupFunction: String => WdlValue = WdlExpression.standardLookupFunction(locallyQualifiedInputs, key.scope.task.declarations, engineFunctions) + def lookupFunction(evaluatedValues: Map[String, WdlValue]): String => WdlValue = { + val currentlyKnownValues = locallyQualifiedInputs ++ evaluatedValues + WdlExpression.standardLookupFunction(currentlyKnownValues, key.scope.task.declarations, engineFunctions) + } /** Initiate execution, callers can invoke `poll` once this `Future` completes successfully. */ def execute(implicit ec: ExecutionContext): Future[ExecutionHandle] diff --git a/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala b/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala index cd99d5e5744..ec48cd00961 100644 --- a/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala +++ b/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala @@ -8,7 +8,8 @@ import akka.actor.ActorSystem import com.google.api.services.genomics.model.Parameter import com.typesafe.scalalogging.LazyLogging import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes -import wdl4s.{Scatter, UnsatisfiedInputsException, Call, CallInputs} +import wdl4s.CallInputs +import wdl4s._ import wdl4s.expression.NoFunctions import wdl4s.values._ import cromwell.engine.ExecutionIndex.{ExecutionIndex, IndexEnhancedInt} @@ -355,7 +356,7 @@ case class JesBackend(actorSystem: ActorSystem) def generateJesOutputs(backendCall: BackendCall): Seq[JesOutput] = { val log = workflowLoggerWithCall(backendCall) val wdlFileOutputs = backendCall.call.task.outputs flatMap { taskOutput => - taskOutput.expression.evaluateFiles(backendCall.lookupFunction, NoFunctions, taskOutput.wdlType) match { + taskOutput.expression.evaluateFiles(backendCall.lookupFunction(Map.empty), NoFunctions, taskOutput.wdlType) match { case Success(wdlFiles) => wdlFiles map gcsPathToLocal case Failure(ex) => log.warn(s"Could not evaluate $taskOutput: ${ex.getMessage}") @@ -457,8 +458,8 @@ case class JesBackend(actorSystem: ActorSystem) } } - private def customLookupFunction(backendCall: BackendCall) = { toBeLookedUp: String => - val originalLookup = backendCall.lookupFunction + private def customLookupFunction(backendCall: BackendCall, alreadyGeneratedOutputs: Map[String, WdlValue]): String => WdlValue = toBeLookedUp => { + val originalLookup = backendCall.lookupFunction(alreadyGeneratedOutputs) gcsInputToGcsOutput(backendCall, originalLookup(toBeLookedUp)) } @@ -475,31 +476,9 @@ case class JesBackend(actorSystem: ActorSystem) } def postProcess(backendCall: BackendCall): Try[CallOutputs] = { - val outputMappings = backendCall.call.task.outputs.map({ taskOutput => - /** - * This will evaluate the task output expression and coerces it to the task output's type. - * If the result is a WdlFile, then attempt to find the JesOutput with the same path and - * return a WdlFile that represents the GCS path and not the local path. For example, - * - *
-        * output {
-        *   File x = "out" + ".txt"
-        * }
-        * 
- * - * "out" + ".txt" is evaluated to WdlString("out.txt") and then coerced into a WdlFile("out.txt") - * Then, via wdlFileToGcsPath(), we attempt to find the JesOutput with .name == "out.txt". - * If it is found, then WdlFile("gs://some_bucket/out.txt") will be returned. - */ - val attemptedValue = for { - wdlValue <- taskOutput.expression.evaluate(customLookupFunction(backendCall), backendCall.engineFunctions) - coercedValue <- taskOutput.wdlType.coerceRawValue(wdlValue) - value = wdlValueToGcsPath(generateJesOutputs(backendCall))(coercedValue) - } yield value - - taskOutput.name -> attemptedValue - }).toMap - + val outputs = backendCall.call.task.outputs + val outputFoldingFunction = getOutputFoldingFunction(backendCall) + val outputMappings = outputs.foldLeft(Seq.empty[AttemptedLookupResult])(outputFoldingFunction).map(_.toPair).toMap TryUtil.sequenceMap(outputMappings) map { outputMap => outputMap mapValues { v => CallOutput(v, v.getHash(backendCall.workflowDescriptor)) @@ -507,6 +486,33 @@ case class JesBackend(actorSystem: ActorSystem) } } + private def getOutputFoldingFunction(backendCall: BackendCall): (Seq[AttemptedLookupResult], TaskOutput) => Seq[AttemptedLookupResult] = { + (currentList: Seq[AttemptedLookupResult], taskOutput: TaskOutput) => { + currentList ++ Seq(AttemptedLookupResult(taskOutput.name, outputLookup(taskOutput, backendCall, currentList))) + } + } + + private def outputLookup(taskOutput: TaskOutput, backendCall: BackendCall, currentList: Seq[AttemptedLookupResult]) = for { + /** + * This will evaluate the task output expression and coerces it to the task output's type. + * If the result is a WdlFile, then attempt to find the JesOutput with the same path and + * return a WdlFile that represents the GCS path and not the local path. For example, + * + *
+    * output {
+    *   File x = "out" + ".txt"
+    * }
+    * 
+ * + * "out" + ".txt" is evaluated to WdlString("out.txt") and then coerced into a WdlFile("out.txt") + * Then, via wdlFileToGcsPath(), we attempt to find the JesOutput with .name == "out.txt". + * If it is found, then WdlFile("gs://some_bucket/out.txt") will be returned. + */ + wdlValue <- taskOutput.expression.evaluate(customLookupFunction(backendCall, currentList.toLookupMap), backendCall.engineFunctions) + coercedValue <- taskOutput.wdlType.coerceRawValue(wdlValue) + value = wdlValueToGcsPath(generateJesOutputs(backendCall))(coercedValue) + } yield value + def executionResult(status: RunStatus, handle: JesPendingExecutionHandle)(implicit ec: ExecutionContext): Future[ExecutionHandle] = Future { val log = workflowLoggerWithCall(handle.backendCall) diff --git a/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala b/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala index e9c0d5561ed..accf3b70cee 100644 --- a/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala +++ b/src/main/scala/cromwell/engine/backend/local/SharedFileSystem.scala @@ -6,7 +6,7 @@ import java.nio.file.{Files, Path, Paths} import better.files.{File => ScalaFile, _} import com.typesafe.config.ConfigFactory import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes -import wdl4s.{Call, TaskOutput} +import wdl4s.{CallInputs, Call, TaskOutput} import wdl4s.types.{WdlArrayType, WdlFileType, WdlMapType} import wdl4s.values.{WdlValue, _} import cromwell.engine.ExecutionIndex.ExecutionIndex @@ -14,6 +14,7 @@ import cromwell.engine.backend.{CallLogs, LocalFileSystemBackendCall, _} import cromwell.engine.io.IoInterface import cromwell.engine.io.gcs.{GcsPath, GoogleCloudStorage} import cromwell.engine.workflow.{CallKey, WorkflowOptions} +import cromwell.engine.{WorkflowContext, WorkflowDescriptor, WorkflowEngineFunctions, WorkflowId} import cromwell.engine._ import cromwell.util.TryUtil import org.apache.commons.io.FileUtils @@ -24,6 +25,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps import scala.util.{Failure, Success, Try} import Hashing._ +import cromwell.engine.backend.AttemptedLookupResult object SharedFileSystem { type LocalizationStrategy = (String, Path, WorkflowDescriptor) => Try[Unit] @@ -121,19 +123,12 @@ trait SharedFileSystem { def postProcess(backendCall: LocalFileSystemBackendCall): Try[CallOutputs] = { implicit val hasher = backendCall.workflowDescriptor.fileHasher - // Evaluate output expressions, performing conversions from String -> File where required. - val outputMappings = backendCall.call.task.outputs map { taskOutput => - val tryConvertedValue = - for { - expressionValue <- taskOutput.expression.evaluate(backendCall.lookupFunction, backendCall.engineFunctions) - convertedValue <- outputAutoConversion(backendCall, taskOutput, expressionValue) - pathAdjustedValue <- Success(absolutizeOutputWdlFile(convertedValue, backendCall.callRootPath)) - } yield pathAdjustedValue - taskOutput.name -> tryConvertedValue - } - val taskOutputFailures = outputMappings filter { _._2.isFailure } + val outputs = backendCall.call.task.outputs + val outputFoldingFunction = getOutputFoldingFunction(backendCall) + val outputMappings = outputs.foldLeft(Seq.empty[AttemptedLookupResult])(outputFoldingFunction).map(_.toPair).toMap + val taskOutputFailures = outputMappings filter { _._2.isFailure } if (taskOutputFailures.isEmpty) { val unwrappedMap = outputMappings collect { case (name, Success(wdlValue)) => name -> CallOutput(wdlValue, wdlValue.getHash(backendCall.workflowDescriptor)) @@ -145,6 +140,18 @@ trait SharedFileSystem { } } + private def getOutputFoldingFunction(backendCall: LocalFileSystemBackendCall): (Seq[AttemptedLookupResult], TaskOutput) => Seq[AttemptedLookupResult] = { + (currentList: Seq[AttemptedLookupResult], taskOutput: TaskOutput) => { + currentList ++ Seq(AttemptedLookupResult(taskOutput.name, outputLookup(taskOutput, backendCall, currentList))) + } + } + + private def outputLookup(taskOutput: TaskOutput, backendCall: LocalFileSystemBackendCall, currentList: Seq[AttemptedLookupResult]) = for { + expressionValue <- taskOutput.expression.evaluate(backendCall.lookupFunction(currentList.toLookupMap), backendCall.engineFunctions) + convertedValue <- outputAutoConversion(backendCall, taskOutput, expressionValue) + pathAdjustedValue <- Success(absolutizeOutputWdlFile(convertedValue, backendCall.callRootPath)) + } yield pathAdjustedValue + def adjustOutputPaths(call: Call, outputs: CallOutputs): CallOutputs = outputs def stdoutStderr(descriptor: WorkflowDescriptor, callName: String, index: ExecutionIndex): CallLogs = { diff --git a/src/main/scala/cromwell/engine/package.scala b/src/main/scala/cromwell/engine/package.scala index 3e3e41d42ae..bf532699783 100644 --- a/src/main/scala/cromwell/engine/package.scala +++ b/src/main/scala/cromwell/engine/package.scala @@ -39,7 +39,6 @@ package object engine { type WorkflowOutputs = Map[FullyQualifiedName, CallOutput] type FullyQualifiedName = String type LocallyQualifiedName = String - type CallInputs = Map[String, WdlValue] case class CallOutput(wdlValue: WdlValue, hash: Option[SymbolHash]) type CallOutputs = Map[LocallyQualifiedName, CallOutput] type HostInputs = Map[String, WdlValue] diff --git a/src/test/scala/cromwell/ReferencingPreviousInputsAndOutputs.scala b/src/test/scala/cromwell/ReferencingPreviousInputsAndOutputs.scala new file mode 100644 index 00000000000..cfa934746b4 --- /dev/null +++ b/src/test/scala/cromwell/ReferencingPreviousInputsAndOutputs.scala @@ -0,0 +1,24 @@ +package cromwell + +import akka.testkit._ +import cromwell.util.SampleWdl +import wdl4s.values.WdlFloat + +import scala.language.postfixOps + +class ReferencingPreviousInputsAndOutputs extends CromwellTestkitSpec { + "A task with outputs which reference other outputs" should { + "run without let or hindrance" in { + runWdlAndAssertOutputs( + sampleWdl = SampleWdl.ReferencingPreviousInputsAndOutputs, + eventFilter = EventFilter.info(pattern = s"starting calls: wf.golden_pie", occurrences = 1), + expectedOutputs = Map( + "wf.golden_pie.Au" -> WdlFloat(1.6180339887), + "wf.golden_pie.doubleAu" -> WdlFloat(1.6180339887 * 2), + "wf.golden_pie.tauValue" -> WdlFloat(3.1415926 * 2), + "wf.golden_pie.goldenPie" -> WdlFloat(3.1415926 * 1.6180339887) + ) + ) + } + } +} diff --git a/src/test/scala/cromwell/util/SampleWdl.scala b/src/test/scala/cromwell/util/SampleWdl.scala index f36b55e7b1f..32d4223f70a 100644 --- a/src/test/scala/cromwell/util/SampleWdl.scala +++ b/src/test/scala/cromwell/util/SampleWdl.scala @@ -1737,4 +1737,33 @@ object SampleWdl { """.stripMargin override val rawInputs = Map.empty[String, String] } + + /** + * Inputs referencing other inputs and outputs referencing other outputs. + */ + object ReferencingPreviousInputsAndOutputs extends SampleWdl { + override def wdlSource(runtime: String = "") = + """task golden_pie { + | Float pi = 3.1415926 + | Float tau = pi + pi + | + | command { + | echo 1.6180339887 + | echo ${tau} 1>&2 + | } + | + | output { + | Float Au = read_float(stdout()) + | Float doubleAu = Au + Au + | Float tauValue = read_float(stderr()) + | Float goldenPie = Au * pi + | } + |} + | + |workflow wf { + | call golden_pie + |} + """.stripMargin + override val rawInputs = Map.empty[FullyQualifiedName, Any] + } } From de4911037d166abd1ced152df711d68191b40867 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Wed, 13 Jan 2016 05:55:29 -0500 Subject: [PATCH 08/31] warn not debug on blocked async failures --- src/main/scala/cromwell/engine/workflow/WorkflowActor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 3e9205c1582..4fce13cb561 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -609,10 +609,10 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) scheduleTransition(WorkflowFailed) stay() case Event(message @ AsyncFailure(t), _) => - // This is the unusual combination of debug + throwable logging since the expectation is that this will eventually + // This is the unusual combination of warn + throwable logging since the expectation is that this will eventually // be logged in the case above as an error, but if for some weird reason this actor never ends up in that // state we don't want to be completely blind to the cause of the AsyncFailure. - logger.debug(t.getMessage, t) + logger.warn(t.getMessage, t) resendDueToPendingExecutionWrites(message) stay() case Event(Terminate, data) if data.pendingExecutions.isEmpty && stateName.isTerminal => From 4f19751fdcde487b2687d1fc4b96f34252d81e70 Mon Sep 17 00:00:00 2001 From: Kristian Cibulskis Date: Wed, 13 Jan 2016 12:49:28 -0500 Subject: [PATCH 09/31] added stanza related to workflow option encryption --- src/main/config/cromwell.conf.ctmpl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/config/cromwell.conf.ctmpl b/src/main/config/cromwell.conf.ctmpl index 8bac5d4f191..10c95b9359a 100644 --- a/src/main/config/cromwell.conf.ctmpl +++ b/src/main/config/cromwell.conf.ctmpl @@ -13,6 +13,14 @@ backend { } } +workflow-options { + // These workflow options will be encrypted when stored in the database + encrypted-fields: ["refresh_token"] + + // AES-256 key to use to encrypt the values in `encrypted-fields` + base64-encryption-key: "{{.Data.workflow_options_encryption_key}}" +} + docker { dockerAccount = "{{.Data.docker_account}}" dockerToken = "{{.Data.docker_token}}" From 7fdf4441aea303561c6300087ecf8bd4a75b9846 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Wed, 13 Jan 2016 05:52:07 -0500 Subject: [PATCH 10/31] Fix abort. --- .../cromwell/engine/backend/BackendCall.scala | 5 ++++ .../engine/backend/jes/JesBackend.scala | 27 +++++++++---------- .../engine/workflow/WorkflowActor.scala | 6 ++--- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main/scala/cromwell/engine/backend/BackendCall.scala b/src/main/scala/cromwell/engine/backend/BackendCall.scala index b1ad32e3921..dd29c99f1a5 100644 --- a/src/main/scala/cromwell/engine/backend/BackendCall.scala +++ b/src/main/scala/cromwell/engine/backend/BackendCall.scala @@ -70,6 +70,11 @@ final case class FailedExecutionHandle(throwable: Throwable, returnCode: Option[ override val result = FailedExecution(throwable, returnCode) } +case object AbortedExecutionHandle extends ExecutionHandle { + override def isDone: Boolean = true + override def result: ExecutionResult = AbortedExecution +} + trait BackendCall { /** * The Workflow and Call to invoke. It is assumed that in the creation diff --git a/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala b/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala index ec48cd00961..a6b5349c4d1 100644 --- a/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala +++ b/src/main/scala/cromwell/engine/backend/jes/JesBackend.scala @@ -2,39 +2,36 @@ package cromwell.engine.backend.jes import java.math.BigInteger import java.net.SocketTimeoutException -import java.nio.file.{Files, Path, Paths} +import java.nio.file.{Path, Paths} import akka.actor.ActorSystem import com.google.api.services.genomics.model.Parameter import com.typesafe.scalalogging.LazyLogging -import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes -import wdl4s.CallInputs -import wdl4s._ -import wdl4s.expression.NoFunctions -import wdl4s.values._ import cromwell.engine.ExecutionIndex.{ExecutionIndex, IndexEnhancedInt} import cromwell.engine.ExecutionStatus.ExecutionStatus -import cromwell.engine.backend._ +import cromwell.engine.Hashing._ +import cromwell.engine.backend.{BackendType, _} import cromwell.engine.backend.jes.JesBackend._ import cromwell.engine.backend.jes.Run.RunStatus import cromwell.engine.backend.jes.authentication._ +import cromwell.engine.backend.runtimeattributes.CromwellRuntimeAttributes import cromwell.engine.db.DataAccess.globalDataAccess import cromwell.engine.db.ExecutionDatabaseKey import cromwell.engine.db.slick.Execution import cromwell.engine.io.IoInterface import cromwell.engine.io.gcs._ import cromwell.engine.workflow.{CallKey, WorkflowOptions} -import cromwell.engine.{AbortRegistrationFunction, _} -import cromwell.engine.{HostInputs, CallOutput, CallOutputs} +import cromwell.engine.{AbortRegistrationFunction, CallOutput, CallOutputs, HostInputs, _} import cromwell.logging.WorkflowLogger -import cromwell.engine.backend.BackendType import cromwell.util.{AggregatedException, TryUtil} +import wdl4s.{CallInputs, _} +import wdl4s.expression.NoFunctions +import wdl4s.values._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps import scala.util.{Failure, Success, Try} -import Hashing._ object JesBackend { /* @@ -539,12 +536,12 @@ case class JesBackend(actorSystem: ActorSystem) case Run.Success(events) => backendCall.hash map { h => handleSuccess(outputMappings, backendCall.workflowDescriptor, events, returnCode.get, h, handle) } case Run.Failed(errorCode, errorMessage) => - val throwable = if (errorMessage contains "Operation canceled at") { - new TaskAbortedException() + if (errorMessage contains "Operation canceled at") { + AbortedExecutionHandle.future } else { - new Throwable(s"Task ${backendCall.workflowDescriptor.id}:${backendCall.call.unqualifiedName} failed: error code $errorCode. Message: $errorMessage") + val e = new Throwable(s"Task ${backendCall.workflowDescriptor.id}:${backendCall.call.unqualifiedName} failed: error code $errorCode. Message: $errorMessage") + FailedExecutionHandle(e, Option(errorCode)).future } - FailedExecutionHandle(throwable, Option(errorCode)).future } } catch { case e: Exception => diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 4fce13cb561..7374534f4d1 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -466,6 +466,7 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) // Something funky's going on if aborts are coming through while the workflow's still running. But don't second-guess // by transitioning the whole workflow - the message is either still in the queue or this command was maybe // cancelled by some external system. + executionStore += callKey -> ExecutionStatus.Aborted persistStatusThenAck(callKey, ExecutionStatus.Aborted, sender(), message) logger.warn(s"Call ${callKey.scope.unqualifiedName} was aborted but the workflow should still be running.") val updatedData = data.addPersisting(callKey, ExecutionStatus.Aborted) @@ -529,6 +530,7 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) // Collector keys are weird internal things and never go to Running state. handleCallCompleted(collectorKey, outputs, executionEvents, returnCode, message, hash, resultsClonedFrom, data) case Event(message @ CallAborted(callKey), data) if data.isPersistedRunning(callKey) => + executionStore += callKey -> ExecutionStatus.Aborted persistStatusThenAck(callKey, ExecutionStatus.Aborted, sender(), message) val updatedData = data.addPersisting(callKey, ExecutionStatus.Aborted) if (isWorkflowAborted) scheduleTransition(WorkflowAborted) @@ -545,10 +547,6 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) if (isWorkflowAborted) scheduleTransition(WorkflowAborted) val updatedData = data.removePersisting(callKey, ExecutionStatus.Done) stay() using updatedData - case Event(m, _) => - logger.error("Unexpected message in Aborting state: " + m.getClass.getSimpleName) - if (isWorkflowAborted) scheduleTransition(WorkflowAborted) - stay() } /** From 8767cc4c4dca859427464858a9cf62030d65cd52 Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Wed, 13 Jan 2016 14:29:17 -0500 Subject: [PATCH 11/31] Robustity of workflowTiming diagrams improved --- .../workflowTimings/workflowTimings.html | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/resources/workflowTimings/workflowTimings.html b/src/main/resources/workflowTimings/workflowTimings.html index 1d7150fe83a..80e04885918 100644 --- a/src/main/resources/workflowTimings/workflowTimings.html +++ b/src/main/resources/workflowTimings/workflowTimings.html @@ -7,6 +7,14 @@