Skip to content

Commit

Permalink
Make use of ObserverHandler during roll forward
Browse files Browse the repository at this point in the history
Simplify ObserverHandler and ExplorerState types.
  • Loading branch information
ffakenz committed Jan 12, 2024
1 parent 30ee731 commit e2ec89a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 33 deletions.
60 changes: 35 additions & 25 deletions hydra-chain-observer/src/Hydra/ChainObserver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ import Ouroboros.Network.Protocol.ChainSync.Client (
ClientStNext (..),
)

type ObserverHandler = ChainPoint -> [(TxId, HeadObservation)] -> IO ()
type ObserverHandler m = [HeadObservation] -> m ()

defaultObserverHandler :: ObserverHandler
defaultObserverHandler = const . const $ pure ()
defaultObserverHandler :: Applicative m => ObserverHandler m
defaultObserverHandler = const $ pure ()

main :: ObserverHandler -> IO ()
main :: ObserverHandler IO -> IO ()
main observerHandler = do
Options{networkId, nodeSocket, startChainFrom} <- execParser hydraChainObserverOptions
withTracer (Verbose "hydra-chain-observer") $ \tracer -> do
Expand All @@ -70,7 +70,7 @@ main observerHandler = do
traceWith tracer StartObservingFrom{chainPoint}
connectToLocalNode
(connectInfo nodeSocket networkId)
(clientProtocols tracer networkId chainPoint)
(clientProtocols tracer networkId chainPoint observerHandler)

type ChainObserverLog :: Type
data ChainObserverLog
Expand Down Expand Up @@ -107,10 +107,11 @@ clientProtocols ::
Tracer IO ChainObserverLog ->
NetworkId ->
ChainPoint ->
ObserverHandler IO ->
LocalNodeClientProtocols BlockType ChainPoint ChainTip slot tx txid txerr query IO
clientProtocols tracer networkId startingPoint =
clientProtocols tracer networkId startingPoint observerHandler =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint
{ localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint observerHandler
, localTxSubmissionClient = Nothing
, localStateQueryClient = Nothing
, localTxMonitoringClient = Nothing
Expand All @@ -134,8 +135,9 @@ chainSyncClient ::
Tracer m ChainObserverLog ->
NetworkId ->
ChainPoint ->
ObserverHandler m ->
ChainSyncClient BlockType ChainPoint ChainTip m ()
chainSyncClient tracer networkId startingPoint =
chainSyncClient tracer networkId startingPoint observerHandler =
ChainSyncClient $
pure $
SendMsgFindIntersect [startingPoint] clientStIntersect
Expand All @@ -159,35 +161,43 @@ chainSyncClient tracer networkId startingPoint =
case blockInMode of
BlockInMode _ (Block _header txs) BabbageEraInCardanoMode -> do
let point = chainTipToChainPoint tip
traceWith tracer RollForward{point, receivedTxIds = getTxId . getTxBody <$> txs}
let (utxo', logs) = observeAll networkId utxo txs
forM_ logs (traceWith tracer)
let receivedTxIds = getTxId . getTxBody <$> txs
traceWith tracer RollForward{point, receivedTxIds}
let (utxo', observations) = observeAll networkId utxo txs
-- FIXME we should be exposing OnChainTx instead of working around NoHeadTx.
forM_ observations (maybe (pure ()) (traceWith tracer) . logObservation)
observerHandler observations
pure $ clientStIdle utxo'
_ -> pure $ clientStIdle utxo
, recvMsgRollBackward = \point _tip -> ChainSyncClient $ do
traceWith tracer Rollback{point}
pure $ clientStIdle utxo
}

observeTx :: NetworkId -> UTxO -> Tx -> (UTxO, Maybe ChainObserverLog)
logObservation :: HeadObservation -> Maybe ChainObserverLog
logObservation = \case
NoHeadTx -> Nothing
Init InitObservation{headId} -> pure $ HeadInitTx{headId}
Commit CommitObservation{headId} -> pure $ HeadCommitTx{headId}
CollectCom CollectComObservation{headId} -> pure $ HeadCollectComTx{headId}
Close CloseObservation{headId} -> pure $ HeadCloseTx{headId}
Fanout FanoutObservation{headId} -> pure $ HeadFanoutTx{headId}
Abort AbortObservation{headId} -> pure $ HeadAbortTx{headId}
Contest ContestObservation{headId} -> pure $ HeadContestTx{headId}

observeTx :: NetworkId -> UTxO -> Tx -> (UTxO, Maybe HeadObservation)
observeTx networkId utxo tx =
let utxo' = adjustUTxO tx utxo
in case observeHeadTx networkId utxo tx of
NoHeadTx -> (utxo, Nothing)
Init InitObservation{headId} -> (utxo', pure $ HeadInitTx{headId})
Commit CommitObservation{headId} -> (utxo', pure $ HeadCommitTx{headId})
CollectCom CollectComObservation{headId} -> (utxo', pure $ HeadCollectComTx{headId})
Close CloseObservation{headId} -> (utxo', pure $ HeadCloseTx{headId})
Fanout FanoutObservation{headId} -> (utxo', pure $ HeadFanoutTx{headId})
Abort AbortObservation{headId} -> (utxo', pure $ HeadAbortTx{headId})
Contest ContestObservation{headId} -> (utxo', pure $ HeadContestTx{headId})

observeAll :: NetworkId -> UTxO -> [Tx] -> (UTxO, [ChainObserverLog])
observation -> (utxo', pure observation)

observeAll :: NetworkId -> UTxO -> [Tx] -> (UTxO, [HeadObservation])
observeAll networkId utxo txs =
second reverse $ foldr go (utxo, []) txs
where
go :: Tx -> (UTxO, [ChainObserverLog]) -> (UTxO, [ChainObserverLog])
go tx (utxo'', logs) =
go :: Tx -> (UTxO, [HeadObservation]) -> (UTxO, [HeadObservation])
go tx (utxo'', observations) =
case observeTx networkId utxo'' tx of
(utxo', Nothing) -> (utxo', logs)
(utxo', Just logEntry) -> (utxo', logEntry : logs)
(utxo', Nothing) -> (utxo', observations)
(utxo', Just observation) -> (utxo', observation : observations)
27 changes: 19 additions & 8 deletions hydra-explorer/src/Hydra/Explorer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import Hydra.Network (PortNumber)
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO)
import Data.Aeson qualified as Aeson
import Data.List qualified as List
import Data.Map.Strict qualified as Map
import Hydra.API.APIServerLog (APIServerLog (..), Method (..), PathInfo (..))
import Hydra.Cardano.Api (ChainPoint, TxId)
import Hydra.Chain.Direct.Tx (HeadObservation)
import Hydra.Chain.Direct.Tx (AbortObservation (..), CloseObservation (..), CollectComObservation (..), CommitObservation (..), ContestObservation (..), FanoutObservation (..), HeadObservation (..), InitObservation (..))
import Network.HTTP.Types (status200)
import Network.HTTP.Types.Header (HeaderName)
import Network.HTTP.Types.Status (status404, status500)
Expand All @@ -30,12 +28,12 @@ import Network.Wai (
import Network.Wai.Handler.Warp qualified as Warp
import System.Environment (withArgs)

type ExplorerState = Map ChainPoint [(TxId, HeadObservation)]
type ExplorerState = [HeadObservation]

observerHandler :: TVar IO ExplorerState -> ChainPoint -> [(TxId, HeadObservation)] -> IO ()
observerHandler explorerState point observations =
observerHandler :: TVar IO ExplorerState -> ExplorerState -> IO ()
observerHandler explorerState observations =
atomically $
modifyTVar' explorerState (Map.insert point observations)
modifyTVar' explorerState (<> observations)

main :: IO ()
main = do
Expand Down Expand Up @@ -69,7 +67,20 @@ main = do
getHeadIdsReadModel :: TVar IO ExplorerState -> IO GetHeadIds
getHeadIdsReadModel tv = atomically $ do
currentState <- readTVar tv
pure $ pure []
let headIds =
mapMaybe
( \case
NoHeadTx -> Nothing
Init InitObservation{headId} -> Just headId
Abort AbortObservation{headId} -> Just headId
Commit CommitObservation{headId} -> Just headId
CollectCom CollectComObservation{headId} -> Just headId
Close CloseObservation{headId} -> Just headId
Contest ContestObservation{headId} -> Just headId
Fanout FanoutObservation{headId} -> Just headId
)
currentState
pure $ pure headIds

type GetHeadIds = IO [HeadId]

Expand Down

0 comments on commit e2ec89a

Please sign in to comment.