Skip to content

Commit

Permalink
BT-684 Initial Blob Storage Impl (#6810)
Browse files Browse the repository at this point in the history
BT-684 Initial Blob Storage Implementation

Merge the initial PathBuilder and PathBuilderFactory for accessing blob storage
  • Loading branch information
kraefrei authored Jul 26, 2022
1 parent 56d3d1d commit 95c550e
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 0 deletions.
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ lazy val cloudSupport = project
.dependsOn(common)
.dependsOn(common % "test->test")

lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.withLibrarySettings("cromwell-azure-blobFileSystem", blobFileSystemDependencies)
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
.dependsOn(core)
Expand Down Expand Up @@ -249,10 +255,12 @@ lazy val engine = project
.dependsOn(drsFileSystem)
.dependsOn(sraFileSystem)
.dependsOn(awsS3FileSystem)
.dependsOn(azureBlobFileSystem)
.dependsOn(awsS3FileSystem % "test->test")
.dependsOn(drsFileSystem % "test->test")
.dependsOn(httpFileSystem % "test->test")
.dependsOn(ftpFileSystem % "test->test")
.dependsOn(azureBlobFileSystem % "test->test")
.dependsOn(`cloud-nio-spi`)
.dependsOn(languageFactoryCore)
.dependsOn(cwlV1_0LanguageFactory % "test->test")
Expand Down Expand Up @@ -391,6 +399,7 @@ lazy val root = (project in file("."))
.aggregate(`cromwell-drs-localizer`)
.aggregate(awsBackend)
.aggregate(awsS3FileSystem)
.aggregate(azureBlobFileSystem)
.aggregate(backend)
.aggregate(centaur)
.aggregate(centaurCwlRunner)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.storage.blob.nio.AzureFileSystem
import com.google.common.net.UrlEscapers
import cromwell.core.path.NioPath
import cromwell.core.path.Path
import cromwell.core.path.PathBuilder
import cromwell.filesystems.blob.BlobPathBuilder._

import java.net.MalformedURLException
import java.net.URI
import java.nio.file.FileSystems
import scala.jdk.CollectionConverters._
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Try

object BlobPathBuilder {

sealed trait BlobPathValidation
case class ValidBlobPath(path: String) extends BlobPathValidation
case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation

def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint"
def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string))
def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").filter(!_.isEmpty()).headOption

/**
* Validates a that a path from a string is a valid BlobPath of the format:
* {endpoint}/{containerName}/{pathToFile}
*
* with an endpoint for a particular storage account typically given by:
* https://{storageAccountName}.blob.core.windows.net/
*
* For example, a path string we might expect to receive might look like:
* https://appexternalstorage.blob.core.windows.net/inputs/test/testFile.wdl
*
* In this example
* storageAccountName -> appexternalstorage
* endpoint -> https://{storageAccountName}.blob.core.windows.net/
* container -> inputs
* pathToFile -> test/testFile.wdl
*
* If the configured container and storage account do not match, the string is considered unparsable
*/
def validateBlobPath(string: String, container: String, endpoint: String): BlobPathValidation = {
Try {
val uri = parseURI(string)
val storageAccount = parseStorageAccount(parseURI(endpoint))
val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container)
def hasEndpoint = parseStorageAccount(uri).contains(storageAccount.get)
if (hasContainer && !storageAccount.isEmpty && hasEndpoint) {
ValidBlobPath(uri.getPath.replaceFirst("/" + container, ""))
} else {
UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint)))
}
} recover { case t => UnparsableBlobPath(t) } get
}
}

class BlobPathBuilder(credential: AzureSasCredential, container: String, endpoint: String) extends PathBuilder {

val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential),
(AzureFileSystem.AZURE_STORAGE_FILE_STORES, container))

def build(string: String): Try[BlobPath] = {
validateBlobPath(string, container, endpoint) match {
case ValidBlobPath(path) =>
Try {
val fileSystem = FileSystems.newFileSystem(new URI("azb://?endpoint=" + endpoint), fileSystemConfig.asJava)
val blobStoragePath = fileSystem.getPath(path)
BlobPath(blobStoragePath, endpoint, container)
}
case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage)
}
}

override def name: String = "Azure Blob Storage"
}

// Add args for container, storage account name
case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path {
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container)

override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/")

override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cromwell.filesystems.blob

import akka.actor.ActorSystem
import com.azure.core.credential.AzureSasCredential
import com.typesafe.config.Config
import cromwell.core.WorkflowOptions
import cromwell.core.path.PathBuilderFactory
import cromwell.filesystems.blob.BlobPathBuilder

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config) extends PathBuilderFactory {
override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = {
val sasToken: String = instanceConfig.getString("sasToken")
val container: String = instanceConfig.getString("store")
val endpoint: String = instanceConfig.getString("endpoint")
Future {
new BlobPathBuilder(new AzureSasCredential(sasToken), container, endpoint)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import cromwell.filesystems.blob.BlobPathBuilder
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.file.Files

object BlobPathBuilderSpec {
def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net"
}

class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{

it should "parse a URI into a path" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount")
val container = "container"
val evalPath = "/path/to/file"
val testString = endpoint + "/" + container + evalPath
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match {
case BlobPathBuilder.ValidBlobPath(path) => path should equal(evalPath)
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => fail(errorMessage)
}
}

it should "bad storage account fails causes URI to fail parse into a path" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount")
val container = "container"
val evalPath = "/path/to/file"
val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount") + container + evalPath
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match {
case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched storage account")
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint))
}
}

it should "bad container fails causes URI to fail parse into a path" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount")
val container = "container"
val evalPath = "/path/to/file"
val testString = endpoint + "badContainer" + evalPath
BlobPathBuilder.validateBlobPath(testString, container, endpoint) match {
case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched container")
case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint))
}
}

ignore should "build a blob path from a test string and read a file" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost
val store = "inputs"
val evalPath = "/test/inputFile.txt"
val sas = "{SAS TOKEN HERE}"
val testString = endpoint + "/" + store + evalPath
val blobPath: BlobPath = new BlobPathBuilder(new AzureSasCredential(sas), store, endpoint) build testString getOrElse fail()
blobPath.container should equal(store)
blobPath.endpoint should equal(endpoint)
blobPath.pathAsString should equal(testString)
blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath)
val is = Files.newInputStream(blobPath.nioPath)
val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}
}
6 changes: 6 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ object Dependencies {
// We would like to use the BOM to manage Azure SDK versions, but SBT doesn't support it.
// https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/boms/azure-sdk-bom
// https://github.com/sbt/sbt/issues/4531
private val azureStorageBlobNioV = "12.0.0-beta.18"
private val azureIdentitySdkV = "1.4.2"
private val azureKeyVaultSdkV = "4.3.7"
private val betterFilesV = "3.9.1"
Expand Down Expand Up @@ -181,6 +182,9 @@ object Dependencies {
)

val azureDependencies: List[ModuleID] = List(
"com.azure" % "azure-storage-blob-nio" % azureStorageBlobNioV
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
"com.azure" % "azure-identity" % azureIdentitySdkV
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
Expand Down Expand Up @@ -393,6 +397,8 @@ object Dependencies {
List("scalatest", "mysql", "mariadb", "postgresql")
.map(name => "com.dimafeng" %% s"testcontainers-scala-$name" % testContainersScalaV % Test)

val blobFileSystemDependencies: List[ModuleID] = azureDependencies

val s3FileSystemDependencies: List[ModuleID] = junitDependencies

val gcsFileSystemDependencies: List[ModuleID] = akkaHttpDependencies
Expand Down

0 comments on commit 95c550e

Please sign in to comment.