Skip to content

Commit

Permalink
Refactor metadata builder inside the metadata service [BA-5842] (broa…
Browse files Browse the repository at this point in the history
  • Loading branch information
cjllanwarne authored Sep 10, 2019
1 parent d48e80b commit e1f95bf
Show file tree
Hide file tree
Showing 28 changed files with 1,079 additions and 812 deletions.
2 changes: 1 addition & 1 deletion common/src/main/scala/common/validation/ErrorOr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object ErrorOr {
type ErrorOr[+A] = Validated[NonEmptyList[String], A]

implicit class EnhancedErrorOr[A](val eoa: ErrorOr[A]) extends AnyVal {
def contextualizeErrors(s: String): ErrorOr[A] = eoa.leftMap { errors =>
def contextualizeErrors(s: => String): ErrorOr[A] = eoa.leftMap { errors =>
val total = errors.size
errors.zipWithIndex map { case (e, i) => s"Failed to $s (reason ${i + 1} of $total): $e" }
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/test/scala/cromwell/core/TestKitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package cromwell.core

import java.util.UUID

import akka.actor.{ActorSystem, Props}
import akka.testkit.TestKit
import akka.actor.ActorSystem
import akka.testkit.{TestActors, TestKit}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.{BeforeAndAfterAll, Suite}

Expand All @@ -21,7 +21,9 @@ abstract class TestKitSuite(actorSystemName: String = TestKitSuite.randomName,
shutdown()
}

val emptyActor = system.actorOf(Props.empty, "TestKitSuiteEmptyActor")
// 'BlackHoleActor' swallows messages without logging them (thus reduces log file overhead):
val emptyActor = system.actorOf(TestActors.blackholeProps, "TestKitSuiteEmptyActor")

val mockIoActor = system.actorOf(MockIoActor.props(), "TestKitSuiteMockIoActor")
val simpleIoActor = system.actorOf(SimpleIoActor.props, "TestKitSuiteSimpleIoActor")
val failIoActor = system.actorOf(FailIoActor.props(), "TestKitSuiteFailIoActor")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package cromwell.engine.workflow

import java.util.UUID

import akka.actor.FSM.{CurrentState, Transition}
import akka.actor._
import akka.stream.ActorMaterializer
Expand All @@ -23,9 +21,8 @@ import cromwell.engine.workflow.workflowstore.{InMemoryWorkflowStore, WorkflowSt
import cromwell.jobstore.EmptyJobStoreActor
import cromwell.server.CromwellRootActor
import cromwell.services.metadata.MetadataService.{GetSingleWorkflowMetadataAction, GetStatus, ListenToMetadataWriteActor, WorkflowOutputs}
import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse}
import cromwell.subworkflowstore.EmptySubWorkflowStoreActor
import cromwell.webservice.metadata.MetadataBuilderActor
import cromwell.webservice.metadata.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse}
import spray.json._

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down Expand Up @@ -79,18 +76,18 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection,
case Event(IssuePollRequest, RunningSwraData(_, id)) =>
requestStatus(id)
stay()
case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(_, _)) if !jsObject.state.isTerminal =>
case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(_, _)) if !jsObject.state.isTerminal =>
schedulePollRequest()
stay()
case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowSucceeded =>
case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowSucceeded =>
log.info(s"$Tag workflow finished with status '$WorkflowSucceeded'.")
serviceRegistryActor ! ListenToMetadataWriteActor
goto(WaitingForFlushedMetadata) using SucceededSwraData(replyTo, id)
case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowFailed =>
case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowFailed =>
log.info(s"$Tag workflow finished with status '$WorkflowFailed'.")
serviceRegistryActor ! ListenToMetadataWriteActor
goto(WaitingForFlushedMetadata) using FailedSwraData(replyTo, id, new RuntimeException(s"Workflow $id transitioned to state $WorkflowFailed"))
case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowAborted =>
case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowAborted =>
log.info(s"$Tag workflow finished with status '$WorkflowAborted'.")
serviceRegistryActor ! ListenToMetadataWriteActor
goto(WaitingForFlushedMetadata) using AbortedSwraData(replyTo, id)
Expand All @@ -99,22 +96,21 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection,
when (WaitingForFlushedMetadata) {
case Event(QueueWeight(weight), _) if weight > 0 => stay()
case Event(QueueWeight(_), data: SucceededSwraData) =>
val metadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor),
s"CompleteRequest-Workflow-${data.id}-request-${UUID.randomUUID()}")
metadataBuilder ! WorkflowOutputs(data.id)

serviceRegistryActor ! WorkflowOutputs(data.id)
goto(RequestingOutputs)
case Event(QueueWeight(_), data : TerminalSwraData) =>
requestMetadataOrIssueReply(data)
}

when (RequestingOutputs) {
case Event(BuiltMetadataResponse(outputs: JsObject), data: TerminalSwraData) =>
case Event(BuiltMetadataResponse(_, outputs: JsObject), data: TerminalSwraData) =>
outputOutputs(outputs)
requestMetadataOrIssueReply(data)
}

when (RequestingMetadata) {
case Event(BuiltMetadataResponse(metadata: JsObject), data: TerminalSwraData) =>
case Event(BuiltMetadataResponse(_, metadata: JsObject), data: TerminalSwraData) =>
outputMetadata(metadata)
issueReply(data)
}
Expand All @@ -128,7 +124,7 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection,
case Event(r: WorkflowAbortFailureResponse, data) => failAndFinish(r.failure, data)
case Event(Failure(e), data) => failAndFinish(e, data)
case Event(Status.Failure(e), data) => failAndFinish(e, data)
case Event(FailedMetadataResponse(e), data) => failAndFinish(e, data)
case Event(FailedMetadataResponse(_, e), data) => failAndFinish(e, data)
case Event(CurrentState(_, _) | Transition(_, _, _), _) =>
// ignore uninteresting current state and transition messages
stay()
Expand All @@ -144,8 +140,7 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection,
private def requestMetadataOrIssueReply(newData: TerminalSwraData) = if (metadataOutputPath.isDefined) requestMetadata(newData) else issueReply(newData)

private def requestMetadata(newData: TerminalSwraData): State = {
val metadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor), s"MetadataRequest-Workflow-${newData.id}")
metadataBuilder ! GetSingleWorkflowMetadataAction(newData.id, None, None, expandSubWorkflows = true)
serviceRegistryActor ! GetSingleWorkflowMetadataAction(newData.id, None, None, expandSubWorkflows = true)
goto (RequestingMetadata) using newData
}

Expand All @@ -159,8 +154,7 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection,
// This requests status via the metadata service rather than instituting an FSM watch on the underlying workflow actor.
// Cromwell's eventual consistency means it isn't safe to use an FSM transition to a terminal state as the signal for
// when outputs or metadata have stabilized.
val metadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor), s"StatusRequest-Workflow-$id-request-${UUID.randomUUID()}")
metadataBuilder ! GetStatus(id)
serviceRegistryActor ! GetStatus(id)
}

private def issueSuccessReply(replyTo: ActorRef): State = {
Expand Down
Loading

0 comments on commit e1f95bf

Please sign in to comment.