Skip to content

Commit

Permalink
Merge pull request #1235 from input-output-hk/head-explorer-backend
Browse files Browse the repository at this point in the history
Create a hydra-explorer executable that can track all heads on-chain
  • Loading branch information
ffakenz authored Jan 29, 2024
2 parents 3659f50 + 594558b commit b5ef1fa
Show file tree
Hide file tree
Showing 21 changed files with 1,300 additions and 55 deletions.
1 change: 1 addition & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ packages:
hydra-tui
hydraw
hydra-chain-observer
hydra-explorer

-- Compile more things in parallel
package *
Expand Down
3 changes: 2 additions & 1 deletion hydra-chain-observer/exe/Main.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Main where

import Hydra.ChainObserver (defaultObserverHandler)
import Hydra.ChainObserver qualified
import Hydra.Prelude

main :: IO ()
main = Hydra.ChainObserver.main
main = Hydra.ChainObserver.main defaultObserverHandler
71 changes: 44 additions & 27 deletions hydra-chain-observer/src/Hydra/ChainObserver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Hydra.Cardano.Api (
SocketPath,
Tx,
UTxO,
chainTipToChainPoint,
connectToLocalNode,
getTxBody,
getTxId,
Expand Down Expand Up @@ -52,8 +53,13 @@ import Ouroboros.Network.Protocol.ChainSync.Client (
ClientStNext (..),
)

main :: IO ()
main = do
type ObserverHandler m = [HeadObservation] -> m ()

defaultObserverHandler :: Applicative m => ObserverHandler m
defaultObserverHandler = const $ pure ()

main :: ObserverHandler IO -> IO ()
main observerHandler = do
Options{networkId, nodeSocket, startChainFrom} <- execParser hydraChainObserverOptions
withTracer (Verbose "hydra-chain-observer") $ \tracer -> do
traceWith tracer KnownScripts{scriptInfo = Contract.scriptInfo}
Expand All @@ -64,7 +70,7 @@ main = do
traceWith tracer StartObservingFrom{chainPoint}
connectToLocalNode
(connectInfo nodeSocket networkId)
(clientProtocols tracer networkId chainPoint)
(clientProtocols tracer networkId chainPoint observerHandler)

type ChainObserverLog :: Type
data ChainObserverLog
Expand All @@ -79,7 +85,7 @@ data ChainObserverLog
| HeadAbortTx {headId :: HeadId}
| HeadContestTx {headId :: HeadId}
| Rollback {point :: ChainPoint}
| RollForward {receivedTxIds :: [TxId]}
| RollForward {point :: ChainPoint, receivedTxIds :: [TxId]}
deriving stock (Eq, Show, Generic)
deriving anyclass (ToJSON)

Expand All @@ -101,10 +107,11 @@ clientProtocols ::
Tracer IO ChainObserverLog ->
NetworkId ->
ChainPoint ->
ObserverHandler IO ->
LocalNodeClientProtocols BlockType ChainPoint ChainTip slot tx txid txerr query IO
clientProtocols tracer networkId startingPoint =
clientProtocols tracer networkId startingPoint observerHandler =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint
{ localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint observerHandler
, localTxSubmissionClient = Nothing
, localStateQueryClient = Nothing
, localTxMonitoringClient = Nothing
Expand All @@ -128,8 +135,9 @@ chainSyncClient ::
Tracer m ChainObserverLog ->
NetworkId ->
ChainPoint ->
ObserverHandler m ->
ChainSyncClient BlockType ChainPoint ChainTip m ()
chainSyncClient tracer networkId startingPoint =
chainSyncClient tracer networkId startingPoint observerHandler =
ChainSyncClient $
pure $
SendMsgFindIntersect [startingPoint] clientStIntersect
Expand All @@ -143,44 +151,53 @@ chainSyncClient tracer networkId startingPoint =
ChainSyncClient $ throwIO (IntersectionNotFound startingPoint)
}

clientStIdle :: UTxO -> ClientStIdle BlockType ChainPoint tip m ()
clientStIdle :: UTxO -> ClientStIdle BlockType ChainPoint ChainTip m ()
clientStIdle utxo = SendMsgRequestNext (clientStNext utxo) (pure $ clientStNext utxo)

clientStNext :: UTxO -> ClientStNext BlockType ChainPoint tip m ()
clientStNext :: UTxO -> ClientStNext BlockType ChainPoint ChainTip m ()
clientStNext utxo =
ClientStNext
{ recvMsgRollForward = \blockInMode _tip -> ChainSyncClient $ do
{ recvMsgRollForward = \blockInMode tip -> ChainSyncClient $ do
case blockInMode of
BlockInMode _ (Block _header txs) BabbageEraInCardanoMode -> do
traceWith tracer RollForward{receivedTxIds = getTxId . getTxBody <$> txs}
let (utxo', logs) = observeAll networkId utxo txs
forM_ logs (traceWith tracer)
let point = chainTipToChainPoint tip
let receivedTxIds = getTxId . getTxBody <$> txs
traceWith tracer RollForward{point, receivedTxIds}
let (utxo', observations) = observeAll networkId utxo txs
-- FIXME we should be exposing OnChainTx instead of working around NoHeadTx.
forM_ observations (maybe (pure ()) (traceWith tracer) . logObservation)
observerHandler observations
pure $ clientStIdle utxo'
_ -> pure $ clientStIdle utxo
, recvMsgRollBackward = \point _tip -> ChainSyncClient $ do
traceWith tracer Rollback{point}
pure $ clientStIdle utxo
}

observeTx :: NetworkId -> UTxO -> Tx -> (UTxO, Maybe ChainObserverLog)
logObservation :: HeadObservation -> Maybe ChainObserverLog
logObservation = \case
NoHeadTx -> Nothing
Init InitObservation{headId} -> pure $ HeadInitTx{headId}
Commit CommitObservation{headId} -> pure $ HeadCommitTx{headId}
CollectCom CollectComObservation{headId} -> pure $ HeadCollectComTx{headId}
Close CloseObservation{headId} -> pure $ HeadCloseTx{headId}
Fanout FanoutObservation{headId} -> pure $ HeadFanoutTx{headId}
Abort AbortObservation{headId} -> pure $ HeadAbortTx{headId}
Contest ContestObservation{headId} -> pure $ HeadContestTx{headId}

observeTx :: NetworkId -> UTxO -> Tx -> (UTxO, Maybe HeadObservation)
observeTx networkId utxo tx =
let utxo' = adjustUTxO tx utxo
in case observeHeadTx networkId utxo tx of
NoHeadTx -> (utxo, Nothing)
Init InitObservation{headId} -> (utxo', pure $ HeadInitTx{headId})
Commit CommitObservation{headId} -> (utxo', pure $ HeadCommitTx{headId})
CollectCom CollectComObservation{headId} -> (utxo', pure $ HeadCollectComTx{headId})
Close CloseObservation{headId} -> (utxo', pure $ HeadCloseTx{headId})
Fanout FanoutObservation{headId} -> (utxo', pure $ HeadFanoutTx{headId})
Abort AbortObservation{headId} -> (utxo', pure $ HeadAbortTx{headId})
Contest ContestObservation{headId} -> (utxo', pure $ HeadContestTx{headId})

observeAll :: NetworkId -> UTxO -> [Tx] -> (UTxO, [ChainObserverLog])
observation -> (utxo', pure observation)

observeAll :: NetworkId -> UTxO -> [Tx] -> (UTxO, [HeadObservation])
observeAll networkId utxo txs =
second reverse $ foldr go (utxo, []) txs
where
go :: Tx -> (UTxO, [ChainObserverLog]) -> (UTxO, [ChainObserverLog])
go tx (utxo'', logs) =
go :: Tx -> (UTxO, [HeadObservation]) -> (UTxO, [HeadObservation])
go tx (utxo'', observations) =
case observeTx networkId utxo'' tx of
(utxo', Nothing) -> (utxo', logs)
(utxo', Just logEntry) -> (utxo', logEntry : logs)
(utxo', Nothing) -> (utxo', observations)
(utxo', Just observation) -> (utxo', observation : observations)
17 changes: 9 additions & 8 deletions hydra-chain-observer/test/Hydra/ChainObserverSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import Test.Hydra.Prelude
import Hydra.Chain.Direct.Fixture (testNetworkId)
import Hydra.Chain.Direct.State (HasKnownUTxO (getKnownUTxO), genChainStateWithTx)
import Hydra.Chain.Direct.State qualified as Transition
import Hydra.ChainObserver (ChainObserverLog (..), observeAll, observeTx)
import Hydra.Chain.Direct.Tx (HeadObservation (..))
import Hydra.ChainObserver (observeAll, observeTx)
import Hydra.Ledger.Cardano (genSequenceOfSimplePaymentTransactions)
import Test.QuickCheck (counterexample, forAll, forAllBlind, property, (=/=), (===))
import Test.QuickCheck.Property (checkCoverage)
Expand All @@ -21,13 +22,13 @@ spec =
counterexample (show transition) $
let utxo = getKnownUTxO st
in case snd $ observeTx testNetworkId utxo tx of
Just (HeadInitTx{}) -> transition === Transition.Init
Just (HeadCommitTx{}) -> transition === Transition.Commit
Just (HeadCollectComTx{}) -> transition === Transition.Collect
Just (HeadAbortTx{}) -> transition === Transition.Abort
Just (HeadCloseTx{}) -> transition === Transition.Close
Just (HeadContestTx{}) -> transition === Transition.Contest
Just (HeadFanoutTx{}) -> transition === Transition.Fanout
Just (Init{}) -> transition === Transition.Init
Just (Commit{}) -> transition === Transition.Commit
Just (CollectCom{}) -> transition === Transition.Collect
Just (Abort{}) -> transition === Transition.Abort
Just (Close{}) -> transition === Transition.Close
Just (Contest{}) -> transition === Transition.Contest
Just (Fanout{}) -> transition === Transition.Fanout
_ -> property False

prop "Updates UTxO state given transaction part of Head lifecycle" $
Expand Down
4 changes: 4 additions & 0 deletions hydra-cluster/hydra-cluster.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ test-suite tests
Test.Hydra.Cluster.CardanoCliSpec
Test.Hydra.Cluster.FaucetSpec
Test.Hydra.Cluster.MithrilSpec
Test.HydraExplorerSpec
Test.OfflineChainSpec

build-depends:
Expand All @@ -168,6 +169,8 @@ test-suite tests
, directory
, filepath
, hspec
, http-client
, http-conduit
, hydra-cardano-api
, hydra-cluster
, hydra-node:{hydra-node, testlib}
Expand All @@ -185,6 +188,7 @@ test-suite tests
build-tool-depends:
, hspec-discover:hspec-discover
, hydra-chain-observer:hydra-chain-observer
, hydra-explorer:hydra-explorer
, hydra-node:hydra-node

ghc-options: -threaded -rtsopts
Expand Down
16 changes: 5 additions & 11 deletions hydra-cluster/test/Test/ChainObserverSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import Test.Hydra.Prelude
import CardanoClient (RunningNode (..), submitTx)
import CardanoNode (NodeLog, withCardanoNodeDevnet)
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO)
import Control.Exception (IOException)
import Control.Lens ((^?))
import Data.Aeson as Aeson
import Data.Aeson.Lens (key, _String)
Expand Down Expand Up @@ -112,16 +111,11 @@ data ChainObserverLog
-- | Starts a 'hydra-chain-observer' on some Cardano network.
withChainObserver :: RunningNode -> (ChainObserverHandle -> IO ()) -> IO ()
withChainObserver cardanoNode action =
-- XXX: If this throws an IOException, 'withFile' invocations around mislead
-- to the file path opened (e.g. the cardano-node log file) in the test
-- failure output. Print the exception here to have some debuggability at
-- least.
handle (\(e :: IOException) -> print e >> throwIO e) $
withCreateProcess process{std_out = CreatePipe} $ \_in (Just out) _err _ph ->
action
ChainObserverHandle
{ awaitNext = awaitNext out
}
withCreateProcess process{std_out = CreatePipe} $ \_in (Just out) _err _ph ->
action
ChainObserverHandle
{ awaitNext = awaitNext out
}
where
awaitNext :: Handle -> IO Aeson.Value
awaitNext out = do
Expand Down
134 changes: 134 additions & 0 deletions hydra-cluster/test/Test/HydraExplorerSpec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
{-# LANGUAGE DeriveAnyClass #-}

-- | Integration tests for the 'hydra-explorer' executable. These will run
-- also 'hydra-node' on a devnet and assert correct observation.
module Test.HydraExplorerSpec where

import Hydra.Prelude hiding (get)
import Test.Hydra.Prelude

import CardanoClient (RunningNode (..))
import CardanoNode (NodeLog, withCardanoNodeDevnet)
import Control.Lens ((^.), (^?))
import Data.Aeson as Aeson
import Data.Aeson.Lens (key, nth, _Array, _String)
import Hydra.Cardano.Api (NetworkId (..), NetworkMagic (..), unFile)
import Hydra.Cluster.Faucet (FaucetLog, publishHydraScriptsAs, seedFromFaucet_)
import Hydra.Cluster.Fixture (Actor (..), aliceSk, bobSk, cperiod)
import Hydra.Cluster.Util (chainConfigFor, keysFor)
import Hydra.Logging (showLogsOnFailure)
import HydraNode (HydraNodeLog, input, send, waitMatch, withHydraNode)
import Network.HTTP.Client (responseBody)
import Network.HTTP.Simple (httpJSON, parseRequestThrow)
import System.Process (CreateProcess (..), StdStream (..), proc, withCreateProcess)

spec :: Spec
spec = do
it "can observe hydra transactions created by multiple hydra-nodes" $
failAfter 60 $
showLogsOnFailure "HydraExplorerSpec" $ \tracer -> do
withTempDir "hydra-explorer-history" $ \tmpDir -> do
withCardanoNodeDevnet (contramap FromCardanoNode tracer) tmpDir $ \cardanoNode@RunningNode{nodeSocket} -> do
let hydraTracer = contramap FromHydraNode tracer
hydraScriptsTxId <- publishHydraScriptsAs cardanoNode Faucet

let initHead hydraNode = do
send hydraNode $ input "Init" []
waitMatch 5 hydraNode $ \v -> do
guard $ v ^? key "tag" == Just "HeadIsInitializing"
v ^? key "headId" . _String

(aliceCardanoVk, _aliceCardanoSk) <- keysFor Alice
aliceChainConfig <- chainConfigFor Alice tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode aliceCardanoVk 25_000_000 (contramap FromFaucet tracer)
aliceHeadId <- withHydraNode hydraTracer aliceChainConfig tmpDir 1 aliceSk [] [1] initHead

(bobCardanoVk, _bobCardanoSk) <- keysFor Bob
bobChainConfig <- chainConfigFor Bob tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode bobCardanoVk 25_000_000 (contramap FromFaucet tracer)
bobHeadId <- withHydraNode hydraTracer bobChainConfig tmpDir 2 bobSk [] [2] initHead

withHydraExplorer cardanoNode $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 2
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Initializing"

it "can query for all hydra heads observed" $
failAfter 60 $
showLogsOnFailure "HydraExplorerSpec" $ \tracer -> do
withTempDir "hydra-explorer-get-heads" $ \tmpDir -> do
withCardanoNodeDevnet (contramap FromCardanoNode tracer) tmpDir $ \cardanoNode@RunningNode{nodeSocket} -> do
let hydraTracer = contramap FromHydraNode tracer
hydraScriptsTxId <- publishHydraScriptsAs cardanoNode Faucet
withHydraExplorer cardanoNode $ \explorer -> do
(aliceCardanoVk, _aliceCardanoSk) <- keysFor Alice
aliceChainConfig <- chainConfigFor Alice tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode aliceCardanoVk 25_000_000 (contramap FromFaucet tracer)
aliceHeadId <- withHydraNode hydraTracer aliceChainConfig tmpDir 1 aliceSk [] [1] $ \hydraNode -> do
send hydraNode $ input "Init" []

waitMatch 5 hydraNode $ \v -> do
guard $ v ^? key "tag" == Just "HeadIsInitializing"
v ^? key "headId" . _String

(bobCardanoVk, _bobCardanoSk) <- keysFor Bob
bobChainConfig <- chainConfigFor Bob tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode bobCardanoVk 25_000_000 (contramap FromFaucet tracer)
bobHeadId <- withHydraNode hydraTracer bobChainConfig tmpDir 2 bobSk [] [2] $ \hydraNode -> do
send hydraNode $ input "Init" []

bobHeadId <- waitMatch 5 hydraNode $ \v -> do
guard $ v ^? key "tag" == Just "HeadIsInitializing"
v ^? key "headId" . _String

send hydraNode $ input "Abort" []

waitMatch 5 hydraNode $ \v -> do
guard $ v ^? key "tag" == Just "HeadIsAborted"
guard $ v ^? key "headId" . _String == Just bobHeadId

pure bobHeadId

allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 2
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Aborted"

newtype HydraExplorerHandle = HydraExplorerHandle {getHeads :: IO Value}

data HydraExplorerLog
= FromCardanoNode NodeLog
| FromHydraNode HydraNodeLog
| FromFaucet FaucetLog
deriving (Eq, Show, Generic)
deriving anyclass (ToJSON)

-- | Starts a 'hydra-explorer' on some Cardano network.
withHydraExplorer :: RunningNode -> (HydraExplorerHandle -> IO ()) -> IO ()
withHydraExplorer cardanoNode action =
withCreateProcess process{std_out = CreatePipe, std_err = CreatePipe} $
\_in _stdOut err processHandle ->
race
(checkProcessHasNotDied "hydra-explorer" processHandle err)
( -- XXX: wait for the http server to be listening on port
threadDelay 3
*> action HydraExplorerHandle{getHeads}
)
<&> either absurd id
where
getHeads = responseBody <$> (parseRequestThrow "http://127.0.0.1:9090/heads" >>= httpJSON)

process =
proc
"hydra-explorer"
$ ["--node-socket", unFile nodeSocket]
<> case networkId of
Mainnet -> ["--mainnet"]
Testnet (NetworkMagic magic) -> ["--testnet-magic", show magic]

RunningNode{nodeSocket, networkId} = cardanoNode
Loading

0 comments on commit b5ef1fa

Please sign in to comment.