Skip to content

Commit

Permalink
Add support for Mailer and Data Seeder for Lambda deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
vknaisl committed Jan 11, 2024
1 parent 2879cf3 commit a2ff246
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 20 deletions.
3 changes: 3 additions & 0 deletions registry-server/src/Registry/Model/Context/ContextLenses.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ instance HasField "sentry'" ServerConfig ServerConfigSentry where
instance HasField "cloud'" ServerConfig ServerConfigCloud where
getField = (.cloud)

instance HasField "persistentCommand'" ServerConfig ServerConfigPersistentCommand where
getField = (.persistentCommand)

instance HasField "dbPool'" AppContext (Pool Connection) where
getField = (.dbPool)

Expand Down
9 changes: 8 additions & 1 deletion shared-common/src/Shared/Common/Model/Config/ServerConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ data ServerConfigCronWorker = ServerConfigCronWorker
deriving (Generic, Show)

data ServerConfigPersistentCommand = ServerConfigPersistentCommand
{ listenerJob :: ServerConfigPersistentCommandListenerJob
{ lambdaFunctions :: [ServerConfigPersistentCommandLambda]
, listenerJob :: ServerConfigPersistentCommandListenerJob
, retryJob :: ServerConfigCronWorker
}
deriving (Generic, Show)
Expand All @@ -62,3 +63,9 @@ data ServerConfigPersistentCommandListenerJob = ServerConfigPersistentCommandLis
{ enabled :: Bool
}
deriving (Generic, Show)

data ServerConfigPersistentCommandLambda = ServerConfigPersistentCommandLambda
{ component :: String
, functionArn :: String
}
deriving (Generic, Show)
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ defaultCloud =
defaultPersistentCommand :: ServerConfigPersistentCommand
defaultPersistentCommand =
ServerConfigPersistentCommand
{ listenerJob = defaultPersistentCommandListenerJob
{ lambdaFunctions = []
, listenerJob = defaultPersistentCommandListenerJob
, retryJob = defaultPersistentCommandRetryJob
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,19 @@ instance FromJSON ServerConfigCronWorker where

instance FromJSON ServerConfigPersistentCommand where
parseJSON (Object o) = do
lambdaFunctions <- o .:? "lambdaFunctions" .!= []
listenerJob <- o .:? "listenerJob" .!= defaultPersistentCommand.listenerJob
retryJob <- o .:? "retryJob" .!= defaultPersistentCommand.retryJob
return ServerConfigPersistentCommand {..}
parseJSON _ = mzero

instance FromJSON ServerConfigPersistentCommandLambda where
parseJSON (Object o) = do
component <- o .: "component"
functionArn <- o .: "functionArn"
return ServerConfigPersistentCommandLambda {..}
parseJSON _ = mzero

instance FromJSON ServerConfigPersistentCommandListenerJob where
parseJSON (Object o) = do
enabled <- o .:? "enabled" .!= defaultPersistentCommandListenerJob.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class
, HasField "serverConfig'" context sc
, HasField "cloud'" sc ServerConfigCloud
, HasField "s3'" sc ServerConfigS3
, HasField "persistentCommand'" sc ServerConfigPersistentCommand
, HasField "sentry'" sc ServerConfigSentry
, HasField "buildInfoConfig'" context BuildInfoConfig
, HasField "httpClientManager'" context Manager
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Shared.PersistentCommand.Database.DAO.PersistentCommand.PersistentCommandDAO where

import Control.Monad.Reader (ask)
import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import Data.String (fromString)
import qualified Data.UUID as U
import Database.PostgreSQL.Simple
Expand All @@ -14,13 +16,15 @@ import Shared.Common.Database.DAO.Common
import Shared.Common.Model.Common.Page
import Shared.Common.Model.Common.Pageable
import Shared.Common.Model.Common.Sort
import Shared.Common.Model.Config.ServerConfig
import Shared.Common.Model.Context.AppContext
import Shared.Common.Util.Logger
import Shared.Common.Util.String (trim)
import Shared.Common.Util.String (f'', trim)
import Shared.PersistentCommand.Database.Mapping.PersistentCommand.PersistentCommand ()
import Shared.PersistentCommand.Database.Mapping.PersistentCommand.PersistentCommandSimple ()
import Shared.PersistentCommand.Model.PersistentCommand.PersistentCommand
import Shared.PersistentCommand.Model.PersistentCommand.PersistentCommandSimple
import Shared.PersistentCommand.Service.PersistentCommand.PersistentCommandMapper

entityName = "persistent_command"

Expand All @@ -40,19 +44,35 @@ findPersistentCommandsPage states pageable sort = do
_ -> f' "WHERE state in (%s) " [generateQuestionMarks states]
createFindEntitiesPageableQuerySortFn entityName pageLabel pageable sort "*" condition states

findPersistentCommandsByStates :: (AppContextC s sc m, FromField identity) => m [PersistentCommandSimple identity]
findPersistentCommandsByStates = do
findPersistentCommandsForRetryByStates :: (AppContextC s sc m, FromField identity) => m [PersistentCommandSimple identity]
findPersistentCommandsForRetryByStates = findPersistentCommandsByStates True []

findPersistentCommandsForLambdaByStates :: (AppContextC s sc m, FromField identity) => [String] -> m [PersistentCommandSimple identity]
findPersistentCommandsForLambdaByStates = findPersistentCommandsByStates False

findPersistentCommandsByStates :: (AppContextC s sc m, FromField identity) => Bool -> [String] -> m [PersistentCommandSimple identity]
findPersistentCommandsByStates internal components = do
let componentCondition =
case components of
[] -> ""
_ -> f' "AND component IN (%s) " [generateQuestionMarks components]
let sql =
"SELECT uuid, destination, tenant_uuid, created_by \
\FROM persistent_command \
\WHERE (state = 'NewPersistentCommandState' \
\ OR (state = 'ErrorPersistentCommandState' AND attempts < max_attempts AND updated_at < (now() - (2 ^ attempts - 1) * INTERVAL '1 min'))) \
\ AND internal = true \
\ORDER BY created_at \
\LIMIT 5 \
\FOR UPDATE"
logInfoI _CMP_DATABASE (trim sql)
let action conn = query_ conn (fromString sql)
fromString $
f''
"SELECT uuid, destination, component, tenant_uuid, created_by \
\FROM persistent_command \
\WHERE (state = 'NewPersistentCommandState' \
\ OR (state = 'ErrorPersistentCommandState' AND attempts < max_attempts AND updated_at < (now() - (2 ^ attempts - 1) * INTERVAL '1 min'))) \
\ AND internal = ${internal} ${componentCondition} \
\ORDER BY created_at \
\LIMIT 5 \
\FOR UPDATE"
[ ("internal", show internal)
, ("componentCondition", componentCondition)
]
let params = components
logQuery sql params
let action conn = query conn sql params
runDB action

findPersistentCommandByUuid :: (AppContextC s sc m, FromField identity) => U.UUID -> m (PersistentCommand identity)
Expand All @@ -68,9 +88,11 @@ findPersistentCommandSimpleByUuid uuid =
insertPersistentCommand :: (AppContextC s sc m, ToField identity) => PersistentCommand identity -> m Int64
insertPersistentCommand command = do
createInsertFn entityName command
if command.internal
then notifyPersistentCommandQueue
else notifySpecificPersistentCommandQueue command
context <- ask
case (command.internal, L.find (\lf -> lf.component == command.component) (context.serverConfig'.persistentCommand'.lambdaFunctions)) of
(True, _) -> notifyPersistentCommandQueue
(False, Nothing) -> notifySpecificPersistentCommandQueue command
(False, Just lf) -> invokeLambdaFunction (toSimple command) lf

updatePersistentCommandByUuid :: (AppContextC s sc m, ToField identity) => PersistentCommand identity -> m Int64
updatePersistentCommandByUuid command = do
Expand Down Expand Up @@ -125,3 +147,10 @@ notifySpecificPersistentCommandQueue command = do
logInfoI _CMP_DATABASE (trim sql)
let action conn = execute_ conn (fromString sql)
runDB action

invokeLambdaFunction :: AppContextC s sc m => PersistentCommandSimple identity -> ServerConfigPersistentCommandLambda -> m Int64
invokeLambdaFunction command lf = do
let sql = f' "SELECT * FROM aws_lambda.invoke('%s', '{}'::json, 'eu-central-1', 'Event');" [lf.functionArn]
logInfoI _CMP_DATABASE (trim sql)
let action conn = execute_ conn (fromString sql)
runDB action
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import GHC.Generics
data PersistentCommandSimple identity = PersistentCommandSimple
{ uuid :: U.UUID
, destination :: Maybe String
, component :: String
, tenantUuid :: U.UUID
, createdBy :: Maybe identity
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ toSimple command =
PersistentCommandSimple
{ uuid = command.uuid
, destination = command.destination
, component = command.component
, tenantUuid = command.tenantUuid
, createdBy = command.createdBy
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
module Shared.PersistentCommand.Service.PersistentCommand.PersistentCommandService where

import qualified Control.Exception.Base as E
import Control.Monad (forever, unless, when)
import Control.Monad (forever, unless, void, when)
import Control.Monad.Reader (ask, liftIO)
import Data.Aeson (Value (..), toJSON)
import Data.Foldable (traverse_)
import qualified Data.HashMap.Strict as HashMap
import qualified Data.List as L
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import Data.Time
Expand Down Expand Up @@ -34,7 +35,7 @@ runPersistentCommands
-> m ()
runPersistentCommands runAppContextWithAppContext' updateContext createPersistentCommand execute = do
checkPermission _DEV_PERM
commands <- findPersistentCommandsByStates
commands <- findPersistentCommandsForRetryByStates
unless
(null commands)
( do
Expand Down Expand Up @@ -142,6 +143,20 @@ runPersistentCommandChannelListener runAppContextWithAppContext updateContext cr
_ <- getChannelNotification
runPersistentCommands runAppContextWithAppContext updateContext createPersistentCommand execute

retryPersistentCommandsForLambda :: AppContextC s sc m => m ()
retryPersistentCommandsForLambda = do
context <- ask
let components = fmap (\lf -> lf.component) context.serverConfig'.persistentCommand'.lambdaFunctions
persistentCommands <- findPersistentCommandsForLambdaByStates components
traverse_ retryPersistentCommandForLambda persistentCommands

retryPersistentCommandForLambda :: AppContextC s sc m => PersistentCommandSimple U.UUID -> m ()
retryPersistentCommandForLambda command = do
context <- ask
case L.find (\lf -> lf.component == command.component) (context.serverConfig'.persistentCommand'.lambdaFunctions) of
Just lf -> void $ invokeLambdaFunction command lf
Nothing -> logWarnI _CMP_DATABASE (f' "No lambda function found for persistent command '%s'" [U.toString command.uuid])

-- --------------------------------
-- PRIVATE
-- --------------------------------
Expand Down
3 changes: 3 additions & 0 deletions wizard-server/src/Wizard/Model/Context/ContextLenses.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ instance HasField "sentry'" ServerConfig ServerConfigSentry where
instance HasField "cloud'" ServerConfig ServerConfigCloud where
getField = (.cloud)

instance HasField "persistentCommand'" ServerConfig ServerConfigPersistentCommand where
getField = (.persistentCommand)

instance HasField "dbPool'" AppContext (Pool Connection) where
getField = (.dbPool)

Expand Down

0 comments on commit a2ff246

Please sign in to comment.