Skip to content

Commit

Permalink
Option to exclude groups from workflow pickup [BW-1061] (broadinstitu…
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Feb 11, 2022
1 parent bfc1134 commit bfef756
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {
heartbeatTimestampTo: Timestamp,
workflowStateFrom: String,
workflowStateTo: String,
workflowStateExcluded: String)
workflowStateExcluded: String,
excludedGroups: Set[String])
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]] = {

def updateForFetched(cromwellId: String,
Expand Down Expand Up @@ -89,7 +90,7 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {

val action = for {
workflowStoreEntries <- dataAccess.fetchStartableWorkflows(
(limit.toLong, heartbeatTimestampTimedOut, workflowStateExcluded)
limit.toLong, heartbeatTimestampTimedOut, workflowStateExcluded, excludedGroups
).result
_ <- DBIO.sequence(
workflowStoreEntries map updateForFetched(cromwellId, heartbeatTimestampTo, workflowStateFrom, workflowStateTo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ trait WorkflowStoreEntryComponent {
/**
* Returns up to "limit" startable workflows, sorted by submission time.
*/
val fetchStartableWorkflows = Compiled(
(limit: ConstColumn[Long],
heartbeatTimestampTimedOut: ConstColumn[Timestamp],
excludeWorkflowState: Rep[String]
) => {
def fetchStartableWorkflows(limit: Long,
heartbeatTimestampTimedOut: Timestamp,
excludeWorkflowState: String,
excludedGroups: Set[String]
): Query[WorkflowStoreEntries, WorkflowStoreEntry, Seq] = {
val query = for {
row <- workflowStoreEntries
/*
Expand All @@ -93,11 +93,11 @@ trait WorkflowStoreEntryComponent {
transaction that we know will impact those readers.
*/
if (row.heartbeatTimestamp.isEmpty || row.heartbeatTimestamp < heartbeatTimestampTimedOut) &&
(row.workflowState =!= excludeWorkflowState)
(row.workflowState =!= excludeWorkflowState) &&
!(row.hogGroup inSet excludedGroups)
} yield row
query.forUpdate.sortBy(_.submissionTime.asc).take(limit)
}
)

/**
* Useful for counting workflows in a given state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ ____ __ ____ ______ .______ __ ___ _______ __ ______
heartbeatTimestampTo: Timestamp,
workflowStateFrom: String,
workflowStateTo: String,
workflowStateExcluded: String)
workflowStateExcluded: String,
excludedGroups: Set[String])
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]]

def writeWorkflowHeartbeats(workflowExecutionUuids: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)
Determine the number of available workflow slots and request the smaller of that number and maxWorkflowsToLaunch.
*/
val maxNewWorkflows = maxWorkflowsToLaunch min (maxWorkflowsRunning - stateData.workflows.size - stateData.subWorkflows.size)
params.workflowStore ! WorkflowStoreActor.FetchRunnableWorkflows(maxNewWorkflows)
params.workflowStore ! WorkflowStoreActor.FetchRunnableWorkflows(maxNewWorkflows, excludedGroups = Set.empty)
stay()
case Event(WorkflowStoreEngineActor.NoNewWorkflowsToStart, _) =>
log.debug("WorkflowStore provided no new workflows to start")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.engine.workflow.workflowstore

import java.time.OffsetDateTime

import cats.data.NonEmptyList
import cromwell.core.{HogGroup, WorkflowId, WorkflowSourceFilesCollection}
import cromwell.engine.workflow.workflowstore.SqlWorkflowStore.WorkflowStoreAbortResponse.WorkflowStoreAbortResponse
Expand Down Expand Up @@ -32,7 +31,10 @@ class InMemoryWorkflowStore extends WorkflowStore {
* Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp
* flag to true
*/
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
if (excludedGroups.nonEmpty)
throw new UnsupportedOperationException("Programmer Error: group filtering not supported for single-tenant/in-memory workflow store")

val startableWorkflows = workflowStore filter { _._2 == WorkflowStoreState.Submitted } take n
val updatedWorkflows = startableWorkflows map { _._1 -> WorkflowStoreState.Running }
workflowStore = workflowStore ++ updatedWorkflows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDa
* Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp
* flag to true
*/
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
override def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]] = {
import cats.syntax.traverse._
import common.validation.Validation._
sqlDatabase.fetchWorkflowsInState(
Expand All @@ -106,7 +106,8 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase, metadataSqlDa
OffsetDateTime.now.toSystemTimestamp,
WorkflowStoreState.Submitted.toString,
WorkflowStoreState.Running.toString,
WorkflowStoreState.OnHold.toString
WorkflowStoreState.OnHold.toString,
excludedGroups: Set[String]
) map {
// .get on purpose here to fail the future if something went wrong
_.toList.traverse(fromWorkflowStoreEntry).toTry.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait WorkflowStore {
* Retrieves up to n workflows which have not already been pulled into the engine and sets their pickedUp
* flag to true
*/
def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration)(implicit ec: ExecutionContext): Future[List[WorkflowToStart]]
def fetchStartableWorkflows(n: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])(implicit ec: ExecutionContext): Future[List[WorkflowToStart]]

def writeWorkflowHeartbeats(workflowIds: Set[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ sealed trait WorkflowStoreAccess {
heartbeatDateTime: OffsetDateTime)
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int]

def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]]

def abort(workflowId: WorkflowId)
Expand All @@ -50,9 +50,9 @@ case class UncoordinatedWorkflowStoreAccess(store: WorkflowStore) extends Workfl
store.writeWorkflowHeartbeats(workflowIds.toVector.toSet, heartbeatDateTime)
}

override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = {
store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl)
store.fetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl, excludedGroups)
}

override def deleteFromStore(workflowId: WorkflowId)(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[Int] = {
Expand All @@ -79,9 +79,9 @@ case class CoordinatedWorkflowStoreAccess(coordinatedWorkflowStoreAccessActor: A
)
}

override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
override def fetchStartableWorkflows(maxWorkflows: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
(implicit actorSystem: ActorSystem, ec: ExecutionContext): Future[List[WorkflowToStart]] = {
val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl)
val message = WorkflowStoreCoordinatedAccessActor.FetchStartableWorkflows(maxWorkflows, cromwellId, heartbeatTtl, excludedGroups)
withRetryForTransactionRollback(
() => coordinatedWorkflowStoreAccessActor.ask(message).mapTo[List[WorkflowToStart]]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ final case class WorkflowStoreActor private(

object WorkflowStoreActor {
sealed trait WorkflowStoreActorEngineCommand
final case class FetchRunnableWorkflows(n: Int) extends WorkflowStoreActorEngineCommand
final case class FetchRunnableWorkflows(n: Int, excludedGroups: Set[String]) extends WorkflowStoreActorEngineCommand
final case class AbortWorkflowCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand
final case class WorkflowOnHoldToSubmittedCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand
case object InitializerCommand extends WorkflowStoreActorEngineCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends
override def receive: Receive = {
case WriteHeartbeats(ids, heartbeatDateTime) =>
workflowStore.writeWorkflowHeartbeats(ids.toVector.toSet, heartbeatDateTime) |> run
case FetchStartableWorkflows(count, cromwellId, heartbeatTtl) =>
workflowStore.fetchStartableWorkflows(count, cromwellId, heartbeatTtl) |> run
case FetchStartableWorkflows(count, cromwellId, heartbeatTtl, excludedGroups) =>
workflowStore.fetchStartableWorkflows(count, cromwellId, heartbeatTtl, excludedGroups) |> run
case DeleteFromStore(workflowId) =>
workflowStore.deleteFromStore(workflowId) |> run
case Abort(workflowId) =>
Expand All @@ -45,7 +45,7 @@ class WorkflowStoreCoordinatedAccessActor(workflowStore: WorkflowStore) extends
object WorkflowStoreCoordinatedAccessActor {
final case class WriteHeartbeats(workflowIds: NonEmptyVector[(WorkflowId, OffsetDateTime)],
heartbeatDateTime: OffsetDateTime)
final case class FetchStartableWorkflows(count: Int, cromwellId: String, heartbeatTtl: FiniteDuration)
final case class FetchStartableWorkflows(count: Int, cromwellId: String, heartbeatTtl: FiniteDuration, excludedGroups: Set[String])
final case class DeleteFromStore(workflowId: WorkflowId)
final case class Abort(workflowId: WorkflowId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore,

private def startNewWork(command: WorkflowStoreActorEngineCommand, sndr: ActorRef, nextData: WorkflowStoreActorData) = {
val work: Future[Any] = command match {
case FetchRunnableWorkflows(count) =>
newWorkflowMessage(count) map { response =>
case FetchRunnableWorkflows(count, excludedGroups) =>
newWorkflowMessage(count, excludedGroups) map { response =>
response match {
case NewWorkflowsToStart(workflows) =>
val workflowsIds = workflows.map(_.id).toList
Expand Down Expand Up @@ -185,10 +185,10 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore,
/**
* Fetches at most n workflows, and builds the correct response message based on if there were any workflows or not
*/
private def newWorkflowMessage(maxWorkflows: Int): Future[WorkflowStoreEngineActorResponse] = {
private def newWorkflowMessage(maxWorkflows: Int, excludedGroups: Set[String]): Future[WorkflowStoreEngineActorResponse] = {
def fetchStartableWorkflowsIfNeeded = {
if (maxWorkflows > 0) {
workflowStoreAccess.fetchStartableWorkflows(maxWorkflows, workflowHeartbeatConfig.cromwellId, workflowHeartbeatConfig.ttl)
workflowStoreAccess.fetchStartableWorkflows(maxWorkflows, workflowHeartbeatConfig.cromwellId, workflowHeartbeatConfig.ttl, excludedGroups)
} else {
Future.successful(List.empty[WorkflowToStart])
}
Expand Down
Loading

0 comments on commit bfef756

Please sign in to comment.