diff --git a/README.md b/README.md index 64a0380a..14dcb18d 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,54 @@ The relayer consists of the following components: +### API + +#### `/relay` +- Used to manually relay a Warp message. The body of the request must contain the following JSON: +```json +{ + "blockchain-id": "", + "message-id": "", + "block-num": "" +} +``` +- If successful, the endpoint will return the following JSON: +```json +{ + "transaction-hash": "" +} +``` + +#### `/relay/message` +- Used to manually relay a warp message. The body of the request must contain the following JSON: +```json +{ + "unsigned-message-bytes": "", + "source-address": "" +} +``` +- If successful, the endpoint will return the following JSON: +```json +{ + "transaction-hash": "", +} +``` + +#### `/health` +- Takes no arguments. Returns a `200` status code if all Application Relayers are healthy. Returns a `503` status if any of the Application Relayers have experienced an unrecoverable error. Here is an example return body: +```json +{ + "status": "down", + "details": { + "relayers-all": { + "status": "down", + "timestamp": "2024-06-01T05:06:07.685522Z", + "error": "" + } + } +} +``` + ## Testing ### Unit Tests diff --git a/api/health_check.go b/api/health_check.go new file mode 100644 index 00000000..1fafe3d8 --- /dev/null +++ b/api/health_check.go @@ -0,0 +1,42 @@ +package api + +import ( + "context" + "fmt" + "net/http" + + "github.com/alexliesenfeld/health" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "go.uber.org/atomic" + "go.uber.org/zap" +) + +const HealthAPIPath = "/health" + +func HandleHealthCheck(logger logging.Logger, relayerHealth map[ids.ID]*atomic.Bool) { + http.Handle(HealthAPIPath, healthCheckHandler(logger, relayerHealth)) +} + +func healthCheckHandler(logger logging.Logger, relayerHealth map[ids.ID]*atomic.Bool) http.Handler { + return health.NewHandler(health.NewChecker( + health.WithCheck(health.Check{ + Name: "relayers-all", + Check: func(context.Context) error { + // Store the IDs as the cb58 encoding + var unhealthyRelayers []string + for id, health := range relayerHealth { + if !health.Load() { + unhealthyRelayers = append(unhealthyRelayers, id.String()) + } + } + + if len(unhealthyRelayers) > 0 { + logger.Fatal("Relayers are unhealthy for blockchains", zap.Strings("blockchains", unhealthyRelayers)) + return fmt.Errorf("relayers are unhealthy for blockchains %v", unhealthyRelayers) + } + return nil + }, + }), + )) +} diff --git a/api/relay_message.go b/api/relay_message.go new file mode 100644 index 00000000..164b2426 --- /dev/null +++ b/api/relay_message.go @@ -0,0 +1,143 @@ +package api + +import ( + "encoding/json" + "math/big" + "net/http" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/relayer" + "github.com/ava-labs/awm-relayer/types" + relayerTypes "github.com/ava-labs/awm-relayer/types" + "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" +) + +const ( + RelayAPIPath = "/relay" + RelayMessageAPIPath = RelayAPIPath + "/message" +) + +type RelayMessageRequest struct { + // Required. cb58 encoding of the source blockchain ID for the message + BlockchainID string `json:"blockchain-id"` + // Required. cb58 encoding of the warp message ID + MessageID string `json:"message-id"` + // Required. Block number that the message was sent in + BlockNum uint64 `json:"block-num"` +} + +type RelayMessageResponse struct { + // hex encoding of the transaction hash containing the processed message + TransactionHash string `json:"transaction-hash"` +} + +// Defines a manual warp message to be sent from the relayer through the API. +type ManualWarpMessageRequest struct { + UnsignedMessageBytes []byte `json:"unsigned-message-bytes"` + SourceAddress string `json:"source-address"` +} + +func HandleRelayMessage(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) { + http.Handle(RelayAPIPath, relayAPIHandler(logger, messageCoordinator)) +} + +func HandleRelay(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) { + http.Handle(RelayMessageAPIPath, relayMessageAPIHandler(logger, messageCoordinator)) +} + +func relayMessageAPIHandler(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req ManualWarpMessageRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + logger.Warn("Could not decode request body") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + unsignedMessage, err := types.UnpackWarpMessage(req.UnsignedMessageBytes) + if err != nil { + logger.Warn("Error unpacking warp message", zap.Error(err)) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + warpMessageInfo := &relayerTypes.WarpMessageInfo{ + SourceAddress: common.HexToAddress(req.SourceAddress), + UnsignedMessage: unsignedMessage, + } + + txHash, err := messageCoordinator.ProcessWarpMessage(warpMessageInfo) + if err != nil { + logger.Error("Error processing message", zap.Error(err)) + http.Error(w, "error processing message: "+err.Error(), http.StatusInternalServerError) + return + } + + resp, err := json.Marshal( + RelayMessageResponse{ + TransactionHash: txHash.Hex(), + }, + ) + if err != nil { + logger.Error("Error marshaling response", zap.Error(err)) + http.Error(w, "error marshaling response: "+err.Error(), http.StatusInternalServerError) + return + } + + _, err = w.Write(resp) + if err != nil { + logger.Error("Error writing response", zap.Error(err)) + } + }) +} + +func relayAPIHandler(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req RelayMessageRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + logger.Warn("Could not decode request body") + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + blockchainID, err := ids.FromString(req.BlockchainID) + if err != nil { + logger.Warn("Invalid blockchainID", zap.String("blockchainID", req.BlockchainID)) + http.Error(w, "invalid blockchainID: "+err.Error(), http.StatusBadRequest) + return + } + messageID, err := ids.FromString(req.MessageID) + if err != nil { + logger.Warn("Invalid messageID", zap.String("messageID", req.MessageID)) + http.Error(w, "invalid messageID: "+err.Error(), http.StatusBadRequest) + return + } + + txHash, err := messageCoordinator.ProcessMessageID(blockchainID, messageID, new(big.Int).SetUint64(req.BlockNum)) + if err != nil { + logger.Error("Error processing message", zap.Error(err)) + http.Error(w, "error processing message: "+err.Error(), http.StatusInternalServerError) + return + } + + resp, err := json.Marshal( + RelayMessageResponse{ + TransactionHash: txHash.Hex(), + }, + ) + if err != nil { + logger.Error("Error marshalling response", zap.Error(err)) + http.Error(w, "error marshalling response: "+err.Error(), http.StatusInternalServerError) + return + } + + _, err = w.Write(resp) + if err != nil { + logger.Error("Error writing response", zap.Error(err)) + } + }) +} diff --git a/config/config.go b/config/config.go index 4ae1fe00..4712feaa 100644 --- a/config/config.go +++ b/config/config.go @@ -58,7 +58,6 @@ type Config struct { SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"` DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"` ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"` - ManualWarpMessages []*ManualWarpMessage `mapstructure:"manual-warp-messages" json:"manual-warp-messages"` // convenience field to fetch a blockchain's subnet ID blockchainIDToSubnetID map[ids.ID]ids.ID @@ -71,7 +70,7 @@ func DisplayUsageText() { // Validates the configuration // Does not modify the public fields as derived from the configuration passed to the application, -// but does initialize private fields available through getters +// but does initialize private fields available through getters. func (c *Config) Validate() error { if len(c.SourceBlockchains) == 0 { return errors.New("relayer not configured to relay from any subnets. A list of source subnets must be provided in the configuration file") @@ -120,13 +119,6 @@ func (c *Config) Validate() error { } c.blockchainIDToSubnetID = blockchainIDToSubnetID - // Validate the manual warp messages - for i, msg := range c.ManualWarpMessages { - if err := msg.Validate(); err != nil { - return fmt.Errorf("invalid manual warp message at index %d: %w", i, err) - } - } - return nil } diff --git a/config/manual_warp_message.go b/config/manual_warp_message.go deleted file mode 100644 index db5b26b0..00000000 --- a/config/manual_warp_message.go +++ /dev/null @@ -1,76 +0,0 @@ -package config - -import ( - "encoding/hex" - "errors" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/awm-relayer/utils" - "github.com/ethereum/go-ethereum/common" -) - -// Defines a manual warp message to be sent from the relayer on startup. -type ManualWarpMessage struct { - UnsignedMessageBytes string `mapstructure:"unsigned-message-bytes" json:"unsigned-message-bytes"` - SourceBlockchainID string `mapstructure:"source-blockchain-id" json:"source-blockchain-id"` - DestinationBlockchainID string `mapstructure:"destination-blockchain-id" json:"destination-blockchain-id"` - SourceAddress string `mapstructure:"source-address" json:"source-address"` - DestinationAddress string `mapstructure:"destination-address" json:"destination-address"` - - // convenience fields to access the values after initialization - unsignedMessageBytes []byte - sourceBlockchainID ids.ID - destinationBlockchainID ids.ID - sourceAddress common.Address - destinationAddress common.Address -} - -// Validates the manual Warp message configuration. -// Does not modify the public fields as derived from the configuration passed to the application, -// but does initialize private fields available through getters -func (m *ManualWarpMessage) Validate() error { - unsignedMsg, err := hex.DecodeString(utils.SanitizeHexString(m.UnsignedMessageBytes)) - if err != nil { - return err - } - sourceBlockchainID, err := ids.FromString(m.SourceBlockchainID) - if err != nil { - return err - } - if !common.IsHexAddress(m.SourceAddress) { - return errors.New("invalid source address in manual warp message configuration") - } - destinationBlockchainID, err := ids.FromString(m.DestinationBlockchainID) - if err != nil { - return err - } - if !common.IsHexAddress(m.DestinationAddress) { - return errors.New("invalid destination address in manual warp message configuration") - } - m.unsignedMessageBytes = unsignedMsg - m.sourceBlockchainID = sourceBlockchainID - m.sourceAddress = common.HexToAddress(m.SourceAddress) - m.destinationBlockchainID = destinationBlockchainID - m.destinationAddress = common.HexToAddress(m.DestinationAddress) - return nil -} - -func (m *ManualWarpMessage) GetUnsignedMessageBytes() []byte { - return m.unsignedMessageBytes -} - -func (m *ManualWarpMessage) GetSourceBlockchainID() ids.ID { - return m.sourceBlockchainID -} - -func (m *ManualWarpMessage) GetSourceAddress() common.Address { - return m.sourceAddress -} - -func (m *ManualWarpMessage) GetDestinationBlockchainID() ids.ID { - return m.destinationBlockchainID -} - -func (m *ManualWarpMessage) GetDestinationAddress() common.Address { - return m.destinationAddress -} diff --git a/main/main.go b/main/main.go index e33c5d8e..0ae77aa0 100644 --- a/main/main.go +++ b/main/main.go @@ -5,25 +5,25 @@ package main import ( "context" - "encoding/hex" "fmt" "log" "net/http" "os" "strings" - "github.com/alexliesenfeld/health" "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/api" "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/messages" + offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry" + "github.com/ava-labs/awm-relayer/messages/teleporter" "github.com/ava-labs/awm-relayer/peers" "github.com/ava-labs/awm-relayer/relayer" - "github.com/ava-labs/awm-relayer/types" - relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/ethclient" @@ -102,24 +102,27 @@ func main() { logger.Info("Initializing destination clients") destinationClients, err := vms.CreateDestinationClients(logger, cfg) if err != nil { - logger.Error( - "Failed to create destination clients", - zap.Error(err), - ) + logger.Fatal("Failed to create destination clients", zap.Error(err)) + panic(err) + } + + // Initialize all source clients + logger.Info("Initializing source clients") + sourceClients, err := createSourceClients(context.Background(), logger, &cfg) + if err != nil { + logger.Fatal("Failed to create source clients", zap.Error(err)) panic(err) } // Initialize metrics gathered through prometheus gatherer, registerer, err := initializeMetrics() if err != nil { - logger.Fatal("Failed to set up prometheus metrics", - zap.Error(err)) + logger.Fatal("Failed to set up prometheus metrics", zap.Error(err)) panic(err) } // Initialize the global app request network logger.Info("Initializing app request network") - // The app request network generates P2P networking logs that are verbose at the info level. // Unless the log level is debug or lower, set the network log level to error to avoid spamming the logs. // We do not collect metrics for the network. @@ -133,51 +136,15 @@ func main() { &cfg, ) if err != nil { - logger.Error( - "Failed to create app request network", - zap.Error(err), - ) + logger.Fatal("Failed to create app request network", zap.Error(err)) panic(err) } - // Each goroutine will have an atomic bool that it can set to false if it ever disconnects from its subscription. - relayerHealth := make(map[ids.ID]*atomic.Bool) - - checker := health.NewChecker( - health.WithCheck(health.Check{ - Name: "relayers-all", - Check: func(context.Context) error { - // Store the IDs as the cb58 encoding - var unhealthyRelayers []string - for id, health := range relayerHealth { - if !health.Load() { - unhealthyRelayers = append(unhealthyRelayers, id.String()) - } - } - - if len(unhealthyRelayers) > 0 { - return fmt.Errorf("relayers are unhealthy for blockchains %v", unhealthyRelayers) - } - return nil - }, - }), - ) - - http.Handle("/health", health.NewHandler(checker)) - - // start the health check server - go func() { - log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil)) - }() - startMetricsServer(logger, gatherer, cfg.MetricsPort) - metrics, err := relayer.NewApplicationRelayerMetrics(registerer) + relayerMetrics, err := relayer.NewApplicationRelayerMetrics(registerer) if err != nil { - logger.Error( - "Failed to create application relayer metrics", - zap.Error(err), - ) + logger.Fatal("Failed to create application relayer metrics", zap.Error(err)) panic(err) } @@ -191,20 +158,14 @@ func main() { constants.DefaultNetworkMaximumInboundTimeout, ) if err != nil { - logger.Error( - "Failed to create message creator", - zap.Error(err), - ) + logger.Fatal("Failed to create message creator", zap.Error(err)) panic(err) } // Initialize the database db, err := database.NewDatabase(logger, &cfg) if err != nil { - logger.Error( - "Failed to create database", - zap.Error(err), - ) + logger.Fatal("Failed to create database", zap.Error(err)) panic(err) } @@ -212,159 +173,194 @@ func main() { ticker := utils.NewTicker(cfg.DBWriteIntervalSeconds) go ticker.Run() - // Gather manual Warp messages specified in the configuration - manualWarpMessages := make(map[ids.ID][]*relayerTypes.WarpMessageInfo) - for _, msg := range cfg.ManualWarpMessages { - sourceBlockchainID := msg.GetSourceBlockchainID() - unsignedMsg, err := types.UnpackWarpMessage(msg.GetUnsignedMessageBytes()) - if err != nil { - logger.Error( - "Failed to unpack manual Warp message", - zap.String("warpMessageBytes", hex.EncodeToString(msg.GetUnsignedMessageBytes())), - zap.Error(err), - ) - panic(err) - } - warpLogInfo := relayerTypes.WarpMessageInfo{ - SourceAddress: msg.GetSourceAddress(), - UnsignedMessage: unsignedMsg, - } - manualWarpMessages[sourceBlockchainID] = append(manualWarpMessages[sourceBlockchainID], &warpLogInfo) + relayerHealth := createHealthTrackers(&cfg) + + messageHandlerFactories, err := createMessageHandlerFactories(logger, &cfg) + if err != nil { + logger.Fatal("Failed to create message handler factories", zap.Error(err)) + panic(err) + } + + applicationRelayers, minHeights, err := createApplicationRelayers( + context.Background(), + logger, + relayerMetrics, + db, + ticker, + network, + messageCreator, + &cfg, + sourceClients, + destinationClients, + ) + if err != nil { + logger.Fatal("Failed to create application relayers", zap.Error(err)) + panic(err) } + messageCoordinator := relayer.NewMessageCoordinator(logger, messageHandlerFactories, applicationRelayers, sourceClients) + + // Each Listener goroutine will have an atomic bool that it can set to false to indicate an unrecoverable error + api.HandleHealthCheck(logger, relayerHealth) + api.HandleRelay(logger, messageCoordinator) + api.HandleRelayMessage(logger, messageCoordinator) + + // start the health check server + go func() { + log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil)) + }() // Create listeners for each of the subnets configured as a source errGroup, ctx := errgroup.WithContext(context.Background()) for _, s := range cfg.SourceBlockchains { - blockchainID, err := ids.FromString(s.BlockchainID) - if err != nil { - logger.Error( - "Invalid subnetID in configuration", - zap.Error(err), - ) - panic(err) - } sourceBlockchain := s - health := atomic.NewBool(true) - relayerHealth[blockchainID] = health - // errgroup will cancel the context when the first goroutine returns an error errGroup.Go(func() error { - // Dial the eth client - ethClient, err := utils.NewEthClientWithConfig( - context.Background(), - sourceBlockchain.RPCEndpoint.BaseURL, - sourceBlockchain.RPCEndpoint.HTTPHeaders, - sourceBlockchain.RPCEndpoint.QueryParams, - ) - if err != nil { - logger.Error( - "Failed to connect to node via RPC", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - zap.Error(err), - ) - return err - } - - // Create the ApplicationRelayers - applicationRelayers, minHeight, err := createApplicationRelayers( - ctx, - logger, - metrics, - db, - ticker, - *sourceBlockchain, - network, - messageCreator, - &cfg, - ethClient, - destinationClients, - ) - if err != nil { - logger.Error( - "Failed to create application relayers", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - zap.Error(err), - ) - return err - } - logger.Info( - "Created application relayers", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - ) - // runListener runs until it errors or the context is cancelled by another goroutine - return runListener( + return relayer.RunListener( ctx, logger, *sourceBlockchain, - health, - manualWarpMessages[blockchainID], - &cfg, - ethClient, - applicationRelayers, - minHeight, + sourceClients[sourceBlockchain.GetBlockchainID()], + relayerHealth[sourceBlockchain.GetBlockchainID()], + cfg.ProcessMissedBlocks, + minHeights[sourceBlockchain.GetBlockchainID()], + messageCoordinator, ) }) } err = errGroup.Wait() - logger.Error( - "Relayer exiting.", - zap.Error(err), - ) + logger.Error("Relayer exiting.", zap.Error(err)) } -// runListener creates a Listener instance and the ApplicationRelayers for a subnet. -// The Listener listens for warp messages on that subnet, and the ApplicationRelayers handle delivery to the destination -func runListener( - ctx context.Context, +func createMessageHandlerFactories( logger logging.Logger, - sourceBlockchain config.SourceBlockchain, - relayerHealth *atomic.Bool, - manualWarpMessages []*relayerTypes.WarpMessageInfo, globalConfig *config.Config, - ethClient ethclient.Client, - applicationRelayers map[common.Hash]*relayer.ApplicationRelayer, - minHeight uint64, -) error { - // Create the Listener - listener, err := relayer.NewListener( - logger, - sourceBlockchain, - relayerHealth, - globalConfig, - applicationRelayers, - minHeight, - ethClient, - ) - if err != nil { - return fmt.Errorf("failed to create listener instance: %w", err) +) (map[ids.ID]map[common.Address]messages.MessageHandlerFactory, error) { + messageHandlerFactories := make(map[ids.ID]map[common.Address]messages.MessageHandlerFactory) + for _, sourceBlockchain := range globalConfig.SourceBlockchains { + messageHandlerFactoriesForSource := make(map[common.Address]messages.MessageHandlerFactory) + // Create message handler factories for each supported message protocol + for addressStr, cfg := range sourceBlockchain.MessageContracts { + address := common.HexToAddress(addressStr) + format := cfg.MessageFormat + var ( + m messages.MessageHandlerFactory + err error + ) + switch config.ParseMessageProtocol(format) { + case config.TELEPORTER: + m, err = teleporter.NewMessageHandlerFactory( + logger, + address, + cfg, + ) + case config.OFF_CHAIN_REGISTRY: + m, err = offchainregistry.NewMessageHandlerFactory( + logger, + cfg, + ) + default: + m, err = nil, fmt.Errorf("invalid message format %s", format) + } + if err != nil { + logger.Error("Failed to create message handler factory", zap.Error(err)) + return nil, err + } + messageHandlerFactoriesForSource[address] = m + } + messageHandlerFactories[sourceBlockchain.GetBlockchainID()] = messageHandlerFactoriesForSource } - logger.Info( - "Created listener", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - ) - err = listener.ProcessManualWarpMessages(logger, manualWarpMessages, sourceBlockchain) - if err != nil { - logger.Error( - "Failed to process manual Warp messages", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - zap.Error(err), + return messageHandlerFactories, nil +} + +func createSourceClients( + ctx context.Context, + logger logging.Logger, + cfg *config.Config, +) (map[ids.ID]ethclient.Client, error) { + var err error + clients := make(map[ids.ID]ethclient.Client) + + for _, sourceBlockchain := range cfg.SourceBlockchains { + clients[sourceBlockchain.GetBlockchainID()], err = utils.NewEthClientWithConfig( + ctx, + sourceBlockchain.RPCEndpoint.BaseURL, + sourceBlockchain.RPCEndpoint.HTTPHeaders, + sourceBlockchain.RPCEndpoint.QueryParams, ) + if err != nil { + logger.Error( + "Failed to connect to node via RPC", + zap.String("blockchainID", sourceBlockchain.BlockchainID), + zap.Error(err), + ) + return nil, err + } } + return clients, nil +} - logger.Info( - "Listener initialized. Listening for messages to relay.", - zap.String("originBlockchainID", sourceBlockchain.BlockchainID), - ) +// Returns a map of application relayers, as well as a map of source blockchain IDs to starting heights. +func createApplicationRelayers( + ctx context.Context, + logger logging.Logger, + relayerMetrics *relayer.ApplicationRelayerMetrics, + db database.RelayerDatabase, + ticker *utils.Ticker, + network *peers.AppRequestNetwork, + messageCreator message.Creator, + cfg *config.Config, + sourceClients map[ids.ID]ethclient.Client, + destinationClients map[ids.ID]vms.DestinationClient, +) (map[common.Hash]*relayer.ApplicationRelayer, map[ids.ID]uint64, error) { + applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer) + minHeights := make(map[ids.ID]uint64) + for _, sourceBlockchain := range cfg.SourceBlockchains { + currentHeight, err := sourceClients[sourceBlockchain.GetBlockchainID()].BlockNumber(ctx) + if err != nil { + logger.Error("Failed to get current block height", zap.Error(err)) + return nil, nil, err + } - // Wait for logs from the subscribed node - // Will only return on error or context cancellation - return listener.ProcessLogs(ctx) + // Create the ApplicationRelayers + applicationRelayersForSource, minHeight, err := createApplicationRelayersForSourceChain( + ctx, + logger, + relayerMetrics, + db, + ticker, + *sourceBlockchain, + network, + messageCreator, + cfg, + currentHeight, + destinationClients, + ) + if err != nil { + logger.Error( + "Failed to create application relayers", + zap.String("blockchainID", sourceBlockchain.BlockchainID), + zap.Error(err), + ) + return nil, nil, err + } + + for relayerID, applicationRelayer := range applicationRelayersForSource { + applicationRelayers[relayerID] = applicationRelayer + } + minHeights[sourceBlockchain.GetBlockchainID()] = minHeight + + logger.Info( + "Created application relayers", + zap.String("blockchainID", sourceBlockchain.BlockchainID), + ) + } + return applicationRelayers, minHeights, nil } // createApplicationRelayers creates Application Relayers for a given source blockchain. -func createApplicationRelayers( +func createApplicationRelayersForSourceChain( ctx context.Context, logger logging.Logger, metrics *relayer.ApplicationRelayerMetrics, @@ -374,7 +370,7 @@ func createApplicationRelayers( network *peers.AppRequestNetwork, messageCreator message.Creator, cfg *config.Config, - srcEthClient ethclient.Client, + currentHeight uint64, destinationClients map[ids.ID]vms.DestinationClient, ) (map[common.Hash]*relayer.ApplicationRelayer, uint64, error) { // Create the ApplicationRelayers @@ -384,17 +380,8 @@ func createApplicationRelayers( ) applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer) - currentHeight, err := srcEthClient.BlockNumber(context.Background()) - if err != nil { - logger.Error( - "Failed to get current block height", - zap.Error(err), - ) - return nil, 0, err - } - // Each ApplicationRelayer determines its starting height based on the database state. - // The Listener begins processing messages starting from the minimum height across all of the ApplicationRelayers + // The Listener begins processing messages starting from the minimum height across all the ApplicationRelayers minHeight := uint64(0) for _, relayerID := range database.GetSourceBlockchainRelayerIDs(&sourceBlockchain) { height, err := database.CalculateStartingBlockHeight( @@ -441,6 +428,14 @@ func createApplicationRelayers( return applicationRelayers, minHeight, nil } +func createHealthTrackers(cfg *config.Config) map[ids.ID]*atomic.Bool { + healthTrackers := make(map[ids.ID]*atomic.Bool, len(cfg.SourceBlockchains)) + for _, sourceBlockchain := range cfg.SourceBlockchains { + healthTrackers[sourceBlockchain.GetBlockchainID()] = atomic.NewBool(true) + } + return healthTrackers +} + func startMetricsServer(logger logging.Logger, gatherer prometheus.Gatherer, port uint16) { http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})) diff --git a/messages/message_handler.go b/messages/message_handler.go index 1ebfc04c..3beac8bc 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -27,7 +27,8 @@ type MessageHandler interface { // SendMessage sends the signed message to the destination chain. The payload parsed according to // the VM rules is also passed in, since MessageManager does not assume any particular VM - SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) error + // returns the transaction hash if the transaction is successful. + SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) // GetMessageRoutingInfo returns the source chain ID, origin sender address, destination chain ID, and destination address GetMessageRoutingInfo() ( diff --git a/messages/off-chain-registry/message_handler.go b/messages/off-chain-registry/message_handler.go index 8abd17e4..bf6ca351 100644 --- a/messages/off-chain-registry/message_handler.go +++ b/messages/off-chain-registry/message_handler.go @@ -144,7 +144,7 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie return false, nil } -func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) error { +func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) { // Construct the transaction call data to call the TeleporterRegistry contract. // Only one off-chain registry Warp message is sent at a time, so we hardcode the index to 0 in the call. callData, err := teleporterregistry.PackAddProtocolVersion(0) @@ -154,10 +154,10 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("destinationBlockchainID", destinationClient.DestinationBlockchainID().String()), zap.String("warpMessageID", signedMessage.ID().String()), ) - return err + return common.Hash{}, err } - _, err = destinationClient.SendTx(signedMessage, m.factory.registryAddress.Hex(), addProtocolVersionGasLimit, callData) + txHash, err := destinationClient.SendTx(signedMessage, m.factory.registryAddress.Hex(), addProtocolVersionGasLimit, callData) if err != nil { m.logger.Error( "Failed to send tx.", @@ -165,14 +165,14 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("warpMessageID", signedMessage.ID().String()), zap.Error(err), ) - return err + return common.Hash{}, err } m.logger.Info( "Sent message to destination chain", zap.String("destinationBlockchainID", destinationClient.DestinationBlockchainID().String()), zap.String("warpMessageID", signedMessage.ID().String()), ) - return nil + return txHash, nil } func (m *messageHandler) GetMessageRoutingInfo() ( diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go index 806d254a..0e58e044 100644 --- a/messages/teleporter/message_handler.go +++ b/messages/teleporter/message_handler.go @@ -172,7 +172,7 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie // SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract, // and dispatches transaction construction and broadcast to the destination client -func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) error { +func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) { destinationBlockchainID := destinationClient.DestinationBlockchainID() teleporterMessageID, err := teleporterUtils.CalculateMessageID( m.factory.protocolAddress, @@ -181,7 +181,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli m.teleporterMessage.MessageNonce, ) if err != nil { - return fmt.Errorf("failed to calculate Teleporter message ID: %w", err) + return common.Hash{}, fmt.Errorf("failed to calculate Teleporter message ID: %w", err) } m.logger.Info( @@ -198,7 +198,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("warpMessageID", signedMessage.ID().String()), zap.String("teleporterMessageID", teleporterMessageID.String()), ) - return err + return common.Hash{}, err } gasLimit, err := gasUtils.CalculateReceiveMessageGasLimit( @@ -215,7 +215,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("warpMessageID", signedMessage.ID().String()), zap.String("teleporterMessageID", teleporterMessageID.String()), ) - return err + return common.Hash{}, err } // Construct the transaction call data to call the receive cross chain message method of the receiver precompile. callData, err := teleportermessenger.PackReceiveCrossChainMessage(0, common.HexToAddress(m.factory.messageConfig.RewardAddress)) @@ -226,7 +226,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("warpMessageID", signedMessage.ID().String()), zap.String("teleporterMessageID", teleporterMessageID.String()), ) - return err + return common.Hash{}, err } txHash, err := destinationClient.SendTx(signedMessage, m.factory.protocolAddress.Hex(), gasLimit, callData) @@ -238,13 +238,13 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("teleporterMessageID", teleporterMessageID.String()), zap.Error(err), ) - return err + return common.Hash{}, err } // Wait for the message to be included in a block before returning err = m.waitForReceipt(signedMessage, destinationClient, txHash, teleporterMessageID) if err != nil { - return err + return common.Hash{}, err } m.logger.Info( @@ -254,7 +254,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli zap.String("teleporterMessageID", teleporterMessageID.String()), zap.String("txHash", txHash.String()), ) - return nil + return txHash, nil } func (m *messageHandler) waitForReceipt(signedMessage *warp.Message, destinationClient vms.DestinationClient, txHash common.Hash, teleporterMessageID ids.ID) error { diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index c1f59600..d2ba8c08 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -31,6 +31,7 @@ import ( coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message" msg "github.com/ava-labs/subnet-evm/plugin/evm/message" "github.com/ava-labs/subnet-evm/rpc" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "golang.org/x/sync/errgroup" @@ -158,7 +159,8 @@ func (r *ApplicationRelayer) ProcessHeight(height uint64, handlers []messages.Me // Once we upgrade to Go 1.22, we can use the loop variable directly in the goroutine h := handler eg.Go(func() error { - return r.ProcessMessage(h) + _, err := r.ProcessMessage(h) + return err }) } if err := eg.Wait(); err != nil { @@ -182,29 +184,26 @@ func (r *ApplicationRelayer) ProcessHeight(height uint64, handlers []messages.Me } // Relays a message to the destination chain. Does not checkpoint the height. -func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) error { +// returns the transaction hash if the message is successfully relayed. +func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (common.Hash, error) { // Increment the request ID. Make sure we don't hold the lock while we relay the message. r.lock.Lock() r.currentRequestID++ reqID := r.currentRequestID r.lock.Unlock() - err := r.relayMessage( - reqID, - handler, - ) - - return err + return r.relayMessage(reqID, handler) } func (r *ApplicationRelayer) RelayerID() database.RelayerID { return r.relayerID } +// returns the transaction hash if the message is successfully relayed. func (r *ApplicationRelayer) relayMessage( requestID uint32, handler messages.MessageHandler, -) error { +) (common.Hash, error) { r.logger.Debug( "Relaying message", zap.Uint32("requestID", requestID), @@ -218,11 +217,11 @@ func (r *ApplicationRelayer) relayMessage( zap.Error(err), ) r.incFailedRelayMessageCount("failed to check if message should be sent") - return err + return common.Hash{}, err } if !shouldSend { r.logger.Info("Message should not be sent") - return nil + return common.Hash{}, nil } unsignedMessage := handler.GetUnsignedMessage() @@ -240,7 +239,7 @@ func (r *ApplicationRelayer) relayMessage( zap.Error(err), ) r.incFailedRelayMessageCount("failed to create signed warp message via AppRequest network") - return err + return common.Hash{}, err } } else { r.incFetchSignatureRPCCount() @@ -251,29 +250,30 @@ func (r *ApplicationRelayer) relayMessage( zap.Error(err), ) r.incFailedRelayMessageCount("failed to create signed warp message via RPC") - return err + return common.Hash{}, err } } // create signed message latency (ms) r.setCreateSignedMessageLatencyMS(float64(time.Since(startCreateSignedMessageTime).Milliseconds())) - err = handler.SendMessage(signedMessage, r.destinationClient) + txHash, err := handler.SendMessage(signedMessage, r.destinationClient) if err != nil { r.logger.Error( "Failed to send warp message", zap.Error(err), ) r.incFailedRelayMessageCount("failed to send warp message") - return err + return common.Hash{}, err } r.logger.Info( "Finished relaying message to destination chain", zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()), + zap.String("txHash", txHash.Hex()), ) r.incSuccessfulRelayMessageCount() - return nil + return txHash, nil } // createSignedMessage fetches the signed Warp message from the source chain via RPC. diff --git a/relayer/listener.go b/relayer/listener.go index 345e79b9..6a9948fd 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -8,20 +8,13 @@ import ( "fmt" "math/big" "math/rand" - "sync" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" - "github.com/ava-labs/awm-relayer/database" - "github.com/ava-labs/awm-relayer/messages" - offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry" - "github.com/ava-labs/awm-relayer/messages/teleporter" - relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" - vms "github.com/ava-labs/awm-relayer/vms" + "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/ethclient" - "github.com/ethereum/go-ethereum/common" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -35,28 +28,63 @@ const ( // Listener handles all messages sent from a given source chain type Listener struct { - Subscriber vms.Subscriber - requestIDLock *sync.Mutex - currentRequestID uint32 - contractMessage vms.ContractMessage - messageHandlerFactories map[common.Address]messages.MessageHandlerFactory - logger logging.Logger - sourceBlockchain config.SourceBlockchain - catchUpResultChan chan bool - healthStatus *atomic.Bool - globalConfig *config.Config - applicationRelayers map[common.Hash]*ApplicationRelayer - ethClient ethclient.Client + Subscriber vms.Subscriber + currentRequestID uint32 + contractMessage vms.ContractMessage + logger logging.Logger + sourceBlockchain config.SourceBlockchain + catchUpResultChan chan bool + healthStatus *atomic.Bool + ethClient ethclient.Client + messageCoordinator *MessageCoordinator } -func NewListener( +// runListener creates a Listener instance and the ApplicationRelayers for a subnet. +// The Listener listens for warp messages on that subnet, and the ApplicationRelayers handle delivery to the destination +func RunListener( + ctx context.Context, logger logging.Logger, sourceBlockchain config.SourceBlockchain, + ethRPCClient ethclient.Client, relayerHealth *atomic.Bool, - globalConfig *config.Config, - applicationRelayers map[common.Hash]*ApplicationRelayer, + processMissedBlocks bool, + minHeight uint64, + messageCoordinator *MessageCoordinator, +) error { + // Create the Listener + listener, err := newListener( + ctx, + logger, + sourceBlockchain, + ethRPCClient, + relayerHealth, + processMissedBlocks, + minHeight, + messageCoordinator, + ) + if err != nil { + return fmt.Errorf("failed to create listener instance: %w", err) + } + + logger.Info( + "Listener initialized. Listening for messages to relay.", + zap.String("originBlockchainID", sourceBlockchain.BlockchainID), + ) + + // Wait for logs from the subscribed node + // Will only return on error or context cancellation + return listener.processLogs(ctx) +} + +func newListener( + ctx context.Context, + logger logging.Logger, + sourceBlockchain config.SourceBlockchain, + ethRPCClient ethclient.Client, + relayerHealth *atomic.Bool, + processMissedBlocks bool, startingHeight uint64, - ethClient ethclient.Client, + messageCoordinator *MessageCoordinator, ) (*Listener, error) { blockchainID, err := ids.FromString(sourceBlockchain.BlockchainID) if err != nil { @@ -66,8 +94,9 @@ func NewListener( ) return nil, err } + ethWSClient, err := utils.NewEthClientWithConfig( - context.Background(), + ctx, sourceBlockchain.WSEndpoint.BaseURL, sourceBlockchain.WSEndpoint.HTTPHeaders, sourceBlockchain.WSEndpoint.QueryParams, @@ -82,40 +111,6 @@ func NewListener( } sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient) - // Create message managers for each supported message protocol - messageHandlerFactories := make(map[common.Address]messages.MessageHandlerFactory) - for addressStr, cfg := range sourceBlockchain.MessageContracts { - address := common.HexToAddress(addressStr) - format := cfg.MessageFormat - var ( - m messages.MessageHandlerFactory - err error - ) - switch config.ParseMessageProtocol(format) { - case config.TELEPORTER: - m, err = teleporter.NewMessageHandlerFactory( - logger, - address, - cfg, - ) - case config.OFF_CHAIN_REGISTRY: - m, err = offchainregistry.NewMessageHandlerFactory( - logger, - cfg, - ) - default: - m, err = nil, fmt.Errorf("invalid message format %s", format) - } - if err != nil { - logger.Error( - "Failed to create message manager", - zap.Error(err), - ) - return nil, err - } - messageHandlerFactories[address] = m - } - // Marks when the listener has finished the catch-up process on startup. // Until that time, we do not know the order in which messages are processed, // since the catch-up process occurs concurrently with normal message processing @@ -132,18 +127,15 @@ func NewListener( zap.String("blockchainIDHex", sourceBlockchain.GetBlockchainID().Hex()), ) lstnr := Listener{ - Subscriber: sub, - requestIDLock: &sync.Mutex{}, - currentRequestID: rand.Uint32(), // Initialize to a random value to mitigate requestID collision - contractMessage: vms.NewContractMessage(logger, sourceBlockchain), - messageHandlerFactories: messageHandlerFactories, - logger: logger, - sourceBlockchain: sourceBlockchain, - catchUpResultChan: catchUpResultChan, - healthStatus: relayerHealth, - globalConfig: globalConfig, - applicationRelayers: applicationRelayers, - ethClient: ethClient, + Subscriber: sub, + currentRequestID: rand.Uint32(), // Initialize to a random value to mitigate requestID collision + contractMessage: vms.NewContractMessage(logger, sourceBlockchain), + logger: logger, + sourceBlockchain: sourceBlockchain, + catchUpResultChan: catchUpResultChan, + healthStatus: relayerHealth, + ethClient: ethRPCClient, + messageCoordinator: messageCoordinator, } // Open the subscription. We must do this before processing any missed messages, otherwise we may miss an incoming message @@ -157,7 +149,7 @@ func NewListener( return nil, err } - if lstnr.globalConfig.ProcessMissedBlocks { + if processMissedBlocks { // Process historical blocks in a separate goroutine so that the main processing loop can // start processing new blocks as soon as possible. Otherwise, it's possible for // ProcessFromHeight to overload the message queue and cause a deadlock. @@ -176,7 +168,7 @@ func NewListener( // Listens to the Subscriber logs channel to process them. // On subscriber error, attempts to reconnect and errors if unable. // Exits if context is cancelled by another goroutine. -func (lstnr *Listener) ProcessLogs(ctx context.Context) error { +func (lstnr *Listener) processLogs(ctx context.Context) error { // Error channel for application relayer errors errChan := make(chan error) for { @@ -211,52 +203,7 @@ func (lstnr *Listener) ProcessLogs(ctx context.Context) error { return fmt.Errorf("failed to catch up on historical blocks") } case blockHeader := <-lstnr.Subscriber.Headers(): - // Parse the logs in the block, and group by application relayer - - block, err := relayerTypes.NewWarpBlockInfo(blockHeader, lstnr.ethClient) - if err != nil { - lstnr.logger.Error( - "Failed to create Warp block info", - zap.Error(err), - ) - continue - } - - // Relay the messages in the block to the destination chains. Continue on failure. - lstnr.logger.Debug( - "Processing block", - zap.String("sourceBlockchainID", lstnr.sourceBlockchain.GetBlockchainID().String()), - zap.Uint64("blockNumber", block.BlockNumber), - ) - - // Register each message in the block with the appropriate application relayer - messageHandlers := make(map[common.Hash][]messages.MessageHandler) - for _, warpLogInfo := range block.Messages { - appRelayer, handler, err := lstnr.GetAppRelayerMessageHandler(warpLogInfo) - if err != nil { - lstnr.logger.Error( - "Failed to parse message", - zap.String("blockchainID", lstnr.sourceBlockchain.GetBlockchainID().String()), - zap.Error(err), - ) - continue - } - if appRelayer == nil { - lstnr.logger.Debug("Application relayer not found. Skipping message relay") - continue - } - messageHandlers[appRelayer.relayerID.ID] = append(messageHandlers[appRelayer.relayerID.ID], handler) - } - // Initiate message relay of all registered messages - for _, appRelayer := range lstnr.applicationRelayers { - // Dispatch all messages in the block to the appropriate application relayer. - // An empty slice is still a valid argument to ProcessHeight; in this case the height is immediately committed. - handlers := messageHandlers[appRelayer.relayerID.ID] - - // Process the height async. This is safe because the ApplicationRelayer maintains the threadsafe - // invariant that heights are committed to the database one at a time, in order, with no gaps. - go appRelayer.ProcessHeight(block.BlockNumber, handlers, errChan) - } + go lstnr.messageCoordinator.ProcessBlock(blockHeader, lstnr.ethClient, errChan) case err := <-lstnr.Subscriber.Err(): lstnr.healthStatus.Store(false) lstnr.logger.Error( @@ -298,162 +245,3 @@ func (lstnr *Listener) reconnectToSubscriber() error { lstnr.healthStatus.Store(true) return nil } - -// Unpacks the Warp message and fetches the appropriate application relayer -// Checks for the following registered keys. At most one of these keys should be registered. -// 1. An exact match on sourceBlockchainID, destinationBlockchainID, originSenderAddress, and destinationAddress -// 2. A match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress -// 3. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress -// 4. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress -func (lstnr *Listener) getApplicationRelayer( - sourceBlockchainID ids.ID, - originSenderAddress common.Address, - destinationBlockchainID ids.ID, - destinationAddress common.Address, -) *ApplicationRelayer { - // Check for an exact match - applicationRelayerID := database.CalculateRelayerID( - sourceBlockchainID, - destinationBlockchainID, - originSenderAddress, - destinationAddress, - ) - if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok { - return applicationRelayer - } - - // Check for a match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress - applicationRelayerID = database.CalculateRelayerID( - sourceBlockchainID, - destinationBlockchainID, - originSenderAddress, - database.AllAllowedAddress, - ) - if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok { - return applicationRelayer - } - - // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress - applicationRelayerID = database.CalculateRelayerID( - sourceBlockchainID, - destinationBlockchainID, - database.AllAllowedAddress, - destinationAddress, - ) - if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok { - return applicationRelayer - } - - // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress - applicationRelayerID = database.CalculateRelayerID( - sourceBlockchainID, - destinationBlockchainID, - database.AllAllowedAddress, - database.AllAllowedAddress, - ) - if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok { - return applicationRelayer - } - lstnr.logger.Debug( - "Application relayer not found. Skipping message relay.", - zap.String("blockchainID", lstnr.sourceBlockchain.GetBlockchainID().String()), - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("originSenderAddress", originSenderAddress.String()), - zap.String("destinationAddress", destinationAddress.String()), - ) - return nil -} - -// Returns the ApplicationRelayer that is configured to handle this message, as well as a one-time MessageHandler -// instance that the ApplicationRelayer uses to relay this specific message. -// The MessageHandler and ApplicationRelayer are decoupled to support batch workflows in which a single ApplicationRelayer -// processes multiple messages (using their corresponding MessageHandlers) in a single shot. -func (lstnr *Listener) GetAppRelayerMessageHandler(warpMessageInfo *relayerTypes.WarpMessageInfo) ( - *ApplicationRelayer, - messages.MessageHandler, - error, -) { - // Check that the warp message is from a supported message protocol contract address. - messageHandlerFactory, supportedMessageProtocol := lstnr.messageHandlerFactories[warpMessageInfo.SourceAddress] - if !supportedMessageProtocol { - // Do not return an error here because it is expected for there to be messages from other contracts - // than just the ones supported by a single listener instance. - lstnr.logger.Debug( - "Warp message from unsupported message protocol address. Not relaying.", - zap.String("protocolAddress", warpMessageInfo.SourceAddress.Hex()), - ) - return nil, nil, nil - } - messageHandler, err := messageHandlerFactory.NewMessageHandler(warpMessageInfo.UnsignedMessage) - if err != nil { - lstnr.logger.Error( - "Failed to create message handler", - zap.Error(err), - ) - return nil, nil, err - } - - // Fetch the message delivery data - sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo() - if err != nil { - lstnr.logger.Error( - "Failed to get message routing information", - zap.Error(err), - ) - return nil, nil, err - } - - lstnr.logger.Info( - "Unpacked warp message", - zap.String("sourceBlockchainID", sourceBlockchainID.String()), - zap.String("originSenderAddress", originSenderAddress.String()), - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("destinationAddress", destinationAddress.String()), - zap.String("warpMessageID", warpMessageInfo.UnsignedMessage.ID().String()), - ) - - appRelayer := lstnr.getApplicationRelayer( - sourceBlockchainID, - originSenderAddress, - destinationBlockchainID, - destinationAddress, - ) - if appRelayer == nil { - return nil, nil, nil - } - return appRelayer, messageHandler, nil -} - -func (lstnr *Listener) ProcessManualWarpMessages( - logger logging.Logger, - manualWarpMessages []*relayerTypes.WarpMessageInfo, - sourceBlockchain config.SourceBlockchain, -) error { - // Send any messages that were specified in the configuration - for _, warpMessage := range manualWarpMessages { - logger.Info( - "Relaying manual Warp message", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()), - ) - appRelayer, handler, err := lstnr.GetAppRelayerMessageHandler(warpMessage) - if err != nil { - logger.Error( - "Failed to parse manual Warp message.", - zap.Error(err), - zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()), - ) - return err - } - err = appRelayer.ProcessMessage(handler) - if err != nil { - logger.Error( - "Failed to process manual Warp message", - zap.String("blockchainID", sourceBlockchain.BlockchainID), - zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()), - ) - return err - } - } - return nil -} diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go new file mode 100644 index 00000000..49b2c37d --- /dev/null +++ b/relayer/message_coordinator.go @@ -0,0 +1,259 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package relayer + +import ( + "context" + "errors" + "fmt" + "math/big" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/messages" + relayerTypes "github.com/ava-labs/awm-relayer/types" + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/interfaces" + "github.com/ava-labs/subnet-evm/precompile/contracts/warp" + "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" +) + +// MessageCoordinator contains all the logic required to process messages in the relayer. +// Other components such as the listeners or the API should pass messages to the MessageCoordinator +// so that it can parse the message(s) and pass them the the proper ApplicationRelayer. +type MessageCoordinator struct { + logger logging.Logger + // Maps Source blockchain ID and protocol address to a Message Handler Factory + messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory + applicationRelayers map[common.Hash]*ApplicationRelayer + sourceClients map[ids.ID]ethclient.Client +} + +func NewMessageCoordinator( + logger logging.Logger, + messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory, + applicationRelayers map[common.Hash]*ApplicationRelayer, + sourceClients map[ids.ID]ethclient.Client, +) *MessageCoordinator { + return &MessageCoordinator{ + logger: logger, + messageHandlerFactories: messageHandlerFactories, + applicationRelayers: applicationRelayers, + sourceClients: sourceClients, + } +} + +// getAppRelayerMessageHandler returns the ApplicationRelayer that is configured to handle this message, as well as a +// one-time MessageHandler instance that the ApplicationRelayer uses to relay this specific message. +// The MessageHandler and ApplicationRelayer are decoupled to support batch workflows in which a single ApplicationRelayer +// processes multiple messages (using their corresponding MessageHandlers) in a single shot. +func (mc *MessageCoordinator) getAppRelayerMessageHandler( + warpMessageInfo *relayerTypes.WarpMessageInfo, +) ( + *ApplicationRelayer, + messages.MessageHandler, + error, +) { + // Check that the warp message is from a supported message protocol contract address. + messageHandlerFactory, supportedMessageProtocol := mc.messageHandlerFactories[warpMessageInfo.UnsignedMessage.SourceChainID][warpMessageInfo.SourceAddress] + if !supportedMessageProtocol { + // Do not return an error here because it is expected for there to be messages from other contracts + // than just the ones supported by a single listener instance. + mc.logger.Debug( + "Warp message from unsupported message protocol address. Not relaying.", + zap.String("protocolAddress", warpMessageInfo.SourceAddress.Hex()), + ) + return nil, nil, nil + } + messageHandler, err := messageHandlerFactory.NewMessageHandler(warpMessageInfo.UnsignedMessage) + if err != nil { + mc.logger.Error("Failed to create message handler", zap.Error(err)) + return nil, nil, err + } + + // Fetch the message delivery data + sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo() + if err != nil { + mc.logger.Error("Failed to get message routing information", zap.Error(err)) + return nil, nil, err + } + + mc.logger.Info( + "Unpacked warp message", + zap.String("sourceBlockchainID", sourceBlockchainID.String()), + zap.String("originSenderAddress", originSenderAddress.String()), + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("destinationAddress", destinationAddress.String()), + zap.String("warpMessageID", warpMessageInfo.UnsignedMessage.ID().String()), + ) + + appRelayer := mc.getApplicationRelayer( + sourceBlockchainID, + originSenderAddress, + destinationBlockchainID, + destinationAddress, + ) + if appRelayer == nil { + return nil, nil, nil + } + return appRelayer, messageHandler, nil +} + +// Unpacks the Warp message and fetches the appropriate application relayer +// Checks for the following registered keys. At most one of these keys should be registered. +// 1. An exact match on sourceBlockchainID, destinationBlockchainID, originSenderAddress, and destinationAddress +// 2. A match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress +// 3. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress +// 4. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress +func (mc *MessageCoordinator) getApplicationRelayer( + sourceBlockchainID ids.ID, + originSenderAddress common.Address, + destinationBlockchainID ids.ID, + destinationAddress common.Address, +) *ApplicationRelayer { + // Check for an exact match + applicationRelayerID := database.CalculateRelayerID( + sourceBlockchainID, + destinationBlockchainID, + originSenderAddress, + destinationAddress, + ) + if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { + return applicationRelayer + } + + // Check for a match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress + applicationRelayerID = database.CalculateRelayerID( + sourceBlockchainID, + destinationBlockchainID, + originSenderAddress, + database.AllAllowedAddress, + ) + if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { + return applicationRelayer + } + + // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress + applicationRelayerID = database.CalculateRelayerID( + sourceBlockchainID, + destinationBlockchainID, + database.AllAllowedAddress, + destinationAddress, + ) + if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { + return applicationRelayer + } + + // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress + applicationRelayerID = database.CalculateRelayerID( + sourceBlockchainID, + destinationBlockchainID, + database.AllAllowedAddress, + database.AllAllowedAddress, + ) + if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok { + return applicationRelayer + } + mc.logger.Debug( + "Application relayer not found. Skipping message relay.", + zap.String("blockchainID", sourceBlockchainID.String()), + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("originSenderAddress", originSenderAddress.String()), + zap.String("destinationAddress", destinationAddress.String()), + ) + return nil +} + +func (mc *MessageCoordinator) ProcessWarpMessage(warpMessage *relayerTypes.WarpMessageInfo) (common.Hash, error) { + appRelayer, handler, err := mc.getAppRelayerMessageHandler(warpMessage) + if err != nil { + mc.logger.Error( + "Failed to parse Warp message.", + zap.Error(err), + zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()), + ) + return common.Hash{}, err + } + if appRelayer == nil { + mc.logger.Error("Application relayer not found") + return common.Hash{}, errors.New("application relayer not found") + } + + return appRelayer.ProcessMessage(handler) +} + +func (mc *MessageCoordinator) ProcessMessageID(blockchainID ids.ID, messageID ids.ID, blockNum *big.Int) (common.Hash, error) { + ethClient, ok := mc.sourceClients[blockchainID] + if !ok { + mc.logger.Error("Source client not found", zap.String("blockchainID", blockchainID.String())) + return common.Hash{}, fmt.Errorf("source client not set for blockchain: %s", blockchainID.String()) + } + + warpMessage, err := FetchWarpMessage(ethClient, messageID, blockNum) + if err != nil { + mc.logger.Error("Failed to fetch warp from blockchain", zap.String("blockchainID", blockchainID.String()), zap.Error(err)) + return common.Hash{}, fmt.Errorf("could not fetch warp message from ID: %w", err) + } + + return mc.ProcessWarpMessage(warpMessage) +} + +// Meant to be ran asynchronously. Errors should be sent to errChan. +func (mc *MessageCoordinator) ProcessBlock(blockHeader *types.Header, ethClient ethclient.Client, errChan chan error) { + // Parse the logs in the block, and group by application relayer + block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient) + if err != nil { + mc.logger.Error("Failed to create Warp block info", zap.Error(err)) + errChan <- err + return + } + + // Register each message in the block with the appropriate application relayer + messageHandlers := make(map[common.Hash][]messages.MessageHandler) + for _, warpLogInfo := range block.Messages { + appRelayer, handler, err := mc.getAppRelayerMessageHandler(warpLogInfo) + if err != nil { + mc.logger.Error( + "Failed to parse message", + zap.String("blockchainID", warpLogInfo.UnsignedMessage.SourceChainID.String()), + zap.String("protocolAddress", warpLogInfo.SourceAddress.String()), + zap.Error(err), + ) + continue + } + if appRelayer == nil { + mc.logger.Debug("Application relayer not found. Skipping message relay") + continue + } + messageHandlers[appRelayer.relayerID.ID] = append(messageHandlers[appRelayer.relayerID.ID], handler) + } + // Initiate message relay of all registered messages + for _, appRelayer := range mc.applicationRelayers { + // Dispatch all messages in the block to the appropriate application relayer. + // An empty slice is still a valid argument to ProcessHeight; in this case the height is immediately committed. + handlers := messageHandlers[appRelayer.relayerID.ID] + + go appRelayer.ProcessHeight(block.BlockNumber, handlers, errChan) + } +} + +func FetchWarpMessage(ethClient ethclient.Client, warpID ids.ID, blockNum *big.Int) (*relayerTypes.WarpMessageInfo, error) { + logs, err := ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{ + Topics: [][]common.Hash{{relayerTypes.WarpPrecompileLogFilter}, nil, {common.Hash(warpID)}}, + Addresses: []common.Address{warp.ContractAddress}, + FromBlock: blockNum, + ToBlock: blockNum, + }) + if err != nil { + return nil, fmt.Errorf("could not fetch logs: %w", err) + } + if len(logs) != 1 { + return nil, fmt.Errorf("found more than 1 log: %d", len(logs)) + } + + return relayerTypes.NewWarpMessageInfo(logs[0]) +} diff --git a/tests/basic_relay.go b/tests/basic_relay.go index 1c2a52df..28bd9735 100644 --- a/tests/basic_relay.go +++ b/tests/basic_relay.go @@ -52,6 +52,8 @@ func BasicRelay(network interfaces.LocalNetwork) { fundedAddress, relayerKey, ) + // The config needs to be validated in order to be passed to database.GetConfigRelayerIDs + relayerConfig.Validate() relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname) @@ -114,8 +116,10 @@ func BasicRelay(network interfaces.LocalNetwork) { relayerIDA := database.CalculateRelayerID(subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, database.AllAllowedAddress, database.AllAllowedAddress) relayerIDB := database.CalculateRelayerID(subnetBInfo.BlockchainID, subnetAInfo.BlockchainID, database.AllAllowedAddress, database.AllAllowedAddress) // Modify the JSON database to force the relayer to re-process old blocks - jsonDB.Put(relayerIDA, database.LatestProcessedBlockKey, []byte("0")) - jsonDB.Put(relayerIDB, database.LatestProcessedBlockKey, []byte("0")) + err = jsonDB.Put(relayerIDA, database.LatestProcessedBlockKey, []byte("0")) + Expect(err).Should(BeNil()) + err = jsonDB.Put(relayerIDB, database.LatestProcessedBlockKey, []byte("0")) + Expect(err).Should(BeNil()) // Subscribe to the destination chain newHeadsB := make(chan *types.Header, 10) diff --git a/tests/e2e_test.go b/tests/e2e_test.go index a587c874..be3b9768 100644 --- a/tests/e2e_test.go +++ b/tests/e2e_test.go @@ -21,9 +21,7 @@ const ( warpGenesisFile = "./tests/utils/warp-genesis.json" ) -var ( - localNetworkInstance *local.LocalNetwork -) +var localNetworkInstance *local.LocalNetwork func TestE2E(t *testing.T) { if os.Getenv("RUN_E2E") == "" { @@ -68,9 +66,6 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { ginkgo.It("Basic Relay", func() { BasicRelay(localNetworkInstance) }) - ginkgo.It("Teleporter Registry", func() { - TeleporterRegistry(localNetworkInstance) - }) ginkgo.It("Shared Database", func() { SharedDatabaseAccess(localNetworkInstance) }) @@ -80,6 +75,9 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() { ginkgo.It("Batch Message", func() { BatchRelay(localNetworkInstance) }) + ginkgo.It("Relay Message API", func() { + RelayMessageAPI(localNetworkInstance) + }) ginkgo.It("Warp API", func() { WarpAPIRelay(localNetworkInstance) }) diff --git a/tests/manual_message.go b/tests/manual_message.go index 9a487d98..63a4a214 100644 --- a/tests/manual_message.go +++ b/tests/manual_message.go @@ -4,35 +4,45 @@ package tests import ( + "bytes" "context" - "encoding/hex" + "encoding/json" + "fmt" + "math/big" + "net/http" "time" - avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" - "github.com/ava-labs/awm-relayer/config" + runner_sdk "github.com/ava-labs/avalanche-network-runner/client" + "github.com/ava-labs/awm-relayer/api" + offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry" testUtils "github.com/ava-labs/awm-relayer/tests/utils" "github.com/ava-labs/subnet-evm/accounts/abi/bind" - "github.com/ava-labs/subnet-evm/core/types" - subnetEvmInterfaces "github.com/ava-labs/subnet-evm/interfaces" - "github.com/ava-labs/subnet-evm/precompile/contracts/warp" "github.com/ava-labs/teleporter/tests/interfaces" - "github.com/ava-labs/teleporter/tests/utils" + teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - . "github.com/onsi/gomega" ) -// This tests relaying a message manually provided in the relayer config +// Tests relayer support for off-chain Teleporter Registry updates +// - Configures the relayer to send an off-chain message to the Teleporter Registry +// - Verifies that the Teleporter Registry is updated func ManualMessage(network interfaces.LocalNetwork) { - subnetAInfo := network.GetPrimaryNetworkInfo() - subnetBInfo, _ := utils.GetTwoSubnets(network) + cChainInfo := network.GetPrimaryNetworkInfo() + subnetAInfo, subnetBInfo := teleporterTestUtils.GetTwoSubnets(network) fundedAddress, fundedKey := network.GetFundedAccountInfo() teleporterContractAddress := network.GetTeleporterContractAddress() err := testUtils.ClearRelayerStorage() Expect(err).Should(BeNil()) + // + // Get the current Teleporter Registry version + // + currentVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{}) + Expect(err).Should(BeNil()) + expectedNewVersion := currentVersion.Add(currentVersion, big.NewInt(1)) + // // Fund the relayer address on all subnets // @@ -41,89 +51,86 @@ func ManualMessage(network interfaces.LocalNetwork) { log.Info("Funding relayer address on all subnets") relayerKey, err := crypto.GenerateKey() Expect(err).Should(BeNil()) - testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey) + testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{cChainInfo}, fundedKey, relayerKey) // - // Send two Teleporter message on Subnet A, before the relayer is running + // Define the off-chain Warp message // + log.Info("Creating off-chain Warp message") + newProtocolAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567") + networkID := network.GetNetworkID() - log.Info("Sending two teleporter messages on subnet A") - // This message will be delivered by the relayer - receipt1, _, id1 := testUtils.SendBasicTeleporterMessage(ctx, subnetAInfo, subnetBInfo, fundedKey, fundedAddress) - msg1 := getWarpMessageFromLog(ctx, receipt1, subnetAInfo) - - // This message will not be delivered by the relayer - _, _, id2 := testUtils.SendBasicTeleporterMessage(ctx, subnetAInfo, subnetBInfo, fundedKey, fundedAddress) + // + // Set up the nodes to accept the off-chain message + // + // Create chain config file with off chain message for each chain + unsignedMessage, warpEnabledChainConfigC := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, cChainInfo, newProtocolAddress, 2) + _, warpEnabledChainConfigA := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetAInfo, newProtocolAddress, 2) + _, warpEnabledChainConfigB := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetBInfo, newProtocolAddress, 2) + + // Create chain config with off chain messages + chainConfigs := make(map[string]string) + teleporterTestUtils.SetChainConfig(chainConfigs, cChainInfo, warpEnabledChainConfigC) + teleporterTestUtils.SetChainConfig(chainConfigs, subnetBInfo, warpEnabledChainConfigB) + teleporterTestUtils.SetChainConfig(chainConfigs, subnetAInfo, warpEnabledChainConfigA) + + // Restart nodes with new chain config + nodeNames := network.GetAllNodeNames() + log.Info("Restarting nodes with new chain config") + network.RestartNodes(ctx, nodeNames, runner_sdk.WithChainConfigs(chainConfigs)) + // Refresh the subnet info to get the new clients + cChainInfo = network.GetPrimaryNetworkInfo() // - // Set up relayer config to deliver one of the two previously sent messages + // Set up relayer config // relayerConfig := testUtils.CreateDefaultRelayerConfig( - []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, - []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + []interfaces.SubnetTestInfo{cChainInfo}, + []interfaces.SubnetTestInfo{cChainInfo}, teleporterContractAddress, fundedAddress, relayerKey, ) - relayerConfig.ManualWarpMessages = []*config.ManualWarpMessage{ - { - UnsignedMessageBytes: hex.EncodeToString(msg1.Bytes()), - SourceBlockchainID: subnetAInfo.BlockchainID.String(), - DestinationBlockchainID: subnetBInfo.BlockchainID.String(), - SourceAddress: teleporterContractAddress.Hex(), - DestinationAddress: teleporterContractAddress.Hex(), - }, - } - relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname) - // - // Run the Relayer. On startup, we should deliver the message provided in the config - // - - // Subscribe to the destination chain - newHeadsB := make(chan *types.Header, 10) - sub, err := subnetBInfo.WSClient.SubscribeNewHead(ctx, newHeadsB) - Expect(err).Should(BeNil()) - defer sub.Unsubscribe() - log.Info("Starting the relayer") relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath) defer relayerCleanup() - log.Info("Waiting for a new block confirmation on subnet B") - <-newHeadsB - delivered1, err := subnetBInfo.TeleporterMessenger.MessageReceived( - &bind.CallOpts{}, id1, - ) - Expect(err).Should(BeNil()) - Expect(delivered1).Should(BeTrue()) + // Sleep for some time to make sure relayer has started up and subscribed. + log.Info("Waiting for the relayer to start up") + time.Sleep(15 * time.Second) - log.Info("Waiting for 10s to ensure no new block confirmations on destination chain") - Consistently(newHeadsB, 10*time.Second, 500*time.Millisecond).ShouldNot(Receive()) + reqBody := api.ManualWarpMessageRequest{ + UnsignedMessageBytes: unsignedMessage.Bytes(), + SourceAddress: offchainregistry.OffChainRegistrySourceAddress.Hex(), + } - delivered2, err := subnetBInfo.TeleporterMessenger.MessageReceived( - &bind.CallOpts{}, id2, - ) - Expect(err).Should(BeNil()) - Expect(delivered2).Should(BeFalse()) -} + client := http.Client{ + Timeout: 30 * time.Second, + } -func getWarpMessageFromLog(ctx context.Context, receipt *types.Receipt, source interfaces.SubnetTestInfo) *avalancheWarp.UnsignedMessage { - log.Info("Fetching relevant warp logs from the newly produced block") - logs, err := source.RPCClient.FilterLogs(ctx, subnetEvmInterfaces.FilterQuery{ - BlockHash: &receipt.BlockHash, - Addresses: []common.Address{warp.Module.Address}, - }) - Expect(err).Should(BeNil()) - Expect(len(logs)).Should(Equal(1)) + requestURL := fmt.Sprintf("http://localhost:%d%s", relayerConfig.APIPort, api.RelayMessageAPIPath) - // Check for relevant warp log from subscription and ensure that it matches - // the log extracted from the last block. - txLog := logs[0] - log.Info("Parsing logData as unsigned warp message") - unsignedMsg, err := warp.UnpackSendWarpEventDataToMessage(txLog.Data) - Expect(err).Should(BeNil()) + // Send request to API + { + b, err := json.Marshal(reqBody) + Expect(err).Should(BeNil()) + bodyReader := bytes.NewReader(b) + + req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader) + Expect(err).Should(BeNil()) + req.Header.Set("Content-Type", "application/json") - return unsignedMsg + res, err := client.Do(req) + Expect(err).Should(BeNil()) + Expect(res.Status).Should(Equal("200 OK")) + + // Wait for all nodes to see new transaction + time.Sleep(1 * time.Second) + + newVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{}) + Expect(err).Should(BeNil()) + Expect(newVersion.Uint64()).Should(Equal(expectedNewVersion.Uint64())) + } } diff --git a/tests/relay_message_api.go b/tests/relay_message_api.go new file mode 100644 index 00000000..bdb5852b --- /dev/null +++ b/tests/relay_message_api.go @@ -0,0 +1,156 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tests + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/ava-labs/avalanchego/ids" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/api" + testUtils "github.com/ava-labs/awm-relayer/tests/utils" + "github.com/ava-labs/subnet-evm/core/types" + subnetEvmInterfaces "github.com/ava-labs/subnet-evm/interfaces" + "github.com/ava-labs/subnet-evm/precompile/contracts/warp" + "github.com/ava-labs/teleporter/tests/interfaces" + "github.com/ava-labs/teleporter/tests/utils" + teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + . "github.com/onsi/gomega" +) + +func RelayMessageAPI(network interfaces.LocalNetwork) { + ctx := context.Background() + subnetAInfo := network.GetPrimaryNetworkInfo() + subnetBInfo, _ := utils.GetTwoSubnets(network) + fundedAddress, fundedKey := network.GetFundedAccountInfo() + teleporterContractAddress := network.GetTeleporterContractAddress() + err := testUtils.ClearRelayerStorage() + Expect(err).Should(BeNil()) + + log.Info("Funding relayer address on all subnets") + relayerKey, err := crypto.GenerateKey() + Expect(err).Should(BeNil()) + testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey) + + log.Info("Sending teleporter message") + receipt, _, teleporterMessageID := testUtils.SendBasicTeleporterMessage(ctx, subnetAInfo, subnetBInfo, fundedKey, fundedAddress) + warpMessage := getWarpMessageFromLog(ctx, receipt, subnetAInfo) + + // Set up relayer config + relayerConfig := testUtils.CreateDefaultRelayerConfig( + []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, + teleporterContractAddress, + fundedAddress, + relayerKey, + ) + // Don't process missed blocks, so we can manually relay + relayerConfig.ProcessMissedBlocks = false + + relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname) + + log.Info("Starting the relayer") + relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath) + defer relayerCleanup() + + // Sleep for some time to make sure relayer has started up and subscribed. + log.Info("Waiting for the relayer to start up") + time.Sleep(15 * time.Second) + + reqBody := api.RelayMessageRequest{ + BlockchainID: subnetAInfo.BlockchainID.String(), + MessageID: warpMessage.ID().String(), + BlockNum: receipt.BlockNumber.Uint64(), + } + + client := http.Client{ + Timeout: 30 * time.Second, + } + + requestURL := fmt.Sprintf("http://localhost:%d%s", relayerConfig.APIPort, api.RelayAPIPath) + + // Send request to API + { + b, err := json.Marshal(reqBody) + Expect(err).Should(BeNil()) + bodyReader := bytes.NewReader(b) + + req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader) + Expect(err).Should(BeNil()) + req.Header.Set("Content-Type", "application/json") + + res, err := client.Do(req) + Expect(err).Should(BeNil()) + Expect(res.Status).Should(Equal("200 OK")) + + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + Expect(err).Should(BeNil()) + + var response api.RelayMessageResponse + err = json.Unmarshal(body, &response) + Expect(err).Should(BeNil()) + + receipt, err := subnetBInfo.RPCClient.TransactionReceipt(ctx, common.HexToHash(response.TransactionHash)) + Expect(err).Should(BeNil()) + receiveEvent, err := teleporterTestUtils.GetEventFromLogs(receipt.Logs, subnetBInfo.TeleporterMessenger.ParseReceiveCrossChainMessage) + Expect(err).Should(BeNil()) + Expect(ids.ID(receiveEvent.MessageID)).Should(Equal(teleporterMessageID)) + } + + // Send the same request to ensure the correct response. + { + b, err := json.Marshal(reqBody) + Expect(err).Should(BeNil()) + bodyReader := bytes.NewReader(b) + + req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader) + Expect(err).Should(BeNil()) + req.Header.Set("Content-Type", "application/json") + + res, err := client.Do(req) + Expect(err).Should(BeNil()) + Expect(res.Status).Should(Equal("200 OK")) + + defer res.Body.Close() + body, err := io.ReadAll(res.Body) + Expect(err).Should(BeNil()) + + var response api.RelayMessageResponse + err = json.Unmarshal(body, &response) + Expect(err).Should(BeNil()) + Expect(response.TransactionHash).Should(Equal("0x0000000000000000000000000000000000000000000000000000000000000000")) + } + + // Cancel the command and stop the relayer + relayerCleanup() +} + +func getWarpMessageFromLog(ctx context.Context, receipt *types.Receipt, source interfaces.SubnetTestInfo) *avalancheWarp.UnsignedMessage { + log.Info("Fetching relevant warp logs from the newly produced block") + logs, err := source.RPCClient.FilterLogs(ctx, subnetEvmInterfaces.FilterQuery{ + BlockHash: &receipt.BlockHash, + Addresses: []common.Address{warp.Module.Address}, + }) + Expect(err).Should(BeNil()) + Expect(len(logs)).Should(Equal(1)) + + // Check for relevant warp log from subscription and ensure that it matches + // the log extracted from the last block. + txLog := logs[0] + log.Info("Parsing logData as unsigned warp message") + unsignedMsg, err := warp.UnpackSendWarpEventDataToMessage(txLog.Data) + Expect(err).Should(BeNil()) + + return unsignedMsg +} diff --git a/tests/teleporter_registry.go b/tests/teleporter_registry.go deleted file mode 100644 index 13c4698e..00000000 --- a/tests/teleporter_registry.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package tests - -import ( - "context" - "encoding/hex" - "math/big" - - runner_sdk "github.com/ava-labs/avalanche-network-runner/client" - "github.com/ava-labs/awm-relayer/config" - offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry" - testUtils "github.com/ava-labs/awm-relayer/tests/utils" - "github.com/ava-labs/subnet-evm/accounts/abi/bind" - "github.com/ava-labs/subnet-evm/core/types" - "github.com/ava-labs/teleporter/tests/interfaces" - teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" - . "github.com/onsi/gomega" -) - -// Tests relayer support for off-chain Teleporter Registry updates -// - Configures the relayer to send an off-chain message to the Teleporter Registry -// - Verifies that the Teleporter Registry is updated -func TeleporterRegistry(network interfaces.LocalNetwork) { - cChainInfo := network.GetPrimaryNetworkInfo() - subnetAInfo, subnetBInfo := teleporterTestUtils.GetTwoSubnets(network) - fundedAddress, fundedKey := network.GetFundedAccountInfo() - teleporterContractAddress := network.GetTeleporterContractAddress() - err := testUtils.ClearRelayerStorage() - Expect(err).Should(BeNil()) - - // - // Get the current Teleporter Registry version - // - currentVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{}) - Expect(err).Should(BeNil()) - expectedNewVersion := currentVersion.Add(currentVersion, big.NewInt(1)) - - // - // Fund the relayer address on all subnets - // - ctx := context.Background() - - log.Info("Funding relayer address on all subnets") - relayerKey, err := crypto.GenerateKey() - Expect(err).Should(BeNil()) - testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{cChainInfo}, fundedKey, relayerKey) - - // - // Define the off-chain Warp message - // - log.Info("Creating off-chain Warp message") - newProtocolAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567") - networkID := network.GetNetworkID() - - // - // Set up the nodes to accept the off-chain message - // - // Create chain config file with off chain message for each chain - unsignedMessage, warpEnabledChainConfigC := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, cChainInfo, newProtocolAddress, 2) - _, warpEnabledChainConfigA := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetAInfo, newProtocolAddress, 2) - _, warpEnabledChainConfigB := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetBInfo, newProtocolAddress, 2) - - // Create chain config with off chain messages - chainConfigs := make(map[string]string) - teleporterTestUtils.SetChainConfig(chainConfigs, cChainInfo, warpEnabledChainConfigC) - teleporterTestUtils.SetChainConfig(chainConfigs, subnetBInfo, warpEnabledChainConfigB) - teleporterTestUtils.SetChainConfig(chainConfigs, subnetAInfo, warpEnabledChainConfigA) - - // Restart nodes with new chain config - nodeNames := network.GetAllNodeNames() - log.Info("Restarting nodes with new chain config") - network.RestartNodes(ctx, nodeNames, runner_sdk.WithChainConfigs(chainConfigs)) - // Refresh the subnet info to get the new clients - cChainInfo = network.GetPrimaryNetworkInfo() - - // - // Set up relayer config - // - relayerConfig := testUtils.CreateDefaultRelayerConfig( - []interfaces.SubnetTestInfo{cChainInfo}, - []interfaces.SubnetTestInfo{cChainInfo}, - teleporterContractAddress, - fundedAddress, - relayerKey, - ) - relayerConfig.ManualWarpMessages = []*config.ManualWarpMessage{ - { - UnsignedMessageBytes: hex.EncodeToString(unsignedMessage.Bytes()), - SourceBlockchainID: cChainInfo.BlockchainID.String(), - DestinationBlockchainID: cChainInfo.BlockchainID.String(), - SourceAddress: offchainregistry.OffChainRegistrySourceAddress.Hex(), - DestinationAddress: cChainInfo.TeleporterRegistryAddress.Hex(), - }, - } - relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname) - // - // Run the Relayer. On startup, we should deliver the message provided in the config - // - - // Subscribe to the destination chain - newHeadsC := make(chan *types.Header, 10) - sub, err := cChainInfo.WSClient.SubscribeNewHead(ctx, newHeadsC) - Expect(err).Should(BeNil()) - defer sub.Unsubscribe() - - log.Info("Starting the relayer") - relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath) - defer relayerCleanup() - - log.Info("Waiting for a new block confirmation on the C-Chain") - <-newHeadsC - - log.Info("Verifying that the Teleporter Registry was updated") - newVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{}) - Expect(err).Should(BeNil()) - Expect(newVersion.Cmp(expectedNewVersion)).Should(Equal(0)) -} diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 3d8fd8d1..170b16b8 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -204,6 +204,7 @@ func CreateDefaultRelayerConfig( MetricsPort: 9090, SourceBlockchains: sources, DestinationBlockchains: destinations, + APIPort: 8080, } } diff --git a/types/types.go b/types/types.go index 735115d2..b71a6805 100644 --- a/types/types.go +++ b/types/types.go @@ -16,12 +16,13 @@ import ( "github.com/ethereum/go-ethereum/common" ) -var WarpPrecompileLogFilter = warp.WarpABI.Events["SendWarpMessage"].ID -var ErrInvalidLog = errors.New("invalid warp message log") +var ( + WarpPrecompileLogFilter = warp.WarpABI.Events["SendWarpMessage"].ID + ErrInvalidLog = errors.New("invalid warp message log") +) // WarpBlockInfo describes the block height and logs needed to process Warp messages. -// WarpBlockInfo instances are populated by the subscriber, and forwared to the -// Listener to process +// WarpBlockInfo instances are populated by the subscriber, and forwarded to the Listener to process. type WarpBlockInfo struct { BlockNumber uint64 Messages []*WarpMessageInfo diff --git a/vms/destination_client.go b/vms/destination_client.go index e417a458..12e2c895 100644 --- a/vms/destination_client.go +++ b/vms/destination_client.go @@ -20,7 +20,7 @@ import ( // DestinationClient is the interface for the destination chain client. Methods that interact with the destination chain // should generally be implemented in a thread safe way, as they will be called concurrently by the application relayers. type DestinationClient interface { - // SendTx contructs the transaction from warp primitives, and sends to the configured destination chain endpoint. + // SendTx constructs the transaction from warp primitives, and sends to the configured destination chain endpoint. // Returns the hash of the sent transaction. // TODO: Make generic for any VM. SendTx(signedMessage *warp.Message, toAddress string, gasLimit uint64, callData []byte) (common.Hash, error) @@ -52,7 +52,7 @@ func CreateDestinationClients(logger logging.Logger, relayerConfig config.Config if err != nil { logger.Error( "Failed to decode base-58 encoded source chain ID", - zap.String("blockchainID", blockchainID.String()), + zap.String("blockchainID", subnetInfo.BlockchainID), zap.Error(err), ) return nil, err