Skip to content

Commit

Permalink
Add support for Mailer and Data Seeder for Lambda deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
vknaisl committed Jan 12, 2024
1 parent 2879cf3 commit 2c67232
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 20 deletions.
3 changes: 3 additions & 0 deletions registry-server/src/Registry/Model/Context/ContextLenses.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ instance HasField "sentry'" ServerConfig ServerConfigSentry where
instance HasField "cloud'" ServerConfig ServerConfigCloud where
getField = (.cloud)

instance HasField "persistentCommand'" ServerConfig ServerConfigPersistentCommand where
getField = (.persistentCommand)

instance HasField "dbPool'" AppContext (Pool Connection) where
getField = (.dbPool)

Expand Down
13 changes: 13 additions & 0 deletions registry-server/src/Registry/Worker/CronWorkers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import Registry.Model.Context.BaseContext
import Registry.Model.Context.ContextLenses ()
import Registry.Service.PersistentCommand.PersistentCommandService
import Shared.Common.Model.Config.ServerConfig
import Shared.PersistentCommand.Service.PersistentCommand.PersistentCommandService
import Shared.Worker.Model.Worker.CronWorker

workers :: [CronWorker BaseContext AppContextM]
workers =
[ persistentCommandRetryWorker
, persistentCommandRetryLambdaWorker
]

-- ------------------------------------------------------------------
Expand All @@ -24,3 +26,14 @@ persistentCommandRetryWorker =
, function = runPersistentCommands'
, wrapInTransaction = False
}

persistentCommandRetryLambdaWorker :: CronWorker BaseContext AppContextM
persistentCommandRetryLambdaWorker =
CronWorker
{ name = "persistentCommandRetryLambdaWorker"
, condition = (.serverConfig.persistentCommand.retryLambdaJob.enabled)
, cronDefault = "* * * * *"
, cron = (.serverConfig.persistentCommand.retryLambdaJob.cron)
, function = retryPersistentCommandsForLambda
, wrapInTransaction = False
}
10 changes: 9 additions & 1 deletion shared-common/src/Shared/Common/Model/Config/ServerConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ data ServerConfigCronWorker = ServerConfigCronWorker
deriving (Generic, Show)

data ServerConfigPersistentCommand = ServerConfigPersistentCommand
{ listenerJob :: ServerConfigPersistentCommandListenerJob
{ lambdaFunctions :: [ServerConfigPersistentCommandLambda]
, listenerJob :: ServerConfigPersistentCommandListenerJob
, retryJob :: ServerConfigCronWorker
, retryLambdaJob :: ServerConfigCronWorker
}
deriving (Generic, Show)

data ServerConfigPersistentCommandListenerJob = ServerConfigPersistentCommandListenerJob
{ enabled :: Bool
}
deriving (Generic, Show)

data ServerConfigPersistentCommandLambda = ServerConfigPersistentCommandLambda
{ component :: String
, functionArn :: String
}
deriving (Generic, Show)
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ defaultCloud =
defaultPersistentCommand :: ServerConfigPersistentCommand
defaultPersistentCommand =
ServerConfigPersistentCommand
{ listenerJob = defaultPersistentCommandListenerJob
{ lambdaFunctions = []
, listenerJob = defaultPersistentCommandListenerJob
, retryJob = defaultPersistentCommandRetryJob
, retryLambdaJob = defaultPersistentCommandRetryLambdaJob
}

defaultPersistentCommandListenerJob :: ServerConfigPersistentCommandListenerJob
Expand All @@ -59,3 +61,7 @@ defaultPersistentCommandListenerJob =
defaultPersistentCommandRetryJob :: ServerConfigCronWorker
defaultPersistentCommandRetryJob =
ServerConfigCronWorker {enabled = True, cron = "* * * * *"}

defaultPersistentCommandRetryLambdaJob :: ServerConfigCronWorker
defaultPersistentCommandRetryLambdaJob =
ServerConfigCronWorker {enabled = True, cron = "* * * * *"}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ instance FromEnv ServerConfigPersistentCommand where
[ \c -> applyEnvVariable "PERSISTENT_COMMAND_LISTENER_JOB_ENABLED" c.listenerJob.enabled (\x -> c {listenerJob = c.listenerJob {enabled = x}} :: ServerConfigPersistentCommand)
, \c -> applyEnvVariable "PERSISTENT_COMMAND_RETRY_JOB_ENABLED" c.retryJob.enabled (\x -> c {retryJob = c.retryJob {enabled = x}} :: ServerConfigPersistentCommand)
, \c -> applyStringEnvVariable "PERSISTENT_COMMAND_RETRY_JOB_CRON" c.retryJob.cron (\x -> c {retryJob = c.retryJob {cron = x}} :: ServerConfigPersistentCommand)
, \c -> applyEnvVariable "PERSISTENT_COMMAND_RETRY_LAMBDA_JOB_ENABLED" c.retryLambdaJob.enabled (\x -> c {retryLambdaJob = c.retryLambdaJob {enabled = x}} :: ServerConfigPersistentCommand)
, \c -> applyStringEnvVariable "PERSISTENT_COMMAND_RETRY_LAMBDA_JOB_CRON" c.retryLambdaJob.cron (\x -> c {retryLambdaJob = c.retryLambdaJob {cron = x}} :: ServerConfigPersistentCommand)
]

instance FromEnv ServerConfigLogging where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,20 @@ instance FromJSON ServerConfigCronWorker where

instance FromJSON ServerConfigPersistentCommand where
parseJSON (Object o) = do
lambdaFunctions <- o .:? "lambdaFunctions" .!= []
listenerJob <- o .:? "listenerJob" .!= defaultPersistentCommand.listenerJob
retryJob <- o .:? "retryJob" .!= defaultPersistentCommand.retryJob
retryLambdaJob <- o .:? "retryLambdaJob" .!= defaultPersistentCommand.retryLambdaJob
return ServerConfigPersistentCommand {..}
parseJSON _ = mzero

instance FromJSON ServerConfigPersistentCommandLambda where
parseJSON (Object o) = do
component <- o .: "component"
functionArn <- o .: "functionArn"
return ServerConfigPersistentCommandLambda {..}
parseJSON _ = mzero

instance FromJSON ServerConfigPersistentCommandListenerJob where
parseJSON (Object o) = do
enabled <- o .:? "enabled" .!= defaultPersistentCommandListenerJob.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class
, HasField "serverConfig'" context sc
, HasField "cloud'" sc ServerConfigCloud
, HasField "s3'" sc ServerConfigS3
, HasField "persistentCommand'" sc ServerConfigPersistentCommand
, HasField "sentry'" sc ServerConfigSentry
, HasField "buildInfoConfig'" context BuildInfoConfig
, HasField "httpClientManager'" context Manager
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Shared.PersistentCommand.Database.DAO.PersistentCommand.PersistentCommandDAO where

import Control.Monad.Reader (ask)
import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import Data.String (fromString)
import qualified Data.UUID as U
import Database.PostgreSQL.Simple
Expand All @@ -14,13 +16,15 @@ import Shared.Common.Database.DAO.Common
import Shared.Common.Model.Common.Page
import Shared.Common.Model.Common.Pageable
import Shared.Common.Model.Common.Sort
import Shared.Common.Model.Config.ServerConfig
import Shared.Common.Model.Context.AppContext
import Shared.Common.Util.Logger
import Shared.Common.Util.String (trim)
import Shared.Common.Util.String (f'', trim)
import Shared.PersistentCommand.Database.Mapping.PersistentCommand.PersistentCommand ()
import Shared.PersistentCommand.Database.Mapping.PersistentCommand.PersistentCommandSimple ()
import Shared.PersistentCommand.Model.PersistentCommand.PersistentCommand
import Shared.PersistentCommand.Model.PersistentCommand.PersistentCommandSimple
import Shared.PersistentCommand.Service.PersistentCommand.PersistentCommandMapper

entityName = "persistent_command"

Expand All @@ -40,19 +44,35 @@ findPersistentCommandsPage states pageable sort = do
_ -> f' "WHERE state in (%s) " [generateQuestionMarks states]
createFindEntitiesPageableQuerySortFn entityName pageLabel pageable sort "*" condition states

findPersistentCommandsByStates :: (AppContextC s sc m, FromField identity) => m [PersistentCommandSimple identity]
findPersistentCommandsByStates = do
findPersistentCommandsForRetryByStates :: (AppContextC s sc m, FromField identity) => m [PersistentCommandSimple identity]
findPersistentCommandsForRetryByStates = findPersistentCommandsByStates True []

findPersistentCommandsForLambdaByStates :: (AppContextC s sc m, FromField identity) => [String] -> m [PersistentCommandSimple identity]
findPersistentCommandsForLambdaByStates = findPersistentCommandsByStates False

findPersistentCommandsByStates :: (AppContextC s sc m, FromField identity) => Bool -> [String] -> m [PersistentCommandSimple identity]
findPersistentCommandsByStates internal components = do
let componentCondition =
case components of
[] -> ""
_ -> f' "AND component IN (%s) " [generateQuestionMarks components]
let sql =
"SELECT uuid, destination, tenant_uuid, created_by \
\FROM persistent_command \
\WHERE (state = 'NewPersistentCommandState' \
\ OR (state = 'ErrorPersistentCommandState' AND attempts < max_attempts AND updated_at < (now() - (2 ^ attempts - 1) * INTERVAL '1 min'))) \
\ AND internal = true \
\ORDER BY created_at \
\LIMIT 5 \
\FOR UPDATE"
logInfoI _CMP_DATABASE (trim sql)
let action conn = query_ conn (fromString sql)
fromString $
f''
"SELECT uuid, destination, component, tenant_uuid, created_by \
\FROM persistent_command \
\WHERE (state = 'NewPersistentCommandState' \
\ OR (state = 'ErrorPersistentCommandState' AND attempts < max_attempts AND updated_at < (now() - (2 ^ attempts - 1) * INTERVAL '1 min'))) \
\ AND internal = ${internal} ${componentCondition} \
\ORDER BY created_at \
\LIMIT 5 \
\FOR UPDATE"
[ ("internal", show internal)
, ("componentCondition", componentCondition)
]
let params = components
logQuery sql params
let action conn = query conn sql params
runDB action

findPersistentCommandByUuid :: (AppContextC s sc m, FromField identity) => U.UUID -> m (PersistentCommand identity)
Expand All @@ -68,9 +88,11 @@ findPersistentCommandSimpleByUuid uuid =
insertPersistentCommand :: (AppContextC s sc m, ToField identity) => PersistentCommand identity -> m Int64
insertPersistentCommand command = do
createInsertFn entityName command
if command.internal
then notifyPersistentCommandQueue
else notifySpecificPersistentCommandQueue command
context <- ask
case (command.internal, L.find (\lf -> lf.component == command.component) (context.serverConfig'.persistentCommand'.lambdaFunctions)) of
(True, _) -> notifyPersistentCommandQueue
(False, Nothing) -> notifySpecificPersistentCommandQueue command
(False, Just lf) -> invokeLambdaFunction (toSimple command) lf

updatePersistentCommandByUuid :: (AppContextC s sc m, ToField identity) => PersistentCommand identity -> m Int64
updatePersistentCommandByUuid command = do
Expand Down Expand Up @@ -125,3 +147,11 @@ notifySpecificPersistentCommandQueue command = do
logInfoI _CMP_DATABASE (trim sql)
let action conn = execute_ conn (fromString sql)
runDB action

invokeLambdaFunction :: AppContextC s sc m => PersistentCommandSimple identity -> ServerConfigPersistentCommandLambda -> m Int64
invokeLambdaFunction command lf = do
let sql = f' "SELECT '1', '2' FROM aws_lambda.invoke('%s', '{}'::json, 'eu-central-1', 'Event');" [lf.functionArn]
logInfoI _CMP_DATABASE (trim sql)
let action conn = query_ conn (fromString sql)
runDB action :: AppContextC s sc m => m [String]
return 1
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import GHC.Generics
data PersistentCommandSimple identity = PersistentCommandSimple
{ uuid :: U.UUID
, destination :: Maybe String
, component :: String
, tenantUuid :: U.UUID
, createdBy :: Maybe identity
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ toSimple command =
PersistentCommandSimple
{ uuid = command.uuid
, destination = command.destination
, component = command.component
, tenantUuid = command.tenantUuid
, createdBy = command.createdBy
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module Shared.PersistentCommand.Service.PersistentCommand.PersistentCommandService where

import qualified Control.Exception.Base as E
import Control.Monad (forever, unless, when)
import Control.Monad (forever, unless, void, when)
import Control.Monad.Reader (ask, liftIO)
import Data.Aeson (Value (..), toJSON)
import Data.Foldable (traverse_)
import qualified Data.HashMap.Strict as HashMap
import qualified Data.List as L
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Data.Time
Expand Down Expand Up @@ -34,7 +35,7 @@ runPersistentCommands
-> m ()
runPersistentCommands runAppContextWithAppContext' updateContext createPersistentCommand execute = do
checkPermission _DEV_PERM
commands <- findPersistentCommandsByStates
commands <- findPersistentCommandsForRetryByStates
unless
(null commands)
( do
Expand Down Expand Up @@ -142,6 +143,20 @@ runPersistentCommandChannelListener runAppContextWithAppContext updateContext cr
_ <- getChannelNotification
runPersistentCommands runAppContextWithAppContext updateContext createPersistentCommand execute

retryPersistentCommandsForLambda :: AppContextC s sc m => m ()
retryPersistentCommandsForLambda = do
context <- ask
let components = fmap (\lf -> lf.component) context.serverConfig'.persistentCommand'.lambdaFunctions
persistentCommands <- findPersistentCommandsForLambdaByStates components
traverse_ retryPersistentCommandForLambda persistentCommands

retryPersistentCommandForLambda :: AppContextC s sc m => PersistentCommandSimple U.UUID -> m ()
retryPersistentCommandForLambda command = do
context <- ask
case L.find (\lf -> lf.component == command.component) (context.serverConfig'.persistentCommand'.lambdaFunctions) of
Just lf -> void $ invokeLambdaFunction command lf
Nothing -> logWarnI _CMP_DATABASE (f' "No lambda function found for persistent command '%s'" [U.toString command.uuid])

-- --------------------------------
-- PRIVATE
-- --------------------------------
Expand Down
3 changes: 3 additions & 0 deletions wizard-server/src/Wizard/Model/Context/ContextLenses.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ instance HasField "sentry'" ServerConfig ServerConfigSentry where
instance HasField "cloud'" ServerConfig ServerConfigCloud where
getField = (.cloud)

instance HasField "persistentCommand'" ServerConfig ServerConfigPersistentCommand where
getField = (.persistentCommand)

instance HasField "dbPool'" AppContext (Pool Connection) where
getField = (.dbPool)

Expand Down
13 changes: 13 additions & 0 deletions wizard-server/src/Wizard/Worker/CronWorkers.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Wizard.Worker.CronWorkers where

import Shared.Common.Model.Config.ServerConfig
import Shared.PersistentCommand.Service.PersistentCommand.PersistentCommandService
import Shared.Worker.Model.Worker.CronWorker
import Wizard.Cache.CacheUtil
import Wizard.Model.Cache.ServerCache
Expand Down Expand Up @@ -29,6 +30,7 @@ workers =
, documentWorker
, feedbackWorker
, persistentCommandRetryWorker
, persistentCommandRetryLambdaWorker
, cleanQuestionnaireWorker
, recomputeQuestionnaireIndicationWorker
, squashQuestionnaireEventsWorker
Expand Down Expand Up @@ -106,6 +108,17 @@ persistentCommandRetryWorker =
, wrapInTransaction = False
}

persistentCommandRetryLambdaWorker :: CronWorker BaseContext AppContextM
persistentCommandRetryLambdaWorker =
CronWorker
{ name = "persistentCommandRetryLambdaWorker"
, condition = (.serverConfig.persistentCommand.retryLambdaJob.enabled)
, cronDefault = "* * * * *"
, cron = (.serverConfig.persistentCommand.retryLambdaJob.cron)
, function = retryPersistentCommandsForLambda
, wrapInTransaction = False
}

cleanQuestionnaireWorker :: CronWorker BaseContext AppContextM
cleanQuestionnaireWorker =
CronWorker
Expand Down

0 comments on commit 2c67232

Please sign in to comment.