Skip to content

Commit

Permalink
AWS EFS support BA-5827 (broadinstitute#5070)
Browse files Browse the repository at this point in the history
* checkpoint

* chekcpoint

* Revert "Checkpoint"

This reverts commit cee158d.

* support for EFS in aws backendr- checkpoint1

*  AWS Parallel File system(EFS etc) support

*  Fixed typos

* code cleanup

* code review suggestions incorporated

*  revert inadvertent checkin

* revert premature checkin

* incorporated code review suggestions

*  code review suggestions incorporated

*  removed extra blank lines
  • Loading branch information
vanajasmy authored and aednichols committed Sep 6, 2019
1 parent 3627834 commit d42bacb
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 121 deletions.
48 changes: 37 additions & 11 deletions .../aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core._
import cromwell.core.path.{DefaultPathBuilder, Path}
import cromwell.core.io.DefaultIoCommandBuilder
import cromwell.core.retry.SimpleExponentialBackoff
import cromwell.filesystems.s3.S3Path
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder
Expand Down Expand Up @@ -79,7 +80,10 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
extends BackendJobLifecycleActor with StandardAsyncExecutionActor with AwsBatchJobCachingActorHelper
with KvClient with AskSupport {

override lazy val ioCommandBuilder = S3BatchCommandBuilder
override lazy val ioCommandBuilder = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => S3BatchCommandBuilder
case _ => DefaultIoCommandBuilder
}

val backendSingletonActor: ActorRef =
standardParams.backendSingletonActorOption.getOrElse(
Expand All @@ -104,8 +108,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

override lazy val dockerImageUsed: Option[String] = Option(jobDockerImage)

private lazy val jobScriptMountPath =
AwsBatchWorkingDisk.MountPoint.resolve(jobPaths.script.pathWithoutScheme.stripPrefix("/")).pathAsString
private lazy val jobScriptMountPath = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => AwsBatchWorkingDisk.MountPoint.resolve(jobPaths.script.pathWithoutScheme.stripPrefix("/")).pathAsString
case _ => jobPaths.script.pathWithoutScheme
}


private lazy val execScript =
s"""|#!$jobShell
Expand Down Expand Up @@ -195,7 +202,12 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
private def relativeLocalizationPath(file: WomFile): WomFile = {
file.mapFile(value =>
getPath(value) match {
case Success(path) => path.pathWithoutScheme
case Success(path) => {
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => path.pathWithoutScheme
case _ => path.toString
}
}
case _ => value
}
)
Expand Down Expand Up @@ -247,8 +259,15 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
* @throws Exception if the `path` does not live in one of the supplied `disks`
*/
private def relativePathAndVolume(path: String, disks: Seq[AwsBatchVolume]): (Path, AwsBatchVolume) = {

def getAbsolutePath(path: Path) = {
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => AwsBatchWorkingDisk.MountPoint.resolve(path)
case _ => DefaultPathBuilder.get(configuration.root).resolve(path)
}
}
val absolutePath = DefaultPathBuilder.get(path) match {
case p if !p.isAbsolute => AwsBatchWorkingDisk.MountPoint.resolve(p)
case p if !p.isAbsolute => getAbsolutePath(p)
case p => p
}

Expand Down Expand Up @@ -339,14 +358,21 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
)
}

override lazy val commandDirectory: Path = AwsBatchWorkingDisk.MountPoint

override def globParentDirectory(womGlobFile: WomGlobFile): Path = {
val (_, disk) = relativePathAndVolume(womGlobFile.value, runtimeAttributes.disks)
disk.mountPoint
override lazy val commandDirectory: Path = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => AwsBatchWorkingDisk.MountPoint
case _ => jobPaths.callExecutionRoot
}

override def isTerminal(runStatus: RunStatus): Boolean = {
override def globParentDirectory(womGlobFile: WomGlobFile): Path =
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => {
val (_, disk) = relativePathAndVolume(womGlobFile.value, runtimeAttributes.disks)
disk.mountPoint
}
case _ => commandDirectory
}

override def isTerminal(runStatus: RunStatus): Boolean = {
runStatus match {
case _: TerminalRunStatus => true
case _ => false
Expand Down
28 changes: 23 additions & 5 deletions supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ import org.slf4j.{Logger, LoggerFactory}

import scala.collection.JavaConverters._

case class AwsBatchAttributes(auth: AwsAuthMode,
case class AwsBatchAttributes(fileSystem: String,
auth: AwsAuthMode,
executionBucket: String,
duplicationStrategy: AwsBatchCacheHitDuplicationStrategy,
submitAttempts: Int Refined Positive,
createDefinitionAttempts: Int Refined Positive)

object AwsBatchAttributes {
lazy val Logger = LoggerFactory.getLogger("AwsBatchAttributes")
lazy val Logger = LoggerFactory.getLogger(this.getClass)

private val availableConfigKeys = Set(
"concurrent-job-limit",
Expand All @@ -69,8 +70,10 @@ object AwsBatchAttributes {
"dockerhub.account",
"dockerhub.token",
"filesystems",
"filesystems.local.auth",
"filesystems.s3.auth",
"filesystems.s3.caching.duplication-strategy",
"filesystems.local.caching.duplication-strategy",
"default-runtime-attributes",
"default-runtime-attributes.disks",
"default-runtime-attributes.memory",
Expand Down Expand Up @@ -102,16 +105,30 @@ object AwsBatchAttributes {
warnDeprecated(configKeys, deprecatedAwsBatchKeys, context, Logger)

val executionBucket: ErrorOr[String] = validate { backendConfig.as[String]("root") }
val filesystemAuthMode: ErrorOr[AwsAuthMode] =

val fileSysStr:ErrorOr[String] = validate {backendConfig.hasPath("filesystems.s3") match {
case true => "s3"
case false => "local"
}}

val fileSysPath = backendConfig.hasPath("filesystems.s3") match {
case true => "filesystems.s3"
case false => "filesystems.local"
}
val filesystemAuthMode: ErrorOr[AwsAuthMode] = {
(for {
authName <- validate { backendConfig.as[String]("filesystems.s3.auth") }.toEither
authName <- validate {
backendConfig.as[String](s"${fileSysPath}.auth")
}.toEither
validAuth <- awsConfig.auth(authName).toEither
} yield validAuth).toValidated
}


val duplicationStrategy: ErrorOr[AwsBatchCacheHitDuplicationStrategy] =
validate {
backendConfig.
as[Option[String]]("filesystems.s3.caching.duplication-strategy").
as[Option[String]](s"${fileSysPath}.caching.duplication-strategy").
getOrElse("copy") match {
case "copy" => CopyCachedOutputs
case "reference" => UseOriginalCachedOutputs
Expand All @@ -120,6 +137,7 @@ object AwsBatchAttributes {
}

(
fileSysStr,
filesystemAuthMode,
executionBucket,
duplicationStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,11 @@ case class AwsBatchBackendInitializationData
// TODO: We'll need something specific for batch probably, but I need to
// understand more about the genomics node first
//genomics: Genomics
) extends StandardInitializationData(workflowPaths, runtimeAttributesBuilder, classOf[AwsBatchExpressionFunctions])
) extends StandardInitializationData(workflowPaths, runtimeAttributesBuilder, AwsBatchBackendInitializationDataUtility.getExpressionFunctionsClass(configuration.fileSystem))

object AwsBatchBackendInitializationDataUtility {
def getExpressionFunctionsClass(fs: String) = fs match {
case AWSBatchStorageSystems.s3 => classOf[AwsBatchExpressionFunctions]
case _ => classOf[AwsBatchExpressionFunctionsForFS]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationDa
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardFinalizationActor, StandardFinalizationActorParams, StandardInitializationActor, StandardInitializationActorParams, StandardLifecycleActorFactory}
import cromwell.core.CallOutputs
import wom.graph.CommandCallNode
import org.slf4j.LoggerFactory

case class AwsBatchBackendLifecycleActorFactory(
name: String,
configurationDescriptor: BackendConfigurationDescriptor)
extends StandardLifecycleActorFactory {
lazy val Log = LoggerFactory.getLogger(AwsBatchBackendLifecycleActorFactory.getClass)
override lazy val initializationActorClass: Class[_ <: StandardInitializationActor]
= classOf[AwsBatchInitializationActor]

Expand All @@ -63,7 +61,6 @@ case class AwsBatchBackendLifecycleActorFactory(
calls: Set[CommandCallNode],
serviceRegistryActor: ActorRef,
restart: Boolean): StandardInitializationActorParams = {
Log.debug("Initializing AwsBatchBackendLifecycleActorFactory")
AwsBatchInitializationActorParams(workflowDescriptor, ioActor, calls, configuration, serviceRegistryActor, restart)
}

Expand Down
20 changes: 19 additions & 1 deletion supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchConfiguration.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ package cromwell.backend.impl.aws
import cromwell.filesystems.s3.S3PathBuilderFactory
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.core.{BackendDockerConfiguration}
import cromwell.core.path.PathBuilderFactory
import cromwell.cloudsupport.aws.AwsConfiguration

class AwsBatchConfiguration(val configurationDescriptor: BackendConfigurationDescriptor) {
Expand All @@ -45,5 +46,22 @@ class AwsBatchConfiguration(val configurationDescriptor: BackendConfigurationDes
val batchAttributes = AwsBatchAttributes.fromConfigs(awsConfig, configurationDescriptor.backendConfig)
val awsAuth = batchAttributes.auth
val dockerCredentials = BackendDockerConfiguration.build(configurationDescriptor.backendConfig).dockerCredentials
val pathBuilderFactory = S3PathBuilderFactory(configurationDescriptor.globalConfig, configurationDescriptor.backendConfig)
val fileSystem =
configurationDescriptor.backendConfig.hasPath("filesystems.s3") match {
case true => "s3"
case false => "local"
}
val pathBuilderFactory = configurationDescriptor.backendConfig.hasPath("filesystems.s3") match {
case true => S3PathBuilderFactory(configurationDescriptor.globalConfig, configurationDescriptor.backendConfig)
case false =>
PathBuilderFactory
}
}

object AWSBatchStorageSystems {
val s3:String = "s3"
val efs:String = "efs"
val ebs:String = "ebs"
val local:String = "local"
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,8 @@ class AwsBatchExpressionFunctions(standardParams: StandardExpressionFunctionsPar
}
}
}

class AwsBatchExpressionFunctionsForFS(standardParams: StandardExpressionFunctionsParams)
extends StandardExpressionFunctions(standardParams) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import cromwell.backend._
import cromwell.backend.standard.{StandardFinalizationActor, StandardFinalizationActorParams}
import cromwell.core.CallOutputs
import cromwell.core.io.AsyncIoActorClient
import cromwell.core.io.DefaultIoCommandBuilder
import wom.graph.CommandCallNode
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder

Expand All @@ -57,7 +58,10 @@ class AwsBatchFinalizationActor(val params: AwsBatchFinalizationActorParams)

lazy val configuration: AwsBatchConfiguration = params.configuration

override lazy val ioCommandBuilder = S3BatchCommandBuilder
override lazy val ioCommandBuilder = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => S3BatchCommandBuilder
case _ => DefaultIoCommandBuilder
}

override def ioActor: ActorRef = params.ioActor
}
13 changes: 11 additions & 2 deletions ...edBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchInitializationActor.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ import cromwell.backend.standard.{StandardInitializationActor,
StandardValidatedRuntimeAttributesBuilder}
import cromwell.backend.{BackendConfigurationDescriptor,
BackendWorkflowDescriptor}
import cromwell.core.io.DefaultIoCommandBuilder
import cromwell.core.io.AsyncIoActorClient
import cromwell.core.path.Path
import wom.graph.CommandCallNode

import scala.concurrent.Future

case class AwsBatchInitializationActorParams
Expand Down Expand Up @@ -86,5 +86,14 @@ class AwsBatchInitializationActor(params: AwsBatchInitializationActorParams)
creds <- credentials
} yield AwsBatchBackendInitializationData(workflowPaths, runtimeAttributesBuilder, configuration, creds)

override lazy val ioCommandBuilder = S3BatchCommandBuilder
override lazy val ioCommandBuilder = {
val conf = Option(configuration) match {
case Some(cf) => cf
case None => new AwsBatchConfiguration(params.configurationDescriptor)
}
conf.fileSystem match {
case AWSBatchStorageSystems.s3 => S3BatchCommandBuilder
case _ => DefaultIoCommandBuilder
}
}
}
6 changes: 5 additions & 1 deletion supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,13 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
implicit async: Async[F],
timer: Timer[F]): Aws[F, String] = ReaderT { awsBatchAttributes =>
val jobDefinitionBuilder = StandardAwsBatchJobDefinitionBuilder
val commandStr = awsBatchAttributes.fileSystem match {
case AWSBatchStorageSystems.s3 => reconfiguredScript
case _ => script
}
val jobDefinitionContext = AwsBatchJobDefinitionContext(runtimeAttributes,
taskId,
reconfiguredScript,
commandStr,
dockerRc,
dockerStdout,
dockerStderr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
package cromwell.backend.impl.aws

import akka.actor.Actor
import cromwell.backend.impl.aws.io.{AwsBatchVolume, AwsBatchWorkingDisk}
import cromwell.backend.impl.aws.io.AwsBatchVolume
import cromwell.backend.impl.aws.io.AwsBatchWorkingDisk
import cromwell.backend.standard.StandardCachingActorHelper
import cromwell.core.logging.JobLogging
import cromwell.core.path.Path
Expand All @@ -51,11 +52,14 @@ trait AwsBatchJobCachingActorHelper extends StandardCachingActorHelper {

lazy val runtimeAttributes = AwsBatchRuntimeAttributes(validatedRuntimeAttributes, configuration.runtimeConfig)

lazy val workingDisk: AwsBatchVolume = runtimeAttributes.disks.find(_.name == AwsBatchWorkingDisk.Name).get
lazy val workingDisk: AwsBatchVolume = runtimeAttributes.disks.find(x => configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => x.name == AwsBatchWorkingDisk.Name
case _ => configuration.root.startsWith(x.mountPoint.pathAsString)
}).get


lazy val callRootPath: Path = callPaths.callExecutionRoot
lazy val returnCodeFilename: String = callPaths.returnCodeFilename
// lazy val returnCodePath: Path = callPaths.returnCode

lazy val attributes: AwsBatchAttributes = configuration.batchAttributes
}
Loading

0 comments on commit d42bacb

Please sign in to comment.