From e1f95bfc8afee21ae24dfaec09b2258cee17ba78 Mon Sep 17 00:00:00 2001 From: Chris Llanwarne Date: Tue, 10 Sep 2019 12:11:50 -0400 Subject: [PATCH] Refactor metadata builder inside the metadata service [BA-5842] (#5150) --- .../scala/common/validation/ErrorOr.scala | 2 +- .../scala/cromwell/core/TestKitSuite.scala | 8 +- .../workflow/SingleWorkflowRunnerActor.scala | 30 +- .../callcaching/CallCacheDiffActor.scala | 428 ++++++++---------- .../CallCacheDiffActorJsonFormatting.scala | 27 ++ .../webservice/LabelsManagerActor.scala | 25 +- .../webservice/WorkflowJsonSupport.scala | 2 - .../MetadataBuilderRegulatorActor.scala | 57 --- .../routes/CromwellApiService.scala | 15 +- .../routes/MetadataRouteSupport.scala | 34 +- .../routes/wes/WesRouteSupport.scala | 9 +- .../webservice/routes/wes/WesState.scala | 11 +- .../callcaching/CallCacheDiffActorSpec.scala | 298 +++++++----- .../webservice/MetadataBuilderActorSpec.scala | 64 +-- .../routes/CromwellApiServiceSpec.scala | 42 +- .../scala/cromwell/CromwellTestKitSpec.scala | 48 +- .../engine/WorkflowStoreActorSpec.scala | 19 +- .../services/metadata/MetadataQuery.scala | 4 +- .../services/metadata/MetadataService.scala | 42 +- .../metadata/impl/MetadataServiceActor.scala | 12 +- .../ReadDatabaseMetadataWorkerActor.scala | 104 +++++ .../metadata/impl/ReadMetadataActor.scala | 96 ---- .../impl/ReadMetadataRegulatorActor.scala | 85 ++++ .../impl/builder}/MetadataBuilderActor.scala | 251 +++++----- .../impl/builder}/MetadataComponent.scala | 2 +- .../services/metadata/MetadataQuerySpec.scala | 58 +++ ...yForWorkflowsMatchingParametersSpec.scala} | 2 +- .../impl/MetadataServiceActorSpec.scala | 116 +++-- 28 files changed, 1079 insertions(+), 812 deletions(-) create mode 100644 engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorJsonFormatting.scala delete mode 100644 engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderRegulatorActor.scala create mode 100644 services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala delete mode 100644 services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataActor.scala create mode 100644 services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataRegulatorActor.scala rename {engine/src/main/scala/cromwell/webservice/metadata => services/src/main/scala/cromwell/services/metadata/impl/builder}/MetadataBuilderActor.scala (63%) rename {engine/src/main/scala/cromwell/webservice/metadata => services/src/main/scala/cromwell/services/metadata/impl/builder}/MetadataComponent.scala (99%) create mode 100644 services/src/test/scala/cromwell/services/metadata/MetadataQuerySpec.scala rename services/src/test/scala/cromwell/services/metadata/{WorkflowQueryParametersSpec.scala => QueryForWorkflowsMatchingParametersSpec.scala} (99%) diff --git a/common/src/main/scala/common/validation/ErrorOr.scala b/common/src/main/scala/common/validation/ErrorOr.scala index 95eb9bab63e..d01a6a15174 100644 --- a/common/src/main/scala/common/validation/ErrorOr.scala +++ b/common/src/main/scala/common/validation/ErrorOr.scala @@ -10,7 +10,7 @@ object ErrorOr { type ErrorOr[+A] = Validated[NonEmptyList[String], A] implicit class EnhancedErrorOr[A](val eoa: ErrorOr[A]) extends AnyVal { - def contextualizeErrors(s: String): ErrorOr[A] = eoa.leftMap { errors => + def contextualizeErrors(s: => String): ErrorOr[A] = eoa.leftMap { errors => val total = errors.size errors.zipWithIndex map { case (e, i) => s"Failed to $s (reason ${i + 1} of $total): $e" } } diff --git a/core/src/test/scala/cromwell/core/TestKitSuite.scala b/core/src/test/scala/cromwell/core/TestKitSuite.scala index e8443a4d5f7..df1317deae3 100644 --- a/core/src/test/scala/cromwell/core/TestKitSuite.scala +++ b/core/src/test/scala/cromwell/core/TestKitSuite.scala @@ -2,8 +2,8 @@ package cromwell.core import java.util.UUID -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestKit +import akka.actor.ActorSystem +import akka.testkit.{TestActors, TestKit} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.{BeforeAndAfterAll, Suite} @@ -21,7 +21,9 @@ abstract class TestKitSuite(actorSystemName: String = TestKitSuite.randomName, shutdown() } - val emptyActor = system.actorOf(Props.empty, "TestKitSuiteEmptyActor") + // 'BlackHoleActor' swallows messages without logging them (thus reduces log file overhead): + val emptyActor = system.actorOf(TestActors.blackholeProps, "TestKitSuiteEmptyActor") + val mockIoActor = system.actorOf(MockIoActor.props(), "TestKitSuiteMockIoActor") val simpleIoActor = system.actorOf(SimpleIoActor.props, "TestKitSuiteSimpleIoActor") val failIoActor = system.actorOf(FailIoActor.props(), "TestKitSuiteFailIoActor") diff --git a/engine/src/main/scala/cromwell/engine/workflow/SingleWorkflowRunnerActor.scala b/engine/src/main/scala/cromwell/engine/workflow/SingleWorkflowRunnerActor.scala index 1ee8e8cbb76..c17ef5339d5 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/SingleWorkflowRunnerActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/SingleWorkflowRunnerActor.scala @@ -1,7 +1,5 @@ package cromwell.engine.workflow -import java.util.UUID - import akka.actor.FSM.{CurrentState, Transition} import akka.actor._ import akka.stream.ActorMaterializer @@ -23,9 +21,8 @@ import cromwell.engine.workflow.workflowstore.{InMemoryWorkflowStore, WorkflowSt import cromwell.jobstore.EmptyJobStoreActor import cromwell.server.CromwellRootActor import cromwell.services.metadata.MetadataService.{GetSingleWorkflowMetadataAction, GetStatus, ListenToMetadataWriteActor, WorkflowOutputs} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse} import cromwell.subworkflowstore.EmptySubWorkflowStoreActor -import cromwell.webservice.metadata.MetadataBuilderActor -import cromwell.webservice.metadata.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse} import spray.json._ import scala.concurrent.ExecutionContext.Implicits.global @@ -79,18 +76,18 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection, case Event(IssuePollRequest, RunningSwraData(_, id)) => requestStatus(id) stay() - case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(_, _)) if !jsObject.state.isTerminal => + case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(_, _)) if !jsObject.state.isTerminal => schedulePollRequest() stay() - case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowSucceeded => + case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowSucceeded => log.info(s"$Tag workflow finished with status '$WorkflowSucceeded'.") serviceRegistryActor ! ListenToMetadataWriteActor goto(WaitingForFlushedMetadata) using SucceededSwraData(replyTo, id) - case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowFailed => + case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowFailed => log.info(s"$Tag workflow finished with status '$WorkflowFailed'.") serviceRegistryActor ! ListenToMetadataWriteActor goto(WaitingForFlushedMetadata) using FailedSwraData(replyTo, id, new RuntimeException(s"Workflow $id transitioned to state $WorkflowFailed")) - case Event(BuiltMetadataResponse(jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowAborted => + case Event(BuiltMetadataResponse(_, jsObject: JsObject), RunningSwraData(replyTo, id)) if jsObject.state == WorkflowAborted => log.info(s"$Tag workflow finished with status '$WorkflowAborted'.") serviceRegistryActor ! ListenToMetadataWriteActor goto(WaitingForFlushedMetadata) using AbortedSwraData(replyTo, id) @@ -99,22 +96,21 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection, when (WaitingForFlushedMetadata) { case Event(QueueWeight(weight), _) if weight > 0 => stay() case Event(QueueWeight(_), data: SucceededSwraData) => - val metadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor), - s"CompleteRequest-Workflow-${data.id}-request-${UUID.randomUUID()}") - metadataBuilder ! WorkflowOutputs(data.id) + + serviceRegistryActor ! WorkflowOutputs(data.id) goto(RequestingOutputs) case Event(QueueWeight(_), data : TerminalSwraData) => requestMetadataOrIssueReply(data) } when (RequestingOutputs) { - case Event(BuiltMetadataResponse(outputs: JsObject), data: TerminalSwraData) => + case Event(BuiltMetadataResponse(_, outputs: JsObject), data: TerminalSwraData) => outputOutputs(outputs) requestMetadataOrIssueReply(data) } when (RequestingMetadata) { - case Event(BuiltMetadataResponse(metadata: JsObject), data: TerminalSwraData) => + case Event(BuiltMetadataResponse(_, metadata: JsObject), data: TerminalSwraData) => outputMetadata(metadata) issueReply(data) } @@ -128,7 +124,7 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection, case Event(r: WorkflowAbortFailureResponse, data) => failAndFinish(r.failure, data) case Event(Failure(e), data) => failAndFinish(e, data) case Event(Status.Failure(e), data) => failAndFinish(e, data) - case Event(FailedMetadataResponse(e), data) => failAndFinish(e, data) + case Event(FailedMetadataResponse(_, e), data) => failAndFinish(e, data) case Event(CurrentState(_, _) | Transition(_, _, _), _) => // ignore uninteresting current state and transition messages stay() @@ -144,8 +140,7 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection, private def requestMetadataOrIssueReply(newData: TerminalSwraData) = if (metadataOutputPath.isDefined) requestMetadata(newData) else issueReply(newData) private def requestMetadata(newData: TerminalSwraData): State = { - val metadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor), s"MetadataRequest-Workflow-${newData.id}") - metadataBuilder ! GetSingleWorkflowMetadataAction(newData.id, None, None, expandSubWorkflows = true) + serviceRegistryActor ! GetSingleWorkflowMetadataAction(newData.id, None, None, expandSubWorkflows = true) goto (RequestingMetadata) using newData } @@ -159,8 +154,7 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection, // This requests status via the metadata service rather than instituting an FSM watch on the underlying workflow actor. // Cromwell's eventual consistency means it isn't safe to use an FSM transition to a terminal state as the signal for // when outputs or metadata have stabilized. - val metadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor), s"StatusRequest-Workflow-$id-request-${UUID.randomUUID()}") - metadataBuilder ! GetStatus(id) + serviceRegistryActor ! GetStatus(id) } private def issueSuccessReply(replyTo: ActorRef): State = { diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala index 688a9a06185..2cb7b0708a4 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActor.scala @@ -3,52 +3,19 @@ package cromwell.engine.workflow.lifecycle.execution.callcaching import akka.actor.{ActorRef, LoggingFSM, Props} import cats.data.NonEmptyList import cats.instances.list._ -import cats.syntax.foldable._ +import cats.syntax.apply._ +import cats.syntax.traverse._ +import cats.syntax.validated._ +import common.exception.AggregatedMessageException +import common.validation.ErrorOr._ +import common.validation.Validation._ import cromwell.core.Dispatcher.EngineDispatcher import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{CallCacheDiffActorData, _} import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffQueryParameter.CallCacheDiffQueryCall -import cromwell.services.metadata.CallMetadataKeys.CallCachingKeys -import cromwell.services.metadata.MetadataService.{GetMetadataQueryAction, MetadataLookupResponse, MetadataServiceKeyLookupFailed} +import cromwell.services.metadata.MetadataService.GetMetadataAction import cromwell.services.metadata._ -import cromwell.webservice.metadata.MetadataComponent._ -import cromwell.webservice.metadata._ -import spray.json.JsObject - -import scala.language.postfixOps -import scala.util.{Failure, Success, Try} - -object CallCacheDiffActor { - private val PlaceholderMissingHashValue = MetadataPrimitive(MetadataValue("Error: there is a hash entry for this key but the value is null !")) - - final case class CachedCallNotFoundException(message: String) extends Exception { - override def getMessage = message - } - - // Exceptions when calls exist but have no hashes in their metadata, indicating they were run pre-28 - private val HashesForCallAAndBNotFoundException = new Exception("callA and callB have not finished yet, or were run on a previous version of Cromwell on which this endpoint was not supported.") - private val HashesForCallANotFoundException = new Exception("callA has not finished yet, or was run on a previous version of Cromwell on which this endpoint was not supported.") - private val HashesForCallBNotFoundException = new Exception("callB has not finished yet, or was run on a previous version of Cromwell on which this endpoint was not supported.") - - sealed trait CallCacheDiffActorState - case object Idle extends CallCacheDiffActorState - case object WaitingForMetadata extends CallCacheDiffActorState - - sealed trait CallCacheDiffActorData - case object CallCacheDiffNoData extends CallCacheDiffActorData - case class CallCacheDiffWithRequest(queryA: MetadataQuery, - queryB: MetadataQuery, - responseA: Option[MetadataLookupResponse], - responseB: Option[MetadataLookupResponse], - replyTo: ActorRef - ) extends CallCacheDiffActorData - - sealed abstract class CallCacheDiffActorResponse - case class BuiltCallCacheDiffResponse(response: JsObject) extends CallCacheDiffActorResponse - case class FailedCallCacheDiffResponse(reason: Throwable) extends CallCacheDiffActorResponse - - - def props(serviceRegistryActor: ActorRef) = Props(new CallCacheDiffActor(serviceRegistryActor)).withDispatcher(EngineDispatcher) -} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse} +import spray.json.{JsArray, JsBoolean, JsNumber, JsObject, JsString, JsValue} class CallCacheDiffActor(serviceRegistryActor: ActorRef) extends LoggingFSM[CallCacheDiffActorState, CallCacheDiffActorData] { startWith(Idle, CallCacheDiffNoData) @@ -57,76 +24,63 @@ class CallCacheDiffActor(serviceRegistryActor: ActorRef) extends LoggingFSM[Call case Event(CallCacheDiffQueryParameter(callA, callB), CallCacheDiffNoData) => val queryA = makeMetadataQuery(callA) val queryB = makeMetadataQuery(callB) - serviceRegistryActor ! GetMetadataQueryAction(queryA) - serviceRegistryActor ! GetMetadataQueryAction(queryB) + serviceRegistryActor ! GetMetadataAction(queryA) + serviceRegistryActor ! GetMetadataAction(queryB) goto(WaitingForMetadata) using CallCacheDiffWithRequest(queryA, queryB, None, None, sender()) } when(WaitingForMetadata) { // First Response // Response A - case Event(response: MetadataLookupResponse, data @ CallCacheDiffWithRequest(queryA, _, None, None, _)) if queryA == response.query => - stay() using data.copy(responseA = Option(response)) + case Event(BuiltMetadataResponse(GetMetadataAction(originalQuery), responseJson), data@CallCacheDiffWithRequest(queryA, _, None, None, _)) if queryA == originalQuery => + stay() using data.copy(responseA = Option(WorkflowMetadataJson(responseJson))) // Response B - case Event(response: MetadataLookupResponse, data @ CallCacheDiffWithRequest(_, queryB, None, None, _)) if queryB == response.query => - stay() using data.copy(responseB = Option(response)) + case Event(BuiltMetadataResponse(GetMetadataAction(originalQuery), responseJson), data@CallCacheDiffWithRequest(_, queryB, None, None, _)) if queryB == originalQuery => + stay() using data.copy(responseB = Option(WorkflowMetadataJson(responseJson))) // Second Response // Response A - case Event(response: MetadataLookupResponse, CallCacheDiffWithRequest(queryA, queryB, None, Some(responseB), replyTo)) if queryA == response.query => - buildDiffAndRespond(queryA, queryB, response, responseB, replyTo) + case Event(BuiltMetadataResponse(GetMetadataAction(originalQuery), responseJson), CallCacheDiffWithRequest(queryA, queryB, None, Some(responseB), replyTo)) if queryA == originalQuery => + buildDiffAndRespond(queryA, queryB, WorkflowMetadataJson(responseJson), responseB, replyTo) // Response B - case Event(response: MetadataLookupResponse, CallCacheDiffWithRequest(queryA, queryB, Some(responseA), None, replyTo)) if queryB == response.query => - buildDiffAndRespond(queryA, queryB, responseA, response, replyTo) - case Event(MetadataServiceKeyLookupFailed(_, failure), data: CallCacheDiffWithRequest) => + case Event(BuiltMetadataResponse(GetMetadataAction(originalQuery), responseJson), CallCacheDiffWithRequest(queryA, queryB, Some(responseA), None, replyTo)) if queryB == originalQuery => + buildDiffAndRespond(queryA, queryB, responseA, WorkflowMetadataJson(responseJson), replyTo) + case Event(FailedMetadataResponse(_, failure), data: CallCacheDiffWithRequest) => data.replyTo ! FailedCallCacheDiffResponse(failure) context stop self stay() } - /** - * Builds a response and sends it back as Json. - * The response is structured in the following way - * { - * "callA": { - * -- information about call A -- - * }, - * "callB": { - * -- information about call B -- - * }, - * "hashDifferential": [ - * { - * "hash key": { - * "callA": -- hash value for call A, or null --, - * "callB": -- hash value for call B, or null -- - * } - * }, - * ... - * ] - * } - */ + whenUnhandled { + case Event(oops, oopsData) => + log.error(s"Programmer Error: Unexpected event received by ${this.getClass.getSimpleName}: $oops / $oopsData (in state $stateName)") + stay() + + } + private def buildDiffAndRespond(queryA: MetadataQuery, queryB: MetadataQuery, - responseA: MetadataLookupResponse, - responseB: MetadataLookupResponse, + responseA: WorkflowMetadataJson, + responseB: WorkflowMetadataJson, replyTo: ActorRef) = { - lazy val buildResponse = { - diffHashes(responseA.eventList, responseB.eventList) match { - case Success(diff) => - val diffObject = MetadataObject(Map( - "callA" -> makeCallInfo(queryA, responseA.eventList), - "callB" -> makeCallInfo(queryB, responseB.eventList), - "hashDifferential" -> diff - )) - - BuiltCallCacheDiffResponse(metadataComponentJsonWriter.write(diffObject).asJsObject) - case Failure(f) => FailedCallCacheDiffResponse(f) - } - } + def describeCallFromQuery(query: MetadataQuery): String = s"${query.workflowId} / ${query.jobKey.map(_.callFqn).getOrElse("<>")}:${query.jobKey.map(_.index.getOrElse(-1)).getOrElse("<>")}" + + val callACachingMetadata = extractCallMetadata(queryA, responseA).contextualizeErrors(s"extract relevant metadata for call A (${describeCallFromQuery(queryA)})") + val callBCachingMetadata = extractCallMetadata(queryB, responseB).contextualizeErrors(s"extract relevant metadata for call B (${describeCallFromQuery(queryB)})") + + val response = (callACachingMetadata, callBCachingMetadata) flatMapN { case (callA, callB) => - val response = checkCallsExistence(queryA, queryB, responseA, responseB) match { - case Some(msg) => FailedCallCacheDiffResponse(CachedCallNotFoundException(msg)) - case None => buildResponse + val callADetails = extractCallDetails(queryA, callA) + val callBDetails = extractCallDetails(queryB, callB) + + (callADetails, callBDetails) mapN { (cad, cbd) => + val callAHashes = callA.callCachingMetadataJson.hashes + val callBHashes = callB.callCachingMetadataJson.hashes + + SuccessfulCallCacheDiffResponse(cad, cbd, calculateHashDifferential(callAHashes, callBHashes)) + } + } valueOr { + e => FailedCallCacheDiffResponse(AggregatedMessageException("Failed to calculate diff for call A and call B", e.toList)) } replyTo ! response @@ -134,172 +88,178 @@ class CallCacheDiffActor(serviceRegistryActor: ActorRef) extends LoggingFSM[Call context stop self stay() } +} - /** - * Returns an error message if one or both of the calls are not found, or None if it does - */ - private def checkCallsExistence(queryA: MetadataQuery, - queryB: MetadataQuery, - responseA: MetadataLookupResponse, - responseB: MetadataLookupResponse): Option[String] = { - import cromwell.core.ExecutionIndex._ - - def makeTag(query: MetadataQuery) = { - s"${query.workflowId}:${query.jobKey.get.callFqn}:${query.jobKey.get.index.fromIndex}" - } - def makeNotFoundMessage(queries: NonEmptyList[MetadataQuery]) = { - val plural = if (queries.tail.nonEmpty) "s" else "" - s"Cannot find call$plural ${queries.map(makeTag).toList.mkString(", ")}" - } +object CallCacheDiffActor { - (responseA.eventList, responseB.eventList) match { - case (a, b) if a.isEmpty && b.isEmpty => Option(makeNotFoundMessage(NonEmptyList.of(queryA, queryB))) - case (a, _) if a.isEmpty => Option(makeNotFoundMessage(NonEmptyList.of(queryA))) - case (_, b) if b.isEmpty => Option(makeNotFoundMessage(NonEmptyList.of(queryB))) - case _ => None - } + final case class CachedCallNotFoundException(message: String) extends Exception { + override def getMessage = message } - /** - * Generates the "info" section of callA or callB - */ - private def makeCallInfo(query: MetadataQuery, eventList: Seq[MetadataEvent]): MetadataComponent = { - val callKey = MetadataObject(Map( - "workflowId" -> MetadataPrimitive(MetadataValue(query.workflowId.toString)), - "callFqn" -> MetadataPrimitive(MetadataValue(query.jobKey.get.callFqn)), - "jobIndex" -> MetadataPrimitive(MetadataValue(query.jobKey.get.index.getOrElse(-1))) - )) + sealed trait CallCacheDiffActorState + case object Idle extends CallCacheDiffActorState + case object WaitingForMetadata extends CallCacheDiffActorState - val allowResultReuse = attributeToComponent(eventList, { _ == CallCachingKeys.AllowReuseMetadataKey }, { _ => "allowResultReuse" }) - val executionStatus = attributeToComponent(eventList, { _ == CallMetadataKeys.ExecutionStatus }) + sealed trait CallCacheDiffActorData + case object CallCacheDiffNoData extends CallCacheDiffActorData + case class CallCacheDiffWithRequest(queryA: MetadataQuery, + queryB: MetadataQuery, + responseA: Option[WorkflowMetadataJson], + responseB: Option[WorkflowMetadataJson], + replyTo: ActorRef + ) extends CallCacheDiffActorData - List(callKey, allowResultReuse, executionStatus) combineAll - } + sealed abstract class CallCacheDiffActorResponse - /** - * Collects events from the list for which the keys verify the keyFilter predicate - * and apply keyModifier to the event's key - */ - private def collectEvents(events: Seq[MetadataEvent], - keyFilter: (String => Boolean), - keyModifier: (String => String)) = events collect { - case event @ MetadataEvent(metadataKey @ MetadataKey(_, _, key), _, _) if keyFilter(key) => - event.copy(key = metadataKey.copy(key = keyModifier(key))) - } + case class FailedCallCacheDiffResponse(reason: Throwable) extends CallCacheDiffActorResponse + final case class SuccessfulCallCacheDiffResponse(callA: CallDetails, callB: CallDetails, hashDifferential: List[HashDifference]) extends CallCacheDiffActorResponse + def props(serviceRegistryActor: ActorRef) = Props(new CallCacheDiffActor(serviceRegistryActor)).withDispatcher(EngineDispatcher) + + final case class CallDetails(executionStatus: String, allowResultReuse: Boolean, callFqn: String, jobIndex: Int, workflowId: String) + final case class HashDifference(hashKey: String, callA: Option[String], callB: Option[String]) - /** - * Given a list of events, a keyFilter and a keyModifier, returns the associated MetadataComponent. - * Ensures that events are properly aggregated together (CRDTs and latest timestamp rule) - */ - private def attributeToComponent(events: Seq[MetadataEvent], keyFilter: (String => Boolean), keyModifier: (String => String) = identity[String]) = { - MetadataComponent(collectEvents(events, keyFilter, keyModifier)) - } /** - * Makes a diff object out of a key and a pair of values. - * Values are Option[Option[MetadataValue]] for the following reason: - * - * The outer option represents whether or not this key had a corresponding hash metadata entry for the given call - * If the above is true, the inner value is the metadata value for this entry, which is nullable, hence an Option. - * The first outer option will determine whether the resulting json value will be null (no hash entry for this key), - * or the actual value. - * If the metadata value (inner option) happens to be None, it's an error, as we don't expect to publish null hash values. - * In that case we replace it with the placeholderMissingHashValue. + * Create a Metadata query from a CallCacheDiffQueryCall */ - private def makeHashDiffObject(key: String, valueA: Option[Option[MetadataValue]], valueB: Option[Option[MetadataValue]]) = { - def makeFinalValue(value: Option[Option[MetadataValue]]) = value match { - case Some(Some(metadataValue)) => MetadataPrimitive(metadataValue) - case Some(None) => PlaceholderMissingHashValue - case None => MetadataNullComponent - } + def makeMetadataQuery(call: CallCacheDiffQueryCall) = MetadataQuery( + workflowId = call.workflowId, + // jobAttempt None will return keys for all attempts + jobKey = Option(MetadataQueryJobKey(call.callFqn, call.jobIndex, None)), + key = None, + includeKeysOption = Option(NonEmptyList.of("callCaching", "executionStatus")), + excludeKeysOption = Option(NonEmptyList.of("callCaching:hitFailures")), + expandSubWorkflows = false + ) - MetadataObject( - "hashKey" -> MetadataPrimitive(MetadataValue(key.trim, MetadataString)), - "callA" -> makeFinalValue(valueA), - "callB" -> makeFinalValue(valueB) - ) + // These simple case classes are just to help apply a little type safety to input and output types: + final case class WorkflowMetadataJson(value: JsObject) extends AnyVal + final case class CallMetadataJson(rawValue: JsObject, jobKey: MetadataQueryJobKey, callCachingMetadataJson: CallCachingMetadataJson) + final case class CallCachingMetadataJson(rawValue: JsObject, hashes: Map[String, String]) + + + /* + * Takes in the JsObject returned from a metadata query and filters out only the appropriate call's callCaching section + */ + def extractCallMetadata(query: MetadataQuery, response: WorkflowMetadataJson): ErrorOr[CallMetadataJson] = { + + for { + // Sanity Checks: + _ <- response.value.checkFieldValue("id", s""""${query.workflowId}"""") + jobKey <- query.jobKey.toErrorOr("Call is required in call cache diff query") + + // Unpack the JSON: + allCalls <- response.value.fieldAsObject("calls") + callShards <- allCalls.fieldAsArray(jobKey.callFqn) + onlyShardElement <- callShards.elementWithHighestAttemptField + _ <- onlyShardElement.checkFieldValue("shardIndex", jobKey.index.getOrElse(-1).toString) + callCachingElement <- onlyShardElement.fieldAsObject(CallMetadataKeys.CallCaching) + hashes <- extractHashes(callCachingElement) + } yield CallMetadataJson(onlyShardElement, jobKey, CallCachingMetadataJson(callCachingElement, hashes)) } - /** - * Creates the hash differential between 2 list of events - */ - private def diffHashes(eventsA: Seq[MetadataEvent], eventsB: Seq[MetadataEvent]): Try[MetadataComponent] = { - val hashesKey = CallCachingKeys.HashesKey + MetadataKey.KeySeparator - // Collect hashes events and map their key to only keep the meaningful part of the key - // Then map the result to get a Map of hashKey -> Option[MetadataValue]. This will allow for fast lookup when - // comparing the 2 hash sets. - // Note that it's an Option[MetadataValue] because metadata values can be null, although for this particular - // case we don't expect it to be (we should never publish a hash metadata event with a null value) - // If that happens we will place a placeholder value in place of the hash to signify of the unexpected absence of it - def collectHashes(events: Seq[MetadataEvent]) = { - collectEvents(events, { _.startsWith(hashesKey) }, { _.stripPrefix(hashesKey) }) map { - case MetadataEvent(MetadataKey(_, _, keyA), valueA, _) => keyA -> valueA - } toMap + def extractHashes(callCachingMetadataJson: JsObject): ErrorOr[Map[String, String]] = { + def processField(keyPrefix: String)(fieldValue: (String, JsValue)): ErrorOr[Map[String, String]] = fieldValue match { + case (key, hashString: JsString) => Map(keyPrefix + key -> hashString.value).validNel + case (key, subObject: JsObject) => extractHashEntries(key + ":", subObject) + case (key, otherValue) => s"Cannot extract hashes for $key. Expected JsString or JsObject but got ${otherValue.getClass.getSimpleName} $otherValue".invalidNel } - val hashesA: Map[String, Option[MetadataValue]] = collectHashes(eventsA) - val hashesB: Map[String, Option[MetadataValue]] = collectHashes(eventsB) + def extractHashEntries(keyPrefix: String, jsObject: JsObject): ErrorOr[Map[String, String]] = { + val traversed = jsObject.fields.toList.traverse(processField(keyPrefix)) + traversed.map(_.flatten.toMap) + } - (hashesA.isEmpty, hashesB.isEmpty) match { - case (true, true) => Failure(HashesForCallAAndBNotFoundException) - case (true, false) => Failure(HashesForCallANotFoundException) - case (false, true) => Failure(HashesForCallBNotFoundException) - case (false, false) => Success(diffHashEvents(hashesA, hashesB)) + for { + hashesSection <- callCachingMetadataJson.fieldAsObject("hashes") + entries <- extractHashEntries("", hashesSection) + } yield entries + } + + def calculateHashDifferential(hashesA: Map[String, String], hashesB: Map[String, String]): List[HashDifference] = { + val hashesInANotMatchedInB: List[HashDifference] = hashesA.toList collect { + case (key, value) if hashesB.get(key) != Option(value) => HashDifference(key, Option(value), hashesB.get(key)) + } + val hashesUniqueToB: List[HashDifference] = hashesB.toList.collect { + case (key, value) if !hashesA.keySet.contains(key) => HashDifference(key, None, Option(value)) } + hashesInANotMatchedInB ++ hashesUniqueToB + } + def extractCallDetails(query: MetadataQuery, callMetadataJson: CallMetadataJson): ErrorOr[CallDetails] = { + val executionStatus = callMetadataJson.rawValue.fieldAsString("executionStatus") + val allowResultReuse = callMetadataJson.callCachingMetadataJson.rawValue.fieldAsBoolean("allowResultReuse") + + (executionStatus, allowResultReuse) mapN { (es, arr) => + CallDetails( + executionStatus = es.value, + allowResultReuse = arr.value, + callFqn = callMetadataJson.jobKey.callFqn, + jobIndex = callMetadataJson.jobKey.index.getOrElse(-1), + workflowId = query.workflowId.toString + ) + } } - private def diffHashEvents(hashesA: Map[String, Option[MetadataValue]], hashesB: Map[String, Option[MetadataValue]]) = { - val hashesUniqueToB: Map[String, Option[MetadataValue]] = hashesB.filterNot({ case (k, _) => hashesA.keySet.contains(k) }) - - val hashDiff: List[MetadataComponent] = { - // Start with all hashes in A - hashesA - // Try to find the corresponding pair in B. - // We end up with a - // List[(Option[String, Option[MetadataValue], Option[String, Option[MetadataValue])] - // ^ ^ ^ ^ - // hashKey hashValue hashKey hashValue - // for for for for - // A A B B - // |____________________________________| |___________________________________| - // hashPair for A hashPair for B - // - // HashPairs are Some or None depending on whether or not they have a metadata entry for the corresponding hashKey - // At this stage we only have Some(hashPair) for A, and either Some(hashPair) or None for B depending on if we found it in hashesB - .map({ - hashPairA => Option(hashPairA) -> hashesB.find(_._1 == hashPairA._1) - }) - // Add the missing hashes that are in B but not in A. The left hashPair is therefore None - .++(hashesUniqueToB.map(None -> Option(_))) - .collect({ - // Both have a value but they're different. We can assume the keys are the same (if we did our job right until here) - case (Some((keyA, valueA)), Some((_, valueB))) if valueA != valueB => - makeHashDiffObject(keyA, Option(valueA), Option(valueB)) - // Key is in A but not in B - case (Some((keyA, valueA)), None) => - makeHashDiffObject(keyA, Option(valueA), None) - // Key is in B but not in A - case (None, Some((keyB, valueB))) => - makeHashDiffObject(keyB, None, Option(valueB)) - }) - .toList + implicit class EnhancedJsObject(val jsObject: JsObject) extends AnyVal { + def getField(field: String): ErrorOr[JsValue] = jsObject.fields.get(field).toErrorOr(s"No '$field' field found") + def fieldAsObject(field: String): ErrorOr[JsObject] = jsObject.getField(field) flatMap { _.mapToJsObject } + def fieldAsArray(field: String): ErrorOr[JsArray] = jsObject.getField(field) flatMap { _.mapToJsArray } + def fieldAsString(field: String): ErrorOr[JsString] = jsObject.getField(field) flatMap { _.mapToJsString } + def fieldAsNumber(field: String): ErrorOr[JsNumber] = jsObject.getField(field) flatMap { _.mapToJsNumber } + def fieldAsBoolean(field: String): ErrorOr[JsBoolean] = jsObject.getField(field) flatMap { _.mapToJsBoolean } + def checkFieldValue(field: String, expectation: String): ErrorOr[Unit] = jsObject.getField(field) flatMap { + case v: JsValue if v.toString == expectation => ().validNel + case other => s"Unexpected metadata field '$field'. Expected '$expectation' but got ${other.toString}".invalidNel } + } - MetadataList(hashDiff) + implicit class EnhancedJsArray(val jsArray: JsArray) extends AnyVal { + + def elementWithHighestAttemptField: ErrorOr[JsObject] = { + def extractAttemptAndObject(value: JsValue): ErrorOr[(Int, JsObject)] = for { + asObject <- value.mapToJsObject + attempt <- asObject.fieldAsNumber("attempt") + } yield (attempt.value.intValue(), asObject) + + def foldFunction(accumulator: ErrorOr[(Int, JsObject)], nextElement: JsValue): ErrorOr[(Int, JsObject)] = { + (accumulator, extractAttemptAndObject(nextElement)) mapN { case ((previousHighestAttempt, previousJsObject), (nextAttempt, nextJsObject)) => + if (previousHighestAttempt > nextAttempt) { + (previousHighestAttempt, previousJsObject) + } else { + (nextAttempt, nextJsObject) + } + } + } + + for { + attemptListNel <- NonEmptyList.fromList(jsArray.elements.toList).toErrorOr("Expected at least one attempt but found 0") + highestAttempt <- attemptListNel.toList.foldLeft(extractAttemptAndObject(attemptListNel.head))(foldFunction) + } yield highestAttempt._2 + } } - /** - * Create a Metadata query from a CallCacheDiffQueryCall - */ - private def makeMetadataQuery(call: CallCacheDiffQueryCall) = MetadataQuery( - call.workflowId, - // jobAttempt None will return keys for all attempts - Option(MetadataQueryJobKey(call.callFqn, call.jobIndex, None)), - None, - Option(NonEmptyList.of("callCaching", "executionStatus")), - None, - expandSubWorkflows = false - ) + implicit class EnhancedJsValue(val jsValue: JsValue) extends AnyVal { + def mapToJsObject: ErrorOr[JsObject] = jsValue match { + case obj: JsObject => obj.validNel + case other => s"Invalid value type. Expected JsObject but got ${other.getClass.getSimpleName}: ${other.prettyPrint}".invalidNel + } + def mapToJsArray: ErrorOr[JsArray] = jsValue match { + case arr: JsArray => arr.validNel + case other => s"Invalid value type. Expected JsArray but got ${other.getClass.getSimpleName}: ${other.prettyPrint}".invalidNel + } + def mapToJsString: ErrorOr[JsString] = jsValue match { + case str: JsString => str.validNel + case other => s"Invalid value type. Expected JsString but got ${other.getClass.getSimpleName}: ${other.prettyPrint}".invalidNel + } + def mapToJsBoolean: ErrorOr[JsBoolean] = jsValue match { + case boo: JsBoolean => boo.validNel + case other => s"Invalid value type. Expected JsBoolean but got ${other.getClass.getSimpleName}: ${other.prettyPrint}".invalidNel + } + def mapToJsNumber: ErrorOr[JsNumber] = jsValue match { + case boo: JsNumber => boo.validNel + case other => s"Invalid value type. Expected JsNumber but got ${other.getClass.getSimpleName}: ${other.prettyPrint}".invalidNel + } + } } diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorJsonFormatting.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorJsonFormatting.scala new file mode 100644 index 00000000000..11ba8bfc3a4 --- /dev/null +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorJsonFormatting.scala @@ -0,0 +1,27 @@ +package cromwell.engine.workflow.lifecycle.execution.callcaching + +import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{CallDetails, HashDifference, SuccessfulCallCacheDiffResponse} +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import org.apache.commons.lang3.NotImplementedException +import spray.json._ + +object CallCacheDiffActorJsonFormatting extends SprayJsonSupport with DefaultJsonProtocol { + + implicit val callDetailsJsonFormatter = jsonFormat5(CallDetails) + + // Note: This json format is written out longform to get the non-standard Option behavior (the default omits 'None' fields altogether) + implicit val hashDifferenceJsonFormatter = new RootJsonFormat[HashDifference] { + override def write(hashDifference: HashDifference): JsValue = { + def fromOption(opt: Option[String]) = opt.map(JsString.apply).getOrElse(JsNull) + JsObject(Map( + "hashKey" -> JsString(hashDifference.hashKey), + "callA" -> fromOption(hashDifference.callA), + "callB" -> fromOption(hashDifference.callB) + )) + } + override def read(json: JsValue): HashDifference = + throw new NotImplementedException("Programmer Error: No reader for HashDifferentials written. It was not expected to be required") + } + + implicit val successfulResponseJsonFormatter = jsonFormat3(SuccessfulCallCacheDiffResponse) +} diff --git a/engine/src/main/scala/cromwell/webservice/LabelsManagerActor.scala b/engine/src/main/scala/cromwell/webservice/LabelsManagerActor.scala index f3a21135860..b0be24f7248 100644 --- a/engine/src/main/scala/cromwell/webservice/LabelsManagerActor.scala +++ b/engine/src/main/scala/cromwell/webservice/LabelsManagerActor.scala @@ -1,11 +1,11 @@ package cromwell.webservice import akka.actor.{Actor, ActorLogging, ActorRef, Props} -import common.collections.EnhancedCollections._ import cromwell.core._ -import cromwell.core.labels.Labels +import cromwell.core.labels.{Label, Labels} import cromwell.services.metadata.MetadataEvent import cromwell.services.metadata.MetadataService._ +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.BuiltMetadataResponse import cromwell.webservice.LabelsManagerActor._ import spray.json.{DefaultJsonProtocol, JsObject, JsString} @@ -24,13 +24,6 @@ object LabelsManagerActor { sealed trait LabelsResponse extends LabelsMessage - def processLabelsResponse(workflowId: WorkflowId, labels: Map[String, String]): JsObject = { - JsObject(Map( - WorkflowMetadataKeys.Id -> JsString(workflowId.toString), - WorkflowMetadataKeys.Labels -> JsObject(labels safeMapValues JsString.apply) - )) - } - sealed abstract class LabelsManagerActorResponse final case class BuiltLabelsManagerResponse(response: JsObject) extends LabelsManagerActorResponse final case class FailedLabelsManagerResponse(reason: Throwable) extends LabelsManagerActorResponse @@ -59,7 +52,7 @@ class LabelsManagerActor(serviceRegistryActor: ActorRef) extends Actor with Acto At this point in the actor lifecycle, wfId has already been filled out so the .get is safe */ serviceRegistryActor ! GetLabels(wfId.get) - case LabelLookupResponse(id, origLabels) => + case BuiltMetadataResponse(_, jsObject) => /* There's some trickery going on here. We've updated the labels in the metadata store but almost certainly when the store received the GetLabels request above the summarizer will not have been run so our new values are @@ -73,8 +66,16 @@ class LabelsManagerActor(serviceRegistryActor: ActorRef) extends Actor with Acto At this point in the actor lifecycle, newLabels will have been filled in so the .get is safe */ - val updated = origLabels ++ newLabels.get.asMap - target ! BuiltLabelsManagerResponse(processLabelsResponse(id, updated)) + + def replaceOrAddLabel(originalJson: JsObject, label: Label): JsObject = { + val labels = originalJson.fields.get("labels").map(_.asJsObject.fields).getOrElse(Map.empty) + val updatedLabels = labels + (label.key -> JsString(label.value)) + + JsObject(originalJson.fields + ("labels" -> JsObject(updatedLabels))) + } + + val updatedJson = newLabels.get.value.foldLeft(jsObject)(replaceOrAddLabel) + target ! BuiltLabelsManagerResponse(updatedJson) context stop self case f: MetadataServiceFailure => /* diff --git a/engine/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala b/engine/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala index d7215362079..4d8bc77c071 100644 --- a/engine/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala @@ -10,7 +10,6 @@ import cromwell.engine._ import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{StatusCheckResponse, SubsystemStatus} import cromwell.services.metadata.MetadataService._ import cromwell.util.JsonFormatting.WomValueJsonFormatter._ -import cromwell.webservice.metadata.MetadataBuilderActor.BuiltMetadataResponse import cromwell.webservice.routes.CromwellApiService.BackendResponse import spray.json.{DefaultJsonProtocol, JsString, JsValue, JsonFormat, RootJsonFormat} @@ -22,7 +21,6 @@ object WorkflowJsonSupport extends DefaultJsonProtocol { implicit val callOutputResponseProtocol = jsonFormat3(CallOutputResponse) implicit val engineStatsProtocol = jsonFormat2(EngineStatsActor.EngineStats) implicit val BackendResponseFormat = jsonFormat2(BackendResponse) - implicit val BuiltStatusResponseFormat = jsonFormat1(BuiltMetadataResponse) implicit val callAttempt = jsonFormat2(CallAttempt) implicit val workflowOptionsFormatter: JsonFormat[WorkflowOptions] = new JsonFormat[WorkflowOptions] { diff --git a/engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderRegulatorActor.scala b/engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderRegulatorActor.scala deleted file mode 100644 index 530158c8404..00000000000 --- a/engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderRegulatorActor.scala +++ /dev/null @@ -1,57 +0,0 @@ -package cromwell.webservice.metadata - -import akka.actor.{Actor, ActorLogging, ActorRef, Props} -import cromwell.core.Dispatcher.ApiDispatcher -import cromwell.services.metadata.MetadataService.MetadataServiceAction -import cromwell.webservice.metadata.MetadataBuilderActor.MetadataBuilderActorResponse - -import scala.collection.mutable - -class MetadataBuilderRegulatorActor(serviceRegistryActor: ActorRef) extends Actor with ActorLogging { - // This actor tracks all requests coming in from the API service and spins up new builders as needed to service them. - // If the processing of an identical request is already in flight the requester will be added to a set of requesters - // to notify when the response from the first request becomes available. - - // Map from requests (MetadataServiceActions) to requesters. - val apiRequests = new mutable.HashMap[MetadataServiceAction, Set[ActorRef]]() - // Map from ActorRefs of MetadataBuilderActors to requests. When a response comes back from a MetadataBuilderActor its - // ActorRef is used as the lookup key in this Map. The result of that lookup yields the request which in turn is used - // as the lookup key for requesters in the above Map. - val builderRequests = new mutable.HashMap[ActorRef, MetadataServiceAction]() - - override def receive: Receive = { - case action: MetadataServiceAction => - val currentRequesters = apiRequests.getOrElse(action, Set.empty) - apiRequests.put(action, currentRequesters + sender()) - if (currentRequesters.isEmpty) { - val metadataBuilderActor = context.actorOf( - MetadataBuilderActor.props(serviceRegistryActor).withDispatcher(ApiDispatcher), MetadataBuilderActor.uniqueActorName) - builderRequests.put(metadataBuilderActor, action) - metadataBuilderActor ! action - } - case response: MetadataBuilderActorResponse => - val sndr = sender() - builderRequests.get(sndr) match { - case Some(action) => - apiRequests.get(action) match { - case Some(requesters) => - apiRequests.remove(action) - requesters foreach { _ ! response} - case None => - // unpossible: there had to have been a request that corresponded to this response - log.error(s"MetadataBuilderRegulatorActor unpossible error: no requesters found for action: $action") - } - builderRequests.remove(sndr) - () - case None => - // unpossible: this actor should know about all the child MetadataBuilderActors it has begotten - log.error(s"MetadataBuilderRegulatorActor unpossible error: unrecognized sender $sndr") - } - } -} - -object MetadataBuilderRegulatorActor { - def props(serviceRegistryActor: ActorRef): Props = { - Props(new MetadataBuilderRegulatorActor(serviceRegistryActor)) - } -} diff --git a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala index 8b6e662eddc..819ef7a66ce 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/CromwellApiService.scala @@ -3,6 +3,7 @@ package cromwell.webservice.routes import java.util.UUID import akka.actor.{ActorRef, ActorRefFactory} +import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.model._ @@ -22,7 +23,7 @@ import cromwell.core.{path => _, _} import cromwell.engine.backend.BackendConfiguration import cromwell.engine.instrumentation.HttpInstrumentation import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException -import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{BuiltCallCacheDiffResponse, CachedCallNotFoundException, CallCacheDiffActorResponse, FailedCallCacheDiffResponse} +import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor.{CachedCallNotFoundException, CallCacheDiffActorResponse, FailedCallCacheDiffResponse, SuccessfulCallCacheDiffResponse} import cromwell.engine.workflow.lifecycle.execution.callcaching.{CallCacheDiffActor, CallCacheDiffQueryParameter} import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.NotInOnHoldStateException import cromwell.engine.workflow.workflowstore.{WorkflowStoreActor, WorkflowStoreEngineActor, WorkflowStoreSubmitActor} @@ -30,7 +31,7 @@ import cromwell.server.CromwellShutdown import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurrentStatus, StatusCheckResponse} import cromwell.services.metadata.MetadataService._ import cromwell.webservice._ -import cromwell.webservice.metadata.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse, MetadataBuilderActorResponse} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse, MetadataBuilderActorResponse} import cromwell.webservice.WorkflowJsonSupport._ import cromwell.webservice.WebServiceUtils import cromwell.webservice.WebServiceUtils.EnhancedThrowable @@ -88,7 +89,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w case Valid(queryParameter) => val diffActor = actorRefFactory.actorOf(CallCacheDiffActor.props(serviceRegistryActor), "CallCacheDiffActor-" + UUID.randomUUID()) onComplete(diffActor.ask(queryParameter).mapTo[CallCacheDiffActorResponse]) { - case Success(r: BuiltCallCacheDiffResponse) => complete(r.response) + case Success(r: SuccessfulCallCacheDiffResponse) => complete(r) case Success(r: FailedCallCacheDiffResponse) => r.reason.errorRequest(StatusCodes.InternalServerError) case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse case Failure(e: CachedCallNotFoundException) => e.errorRequest(StatusCodes.NotFound) @@ -161,19 +162,19 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w val includeKeys = NonEmptyList.of("start", "end", "executionStatus", "executionEvents", "subWorkflowMetadata") val readMetadataRequest = (w: WorkflowId) => GetSingleWorkflowMetadataAction(w, Option(includeKeys), None, expandSubWorkflows = true) - metadataBuilderRegulatorActor.ask(readMetadataRequest(workflowId)).mapTo[MetadataBuilderActorResponse] + serviceRegistryActor.ask(readMetadataRequest(workflowId)).mapTo[MetadataBuilderActorResponse] } private def completeTimingRouteResponse(metadataResponse: Future[MetadataBuilderActorResponse]) = { onComplete(metadataResponse) { - case Success(r: BuiltMetadataResponse) => { + case Success(r: BuiltMetadataResponse) => + Try(Source.fromResource("workflowTimings/workflowTimings.html").mkString) match { case Success(wfTimingsContent) => - val response = HttpResponse(entity = wfTimingsContent.replace("\"{{REPLACE_THIS_WITH_METADATA}}\"", r.response.toString)) + val response = HttpResponse(entity = wfTimingsContent.replace("\"{{REPLACE_THIS_WITH_METADATA}}\"", r.responseJson.toString)) complete(response.withEntity(response.entity.withContentType(`text/html(UTF-8)`))) case Failure(e) => completeResponse(StatusCodes.InternalServerError, APIResponse.fail(new RuntimeException("Error while loading workflowTimings.html", e)), Seq.empty) } - } case Success(r: FailedMetadataResponse) => r.reason.errorRequest(StatusCodes.InternalServerError) case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse case Failure(e: TimeoutException) => e.failRequest(StatusCodes.ServiceUnavailable) diff --git a/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala index 96a87d14844..0b9e1009300 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala @@ -16,10 +16,9 @@ import cromwell.core.{WorkflowId, path => _} import cromwell.engine.instrumentation.HttpInstrumentation import cromwell.server.CromwellShutdown import cromwell.services.metadata.MetadataService._ +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse, MetadataBuilderActorResponse} import cromwell.webservice.LabelsManagerActor import cromwell.webservice.LabelsManagerActor._ -import cromwell.webservice.metadata.MetadataBuilderRegulatorActor -import cromwell.webservice.metadata.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse, MetadataBuilderActorResponse} import cromwell.webservice.routes.CromwellApiService.{InvalidWorkflowException, UnrecognizedWorkflowException, serviceShuttingDownResponse, validateWorkflowIdInMetadata, validateWorkflowIdInMetadataSummaries} import cromwell.webservice.routes.MetadataRouteSupport._ import cromwell.webservice.WebServiceUtils.EnhancedThrowable @@ -37,27 +36,25 @@ trait MetadataRouteSupport extends HttpInstrumentation { implicit val timeout: Timeout - lazy val metadataBuilderRegulatorActor = actorRefFactory.actorOf(MetadataBuilderRegulatorActor.props(serviceRegistryActor)) - val metadataRoutes = concat( path("workflows" / Segment / Segment / "status") { (_, possibleWorkflowId) => get { instrumentRequest { - metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetStatus(w), serviceRegistryActor, metadataBuilderRegulatorActor) + metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetStatus(w), serviceRegistryActor) } } }, path("workflows" / Segment / Segment / "outputs") { (_, possibleWorkflowId) => get { instrumentRequest { - metadataLookup(possibleWorkflowId, (w: WorkflowId) => WorkflowOutputs(w), serviceRegistryActor, metadataBuilderRegulatorActor) + metadataLookup(possibleWorkflowId, (w: WorkflowId) => WorkflowOutputs(w), serviceRegistryActor) } } }, path("workflows" / Segment / Segment / "logs") { (_, possibleWorkflowId) => get { instrumentRequest { - metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetLogs(w), serviceRegistryActor, metadataBuilderRegulatorActor) + metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetLogs(w), serviceRegistryActor) } } }, @@ -71,8 +68,7 @@ trait MetadataRouteSupport extends HttpInstrumentation { metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetSingleWorkflowMetadataAction(w, includeKeysOption, excludeKeysOption, expandSubWorkflows), - serviceRegistryActor, - metadataBuilderRegulatorActor) + serviceRegistryActor) } } } @@ -81,7 +77,7 @@ trait MetadataRouteSupport extends HttpInstrumentation { concat( get { instrumentRequest { - metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetLabels(w), serviceRegistryActor, metadataBuilderRegulatorActor) + metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetLabels(w), serviceRegistryActor) } }, patch { @@ -131,12 +127,11 @@ trait MetadataRouteSupport extends HttpInstrumentation { object MetadataRouteSupport { def metadataLookup(possibleWorkflowId: String, - request: WorkflowId => ReadAction, - serviceRegistryActor: ActorRef, - metadataBuilderRegulatorActor: ActorRef) + request: WorkflowId => MetadataReadAction, + serviceRegistryActor: ActorRef) (implicit timeout: Timeout, ec: ExecutionContext): Route = { - completeMetadataBuilderResponse(metadataBuilderActorRequest(possibleWorkflowId, request, serviceRegistryActor, metadataBuilderRegulatorActor)) + completeMetadataBuilderResponse(metadataBuilderActorRequest(possibleWorkflowId, request, serviceRegistryActor)) } def queryMetadata(parameters: Seq[(String, String)], @@ -145,17 +140,16 @@ object MetadataRouteSupport { } def metadataBuilderActorRequest(possibleWorkflowId: String, - request: WorkflowId => ReadAction, - serviceRegistryActor: ActorRef, - metadataBuilderRegulatorActor: ActorRef) + request: WorkflowId => MetadataReadAction, + serviceRegistryActor: ActorRef) (implicit timeout: Timeout, ec: ExecutionContext): Future[MetadataBuilderActorResponse] = { - validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap { w => metadataBuilderRegulatorActor.ask(request(w)).mapTo[MetadataBuilderActorResponse] } + validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap { w => serviceRegistryActor.ask(request(w)).mapTo[MetadataBuilderActorResponse] } } def completeMetadataBuilderResponse(response: Future[MetadataBuilderActorResponse]): Route = { onComplete(response) { - case Success(r: BuiltMetadataResponse) => complete(r.response) + case Success(r: BuiltMetadataResponse) => complete(r.responseJson) case Success(r: FailedMetadataResponse) => r.reason.errorRequest(StatusCodes.InternalServerError) case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse case Failure(e: UnrecognizedWorkflowException) => e.failRequest(StatusCodes.NotFound) @@ -167,7 +161,7 @@ object MetadataRouteSupport { def metadataQueryRequest(parameters: Seq[(String, String)], serviceRegistryActor: ActorRef)(implicit timeout: Timeout): Future[MetadataQueryResponse] = { - serviceRegistryActor.ask(WorkflowQuery(parameters)).mapTo[MetadataQueryResponse] + serviceRegistryActor.ask(QueryForWorkflowsMatchingParameters(parameters)).mapTo[MetadataQueryResponse] } def completeMetadataQueryResponse(response: Future[MetadataQueryResponse]): Route = { diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala index a7bfd1709ec..4f6f53ed4e5 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesRouteSupport.scala @@ -6,7 +6,7 @@ import akka.http.scaladsl.server.Route import akka.pattern.{AskTimeoutException, ask} import akka.util.Timeout import cromwell.engine.instrumentation.HttpInstrumentation -import cromwell.services.metadata.MetadataService.{GetStatus, MetadataServiceResponse, StatusLookupFailed, StatusLookupResponse} +import cromwell.services.metadata.MetadataService.{GetStatus, MetadataServiceResponse, StatusLookupFailed} import cromwell.webservice.routes.CromwellApiService.{UnrecognizedWorkflowException, validateWorkflowIdInMetadata} import cromwell.webservice.WebServiceUtils.EnhancedThrowable @@ -19,6 +19,7 @@ import WesRouteSupport._ import cromwell.core.abort.SuccessfulAbortResponse import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException import cromwell.server.CromwellShutdown +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.BuiltMetadataResponse import cromwell.webservice.routes.CromwellApiService trait WesRouteSupport extends HttpInstrumentation { @@ -56,9 +57,9 @@ trait WesRouteSupport extends HttpInstrumentation { val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor).flatMap(w => serviceRegistryActor.ask(GetStatus(w)).mapTo[MetadataServiceResponse]) // WES can also return a 401 or a 403 but that requires user auth knowledge which Cromwell doesn't currently have onComplete(response) { - case Success(s: StatusLookupResponse) => - val wesState = WesState.fromCromwellStatus(s.status) - complete(WesRunStatus(s.workflowId.toString, wesState)) + case Success(BuiltMetadataResponse(_, jsObject)) => + val wesState = WesState.fromCromwellStatusJson(jsObject) + complete(WesRunStatus(possibleWorkflowId, wesState)) case Success(r: StatusLookupFailed) => r.reason.errorRequest(StatusCodes.InternalServerError) case Success(m: MetadataServiceResponse) => // This should never happen, but .... diff --git a/engine/src/main/scala/cromwell/webservice/routes/wes/WesState.scala b/engine/src/main/scala/cromwell/webservice/routes/wes/WesState.scala index 15799ca5a53..3f468156a50 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/wes/WesState.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/wes/WesState.scala @@ -2,7 +2,7 @@ package cromwell.webservice.routes.wes import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import cromwell.core._ -import spray.json.{DefaultJsonProtocol, JsString, JsValue, RootJsonFormat} +import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFormat} object WesState { sealed trait WesState extends Product with Serializable { val name: String } @@ -30,6 +30,15 @@ object WesState { } } + def fromCromwellStatusJson(jsonResponse: JsObject): WesState = { + + val statusString = jsonResponse.fields.get("status").collect { + case str: JsString => str.value + }.getOrElse(throw new IllegalArgumentException(s"Could not coerce Cromwell status response ${jsonResponse.compactPrint} into a valid WES status")) + + fromCromwellStatus(WorkflowState.withName(statusString)) + } + def fromString(status: String): WesState = { status match { case Unknown.name => Unknown diff --git a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorSpec.scala index ac35f6d017e..3b3c6fd6e4b 100644 --- a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCacheDiffActorSpec.scala @@ -5,10 +5,13 @@ import cats.data.NonEmptyList import cromwell.core._ import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActor._ import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffQueryParameter.CallCacheDiffQueryCall -import cromwell.services.metadata.MetadataService.{GetMetadataQueryAction, MetadataLookupResponse, MetadataServiceKeyLookupFailed} -import cromwell.services.metadata._ +import cromwell.services.metadata.MetadataService.GetMetadataAction +import cromwell.services.metadata.{MetadataService, _} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse} import org.scalatest.concurrent.Eventually import org.scalatest.{FlatSpecLike, Matchers} +import spray.json.JsObject class CallCacheDiffActorSpec extends TestKitSuite with FlatSpecLike with Matchers with ImplicitSender with Eventually { @@ -19,70 +22,77 @@ class CallCacheDiffActorSpec extends TestKitSuite with FlatSpecLike with Matcher val callFqnA = "callFqnA" val callFqnB = "callFqnB" - + val metadataJobKeyA = Option(MetadataJobKey(callFqnA, Option(1), 1)) val metadataJobKeyB = Option(MetadataJobKey(callFqnB, None, 1)) - + val callA = CallCacheDiffQueryCall(workflowIdA, callFqnA, Option(1)) val callB = CallCacheDiffQueryCall(workflowIdB, callFqnB, None) val queryA = MetadataQuery( - workflowIdA, - Option(MetadataQueryJobKey(callFqnA, Option(1), None)), - None, - Option(NonEmptyList.of("callCaching", "executionStatus")), - None, + workflowId = workflowIdA, + jobKey = Option(MetadataQueryJobKey(callFqnA, Option(1), None)), + key = None, + includeKeysOption = Option(NonEmptyList.of("callCaching", "executionStatus")), + excludeKeysOption = Option(NonEmptyList.of("callCaching:hitFailures")), expandSubWorkflows = false ) val queryB = MetadataQuery( - workflowIdB, - Option(MetadataQueryJobKey(callFqnB, None, None)), - None, - Option(NonEmptyList.of("callCaching", "executionStatus")), - None, + workflowId = workflowIdB, + jobKey = Option(MetadataQueryJobKey(callFqnB, None, None)), + key = None, + includeKeysOption = Option(NonEmptyList.of("callCaching", "executionStatus")), + excludeKeysOption = Option(NonEmptyList.of("callCaching:hitFailures")), expandSubWorkflows = false ) - + val eventsA = List( MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "executionStatus"), MetadataValue("Done")), MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:allowResultReuse"), MetadataValue(true)), - MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes: hash in only in A"), MetadataValue("hello")), - MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes: hash in A and B with same value"), MetadataValue(1)), - MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes: hash in A and B with different value"), MetadataValue("I'm the hash for A !")) + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes:hash in only in A"), MetadataValue("hello from A")), + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes:hash in A and B with same value"), MetadataValue("we are thinking the same thought")), + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes:hash in A and B with different value"), MetadataValue("I'm the hash for A !")) ) + val workflowMetadataA: JsObject = MetadataBuilderActor.workflowMetadataResponse(workflowIdA, eventsA, includeCallsIfEmpty = false, Map.empty) + val responseForA = BuiltMetadataResponse(MetadataService.GetMetadataAction(queryA), workflowMetadataA) val eventsB = List( MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "executionStatus"), MetadataValue("Failed")), MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "callCaching:allowResultReuse"), MetadataValue(false)), - MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "callCaching:hashes: hash in only in B"), MetadataValue("hello")), - MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "callCaching:hashes: hash in A and B with same value"), MetadataValue(1)), - MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes: hash in A and B with different value"), MetadataValue("I'm the hash for B !")) + MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "callCaching:hashes:hash in only in B"), MetadataValue("hello from B")), + MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "callCaching:hashes:hash in A and B with same value"), MetadataValue("we are thinking the same thought")), + MetadataEvent(MetadataKey(workflowIdB, metadataJobKeyB, "callCaching:hashes:hash in A and B with different value"), MetadataValue("I'm the hash for B !")) ) - + val workflowMetadataB: JsObject = MetadataBuilderActor.workflowMetadataResponse(workflowIdB, eventsB, includeCallsIfEmpty = false, Map.empty) + val responseForB = BuiltMetadataResponse(MetadataService.GetMetadataAction(queryB), workflowMetadataB) + it should "send correct queries to MetadataService when receiving a CallCacheDiffRequest" in { val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) actor ! CallCacheDiffQueryParameter(callA, callB) - mockServiceRegistryActor.expectMsg(GetMetadataQueryAction(queryA)) - mockServiceRegistryActor.expectMsg(GetMetadataQueryAction(queryB)) + mockServiceRegistryActor.expectMsg(GetMetadataAction(queryA)) + mockServiceRegistryActor.expectMsg(GetMetadataAction(queryB)) + + system.stop(actor) } it should "save response for callA and wait for callB" in { val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) - + actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - val response = MetadataLookupResponse(queryA, eventsA) - actor ! response - + actor ! responseForA + eventually { - actor.stateData shouldBe CallCacheDiffWithRequest(queryA, queryB, Some(response), None, self) + actor.stateData shouldBe CallCacheDiffWithRequest(queryA, queryB, Some(WorkflowMetadataJson(workflowMetadataA)), None, self) actor.stateName shouldBe WaitingForMetadata } + + system.stop(actor) } it should "save response for callB and wait for callA" in { @@ -91,24 +101,24 @@ class CallCacheDiffActorSpec extends TestKitSuite with FlatSpecLike with Matcher actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - val response = MetadataLookupResponse(queryB, eventsB) - actor ! response + actor ! responseForB eventually { - actor.stateData shouldBe CallCacheDiffWithRequest(queryA, queryB, None, Some(response), self) + actor.stateData shouldBe CallCacheDiffWithRequest(queryA, queryB, None, Some(WorkflowMetadataJson(workflowMetadataB)), self) actor.stateName shouldBe WaitingForMetadata } + + system.stop(actor) } it should "build the response when receiving response for A and already has B" in { val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) watch(actor) - val responseB = MetadataLookupResponse(queryB, eventsB) - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, Option(responseB), self)) + actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, Some(WorkflowMetadataJson(workflowMetadataB)), self)) - actor ! MetadataLookupResponse(queryA, eventsA) + actor ! responseForA expectMsgClass(classOf[CallCacheDiffActorResponse]) expectTerminated(actor) @@ -118,150 +128,192 @@ class CallCacheDiffActorSpec extends TestKitSuite with FlatSpecLike with Matcher val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) watch(actor) - val responseA = MetadataLookupResponse(queryA, eventsA) - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, Option(responseA), None, self)) + actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, Some(WorkflowMetadataJson(workflowMetadataA)), None, self)) - actor ! MetadataLookupResponse(queryB, eventsB) + actor ! responseForB expectMsgClass(classOf[CallCacheDiffActorResponse]) expectTerminated(actor) } - it should "build a correct response" in { - import cromwell.services.metadata.MetadataService.MetadataLookupResponse + val correctCallCacheDiff = { import spray.json._ - - val mockServiceRegistryActor = TestProbe() - val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) - watch(actor) - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - actor ! MetadataLookupResponse(queryB, eventsB) - actor ! MetadataLookupResponse(queryA, eventsA) - - val expectedJson: JsObject = - s""" - |{ - | "callA":{ - | "executionStatus": "Done", - | "allowResultReuse": true, - | "callFqn": "callFqnA", - | "jobIndex": 1, - | "workflowId": "971652a6-139c-4ef3-96b5-aeb611a40dbf" - | }, - | "callB":{ - | "executionStatus": "Failed", - | "allowResultReuse": false, - | "callFqn": "callFqnB", - | "jobIndex": -1, - | "workflowId": "bb85b3ec-e179-4f12-b90f-5191216da598" - | }, - | "hashDifferential":[ - | { - | "hashKey": "hash in only in A", - | "callA":"hello", - | "callB":null - | }, - | { - | "hashKey": "hash in A and B with different value", - | "callA":"I'm the hash for A !", - | "callB":"I'm the hash for B !" - | }, - | { - | "hashKey": "hash in only in B", - | "callA":null, - | "callB":"hello" - | } - | ] - |} + s""" + |{ + | "callA":{ + | "executionStatus": "Done", + | "allowResultReuse": true, + | "callFqn": "callFqnA", + | "jobIndex": 1, + | "workflowId": "971652a6-139c-4ef3-96b5-aeb611a40dbf" + | }, + | "callB":{ + | "executionStatus": "Failed", + | "allowResultReuse": false, + | "callFqn": "callFqnB", + | "jobIndex": -1, + | "workflowId": "bb85b3ec-e179-4f12-b90f-5191216da598" + | }, + | "hashDifferential":[ + | { + | "hashKey": "hash in only in A", + | "callA":"hello from A", + | "callB":null + | }, + | { + | "hashKey": "hash in A and B with different value", + | "callA":"I'm the hash for A !", + | "callB":"I'm the hash for B !" + | }, + | { + | "hashKey": "hash in only in B", + | "callA":null, + | "callB":"hello from B" + | } + | ] + |} """.stripMargin.parseJson.asJsObject - - val expectedResponse = BuiltCallCacheDiffResponse(expectedJson) - - expectMsg(expectedResponse) - expectTerminated(actor) } - it should "fail properly" in { - import scala.concurrent.duration._ - import scala.language.postfixOps + it should "build a correct response" in { + import spray.json._ + import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) watch(actor) - val exception = new Exception("Query lookup failed - but it's ok ! this is a test !") - val responseA = MetadataServiceKeyLookupFailed(queryA, exception) - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - actor ! responseA - - expectMsgPF(1 second) { - case FailedCallCacheDiffResponse(e: Throwable) => - e.getMessage shouldBe "Query lookup failed - but it's ok ! this is a test !" + actor ! responseForB + actor ! responseForA + + expectMsgPF() { + case r: SuccessfulCallCacheDiffResponse => + withClue(s""" + |Expected: + |${correctCallCacheDiff.prettyPrint} + | + |Actual: + |${r.toJson.prettyPrint}""".stripMargin) { + r.toJson should be(correctCallCacheDiff) + } + case other => fail(s"Expected SuccessfulCallCacheDiffResponse but got $other") } - expectTerminated(actor) } - it should "Respond with an appropriate message if hashes are missing" in { - import scala.concurrent.duration._ - import scala.language.postfixOps + it should "build a correct response from multiple attempts" in { + import spray.json._ + import cromwell.engine.workflow.lifecycle.execution.callcaching.CallCacheDiffActorJsonFormatting.successfulResponseJsonFormatter val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) watch(actor) - val responseB = MetadataLookupResponse(queryB, eventsB.filterNot(_.key.key.contains("hashes"))) - - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, Option(responseB), self)) - - actor ! MetadataLookupResponse(queryA, eventsA.filterNot(_.key.key.contains("hashes"))) + actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - expectMsgPF(1 second) { - case FailedCallCacheDiffResponse(e) => - e.getMessage shouldBe "callA and callB have not finished yet, or were run on a previous version of Cromwell on which this endpoint was not supported." + // Create a set of "failed" events for attempt 1: + val eventsAAttempt1 = List( + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "executionStatus"), MetadataValue("Failed")), + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:allowResultReuse"), MetadataValue(false)), + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes:hash in only in A"), MetadataValue("ouch!")), + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes:hash in A and B with same value"), MetadataValue("ouch!")), + MetadataEvent(MetadataKey(workflowIdA, metadataJobKeyA, "callCaching:hashes:hash in A and B with different value"), MetadataValue("ouch!")) + ) + // And update the old "eventsA" to represent attempt 2: + val eventsAAttempt2 = eventsA.map(event => event.copy(key = event.key.copy(jobKey = event.key.jobKey.map(_.copy(attempt = 2))))) + + val modifiedEventsA = eventsAAttempt1 ++ eventsAAttempt2 + + val workflowMetadataA: JsObject = MetadataBuilderActor.workflowMetadataResponse(workflowIdA, modifiedEventsA, includeCallsIfEmpty = false, Map.empty) + val responseForA = BuiltMetadataResponse(MetadataService.GetMetadataAction(queryA), workflowMetadataA) + + + actor ! responseForB + actor ! responseForA + + expectMsgPF() { + case r: SuccessfulCallCacheDiffResponse => + withClue(s""" + |Expected: + |${correctCallCacheDiff.prettyPrint} + | + |Actual: + |${r.toJson.prettyPrint}""".stripMargin) { + r.toJson should be(correctCallCacheDiff) + } + case other => + expectTerminated(actor) + fail(s"Expected SuccessfulCallCacheDiffResponse but got $other") } + expectTerminated(actor) } - it should "Respond with CachedCallNotFoundException if a call is missing" in { + it should "fail properly" in { import scala.concurrent.duration._ import scala.language.postfixOps val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) watch(actor) - val responseB = MetadataLookupResponse(queryB, eventsB.filterNot(_.key.key.contains("hashes"))) + val exception = new Exception("Query lookup failed - but it's ok ! this is a test !") + val responseA = FailedMetadataResponse(GetMetadataAction(queryA), exception) - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, Option(responseB), self)) + actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - actor ! MetadataLookupResponse(queryA, List.empty) + actor ! responseA expectMsgPF(1 second) { - case FailedCallCacheDiffResponse(e) => - e.getMessage shouldBe "Cannot find call 971652a6-139c-4ef3-96b5-aeb611a40dbf:callFqnA:1" + case FailedCallCacheDiffResponse(e: Throwable) => + e.getMessage shouldBe "Query lookup failed - but it's ok ! this is a test !" } + expectTerminated(actor) } - it should "Respond with CachedCallNotFoundException if both calls are missing" in { + it should "respond with an appropriate error if calls' hashes are missing" in { + testExpectedErrorForModifiedMetadata( + metadataFilter = _.key.key.contains("hashes"), + error = s"""Failed to calculate diff for call A and call B: + |Failed to extract relevant metadata for call A (971652a6-139c-4ef3-96b5-aeb611a40dbf / callFqnA:1) (reason 1 of 1): No 'hashes' field found + |Failed to extract relevant metadata for call B (bb85b3ec-e179-4f12-b90f-5191216da598 / callFqnB:-1) (reason 1 of 1): No 'hashes' field found""".stripMargin + ) + } + + it should "respond with an appropriate error if both calls are missing" in { + testExpectedErrorForModifiedMetadata( + metadataFilter = _.key.jobKey.nonEmpty, + error = s"""Failed to calculate diff for call A and call B: + |Failed to extract relevant metadata for call A (971652a6-139c-4ef3-96b5-aeb611a40dbf / callFqnA:1) (reason 1 of 1): No 'calls' field found + |Failed to extract relevant metadata for call B (bb85b3ec-e179-4f12-b90f-5191216da598 / callFqnB:-1) (reason 1 of 1): No 'calls' field found""".stripMargin + ) + } + + def testExpectedErrorForModifiedMetadata(metadataFilter: MetadataEvent => Boolean, error: String) = { import scala.concurrent.duration._ import scala.language.postfixOps val mockServiceRegistryActor = TestProbe() val actor = TestFSMRef(new CallCacheDiffActor(mockServiceRegistryActor.ref)) watch(actor) - val responseB = MetadataLookupResponse(queryB, List.empty) - actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, Option(responseB), self)) + def getModifiedResponse(workflowId: WorkflowId, query: MetadataQuery, events: Seq[MetadataEvent]): BuiltMetadataResponse = { + val modifiedEvents = events.filterNot(metadataFilter) // filters out any "call" level metadata + val modifiedWorkflowMetadata = MetadataBuilderActor.workflowMetadataResponse(workflowId, modifiedEvents, includeCallsIfEmpty = false, Map.empty) + BuiltMetadataResponse(MetadataService.GetMetadataAction(query), modifiedWorkflowMetadata) + } + + actor.setState(WaitingForMetadata, CallCacheDiffWithRequest(queryA, queryB, None, None, self)) - actor ! MetadataLookupResponse(queryA, List.empty) + actor ! getModifiedResponse(workflowIdA, queryA, eventsA) + actor ! getModifiedResponse(workflowIdB, queryB, eventsB) expectMsgPF(1 second) { - case FailedCallCacheDiffResponse(e) => - e.getMessage shouldBe "Cannot find calls 971652a6-139c-4ef3-96b5-aeb611a40dbf:callFqnA:1, bb85b3ec-e179-4f12-b90f-5191216da598:callFqnB:-1" + case FailedCallCacheDiffResponse(e) => e.getMessage shouldBe error } expectTerminated(actor) } + } diff --git a/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala b/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala index 5306fffa8f1..694e4f9d45d 100644 --- a/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala @@ -9,12 +9,13 @@ import akka.util.Timeout import cromwell.core._ import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata._ -import cromwell.webservice.metadata.MetadataBuilderActor -import cromwell.webservice.metadata.MetadataBuilderActor.{BuiltMetadataResponse, MetadataBuilderActorResponse} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, MetadataBuilderActorResponse} import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Assertion, AsyncFlatSpecLike, Matchers, Succeeded} import org.specs2.mock.Mockito import spray.json._ +import cromwell.util.AkkaTestUtil.EnhancedTestProbe import scala.concurrent.Future import scala.concurrent.duration._ @@ -33,13 +34,16 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp queryReply: MetadataQuery, events: Seq[MetadataEvent], expectedRes: String): Future[Assertion] = { - val mockServiceRegistry = TestProbe() - val mba = system.actorOf(MetadataBuilderActor.props(mockServiceRegistry.ref)) + val mockReadMetadataWorkerActor = TestProbe() + def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props + + + val mba = system.actorOf(MetadataBuilderActor.props(readMetadataWorkerMaker)) val response = mba.ask(action).mapTo[MetadataBuilderActorResponse] - mockServiceRegistry.expectMsg(defaultTimeout, action) - mockServiceRegistry.reply(MetadataLookupResponse(queryReply, events)) + mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action) + mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryReply, events)) response map { r => r shouldBe a [BuiltMetadataResponse] } - response.mapTo[BuiltMetadataResponse] map { b => b.response shouldBe expectedRes.parseJson} + response.mapTo[BuiltMetadataResponse] map { b => b.responseJson shouldBe expectedRes.parseJson} } it should "build workflow scope tree from metadata events" in { @@ -96,7 +100,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp |}""".stripMargin val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false) - val queryAction = GetMetadataQueryAction(mdQuery) + val queryAction = GetMetadataAction(mdQuery) assertMetadataResponse(queryAction, mdQuery, workflowAEvents, expectedRes) } @@ -350,7 +354,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp """.stripMargin val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) - val queryAction = GetMetadataQueryAction(mdQuery) + val queryAction = GetMetadataAction(mdQuery) assertMetadataResponse(queryAction, mdQuery, events, expectedResponse) } @@ -371,7 +375,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp """.stripMargin val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) - val queryAction = GetMetadataQueryAction(mdQuery) + val queryAction = GetMetadataAction(mdQuery) assertMetadataResponse(queryAction, mdQuery, events, expectedResponse) } @@ -391,14 +395,14 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp """.stripMargin val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) - val queryAction = GetMetadataQueryAction(mdQuery) + val queryAction = GetMetadataAction(mdQuery) assertMetadataResponse(queryAction, mdQuery, events, expectedResponse) } it should "render empty Json" in { val workflowId = WorkflowId.randomId() val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) - val queryAction = GetMetadataQueryAction(mdQuery) + val queryAction = GetMetadataAction(mdQuery) val expectedEmptyResponse = """{}""" assertMetadataResponse(queryAction, mdQuery, List.empty, expectedEmptyResponse) } @@ -428,7 +432,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp """.stripMargin val mdQuery = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) - val queryAction = GetMetadataQueryAction(mdQuery) + val queryAction = GetMetadataAction(mdQuery) assertMetadataResponse(queryAction, mdQuery, emptyEvents, expectedEmptyResponse) val expectedNonEmptyResponse = @@ -456,20 +460,22 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp ) val mainQuery = MetadataQuery(mainWorkflowId, None, None, None, None, expandSubWorkflows = true) - val mainQueryAction = GetMetadataQueryAction(mainQuery) + val mainQueryAction = GetMetadataAction(mainQuery) val subQuery = MetadataQuery(subWorkflowId, None, None, None, None, expandSubWorkflows = true) - val subQueryAction = GetMetadataQueryAction(subQuery) + val subQueryAction = GetMetadataAction(subQuery) val parentProbe = TestProbe() - val mockServiceRegistry = TestProbe() - val metadataBuilder = TestActorRef(MetadataBuilderActor.props(mockServiceRegistry.ref), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}") + val mockReadMetadataWorkerActor = TestProbe() + def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props + + val metadataBuilder = TestActorRef(MetadataBuilderActor.props(readMetadataWorkerMaker), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}") val response = metadataBuilder.ask(mainQueryAction).mapTo[MetadataBuilderActorResponse] - mockServiceRegistry.expectMsg(defaultTimeout, mainQueryAction) - mockServiceRegistry.reply(MetadataLookupResponse(mainQuery, mainEvents)) - mockServiceRegistry.expectMsg(defaultTimeout, subQueryAction) - mockServiceRegistry.reply(MetadataLookupResponse(subQuery, subEvents)) + mockReadMetadataWorkerActor.expectMsg(defaultTimeout, mainQueryAction) + mockReadMetadataWorkerActor.reply(MetadataLookupResponse(mainQuery, mainEvents)) + mockReadMetadataWorkerActor.expectMsg(defaultTimeout, subQueryAction) + mockReadMetadataWorkerActor.reply(MetadataLookupResponse(subQuery, subEvents)) val expandedRes = s""" @@ -493,7 +499,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp response map { r => r shouldBe a [BuiltMetadataResponse] } val bmr = response.mapTo[BuiltMetadataResponse] - bmr map { b => b.response shouldBe expandedRes.parseJson} + bmr map { b => b.responseJson shouldBe expandedRes.parseJson} } it should "NOT expand sub workflow metadata when NOT asked for" in { @@ -505,15 +511,17 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp ) val queryNoExpand = MetadataQuery(mainWorkflowId, None, None, None, None, expandSubWorkflows = false) - val queryNoExpandAction = GetMetadataQueryAction(queryNoExpand) + val queryNoExpandAction = GetMetadataAction(queryNoExpand) val parentProbe = TestProbe() - val mockServiceRegistry = TestProbe() - val metadataBuilder = TestActorRef(MetadataBuilderActor.props(mockServiceRegistry.ref), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}") + val mockReadMetadataWorkerActor = TestProbe() + def readMetadataWorkerMaker= () => mockReadMetadataWorkerActor.props + + val metadataBuilder = TestActorRef(MetadataBuilderActor.props(readMetadataWorkerMaker), parentProbe.ref, s"MetadataActor-${UUID.randomUUID()}") val response = metadataBuilder.ask(queryNoExpandAction).mapTo[MetadataBuilderActorResponse] - mockServiceRegistry.expectMsg(defaultTimeout, queryNoExpandAction) - mockServiceRegistry.reply(MetadataLookupResponse(queryNoExpand, mainEvents)) + mockReadMetadataWorkerActor.expectMsg(defaultTimeout, queryNoExpandAction) + mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryNoExpand, mainEvents)) val nonExpandedRes = @@ -534,7 +542,7 @@ class MetadataBuilderActorSpec extends TestKitSuite("Metadata") with AsyncFlatSp response map { r => r shouldBe a [BuiltMetadataResponse] } val bmr = response.mapTo[BuiltMetadataResponse] - bmr map { b => b.response shouldBe nonExpandedRes.parseJson} + bmr map { b => b.responseJson shouldBe nonExpandedRes.parseJson} } diff --git a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala index a805774e1a6..e819671e81b 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala @@ -19,6 +19,8 @@ import cromwell.services.healthmonitor.ProtoHealthMonitorServiceActor.{GetCurren import cromwell.services.instrumentation.InstrumentationService.InstrumentationServiceMessage import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata._ +import cromwell.services.metadata.impl.builder.MetadataBuilderActor +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.BuiltMetadataResponse import cromwell.services.womtool.WomtoolServiceMessages.{DescribeFailure, DescribeRequest, DescribeSuccess} import cromwell.services.womtool.models.WorkflowDescription import cromwell.util.SampleWdl.HelloWorld @@ -543,11 +545,13 @@ object CromwellApiServiceSpec { ) } - def responseMetadataValues(workflowId: WorkflowId, withKeys: List[String], withoutKeys: List[String]) = { + def responseMetadataValues(workflowId: WorkflowId, withKeys: List[String], withoutKeys: List[String]): JsObject = { def keyFilter(keys: List[String])(m: MetadataEvent) = keys.exists(k => m.key.key.startsWith(k)) - fullMetadataResponse(workflowId) + val events = fullMetadataResponse(workflowId) .filter(m => withKeys.isEmpty || keyFilter(withKeys)(m)) .filter(m => withoutKeys.isEmpty || !keyFilter(withoutKeys)(m)) + + MetadataBuilderActor.workflowMetadataResponse(workflowId, events, includeCallsIfEmpty = false, Map.empty) } def metadataQuery(workflowId: WorkflowId) = @@ -565,7 +569,7 @@ object CromwellApiServiceSpec { import MockServiceRegistryActor._ override def receive = { - case WorkflowQuery(parameters) => + case QueryForWorkflowsMatchingParameters(parameters) => val labels: Option[Map[String, String]] = { parameters.contains(("additionalQueryResultFields", "labels")).option( Map("key1" -> "label1", "key2" -> "label2")) @@ -585,22 +589,28 @@ object CromwellApiServiceSpec { ok = true, systems = Map( "Engine Database" -> SubsystemStatus(ok = true, messages = None))) - case GetStatus(id) if id == OnHoldWorkflowId => sender ! StatusLookupResponse(id, WorkflowOnHold) - case GetStatus(id) if id == RunningWorkflowId => sender ! StatusLookupResponse(id, WorkflowRunning) - case GetStatus(id) if id == AbortingWorkflowId => sender ! StatusLookupResponse(id, WorkflowAborting) - case GetStatus(id) if id == AbortedWorkflowId => sender ! StatusLookupResponse(id, WorkflowAborted) - case GetStatus(id) if id == SucceededWorkflowId => sender ! StatusLookupResponse(id, WorkflowSucceeded) - case GetStatus(id) if id == FailedWorkflowId => sender ! StatusLookupResponse(id, WorkflowFailed) - case GetStatus(id) => sender ! StatusLookupResponse(id, WorkflowSubmitted) - case GetLabels(id) => sender ! LabelLookupResponse(id, Map("key1" -> "label1", "key2" -> "label2")) - case WorkflowOutputs(id) => + case request @ GetStatus(id) => + val status = id match { + case OnHoldWorkflowId => WorkflowOnHold + case RunningWorkflowId => WorkflowRunning + case AbortingWorkflowId => WorkflowAborting + case AbortedWorkflowId => WorkflowAborted + case SucceededWorkflowId => WorkflowSucceeded + case FailedWorkflowId => WorkflowFailed + case _ => WorkflowSubmitted + } + sender ! BuiltMetadataResponse(request, MetadataBuilderActor.processStatusResponse(id, status)) + case request @ GetLabels(id) => + sender ! BuiltMetadataResponse(request, MetadataBuilderActor.processLabelsResponse(id, Map("key1" -> "label1", "key2" -> "label2"))) + case request @ WorkflowOutputs(id) => val event = Vector(MetadataEvent(MetadataKey(id, None, "outputs:test.hello.salutation"), MetadataValue("Hello foo!", MetadataString))) - sender ! WorkflowOutputsResponse(id, event) - case GetLogs(id) => sender ! LogsResponse(id, logsEvents(id)) - case GetSingleWorkflowMetadataAction(id, withKeys, withoutKeys, _) => + sender ! BuiltMetadataResponse(request, MetadataBuilderActor.processOutputsResponse(id, event)) + case request @ GetLogs(id) => + sender ! BuiltMetadataResponse(request, MetadataBuilderActor.workflowMetadataResponse(id, logsEvents(id), includeCallsIfEmpty = false, Map.empty)) + case request @ GetMetadataAction(MetadataQuery(id, _, _, withKeys, withoutKeys, _)) => val withKeysList = withKeys.map(_.toList).getOrElse(List.empty) val withoutKeysList = withoutKeys.map(_.toList).getOrElse(List.empty) - sender ! MetadataLookupResponse(metadataQuery(id), responseMetadataValues(id, withKeysList, withoutKeysList)) + sender ! BuiltMetadataResponse(request, responseMetadataValues(id, withKeysList, withoutKeysList)) case PutMetadataActionAndRespond(events, _, _) => events.head.key.workflowId match { case CromwellApiServiceSpec.ExistingWorkflowId => sender ! MetadataWriteSuccess(events) diff --git a/server/src/test/scala/cromwell/CromwellTestKitSpec.scala b/server/src/test/scala/cromwell/CromwellTestKitSpec.scala index 68bb814faa2..9eb606a2d76 100644 --- a/server/src/test/scala/cromwell/CromwellTestKitSpec.scala +++ b/server/src/test/scala/cromwell/CromwellTestKitSpec.scala @@ -27,8 +27,7 @@ import cromwell.services.ServiceRegistryActor import cromwell.services.metadata.MetadataService._ import cromwell.subworkflowstore.EmptySubWorkflowStoreActor import cromwell.util.SampleWdl -import cromwell.webservice.metadata.MetadataBuilderActor -import cromwell.webservice.metadata.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse, MetadataBuilderActorResponse} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.{BuiltMetadataResponse, FailedMetadataResponse, MetadataBuilderActorResponse} import org.scalactic.Equality import org.scalatest._ import org.scalatest.concurrent.{Eventually, ScalaFutures} @@ -47,6 +46,7 @@ case class OutputNotFoundException(outputFqn: String, actualOutputs: String) ext case class LogNotFoundException(log: String) extends RuntimeException(s"Expected log $log was not found") object CromwellTestKitSpec { + val ConfigText = """ |akka { @@ -158,6 +158,7 @@ object CromwellTestKitSpec { } } + /** * Special case for validating outputs. Used when the test wants to check that an output exists, but doesn't care what * the actual value was. @@ -327,7 +328,9 @@ abstract class CromwellTestKitSpec(val twms: TestWorkflowManagerSystem = default labels = customLabels ) val workflowId = rootActor.underlyingActor.submitWorkflow(sources) - eventually { verifyWorkflowState(rootActor.underlyingActor.serviceRegistryActor, workflowId, terminalState) } (config = patienceConfig, pos = implicitly[org.scalactic.source.Position]) + eventually { verifyWorkflowComplete(rootActor.underlyingActor.serviceRegistryActor, workflowId) } (config = patienceConfig, pos = implicitly[org.scalactic.source.Position]) + verifyWorkflowState(rootActor.underlyingActor.serviceRegistryActor, workflowId, terminalState) + val outcome = getWorkflowOutputsFromMetadata(workflowId, rootActor.underlyingActor.serviceRegistryActor) system.stop(rootActor) // And return the outcome: @@ -348,7 +351,8 @@ abstract class CromwellTestKitSpec(val twms: TestWorkflowManagerSystem = default val sources = sampleWdl.asWorkflowSources(runtime, workflowOptions) val workflowId = rootActor.underlyingActor.submitWorkflow(sources) - eventually { verifyWorkflowState(rootActor.underlyingActor.serviceRegistryActor, workflowId, terminalState) } (config = patienceConfig, pos = implicitly[org.scalactic.source.Position]) + eventually { verifyWorkflowComplete(rootActor.underlyingActor.serviceRegistryActor, workflowId) } (config = patienceConfig, pos = implicitly[org.scalactic.source.Position]) + verifyWorkflowState(rootActor.underlyingActor.serviceRegistryActor, workflowId, WorkflowSucceeded) val outputs = getWorkflowOutputsFromMetadata(workflowId, rootActor.underlyingActor.serviceRegistryActor) val actualOutputNames = outputs.keys mkString ", " @@ -369,32 +373,38 @@ abstract class CromwellTestKitSpec(val twms: TestWorkflowManagerSystem = default workflowId } + private def getWorkflowState(workflowId: WorkflowId, serviceRegistryActor: ActorRef)(implicit ec: ExecutionContext): WorkflowState = { + val statusResponse = serviceRegistryActor.ask(GetStatus(workflowId))(TimeoutDuration).collect { + case BuiltMetadataResponse(_, jsObject) => WorkflowState.withName(jsObject.fields("status").asInstanceOf[JsString].value) + case f => throw new RuntimeException(s"Unexpected status response for $workflowId: $f") + } + Await.result(statusResponse, TimeoutDuration) + } + /** - * Verifies that a state is correct. // TODO: There must be a better way...? + * Verifies that a workflow is complete */ - protected def verifyWorkflowState(serviceRegistryActor: ActorRef, workflowId: WorkflowId, expectedState: WorkflowState)(implicit ec: ExecutionContext): Unit = { - def getWorkflowState(workflowId: WorkflowId, serviceRegistryActor: ActorRef)(implicit ec: ExecutionContext): WorkflowState = { - val statusResponse = serviceRegistryActor.ask(GetStatus(workflowId))(TimeoutDuration).collect { - case StatusLookupResponse(_, state) => state - case f => throw new RuntimeException(s"Unexpected status response for $workflowId: $f") - } - Await.result(statusResponse, TimeoutDuration) - } + protected def verifyWorkflowComplete(serviceRegistryActor: ActorRef, workflowId: WorkflowId)(implicit ec: ExecutionContext): Unit = { + List(WorkflowSucceeded, WorkflowFailed, WorkflowAborted) should contain(getWorkflowState(workflowId, serviceRegistryActor)) + () + } + /** + * Verifies that a state is correct. + */ + protected def verifyWorkflowState(serviceRegistryActor: ActorRef, workflowId: WorkflowId, expectedState: WorkflowState)(implicit ec: ExecutionContext): Unit = { getWorkflowState(workflowId, serviceRegistryActor) should equal (expectedState) () } private def getWorkflowOutputsFromMetadata(id: WorkflowId, serviceRegistryActor: ActorRef): Map[FullyQualifiedName, WomValue] = { - val mba = system.actorOf(MetadataBuilderActor.props(serviceRegistryActor)) - val response = mba.ask(WorkflowOutputs(id)).mapTo[MetadataBuilderActorResponse] collect { - case BuiltMetadataResponse(r) => r - case FailedMetadataResponse(e) => throw e + + val response = serviceRegistryActor.ask(WorkflowOutputs(id)).mapTo[MetadataBuilderActorResponse] collect { + case BuiltMetadataResponse(_, r) => r + case FailedMetadataResponse(_, e) => throw e } val jsObject = Await.result(response, TimeoutDuration) - system.stop(mba) - jsObject.getFields(WorkflowMetadataKeys.Outputs).toList match { case head::_ => head.asInstanceOf[JsObject].fields.map( x => (x._1, jsValueToWdlValue(x._2))) case _ => Map.empty diff --git a/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala b/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala index d512006fed0..4626977b069 100644 --- a/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala +++ b/server/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala @@ -1,6 +1,7 @@ package cromwell.engine import java.time.OffsetDateTime +import java.util.UUID import akka.testkit._ import cats.data.{NonEmptyList, NonEmptyVector} @@ -20,8 +21,8 @@ import cromwell.engine.workflow.workflowstore._ import cromwell.services.EngineServicesStore import cromwell.services.ServicesStore.EnhancedSqlDatabase import cromwell.services.metadata.MetadataQuery -import cromwell.services.metadata.MetadataService.{GetMetadataQueryAction, MetadataLookupResponse} -import cromwell.services.metadata.impl.ReadMetadataActor +import cromwell.services.metadata.MetadataService.{GetMetadataAction, MetadataLookupResponse} +import cromwell.services.metadata.impl.ReadDatabaseMetadataWorkerActor import cromwell.util.EncryptionSpec import cromwell.util.SampleWdl.HelloWorld import cromwell.{CromwellTestKitSpec, CromwellTestKitWordSpec} @@ -185,10 +186,6 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor ), "WorkflowStoreActor-FetchEncryptedWorkflowOptions" ) - val readMetadataActor = system.actorOf( - ReadMetadataActor.props(metadataReadTimeout = 30 seconds), - "ReadMetadataActor-FetchEncryptedOptions" - ) storeActor ! BatchSubmitWorkflows(NonEmptyList.of(optionedSourceFiles)) val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList @@ -213,8 +210,14 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with CoordinatedWor Seq("iv", "ciphertext") // We need to wait for workflow metadata to be flushed before we can successfully query for it - eventually(timeout(15 seconds), interval(5 seconds)) { - readMetadataActor ! GetMetadataQueryAction(MetadataQuery.forWorkflow(id)) + eventually(timeout(15.seconds.dilated), interval(500.millis.dilated)) { + val actorNameUniquificationString = UUID.randomUUID().toString.take(7) + val readMetadataActor = system.actorOf( + ReadDatabaseMetadataWorkerActor.props(metadataReadTimeout = 30 seconds), + s"ReadMetadataActor-FetchEncryptedOptions-$actorNameUniquificationString" + ) + + readMetadataActor ! GetMetadataAction(MetadataQuery.forWorkflow(id)) expectMsgPF(10 seconds) { case MetadataLookupResponse(_, eventList) => val optionsEvent = eventList.find(_.key.key == "submittedFiles:options").get diff --git a/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala b/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala index 2a91afea3e3..d3264b839c3 100644 --- a/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala +++ b/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala @@ -94,7 +94,9 @@ object MetadataQueryJobKey { def forMetadataJobKey(jobKey: MetadataJobKey) = MetadataQueryJobKey(jobKey.callFqn, jobKey.index, Option(jobKey.attempt)) } -case class MetadataQuery(workflowId: WorkflowId, jobKey: Option[MetadataQueryJobKey], key: Option[String], +case class MetadataQuery(workflowId: WorkflowId, + jobKey: Option[MetadataQueryJobKey], + key: Option[String], includeKeysOption: Option[NonEmptyList[String]], excludeKeysOption: Option[NonEmptyList[String]], expandSubWorkflows: Boolean) diff --git a/services/src/main/scala/cromwell/services/metadata/MetadataService.scala b/services/src/main/scala/cromwell/services/metadata/MetadataService.scala index 9fb078de984..bad99484d91 100644 --- a/services/src/main/scala/cromwell/services/metadata/MetadataService.scala +++ b/services/src/main/scala/cromwell/services/metadata/MetadataService.scala @@ -2,6 +2,7 @@ package cromwell.services.metadata import java.time.OffsetDateTime +import io.circe.Json import akka.actor.ActorRef import cats.data.NonEmptyList import cromwell.core._ @@ -12,7 +13,6 @@ import wom.values._ import scala.util.Random - object MetadataService { final val MetadataServiceName = "MetadataService" @@ -38,7 +38,12 @@ object MetadataService { trait MetadataServiceAction extends MetadataServiceMessage with ServiceRegistryMessage { def serviceName = MetadataServiceName } - trait ReadAction extends MetadataServiceAction + trait MetadataReadAction extends MetadataServiceAction + + trait WorkflowMetadataReadAction extends MetadataReadAction { + def workflowId: WorkflowId + } + object PutMetadataAction { def apply(event: MetadataEvent, others: MetadataEvent*) = new PutMetadataAction(List(event) ++ others) } @@ -82,17 +87,25 @@ object MetadataService { final case object ListenToMetadataWriteActor extends MetadataServiceAction with ListenToMessage - final case class GetSingleWorkflowMetadataAction(workflowId: WorkflowId, - includeKeysOption: Option[NonEmptyList[String]], - excludeKeysOption: Option[NonEmptyList[String]], - expandSubWorkflows: Boolean) - extends ReadAction - final case class GetMetadataQueryAction(key: MetadataQuery) extends ReadAction - final case class GetStatus(workflowId: WorkflowId) extends ReadAction - final case class GetLabels(workflowId: WorkflowId) extends ReadAction - final case class WorkflowQuery(parameters: Seq[(String, String)]) extends ReadAction - final case class WorkflowOutputs(workflowId: WorkflowId) extends ReadAction - final case class GetLogs(workflowId: WorkflowId) extends ReadAction + // Utility object to get GetMetadataAction's for a workflow-only query: + object GetSingleWorkflowMetadataAction { + def apply(workflowId: WorkflowId, + includeKeysOption: Option[NonEmptyList[String]], + excludeKeysOption: Option[NonEmptyList[String]], + expandSubWorkflows: Boolean): WorkflowMetadataReadAction = { + GetMetadataAction(MetadataQuery(workflowId, None, None, includeKeysOption, excludeKeysOption, expandSubWorkflows)) + } + } + + + final case class GetMetadataAction(key: MetadataQuery) extends WorkflowMetadataReadAction { + override def workflowId: WorkflowId = key.workflowId + } + final case class GetStatus(workflowId: WorkflowId) extends WorkflowMetadataReadAction + final case class GetLabels(workflowId: WorkflowId) extends WorkflowMetadataReadAction + final case class QueryForWorkflowsMatchingParameters(parameters: Seq[(String, String)]) extends MetadataReadAction + final case class WorkflowOutputs(workflowId: WorkflowId) extends WorkflowMetadataReadAction + final case class GetLogs(workflowId: WorkflowId) extends WorkflowMetadataReadAction case object RefreshSummary extends MetadataServiceAction trait ValidationCallback { def onMalformed(possibleWorkflowId: String): Unit @@ -112,6 +125,9 @@ object MetadataService { def reason: Throwable } + final case class MetadataLookupJsonResponse(query: MetadataQuery, result: Json) extends MetadataServiceResponse + final case class MetadataLookupFailed(query: MetadataQuery, reason: Throwable) + final case class MetadataLookupResponse(query: MetadataQuery, eventList: Seq[MetadataEvent]) extends MetadataServiceResponse final case class MetadataServiceKeyLookupFailed(query: MetadataQuery, reason: Throwable) extends MetadataServiceFailure diff --git a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala index 56c727504ba..fc5bec6b920 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala @@ -10,6 +10,7 @@ import cromwell.core.{LoadConfig, WorkflowId} import cromwell.services.MetadataServicesStore import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata.impl.MetadataSummaryRefreshActor.{MetadataSummaryFailure, MetadataSummarySuccess, SummarizeMetadata} +import cromwell.services.metadata.impl.builder.MetadataBuilderActor import cromwell.util.GracefulShutdownHelper import cromwell.util.GracefulShutdownHelper.ShutdownCommand import net.ceedubs.ficus.Ficus._ @@ -24,7 +25,7 @@ object MetadataServiceActor { def props(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) = Props(MetadataServiceActor(serviceConfig, globalConfig, serviceRegistryActor)).withDispatcher(ServiceDispatcher) } -final case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) +case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, serviceRegistryActor: ActorRef) extends Actor with ActorLogging with MetadataDatabaseAccess with MetadataServicesStore with GracefulShutdownHelper { private val decider: Decider = { @@ -35,7 +36,7 @@ final case class MetadataServiceActor(serviceConfig: Config, globalConfig: Confi override val supervisorStrategy = new OneForOneStrategy()(decider) { override def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, decision: Directive) = { val childName = if (child == readActor) "Read" else "Write" - log.error(s"The $childName Metadata Actor died unexpectedly, metadata events might have been lost. Restarting it...", cause) + log.error(cause, s"The $childName Metadata Actor died unexpectedly, metadata events might have been lost. Restarting it...") } } @@ -49,7 +50,10 @@ final case class MetadataServiceActor(serviceConfig: Config, globalConfig: Confi private val metadataReadTimeout: Duration = serviceConfig.getOrElse[Duration]("metadata-read-query-timeout", Duration.Inf) - val readActor = context.actorOf(ReadMetadataActor.props(metadataReadTimeout), "read-metadata-actor") + def readMetadataWorkerActorProps(): Props = ReadDatabaseMetadataWorkerActor.props(metadataReadTimeout).withDispatcher(ServiceDispatcher) + def metadataBuilderActorProps(): Props = MetadataBuilderActor.props(readMetadataWorkerActorProps).withDispatcher(ServiceDispatcher) + + val readActor = context.actorOf(ReadMetadataRegulatorActor.props(metadataBuilderActorProps, readMetadataWorkerActorProps), "singleton-ReadMetadataRegulatorActor") val dbFlushRate = serviceConfig.getOrElse("db-flush-rate", 5.seconds) val dbBatchSize = serviceConfig.getOrElse("db-batch-size", 200) @@ -109,7 +113,7 @@ final case class MetadataServiceActor(serviceConfig: Config, globalConfig: Confi case listen: Listen => writeActor forward listen case v: ValidateWorkflowIdInMetadata => validateWorkflowIdInMetadata(v.possibleWorkflowId, sender()) case v: ValidateWorkflowIdInMetadataSummaries => validateWorkflowIdInMetadataSummaries(v.possibleWorkflowId, sender()) - case action: ReadAction => readActor forward action + case action: MetadataReadAction => readActor forward action case RefreshSummary => summaryActor foreach { _ ! SummarizeMetadata(metadataSummaryRefreshLimit, sender()) } case MetadataSummarySuccess => scheduleSummary() case MetadataSummaryFailure(t) => diff --git a/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala new file mode 100644 index 00000000000..b07490af68d --- /dev/null +++ b/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala @@ -0,0 +1,104 @@ +package cromwell.services.metadata.impl + +import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props} +import cromwell.core.Dispatcher.ServiceDispatcher +import cromwell.core.{WorkflowId, WorkflowSubmitted} +import cromwell.services.MetadataServicesStore +import cromwell.services.metadata.MetadataService._ +import cromwell.services.metadata.{MetadataQuery, WorkflowQueryParameters} + +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.util.Try + +object ReadDatabaseMetadataWorkerActor { + def props(metadataReadTimeout: Duration) = Props(new ReadDatabaseMetadataWorkerActor(metadataReadTimeout)).withDispatcher(ServiceDispatcher) +} + +class ReadDatabaseMetadataWorkerActor(metadataReadTimeout: Duration) extends Actor with ActorLogging with MetadataDatabaseAccess with MetadataServicesStore { + + implicit val ec = context.dispatcher + + def receive = { + case GetMetadataAction(query@MetadataQuery(_, _, _, _, _, _)) => evaluateRespondAndStop(sender(), getMetadata(query)) + case GetStatus(workflowId) => evaluateRespondAndStop(sender(), getStatus(workflowId)) + case GetLabels(workflowId) => evaluateRespondAndStop(sender(), queryLabelsAndRespond(workflowId)) + case GetLogs(workflowId) => evaluateRespondAndStop(sender(), queryLogsAndRespond(workflowId)) + case query: QueryForWorkflowsMatchingParameters => evaluateRespondAndStop(sender(), queryWorkflowsAndRespond(query.parameters)) + case WorkflowOutputs(id) => evaluateRespondAndStop(sender(), queryWorkflowOutputsAndRespond(id)) + } + + private def evaluateRespondAndStop(sndr: ActorRef, f: Future[Any]) = { + f map { result => + sndr ! result + } andThen { + case _ => self ! PoisonPill + } recover { + case t => log.error(t, s"Programmer Error! Unexpected error fall-through to 'evaluateRespondAndStop in ${getClass.getSimpleName}'") + } + () + } + + private def getMetadata(query: MetadataQuery): Future[MetadataServiceResponse] = { + + queryMetadataEvents(query, metadataReadTimeout) map { + m => MetadataLookupResponse(query, m) + } recover { + case t => MetadataServiceKeyLookupFailed(query, t) + } + } + + private def getStatus(id: WorkflowId): Future[MetadataServiceResponse] = { + + getWorkflowStatus(id) map { + case Some(s) => StatusLookupResponse(id, s) + // There's a workflow existence check at the API layer. If the request has made it this far in the system + // then the workflow exists but it must not have generated a status yet. + case None => StatusLookupResponse(id, WorkflowSubmitted) + } recover { + case t => StatusLookupFailed(id, t) + } + } + + private def queryLabelsAndRespond(id: WorkflowId): Future[MetadataServiceResponse] = { + + getWorkflowLabels(id) map { + ls => LabelLookupResponse(id, ls) + } recover { + case t => LabelLookupFailed(id, t) + } + } + + private def queryWorkflowsAndRespond(rawParameters: Seq[(String, String)]): Future[MetadataServiceResponse] = { + def queryWorkflows: Future[(WorkflowQueryResponse, Option[QueryMetadata])] = { + for { + // Future/Try to wrap the exception that might be thrown from WorkflowQueryParameters.apply. + parameters <- Future.fromTry(Try(WorkflowQueryParameters(rawParameters))) + response <- queryWorkflowSummaries(parameters) + } yield response + } + + queryWorkflows map { + case (response, metadata) => WorkflowQuerySuccess(response, metadata) + } recover { + case t => WorkflowQueryFailure(t) + } + } + + private def queryWorkflowOutputsAndRespond(id: WorkflowId): Future[MetadataServiceResponse] = { + queryWorkflowOutputs(id, metadataReadTimeout) map { + o => WorkflowOutputsResponse(id, o) + } recover { + case t => WorkflowOutputsFailure(id, t) + } + } + + private def queryLogsAndRespond(id: WorkflowId): Future[MetadataServiceResponse] = { + queryLogs(id, metadataReadTimeout) map { + s => LogsResponse(id, s) + } recover { + case t => LogsFailure(id, t) + } + } + +} diff --git a/services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataActor.scala deleted file mode 100644 index c755f321538..00000000000 --- a/services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataActor.scala +++ /dev/null @@ -1,96 +0,0 @@ -package cromwell.services.metadata.impl - -import akka.actor.{Actor, ActorLogging, Props} -import cromwell.core.Dispatcher.ApiDispatcher -import cromwell.core.{WorkflowId, WorkflowSubmitted} -import cromwell.services.MetadataServicesStore -import cromwell.services.metadata.MetadataService._ -import cromwell.services.metadata.{CallMetadataKeys, MetadataQuery, WorkflowQueryParameters} - -import scala.concurrent.duration.Duration -import scala.concurrent.Future -import scala.util.{Failure, Success, Try} - -object ReadMetadataActor { - def props(metadataReadTimeout: Duration) = Props(new ReadMetadataActor(metadataReadTimeout)).withDispatcher(ApiDispatcher) -} - -class ReadMetadataActor(metadataReadTimeout: Duration) extends Actor with ActorLogging with MetadataDatabaseAccess with MetadataServicesStore { - - implicit val ec = context.dispatcher - - def receive = { - case GetSingleWorkflowMetadataAction(workflowId, includeKeysOption, excludeKeysOption, expandSubWorkflows) => - val includeKeys = if (expandSubWorkflows) { - includeKeysOption map { _ :+ CallMetadataKeys.SubWorkflowId } - } else includeKeysOption - queryAndRespond(MetadataQuery(workflowId, None, None, includeKeys, excludeKeysOption, expandSubWorkflows)) - case GetMetadataQueryAction(query@MetadataQuery(_, _, _, _, _, _)) => queryAndRespond(query) - case GetStatus(workflowId) => queryStatusAndRespond(workflowId) - case GetLabels(workflowId) => queryLabelsAndRespond(workflowId) - case GetLogs(workflowId) => queryLogsAndRespond(workflowId) - case query: WorkflowQuery => queryWorkflowsAndRespond(query.parameters) - case WorkflowOutputs(id) => queryWorkflowOutputsAndRespond(id) - } - - private def queryAndRespond(query: MetadataQuery): Unit = { - val sndr = sender() - queryMetadataEvents(query, metadataReadTimeout) onComplete { - case Success(m) => sndr ! MetadataLookupResponse(query, m) - case Failure(t) => sndr ! MetadataServiceKeyLookupFailed(query, t) - } - } - - private def queryStatusAndRespond(id: WorkflowId): Unit = { - val sndr = sender() - getWorkflowStatus(id) onComplete { - case Success(Some(s)) => sndr ! StatusLookupResponse(id, s) - // There's a workflow existence check at the API layer. If the request has made it this far in the system - // then the workflow exists but it must not have generated a status yet. - case Success(None) => sndr ! StatusLookupResponse(id, WorkflowSubmitted) - case Failure(t) => sndr ! StatusLookupFailed(id, t) - } - } - - private def queryLabelsAndRespond(id: WorkflowId): Unit = { - val sndr = sender() - getWorkflowLabels(id) onComplete { - case Success(ls) => sndr ! LabelLookupResponse(id, ls) - case Failure(t) => sndr ! LabelLookupFailed(id, t) - } - } - - private def queryWorkflowsAndRespond(rawParameters: Seq[(String, String)]): Unit = { - def queryWorkflows: Future[(WorkflowQueryResponse, Option[QueryMetadata])] = { - for { - // Future/Try to wrap the exception that might be thrown from WorkflowQueryParameters.apply. - parameters <- Future.fromTry(Try(WorkflowQueryParameters(rawParameters))) - response <- queryWorkflowSummaries(parameters) - } yield response - } - - val sndr = sender() - - queryWorkflows onComplete { - case Success((response, metadata)) => sndr ! WorkflowQuerySuccess(response, metadata) - case Failure(t) => sndr ! WorkflowQueryFailure(t) - } - } - - private def queryWorkflowOutputsAndRespond(id: WorkflowId): Unit = { - val replyTo = sender() - queryWorkflowOutputs(id, metadataReadTimeout) onComplete { - case Success(o) => replyTo ! WorkflowOutputsResponse(id, o) - case Failure(t) => replyTo ! WorkflowOutputsFailure(id, t) - } - } - - private def queryLogsAndRespond(id: WorkflowId): Unit = { - val replyTo = sender() - queryLogs(id, metadataReadTimeout) onComplete { - case Success(s) => replyTo ! LogsResponse(id, s) - case Failure(t) => replyTo ! LogsFailure(id, t) - } - } - -} diff --git a/services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataRegulatorActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataRegulatorActor.scala new file mode 100644 index 00000000000..998e04f1864 --- /dev/null +++ b/services/src/main/scala/cromwell/services/metadata/impl/ReadMetadataRegulatorActor.scala @@ -0,0 +1,85 @@ +package cromwell.services.metadata.impl + +import java.util.UUID + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import cromwell.core.Dispatcher.ApiDispatcher +import cromwell.services.metadata.MetadataService +import cromwell.services.metadata.MetadataService.{MetadataQueryResponse, MetadataReadAction, MetadataServiceAction, MetadataServiceResponse, WorkflowMetadataReadAction} +import cromwell.services.metadata.impl.ReadMetadataRegulatorActor.ReadMetadataWorkerMaker +import cromwell.services.metadata.impl.builder.MetadataBuilderActor +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.MetadataBuilderActorResponse + +import scala.collection.mutable + +class ReadMetadataRegulatorActor(metadataBuilderActorProps: ReadMetadataWorkerMaker, readMetadataWorkerProps: ReadMetadataWorkerMaker) extends Actor with ActorLogging { + // This actor tracks all requests coming in from the API service and spins up new builders as needed to service them. + // If the processing of an identical request is already in flight the requester will be added to a set of requesters + // to notify when the response from the first request becomes available. + + // Map from requests (MetadataServiceActions) to requesters. + val apiRequests = new mutable.HashMap[MetadataServiceAction, Set[ActorRef]]() + // Map from ActorRefs of MetadataBuilderActors to requests. When a response comes back from a MetadataBuilderActor its + // ActorRef is used as the lookup key in this Map. The result of that lookup yields the request which in turn is used + // as the lookup key for requesters in the above Map. + val builderRequests = new mutable.HashMap[ActorRef, MetadataServiceAction]() + + override def receive: Receive = { + // This indirection via 'MetadataReadAction' lets the compiler make sure we cover all cases in the sealed trait: + case action: MetadataReadAction => + action match { + case singleWorkflowAction: WorkflowMetadataReadAction => + val currentRequesters = apiRequests.getOrElse(singleWorkflowAction, Set.empty) + apiRequests.put(singleWorkflowAction, currentRequesters + sender()) + if (currentRequesters.isEmpty) { + + val builderActor = context.actorOf(metadataBuilderActorProps().withDispatcher(ApiDispatcher), MetadataBuilderActor.uniqueActorName(singleWorkflowAction.workflowId.toString)) + builderRequests.put(builderActor, singleWorkflowAction) + builderActor ! singleWorkflowAction + } + case crossWorkflowAction: MetadataService.QueryForWorkflowsMatchingParameters => + val currentRequesters = apiRequests.getOrElse(crossWorkflowAction, Set.empty) + apiRequests.put(crossWorkflowAction, currentRequesters + sender()) + if (currentRequesters.isEmpty) { + val readMetadataActor = context.actorOf(readMetadataWorkerProps.apply().withDispatcher(ApiDispatcher), s"MetadataQueryWorker-${UUID.randomUUID()}") + builderRequests.put(readMetadataActor, crossWorkflowAction) + readMetadataActor ! crossWorkflowAction + } + } + case serviceResponse: MetadataServiceResponse => + serviceResponse match { + case response: MetadataBuilderActorResponse => handleResponseFromMetadataWorker(response) + case response: MetadataQueryResponse => handleResponseFromMetadataWorker(response) + } + case other => log.error(s"Programmer Error: Unexpected message $other received from $sender") + } + + def handleResponseFromMetadataWorker(response: Any): Unit = { + val sndr = sender() + builderRequests.get(sndr) match { + case Some(action) => + apiRequests.get(action) match { + case Some(requesters) => + apiRequests.remove(action) + requesters foreach { _ ! response} + case None => + // unpossible: there had to have been a request that corresponded to this response + log.error(s"Programmer Error: MetadataBuilderRegulatorActor has no registered requesters found for action: $action") + } + builderRequests.remove(sndr) + () + case None => + // unpossible: this actor should know about all the child MetadataBuilderActors it has begotten + log.error(s"Programmer Error: MetadataBuilderRegulatorActor received a metadata response from an unrecognized sender $sndr") + } + } +} + +object ReadMetadataRegulatorActor { + + type ReadMetadataWorkerMaker = () => Props + + def props(metadataBuilderActorProps: ReadMetadataWorkerMaker, readMetadataWorkerProps: ReadMetadataWorkerMaker): Props = { + Props(new ReadMetadataRegulatorActor(metadataBuilderActorProps, readMetadataWorkerProps)) + } +} diff --git a/engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala similarity index 63% rename from engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderActor.scala rename to services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala index f54c36630ee..80c6860a370 100644 --- a/engine/src/main/scala/cromwell/webservice/metadata/MetadataBuilderActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala @@ -1,18 +1,16 @@ -package cromwell.webservice.metadata +package cromwell.services.metadata.impl.builder import java.time.OffsetDateTime -import java.util.UUID +import java.util.concurrent.atomic.AtomicLong -import akka.actor.{ActorRef, LoggingFSM, Props} +import akka.actor.{ActorRef, LoggingFSM, PoisonPill, Props} import common.collections.EnhancedCollections._ -import cromwell.webservice.metadata.MetadataComponent._ -import cromwell.core.Dispatcher.ApiDispatcher +import cromwell.services.metadata.impl.builder.MetadataComponent._ import cromwell.core.ExecutionIndex.ExecutionIndex import cromwell.core._ -import cromwell.services.ServiceRegistryActor.ServiceRegistryFailure import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata._ -import cromwell.webservice.metadata.MetadataBuilderActor._ +import cromwell.services.metadata.impl.builder.MetadataBuilderActor._ import mouse.all._ import org.slf4j.LoggerFactory import spray.json._ @@ -21,21 +19,26 @@ import scala.language.postfixOps object MetadataBuilderActor { - sealed abstract class MetadataBuilderActorResponse - case class BuiltMetadataResponse(response: JsObject) extends MetadataBuilderActorResponse - case class FailedMetadataResponse(reason: Throwable) extends MetadataBuilderActorResponse + sealed trait MetadataBuilderActorResponse extends MetadataServiceResponse { def originalRequest: MetadataReadAction } + final case class BuiltMetadataResponse(originalRequest: MetadataReadAction, responseJson: JsObject) extends MetadataBuilderActorResponse + final case class FailedMetadataResponse(originalRequest: MetadataReadAction, reason: Throwable) extends MetadataBuilderActorResponse sealed trait MetadataBuilderActorState case object Idle extends MetadataBuilderActorState case object WaitingForMetadataService extends MetadataBuilderActorState case object WaitingForSubWorkflows extends MetadataBuilderActorState - case class MetadataBuilderActorData( - originalQuery: MetadataQuery, - originalEvents: Seq[MetadataEvent], - subWorkflowsMetadata: Map[String, JsValue], - waitFor: Int - ) { + sealed trait MetadataBuilderActorData + + case object IdleData extends MetadataBuilderActorData + final case class HasWorkData(target: ActorRef, + originalRequest: MetadataReadAction) extends MetadataBuilderActorData + final case class HasReceivedEventsData(target: ActorRef, + originalRequest: MetadataReadAction, + originalQuery: MetadataQuery, + originalEvents: Seq[MetadataEvent], + subWorkflowsMetadata: Map[String, JsValue], + waitFor: Int) extends MetadataBuilderActorData { def withSubWorkflow(id: String, metadata: JsValue) = { this.copy(subWorkflowsMetadata = subWorkflowsMetadata + ((id, metadata))) } @@ -43,8 +46,8 @@ object MetadataBuilderActor { def isComplete = subWorkflowsMetadata.size == waitFor } - def props(serviceRegistryActor: ActorRef) = { - Props(new MetadataBuilderActor(serviceRegistryActor)).withDispatcher(ApiDispatcher) + def props(readMetadataWorkerMaker: () => Props) = { + Props(new MetadataBuilderActor(readMetadataWorkerMaker)) } val log = LoggerFactory.getLogger("MetadataBuilder") @@ -137,7 +140,9 @@ object MetadataBuilderActor { JsObject(events.groupBy(_.key.workflowId.toString) safeMapValues parseWorkflowEvents(includeCallsIfEmpty = true, expandedValues)) } - def uniqueActorName: String = List("MetadataBuilderActor", UUID.randomUUID()).mkString("-") + val actorIdIterator = new AtomicLong(0) + + def uniqueActorName(workflowId: String): String = s"${getClass.getSimpleName}.${actorIdIterator.getAndIncrement()}-for-$workflowId" case class JobKeyAndGrouping(jobKey: MetadataJobKey, grouping: String) @@ -194,100 +199,149 @@ object MetadataBuilderActor { val tupledGrouper = (makeSyntheticGroupedExecutionEvents _).tupled nonExecutionEvents ++ ungroupedExecutionEvents.values.toList.flatten ++ (groupedExecutionEventsByGrouping.toList flatMap tupledGrouper) } + + + + def processMetadataEvents(query: MetadataQuery, eventsList: Seq[MetadataEvent], expandedValues: Map[String, JsValue]): JsObject = { + // Should we send back some message ? Or even fail the request instead ? + if (eventsList.isEmpty) JsObject(Map.empty[String, JsValue]) + else { + query match { + case MetadataQuery(w, _, _, _, _, _) => workflowMetadataResponse(w, eventsList, includeCallsIfEmpty = true, expandedValues) + case _ => MetadataBuilderActor.parse(eventsList, expandedValues) + } + } + } + + def processStatusResponse(workflowId: WorkflowId, status: WorkflowState): JsObject = { + JsObject(Map( + WorkflowMetadataKeys.Status -> JsString(status.toString), + WorkflowMetadataKeys.Id -> JsString(workflowId.toString) + )) + } + + def processLabelsResponse(workflowId: WorkflowId, labels: Map[String, String]): JsObject = { + val jsLabels = labels map { case (k, v) => k -> JsString(v) } + JsObject(Map( + WorkflowMetadataKeys.Id -> JsString(workflowId.toString), + WorkflowMetadataKeys.Labels -> JsObject(jsLabels) + )) + } + + def processOutputsResponse(id: WorkflowId, events: Seq[MetadataEvent]): JsObject = { + // Add in an empty output event if there aren't already any output events. + val hasOutputs = events exists { _.key.key.startsWith(WorkflowMetadataKeys.Outputs + ":") } + val updatedEvents = if (hasOutputs) events else MetadataEvent.empty(MetadataKey(id, None, WorkflowMetadataKeys.Outputs)) +: events + + workflowMetadataResponse(id, updatedEvents, includeCallsIfEmpty = false, Map.empty) + } + + def workflowMetadataResponse(workflowId: WorkflowId, + eventsList: Seq[MetadataEvent], + includeCallsIfEmpty: Boolean, + expandedValues: Map[String, JsValue]): JsObject = { + JsObject(MetadataBuilderActor.parseWorkflowEvents(includeCallsIfEmpty, expandedValues)(eventsList).fields + ("id" -> JsString(workflowId.toString))) + } } -class MetadataBuilderActor(serviceRegistryActor: ActorRef) extends LoggingFSM[MetadataBuilderActorState, Option[MetadataBuilderActorData]] - with DefaultJsonProtocol { - import MetadataBuilderActor._ +class MetadataBuilderActor(readMetadataWorkerMaker: () => Props) + extends LoggingFSM[MetadataBuilderActorState, MetadataBuilderActorData] with DefaultJsonProtocol { - private var target: ActorRef = ActorRef.noSender + import MetadataBuilderActor._ - startWith(Idle, None) + startWith(Idle, IdleData) val tag = self.path.name when(Idle) { - case Event(action: MetadataServiceAction, _) => - target = sender() - serviceRegistryActor ! action - goto(WaitingForMetadataService) + case Event(action: MetadataReadAction, IdleData) => + + val readActor = context.actorOf(readMetadataWorkerMaker.apply()) + + readActor ! action + goto(WaitingForMetadataService) using HasWorkData(sender(), action) } - private def allDone = { + private def allDone() = { context stop self stay() } when(WaitingForMetadataService) { - case Event(StatusLookupResponse(w, status), _) => - target ! BuiltMetadataResponse(processStatusResponse(w, status)) - allDone - case Event(LabelLookupResponse(w, labels), _) => - target ! BuiltMetadataResponse(processLabelsResponse(w, labels)) - allDone - case Event(WorkflowOutputsResponse(id, events), _) => - // Add in an empty output event if there aren't already any output events. - val hasOutputs = events exists { _.key.key.startsWith(WorkflowMetadataKeys.Outputs + ":") } - val updatedEvents = if (hasOutputs) events else MetadataEvent.empty(MetadataKey(id, None, WorkflowMetadataKeys.Outputs)) +: events - target ! BuiltMetadataResponse(workflowMetadataResponse(id, updatedEvents, includeCallsIfEmpty = false, Map.empty)) - allDone - case Event(LogsResponse(w, l), _) => - target ! BuiltMetadataResponse(workflowMetadataResponse(w, l, includeCallsIfEmpty = false, Map.empty)) - allDone - case Event(MetadataLookupResponse(query, metadata), None) => processMetadataResponse(query, metadata) - case Event(_: ServiceRegistryFailure, _) => - target ! FailedMetadataResponse(new RuntimeException("Can't find metadata service")) - allDone - case Event(failure: MetadataServiceFailure, _) => - target ! FailedMetadataResponse(failure.reason) - allDone - case Event(unexpectedMessage, stateData) => - target ! FailedMetadataResponse(new RuntimeException(s"MetadataBuilderActor $tag(WaitingForMetadataService, $stateData) got an unexpected message: $unexpectedMessage")) - context stop self - stay() + case Event(StatusLookupResponse(w, status), HasWorkData(target, originalRequest)) => + target ! BuiltMetadataResponse(originalRequest, processStatusResponse(w, status)) + allDone() + case Event(LabelLookupResponse(w, labels), HasWorkData(target, originalRequest)) => + target ! BuiltMetadataResponse(originalRequest, processLabelsResponse(w, labels)) + allDone() + case Event(WorkflowOutputsResponse(id, events), HasWorkData(target, originalRequest)) => + target ! BuiltMetadataResponse(originalRequest, processOutputsResponse(id, events)) + allDone() + case Event(LogsResponse(w, l), HasWorkData(target, originalRequest)) => + target ! BuiltMetadataResponse(originalRequest, workflowMetadataResponse(w, l, includeCallsIfEmpty = false, Map.empty)) + allDone() + case Event(MetadataLookupResponse(query, metadata), HasWorkData(target, originalRequest)) => + processMetadataResponse(query, metadata, target, originalRequest) + case Event(failure: MetadataServiceFailure, HasWorkData(target, originalRequest)) => + target ! FailedMetadataResponse(originalRequest, failure.reason) + allDone() } when(WaitingForSubWorkflows) { - case Event(mbr: MetadataBuilderActorResponse, Some(data)) => + case Event(mbr: MetadataBuilderActorResponse, data: HasReceivedEventsData) => processSubWorkflowMetadata(mbr, data) + case Event(failure: MetadataServiceFailure, data: HasReceivedEventsData) => + data.target ! FailedMetadataResponse(data.originalRequest, failure.reason) + allDone() } whenUnhandled { - case Event(message, data) => - log.error(s"Received unexpected message $message in state $stateName with data $data") + case Event(message, IdleData) => + log.error(s"Received unexpected message $message in state $stateName with $IdleData") stay() + case Event(message, HasWorkData(target, _)) => + log.error(s"Received unexpected message $message in state $stateName with target: $target") + self ! PoisonPill + stay + case Event(message, MetadataBuilderActor.HasReceivedEventsData(target, _, _, _, _, _)) => + log.error(s"Received unexpected message $message in state $stateName with target: $target") + self ! PoisonPill + stay } - def processSubWorkflowMetadata(metadataResponse: MetadataBuilderActorResponse, data: MetadataBuilderActorData) = { + def processSubWorkflowMetadata(metadataResponse: MetadataBuilderActorResponse, data: HasReceivedEventsData) = { metadataResponse match { - case BuiltMetadataResponse(js) => - js.fields.get(WorkflowMetadataKeys.Id) match { - case Some(subId: JsString) => - val newData = data.withSubWorkflow(subId.value, js) - - if (newData.isComplete) { - buildAndStop(data.originalQuery, data.originalEvents, newData.subWorkflowsMetadata) - } else { - stay() using Option(newData) - } - case _ => failAndDie(new RuntimeException("Received unexpected response while waiting for sub workflow metadata.")) + case BuiltMetadataResponse(GetMetadataAction(queryKey), js) => + val subId: WorkflowId = queryKey.workflowId + val newData = data.withSubWorkflow(subId.toString, js) + + if (newData.isComplete) { + buildAndStop(data.originalQuery, data.originalEvents, newData.subWorkflowsMetadata, data.target, data.originalRequest) + } else { + stay() using newData } - case FailedMetadataResponse(e) => failAndDie(new RuntimeException("Failed to retrieve metadata for a sub workflow.", e)) + case FailedMetadataResponse(originalRequest, e) => + failAndDie(new RuntimeException(s"Failed to retrieve metadata for a sub workflow ($originalRequest)", e), data.target, data.originalRequest) + + case other => + val message = s"Programmer Error: MetadataBuilderActor expected subworkflow metadata response type but got ${other.getClass.getSimpleName}" + log.error(message) + failAndDie(new Exception(message), data.target, data.originalRequest) } } - def failAndDie(reason: Throwable) = { - target ! FailedMetadataResponse(reason) + def failAndDie(reason: Throwable, target: ActorRef, originalRequest: MetadataReadAction) = { + target ! FailedMetadataResponse(originalRequest, reason) context stop self stay() } - def buildAndStop(query: MetadataQuery, eventsList: Seq[MetadataEvent], expandedValues: Map[String, JsValue]) = { + def buildAndStop(query: MetadataQuery, eventsList: Seq[MetadataEvent], expandedValues: Map[String, JsValue], target: ActorRef, originalRequest: MetadataReadAction) = { val groupedEvents = groupEvents(eventsList) - target ! BuiltMetadataResponse(processMetadataEvents(query, groupedEvents, expandedValues)) - allDone + target ! BuiltMetadataResponse(originalRequest, processMetadataEvents(query, groupedEvents, expandedValues)) + allDone() } - def processMetadataResponse(query: MetadataQuery, eventsList: Seq[MetadataEvent]) = { + def processMetadataResponse(query: MetadataQuery, eventsList: Seq[MetadataEvent], target: ActorRef, originalRequest: MetadataReadAction) = { if (query.expandSubWorkflows) { // Scan events for sub workflow ids val subWorkflowIds = eventsList.collect({ @@ -295,50 +349,17 @@ class MetadataBuilderActor(serviceRegistryActor: ActorRef) extends LoggingFSM[Me }).flatten.distinct // If none is found just proceed to build metadata - if (subWorkflowIds.isEmpty) buildAndStop(query, eventsList, Map.empty) + if (subWorkflowIds.isEmpty) buildAndStop(query, eventsList, Map.empty, target, originalRequest) else { // Otherwise spin up a metadata builder actor for each sub workflow subWorkflowIds foreach { subId => - val subMetadataBuilder = context.actorOf(MetadataBuilderActor.props(serviceRegistryActor), uniqueActorName) - subMetadataBuilder ! GetMetadataQueryAction(query.copy(workflowId = WorkflowId.fromString(subId))) + val subMetadataBuilder = context.actorOf(MetadataBuilderActor.props(readMetadataWorkerMaker), uniqueActorName(subId)) + subMetadataBuilder ! GetMetadataAction(query.copy(workflowId = WorkflowId.fromString(subId))) } - goto(WaitingForSubWorkflows) using Option(MetadataBuilderActorData(query, eventsList, Map.empty, subWorkflowIds.size)) + goto(WaitingForSubWorkflows) using HasReceivedEventsData(target, originalRequest, query, eventsList, Map.empty, subWorkflowIds.size) } } else { - buildAndStop(query, eventsList, Map.empty) - } - } - - def processMetadataEvents(query: MetadataQuery, eventsList: Seq[MetadataEvent], expandedValues: Map[String, JsValue]): JsObject = { - // Should we send back some message ? Or even fail the request instead ? - if (eventsList.isEmpty) JsObject(Map.empty[String, JsValue]) - else { - query match { - case MetadataQuery(w, _, _, _, _, _) => workflowMetadataResponse(w, eventsList, includeCallsIfEmpty = true, expandedValues) - case _ => MetadataBuilderActor.parse(eventsList, expandedValues) - } + buildAndStop(query, eventsList, Map.empty, target, originalRequest) } } - - def processStatusResponse(workflowId: WorkflowId, status: WorkflowState): JsObject = { - JsObject(Map( - WorkflowMetadataKeys.Status -> JsString(status.toString), - WorkflowMetadataKeys.Id -> JsString(workflowId.toString) - )) - } - - def processLabelsResponse(workflowId: WorkflowId, labels: Map[String, String]): JsObject = { - val jsLabels = labels map { case (k, v) => k -> JsString(v) } - JsObject(Map( - WorkflowMetadataKeys.Id -> JsString(workflowId.toString), - WorkflowMetadataKeys.Labels -> JsObject(jsLabels) - )) - } - - private def workflowMetadataResponse(workflowId: WorkflowId, - eventsList: Seq[MetadataEvent], - includeCallsIfEmpty: Boolean, - expandedValues: Map[String, JsValue]): JsObject = { - JsObject(MetadataBuilderActor.parseWorkflowEvents(includeCallsIfEmpty, expandedValues)(eventsList).fields + ("id" -> JsString(workflowId.toString))) - } } diff --git a/engine/src/main/scala/cromwell/webservice/metadata/MetadataComponent.scala b/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataComponent.scala similarity index 99% rename from engine/src/main/scala/cromwell/webservice/metadata/MetadataComponent.scala rename to services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataComponent.scala index d0173a50fe3..63241d8c6d9 100644 --- a/engine/src/main/scala/cromwell/webservice/metadata/MetadataComponent.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataComponent.scala @@ -1,4 +1,4 @@ -package cromwell.webservice.metadata +package cromwell.services.metadata.impl.builder import cats.instances.list._ import cats.instances.map._ diff --git a/services/src/test/scala/cromwell/services/metadata/MetadataQuerySpec.scala b/services/src/test/scala/cromwell/services/metadata/MetadataQuerySpec.scala new file mode 100644 index 00000000000..9a00f79ad63 --- /dev/null +++ b/services/src/test/scala/cromwell/services/metadata/MetadataQuerySpec.scala @@ -0,0 +1,58 @@ +package cromwell.services.metadata + +import akka.actor.{Actor, ActorRef, Props} +import akka.testkit.TestProbe +import com.typesafe.config.{Config, ConfigFactory} +import cromwell.core.TestKitSuite +import cromwell.services.metadata.MetadataQuerySpec.{CannedResponseReadMetadataWorker, MetadataServiceActor_CustomizeRead} +import cromwell.services.metadata.MetadataService.{MetadataReadAction, MetadataServiceResponse, QueryForWorkflowsMatchingParameters, WorkflowQueryResponse, WorkflowQuerySuccess} +import cromwell.services.metadata.impl.{MetadataServiceActor, MetadataServiceActorSpec} +import org.scalatest.{FlatSpecLike, Matchers} + +class MetadataQuerySpec extends TestKitSuite("MetadataQuerySpec") with FlatSpecLike with Matchers { + + it should "correctly forward requests to read workers and responses back to requesters" in { + + val request = QueryForWorkflowsMatchingParameters( + parameters = List(("paramName1", "paramValue1")) + ) + + val response = WorkflowQuerySuccess( + response = WorkflowQueryResponse(Seq.empty, 0), + meta = None + ) + + val requester = TestProbe("MetadataServiceClientProbe") + def readWorkerProps() = Props(new CannedResponseReadMetadataWorker(Map(request -> response))) + val serviceRegistry = TestProbe("ServiceRegistryProbe") + val metadataService = system.actorOf(MetadataServiceActor_CustomizeRead.props(readWorkerProps, serviceRegistry), "MetadataServiceUnderTest") + + requester.send(metadataService, request) + requester.expectMsg(response) + + } + +} + + +object MetadataQuerySpec { + final class MetadataServiceActor_CustomizeRead(config: Config, serviceRegistryActor: ActorRef, readWorkerMaker: () => Props) + extends MetadataServiceActor(MetadataServiceActorSpec.globalConfigToMetadataServiceConfig(config), config, serviceRegistryActor) { + + override def readMetadataWorkerActorProps(): Props = readWorkerMaker.apply.withDispatcher(cromwell.core.Dispatcher.ServiceDispatcher) + } + + object MetadataServiceActor_CustomizeRead { + val config = ConfigFactory.parseString(MetadataServiceActorSpec.ConfigWithoutSummarizer) + + def props(readActorProps: () => Props, serviceRegistryProbe: TestProbe) = + Props(new MetadataServiceActor_CustomizeRead(config, serviceRegistryProbe.ref, readActorProps)) + } + + + final class CannedResponseReadMetadataWorker(cannedResponses: Map[MetadataReadAction, MetadataServiceResponse]) extends Actor { + override def receive = { + case msg: MetadataReadAction => sender ! cannedResponses.getOrElse(msg, throw new Exception(s"Unexpected inbound message: $msg")) + } + } +} diff --git a/services/src/test/scala/cromwell/services/metadata/WorkflowQueryParametersSpec.scala b/services/src/test/scala/cromwell/services/metadata/QueryForWorkflowsMatchingParametersSpec.scala similarity index 99% rename from services/src/test/scala/cromwell/services/metadata/WorkflowQueryParametersSpec.scala rename to services/src/test/scala/cromwell/services/metadata/QueryForWorkflowsMatchingParametersSpec.scala index dd0545b2082..d0deb2713a8 100644 --- a/services/src/test/scala/cromwell/services/metadata/WorkflowQueryParametersSpec.scala +++ b/services/src/test/scala/cromwell/services/metadata/QueryForWorkflowsMatchingParametersSpec.scala @@ -7,7 +7,7 @@ import cromwell.core.labels.Label import cromwell.services.metadata.WorkflowQueryKey._ import org.scalatest.{Matchers, WordSpec} -class WorkflowQueryParametersSpec extends WordSpec with Matchers { +class QueryForWorkflowsMatchingParametersSpec extends WordSpec with Matchers { val StartDateString = "2015-11-01T11:11:11Z" val EndDateString = "2015-11-01T12:12:12Z" diff --git a/services/src/test/scala/cromwell/services/metadata/impl/MetadataServiceActorSpec.scala b/services/src/test/scala/cromwell/services/metadata/impl/MetadataServiceActorSpec.scala index 80d0a2cad5f..97dfb3bce88 100644 --- a/services/src/test/scala/cromwell/services/metadata/impl/MetadataServiceActorSpec.scala +++ b/services/src/test/scala/cromwell/services/metadata/impl/MetadataServiceActorSpec.scala @@ -4,20 +4,25 @@ import java.time.OffsetDateTime import akka.pattern._ import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} import cromwell.core._ import cromwell.services.ServicesSpec import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata._ +import cromwell.services.metadata.impl.builder.MetadataBuilderActor.BuiltMetadataResponse +import cromwell.services.metadata.impl.MetadataServiceActorSpec._ + +import scala.concurrent.Await import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.{Interval, Timeout} +import spray.json._ import scala.concurrent.duration._ class MetadataServiceActorSpec extends ServicesSpec("Metadata") { import MetadataServiceActorSpec.Config val config = ConfigFactory.parseString(Config) - val actor = system.actorOf(MetadataServiceActor.props(config, config, TestProbe().ref)) + val actor = system.actorOf(MetadataServiceActor.props(config, globalConfigToMetadataServiceConfig(config), TestProbe().ref), "MetadataServiceActor-for-MetadataServiceActorSpec") val workflowId = WorkflowId.randomId() @@ -37,41 +42,85 @@ class MetadataServiceActorSpec extends ServicesSpec("Metadata") { val event3_1 = MetadataEvent(key3, Option(MetadataValue("value3")), moment.plusSeconds(4)) val event3_2 = MetadataEvent(key3, None, moment.plusSeconds(5)) - "MetadataServiceActor" should { - "Store values for different keys and then retrieve those values" in { - val putAction1 = PutMetadataAction(event1_1) - val putAction2 = PutMetadataAction(event1_2) - val putAction3 = PutMetadataAction(event2_1, event3_1, event3_2) + override def beforeAll: Unit = { + + // Even though event1_1 arrives second, the older timestamp should mean it does not replace event1_2: + val putAction2 = PutMetadataAction(event1_2) + val putAction1 = PutMetadataAction(event1_1) + val putAction3 = PutMetadataAction(event2_1, event3_1, event3_2) - actor ! putAction1 - actor ! putAction2 - actor ! putAction3 + actor ! putAction1 + actor ! putAction2 + actor ! putAction3 + } + + val query1 = MetadataQuery.forKey(key1) + val query2 = MetadataQuery.forKey(key2) + val query3 = MetadataQuery.forKey(key3) + val query4 = MetadataQuery.forWorkflow(workflowId) + val query5 = MetadataQuery.forJob(workflowId, supJob) - val query1 = MetadataQuery.forKey(key1) - val query2 = MetadataQuery.forKey(key2) - val query3 = MetadataQuery.forKey(key3) - val query4 = MetadataQuery.forWorkflow(workflowId) - val query5 = MetadataQuery.forJob(workflowId, supJob) + def expectConstructedMetadata(query: MetadataQuery, expectation: String) = { - eventually(Timeout(10.seconds), Interval(2.seconds)) { - (for { - response1 <- (actor ? GetMetadataQueryAction(query1)).mapTo[MetadataServiceResponse] - _ = response1 shouldBe MetadataLookupResponse(query1, Seq(event1_1, event1_2)) + } - response2 <- (actor ? GetMetadataQueryAction(query2)).mapTo[MetadataServiceResponse] - _ = response2 shouldBe MetadataLookupResponse(query2, Seq(event2_1)) + val testCases = List[(String, MetadataQuery, String)] ( + ("query1", query1, s"""{ + | "key1": "value2", + | "calls": {}, + | "id": "$workflowId" + |}""".stripMargin), + ("query2", query2, s"""{ + | "key2": "value1", + | "calls": {}, + | "id": "$workflowId" + |}""".stripMargin), + ("query3", query3, s"""{ + | "calls": { + | "sup.sup": [{ + | "dog": {}, + | "attempt": 1, + | "shardIndex": -1 + | }] + | }, + | "id": "$workflowId" + |}""".stripMargin), + ("query4", query4, s"""{ + | "key1": "value2", + | "key2": "value1", + | "calls": { + | "sup.sup": [{ + | "dog": {}, + | "attempt": 1, + | "shardIndex": -1 + | }] + | }, + | "id": "$workflowId" + |}""".stripMargin), + ("query5", query5, s"""{ + | "calls": { + | "sup.sup": [{ + | "dog": {}, + | "attempt": 1, + | "shardIndex": -1 + | }] + | }, + | "id": "$workflowId" + |}""".stripMargin) + ) - response3 <- (actor ? GetMetadataQueryAction(query3)).mapTo[MetadataServiceResponse] - _ = response3 shouldBe MetadataLookupResponse(query3, Seq(event3_1, event3_2)) + "MetadataServiceActor" should { - response4 <- (actor ? GetMetadataQueryAction(query4)).mapTo[MetadataServiceResponse] - _ = response4 shouldBe MetadataLookupResponse(query4, Seq(event1_1, event1_2, event2_1, event3_1, event3_2)) + testCases foreach { case (name, query, expectation) => - response5 <- (actor ? GetMetadataQueryAction(query5)).mapTo[MetadataServiceResponse] - _ = response5 shouldBe MetadataLookupResponse(query5, Seq(event3_1, event3_2)) + s"perform $name correctly" in { + eventually(Timeout(10.seconds), Interval(2.seconds)) { + val response = Await.result((actor ? GetMetadataAction(query)).mapTo[BuiltMetadataResponse], 1.seconds) - } yield ()).futureValue + response.responseJson shouldBe expectation.parseJson + } } + } } } @@ -82,4 +131,15 @@ object MetadataServiceActorSpec { |services.MetadataService.db-batch-size = 3 |services.MetadataService.db-flush-rate = 100 millis """.stripMargin + + val ConfigWithoutSummarizer = Config + """ + |services.MetadataService.config.metadata-summary-refresh-interval = "Inf" + """.stripMargin + + // Use this to convert the above "global" configs into metadata service specific "service config"s: + def globalConfigToMetadataServiceConfig(config: Config): Config = if (config.hasPath("services.MetadataService.config")) { + config.getConfig("services.MetadataService.config") + } else { + ConfigFactory.empty() + } }