diff --git a/README.md b/README.md
index 64a0380a..14dcb18d 100644
--- a/README.md
+++ b/README.md
@@ -316,6 +316,54 @@ The relayer consists of the following components:
+### API
+
+#### `/relay`
+- Used to manually relay a Warp message. The body of the request must contain the following JSON:
+```json
+{
+ "blockchain-id": "",
+ "message-id": "",
+ "block-num": ""
+}
+```
+- If successful, the endpoint will return the following JSON:
+```json
+{
+ "transaction-hash": ""
+}
+```
+
+#### `/relay/message`
+- Used to manually relay a warp message. The body of the request must contain the following JSON:
+```json
+{
+ "unsigned-message-bytes": "",
+ "source-address": ""
+}
+```
+- If successful, the endpoint will return the following JSON:
+```json
+{
+ "transaction-hash": "",
+}
+```
+
+#### `/health`
+- Takes no arguments. Returns a `200` status code if all Application Relayers are healthy. Returns a `503` status if any of the Application Relayers have experienced an unrecoverable error. Here is an example return body:
+```json
+{
+ "status": "down",
+ "details": {
+ "relayers-all": {
+ "status": "down",
+ "timestamp": "2024-06-01T05:06:07.685522Z",
+ "error": ""
+ }
+ }
+}
+```
+
## Testing
### Unit Tests
diff --git a/api/health_check.go b/api/health_check.go
new file mode 100644
index 00000000..1fafe3d8
--- /dev/null
+++ b/api/health_check.go
@@ -0,0 +1,42 @@
+package api
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+
+ "github.com/alexliesenfeld/health"
+ "github.com/ava-labs/avalanchego/ids"
+ "github.com/ava-labs/avalanchego/utils/logging"
+ "go.uber.org/atomic"
+ "go.uber.org/zap"
+)
+
+const HealthAPIPath = "/health"
+
+func HandleHealthCheck(logger logging.Logger, relayerHealth map[ids.ID]*atomic.Bool) {
+ http.Handle(HealthAPIPath, healthCheckHandler(logger, relayerHealth))
+}
+
+func healthCheckHandler(logger logging.Logger, relayerHealth map[ids.ID]*atomic.Bool) http.Handler {
+ return health.NewHandler(health.NewChecker(
+ health.WithCheck(health.Check{
+ Name: "relayers-all",
+ Check: func(context.Context) error {
+ // Store the IDs as the cb58 encoding
+ var unhealthyRelayers []string
+ for id, health := range relayerHealth {
+ if !health.Load() {
+ unhealthyRelayers = append(unhealthyRelayers, id.String())
+ }
+ }
+
+ if len(unhealthyRelayers) > 0 {
+ logger.Fatal("Relayers are unhealthy for blockchains", zap.Strings("blockchains", unhealthyRelayers))
+ return fmt.Errorf("relayers are unhealthy for blockchains %v", unhealthyRelayers)
+ }
+ return nil
+ },
+ }),
+ ))
+}
diff --git a/api/relay_message.go b/api/relay_message.go
new file mode 100644
index 00000000..164b2426
--- /dev/null
+++ b/api/relay_message.go
@@ -0,0 +1,143 @@
+package api
+
+import (
+ "encoding/json"
+ "math/big"
+ "net/http"
+
+ "github.com/ava-labs/avalanchego/ids"
+ "github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/awm-relayer/relayer"
+ "github.com/ava-labs/awm-relayer/types"
+ relayerTypes "github.com/ava-labs/awm-relayer/types"
+ "github.com/ethereum/go-ethereum/common"
+ "go.uber.org/zap"
+)
+
+const (
+ RelayAPIPath = "/relay"
+ RelayMessageAPIPath = RelayAPIPath + "/message"
+)
+
+type RelayMessageRequest struct {
+ // Required. cb58 encoding of the source blockchain ID for the message
+ BlockchainID string `json:"blockchain-id"`
+ // Required. cb58 encoding of the warp message ID
+ MessageID string `json:"message-id"`
+ // Required. Block number that the message was sent in
+ BlockNum uint64 `json:"block-num"`
+}
+
+type RelayMessageResponse struct {
+ // hex encoding of the transaction hash containing the processed message
+ TransactionHash string `json:"transaction-hash"`
+}
+
+// Defines a manual warp message to be sent from the relayer through the API.
+type ManualWarpMessageRequest struct {
+ UnsignedMessageBytes []byte `json:"unsigned-message-bytes"`
+ SourceAddress string `json:"source-address"`
+}
+
+func HandleRelayMessage(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) {
+ http.Handle(RelayAPIPath, relayAPIHandler(logger, messageCoordinator))
+}
+
+func HandleRelay(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) {
+ http.Handle(RelayMessageAPIPath, relayMessageAPIHandler(logger, messageCoordinator))
+}
+
+func relayMessageAPIHandler(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var req ManualWarpMessageRequest
+ err := json.NewDecoder(r.Body).Decode(&req)
+ if err != nil {
+ logger.Warn("Could not decode request body")
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ unsignedMessage, err := types.UnpackWarpMessage(req.UnsignedMessageBytes)
+ if err != nil {
+ logger.Warn("Error unpacking warp message", zap.Error(err))
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ warpMessageInfo := &relayerTypes.WarpMessageInfo{
+ SourceAddress: common.HexToAddress(req.SourceAddress),
+ UnsignedMessage: unsignedMessage,
+ }
+
+ txHash, err := messageCoordinator.ProcessWarpMessage(warpMessageInfo)
+ if err != nil {
+ logger.Error("Error processing message", zap.Error(err))
+ http.Error(w, "error processing message: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ resp, err := json.Marshal(
+ RelayMessageResponse{
+ TransactionHash: txHash.Hex(),
+ },
+ )
+ if err != nil {
+ logger.Error("Error marshaling response", zap.Error(err))
+ http.Error(w, "error marshaling response: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ _, err = w.Write(resp)
+ if err != nil {
+ logger.Error("Error writing response", zap.Error(err))
+ }
+ })
+}
+
+func relayAPIHandler(logger logging.Logger, messageCoordinator *relayer.MessageCoordinator) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ var req RelayMessageRequest
+ err := json.NewDecoder(r.Body).Decode(&req)
+ if err != nil {
+ logger.Warn("Could not decode request body")
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ blockchainID, err := ids.FromString(req.BlockchainID)
+ if err != nil {
+ logger.Warn("Invalid blockchainID", zap.String("blockchainID", req.BlockchainID))
+ http.Error(w, "invalid blockchainID: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+ messageID, err := ids.FromString(req.MessageID)
+ if err != nil {
+ logger.Warn("Invalid messageID", zap.String("messageID", req.MessageID))
+ http.Error(w, "invalid messageID: "+err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ txHash, err := messageCoordinator.ProcessMessageID(blockchainID, messageID, new(big.Int).SetUint64(req.BlockNum))
+ if err != nil {
+ logger.Error("Error processing message", zap.Error(err))
+ http.Error(w, "error processing message: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ resp, err := json.Marshal(
+ RelayMessageResponse{
+ TransactionHash: txHash.Hex(),
+ },
+ )
+ if err != nil {
+ logger.Error("Error marshalling response", zap.Error(err))
+ http.Error(w, "error marshalling response: "+err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ _, err = w.Write(resp)
+ if err != nil {
+ logger.Error("Error writing response", zap.Error(err))
+ }
+ })
+}
diff --git a/config/config.go b/config/config.go
index 4ae1fe00..4712feaa 100644
--- a/config/config.go
+++ b/config/config.go
@@ -58,7 +58,6 @@ type Config struct {
SourceBlockchains []*SourceBlockchain `mapstructure:"source-blockchains" json:"source-blockchains"`
DestinationBlockchains []*DestinationBlockchain `mapstructure:"destination-blockchains" json:"destination-blockchains"`
ProcessMissedBlocks bool `mapstructure:"process-missed-blocks" json:"process-missed-blocks"`
- ManualWarpMessages []*ManualWarpMessage `mapstructure:"manual-warp-messages" json:"manual-warp-messages"`
// convenience field to fetch a blockchain's subnet ID
blockchainIDToSubnetID map[ids.ID]ids.ID
@@ -71,7 +70,7 @@ func DisplayUsageText() {
// Validates the configuration
// Does not modify the public fields as derived from the configuration passed to the application,
-// but does initialize private fields available through getters
+// but does initialize private fields available through getters.
func (c *Config) Validate() error {
if len(c.SourceBlockchains) == 0 {
return errors.New("relayer not configured to relay from any subnets. A list of source subnets must be provided in the configuration file")
@@ -120,13 +119,6 @@ func (c *Config) Validate() error {
}
c.blockchainIDToSubnetID = blockchainIDToSubnetID
- // Validate the manual warp messages
- for i, msg := range c.ManualWarpMessages {
- if err := msg.Validate(); err != nil {
- return fmt.Errorf("invalid manual warp message at index %d: %w", i, err)
- }
- }
-
return nil
}
diff --git a/config/manual_warp_message.go b/config/manual_warp_message.go
deleted file mode 100644
index db5b26b0..00000000
--- a/config/manual_warp_message.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package config
-
-import (
- "encoding/hex"
- "errors"
-
- "github.com/ava-labs/avalanchego/ids"
- "github.com/ava-labs/awm-relayer/utils"
- "github.com/ethereum/go-ethereum/common"
-)
-
-// Defines a manual warp message to be sent from the relayer on startup.
-type ManualWarpMessage struct {
- UnsignedMessageBytes string `mapstructure:"unsigned-message-bytes" json:"unsigned-message-bytes"`
- SourceBlockchainID string `mapstructure:"source-blockchain-id" json:"source-blockchain-id"`
- DestinationBlockchainID string `mapstructure:"destination-blockchain-id" json:"destination-blockchain-id"`
- SourceAddress string `mapstructure:"source-address" json:"source-address"`
- DestinationAddress string `mapstructure:"destination-address" json:"destination-address"`
-
- // convenience fields to access the values after initialization
- unsignedMessageBytes []byte
- sourceBlockchainID ids.ID
- destinationBlockchainID ids.ID
- sourceAddress common.Address
- destinationAddress common.Address
-}
-
-// Validates the manual Warp message configuration.
-// Does not modify the public fields as derived from the configuration passed to the application,
-// but does initialize private fields available through getters
-func (m *ManualWarpMessage) Validate() error {
- unsignedMsg, err := hex.DecodeString(utils.SanitizeHexString(m.UnsignedMessageBytes))
- if err != nil {
- return err
- }
- sourceBlockchainID, err := ids.FromString(m.SourceBlockchainID)
- if err != nil {
- return err
- }
- if !common.IsHexAddress(m.SourceAddress) {
- return errors.New("invalid source address in manual warp message configuration")
- }
- destinationBlockchainID, err := ids.FromString(m.DestinationBlockchainID)
- if err != nil {
- return err
- }
- if !common.IsHexAddress(m.DestinationAddress) {
- return errors.New("invalid destination address in manual warp message configuration")
- }
- m.unsignedMessageBytes = unsignedMsg
- m.sourceBlockchainID = sourceBlockchainID
- m.sourceAddress = common.HexToAddress(m.SourceAddress)
- m.destinationBlockchainID = destinationBlockchainID
- m.destinationAddress = common.HexToAddress(m.DestinationAddress)
- return nil
-}
-
-func (m *ManualWarpMessage) GetUnsignedMessageBytes() []byte {
- return m.unsignedMessageBytes
-}
-
-func (m *ManualWarpMessage) GetSourceBlockchainID() ids.ID {
- return m.sourceBlockchainID
-}
-
-func (m *ManualWarpMessage) GetSourceAddress() common.Address {
- return m.sourceAddress
-}
-
-func (m *ManualWarpMessage) GetDestinationBlockchainID() ids.ID {
- return m.destinationBlockchainID
-}
-
-func (m *ManualWarpMessage) GetDestinationAddress() common.Address {
- return m.destinationAddress
-}
diff --git a/main/main.go b/main/main.go
index e33c5d8e..0ae77aa0 100644
--- a/main/main.go
+++ b/main/main.go
@@ -5,25 +5,25 @@ package main
import (
"context"
- "encoding/hex"
"fmt"
"log"
"net/http"
"os"
"strings"
- "github.com/alexliesenfeld/health"
"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/awm-relayer/api"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
+ "github.com/ava-labs/awm-relayer/messages"
+ offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry"
+ "github.com/ava-labs/awm-relayer/messages/teleporter"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/relayer"
- "github.com/ava-labs/awm-relayer/types"
- relayerTypes "github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/ethclient"
@@ -102,24 +102,27 @@ func main() {
logger.Info("Initializing destination clients")
destinationClients, err := vms.CreateDestinationClients(logger, cfg)
if err != nil {
- logger.Error(
- "Failed to create destination clients",
- zap.Error(err),
- )
+ logger.Fatal("Failed to create destination clients", zap.Error(err))
+ panic(err)
+ }
+
+ // Initialize all source clients
+ logger.Info("Initializing source clients")
+ sourceClients, err := createSourceClients(context.Background(), logger, &cfg)
+ if err != nil {
+ logger.Fatal("Failed to create source clients", zap.Error(err))
panic(err)
}
// Initialize metrics gathered through prometheus
gatherer, registerer, err := initializeMetrics()
if err != nil {
- logger.Fatal("Failed to set up prometheus metrics",
- zap.Error(err))
+ logger.Fatal("Failed to set up prometheus metrics", zap.Error(err))
panic(err)
}
// Initialize the global app request network
logger.Info("Initializing app request network")
-
// The app request network generates P2P networking logs that are verbose at the info level.
// Unless the log level is debug or lower, set the network log level to error to avoid spamming the logs.
// We do not collect metrics for the network.
@@ -133,51 +136,15 @@ func main() {
&cfg,
)
if err != nil {
- logger.Error(
- "Failed to create app request network",
- zap.Error(err),
- )
+ logger.Fatal("Failed to create app request network", zap.Error(err))
panic(err)
}
- // Each goroutine will have an atomic bool that it can set to false if it ever disconnects from its subscription.
- relayerHealth := make(map[ids.ID]*atomic.Bool)
-
- checker := health.NewChecker(
- health.WithCheck(health.Check{
- Name: "relayers-all",
- Check: func(context.Context) error {
- // Store the IDs as the cb58 encoding
- var unhealthyRelayers []string
- for id, health := range relayerHealth {
- if !health.Load() {
- unhealthyRelayers = append(unhealthyRelayers, id.String())
- }
- }
-
- if len(unhealthyRelayers) > 0 {
- return fmt.Errorf("relayers are unhealthy for blockchains %v", unhealthyRelayers)
- }
- return nil
- },
- }),
- )
-
- http.Handle("/health", health.NewHandler(checker))
-
- // start the health check server
- go func() {
- log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil))
- }()
-
startMetricsServer(logger, gatherer, cfg.MetricsPort)
- metrics, err := relayer.NewApplicationRelayerMetrics(registerer)
+ relayerMetrics, err := relayer.NewApplicationRelayerMetrics(registerer)
if err != nil {
- logger.Error(
- "Failed to create application relayer metrics",
- zap.Error(err),
- )
+ logger.Fatal("Failed to create application relayer metrics", zap.Error(err))
panic(err)
}
@@ -191,20 +158,14 @@ func main() {
constants.DefaultNetworkMaximumInboundTimeout,
)
if err != nil {
- logger.Error(
- "Failed to create message creator",
- zap.Error(err),
- )
+ logger.Fatal("Failed to create message creator", zap.Error(err))
panic(err)
}
// Initialize the database
db, err := database.NewDatabase(logger, &cfg)
if err != nil {
- logger.Error(
- "Failed to create database",
- zap.Error(err),
- )
+ logger.Fatal("Failed to create database", zap.Error(err))
panic(err)
}
@@ -212,159 +173,194 @@ func main() {
ticker := utils.NewTicker(cfg.DBWriteIntervalSeconds)
go ticker.Run()
- // Gather manual Warp messages specified in the configuration
- manualWarpMessages := make(map[ids.ID][]*relayerTypes.WarpMessageInfo)
- for _, msg := range cfg.ManualWarpMessages {
- sourceBlockchainID := msg.GetSourceBlockchainID()
- unsignedMsg, err := types.UnpackWarpMessage(msg.GetUnsignedMessageBytes())
- if err != nil {
- logger.Error(
- "Failed to unpack manual Warp message",
- zap.String("warpMessageBytes", hex.EncodeToString(msg.GetUnsignedMessageBytes())),
- zap.Error(err),
- )
- panic(err)
- }
- warpLogInfo := relayerTypes.WarpMessageInfo{
- SourceAddress: msg.GetSourceAddress(),
- UnsignedMessage: unsignedMsg,
- }
- manualWarpMessages[sourceBlockchainID] = append(manualWarpMessages[sourceBlockchainID], &warpLogInfo)
+ relayerHealth := createHealthTrackers(&cfg)
+
+ messageHandlerFactories, err := createMessageHandlerFactories(logger, &cfg)
+ if err != nil {
+ logger.Fatal("Failed to create message handler factories", zap.Error(err))
+ panic(err)
+ }
+
+ applicationRelayers, minHeights, err := createApplicationRelayers(
+ context.Background(),
+ logger,
+ relayerMetrics,
+ db,
+ ticker,
+ network,
+ messageCreator,
+ &cfg,
+ sourceClients,
+ destinationClients,
+ )
+ if err != nil {
+ logger.Fatal("Failed to create application relayers", zap.Error(err))
+ panic(err)
}
+ messageCoordinator := relayer.NewMessageCoordinator(logger, messageHandlerFactories, applicationRelayers, sourceClients)
+
+ // Each Listener goroutine will have an atomic bool that it can set to false to indicate an unrecoverable error
+ api.HandleHealthCheck(logger, relayerHealth)
+ api.HandleRelay(logger, messageCoordinator)
+ api.HandleRelayMessage(logger, messageCoordinator)
+
+ // start the health check server
+ go func() {
+ log.Fatalln(http.ListenAndServe(fmt.Sprintf(":%d", cfg.APIPort), nil))
+ }()
// Create listeners for each of the subnets configured as a source
errGroup, ctx := errgroup.WithContext(context.Background())
for _, s := range cfg.SourceBlockchains {
- blockchainID, err := ids.FromString(s.BlockchainID)
- if err != nil {
- logger.Error(
- "Invalid subnetID in configuration",
- zap.Error(err),
- )
- panic(err)
- }
sourceBlockchain := s
- health := atomic.NewBool(true)
- relayerHealth[blockchainID] = health
-
// errgroup will cancel the context when the first goroutine returns an error
errGroup.Go(func() error {
- // Dial the eth client
- ethClient, err := utils.NewEthClientWithConfig(
- context.Background(),
- sourceBlockchain.RPCEndpoint.BaseURL,
- sourceBlockchain.RPCEndpoint.HTTPHeaders,
- sourceBlockchain.RPCEndpoint.QueryParams,
- )
- if err != nil {
- logger.Error(
- "Failed to connect to node via RPC",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- zap.Error(err),
- )
- return err
- }
-
- // Create the ApplicationRelayers
- applicationRelayers, minHeight, err := createApplicationRelayers(
- ctx,
- logger,
- metrics,
- db,
- ticker,
- *sourceBlockchain,
- network,
- messageCreator,
- &cfg,
- ethClient,
- destinationClients,
- )
- if err != nil {
- logger.Error(
- "Failed to create application relayers",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- zap.Error(err),
- )
- return err
- }
- logger.Info(
- "Created application relayers",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- )
-
// runListener runs until it errors or the context is cancelled by another goroutine
- return runListener(
+ return relayer.RunListener(
ctx,
logger,
*sourceBlockchain,
- health,
- manualWarpMessages[blockchainID],
- &cfg,
- ethClient,
- applicationRelayers,
- minHeight,
+ sourceClients[sourceBlockchain.GetBlockchainID()],
+ relayerHealth[sourceBlockchain.GetBlockchainID()],
+ cfg.ProcessMissedBlocks,
+ minHeights[sourceBlockchain.GetBlockchainID()],
+ messageCoordinator,
)
})
}
err = errGroup.Wait()
- logger.Error(
- "Relayer exiting.",
- zap.Error(err),
- )
+ logger.Error("Relayer exiting.", zap.Error(err))
}
-// runListener creates a Listener instance and the ApplicationRelayers for a subnet.
-// The Listener listens for warp messages on that subnet, and the ApplicationRelayers handle delivery to the destination
-func runListener(
- ctx context.Context,
+func createMessageHandlerFactories(
logger logging.Logger,
- sourceBlockchain config.SourceBlockchain,
- relayerHealth *atomic.Bool,
- manualWarpMessages []*relayerTypes.WarpMessageInfo,
globalConfig *config.Config,
- ethClient ethclient.Client,
- applicationRelayers map[common.Hash]*relayer.ApplicationRelayer,
- minHeight uint64,
-) error {
- // Create the Listener
- listener, err := relayer.NewListener(
- logger,
- sourceBlockchain,
- relayerHealth,
- globalConfig,
- applicationRelayers,
- minHeight,
- ethClient,
- )
- if err != nil {
- return fmt.Errorf("failed to create listener instance: %w", err)
+) (map[ids.ID]map[common.Address]messages.MessageHandlerFactory, error) {
+ messageHandlerFactories := make(map[ids.ID]map[common.Address]messages.MessageHandlerFactory)
+ for _, sourceBlockchain := range globalConfig.SourceBlockchains {
+ messageHandlerFactoriesForSource := make(map[common.Address]messages.MessageHandlerFactory)
+ // Create message handler factories for each supported message protocol
+ for addressStr, cfg := range sourceBlockchain.MessageContracts {
+ address := common.HexToAddress(addressStr)
+ format := cfg.MessageFormat
+ var (
+ m messages.MessageHandlerFactory
+ err error
+ )
+ switch config.ParseMessageProtocol(format) {
+ case config.TELEPORTER:
+ m, err = teleporter.NewMessageHandlerFactory(
+ logger,
+ address,
+ cfg,
+ )
+ case config.OFF_CHAIN_REGISTRY:
+ m, err = offchainregistry.NewMessageHandlerFactory(
+ logger,
+ cfg,
+ )
+ default:
+ m, err = nil, fmt.Errorf("invalid message format %s", format)
+ }
+ if err != nil {
+ logger.Error("Failed to create message handler factory", zap.Error(err))
+ return nil, err
+ }
+ messageHandlerFactoriesForSource[address] = m
+ }
+ messageHandlerFactories[sourceBlockchain.GetBlockchainID()] = messageHandlerFactoriesForSource
}
- logger.Info(
- "Created listener",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- )
- err = listener.ProcessManualWarpMessages(logger, manualWarpMessages, sourceBlockchain)
- if err != nil {
- logger.Error(
- "Failed to process manual Warp messages",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- zap.Error(err),
+ return messageHandlerFactories, nil
+}
+
+func createSourceClients(
+ ctx context.Context,
+ logger logging.Logger,
+ cfg *config.Config,
+) (map[ids.ID]ethclient.Client, error) {
+ var err error
+ clients := make(map[ids.ID]ethclient.Client)
+
+ for _, sourceBlockchain := range cfg.SourceBlockchains {
+ clients[sourceBlockchain.GetBlockchainID()], err = utils.NewEthClientWithConfig(
+ ctx,
+ sourceBlockchain.RPCEndpoint.BaseURL,
+ sourceBlockchain.RPCEndpoint.HTTPHeaders,
+ sourceBlockchain.RPCEndpoint.QueryParams,
)
+ if err != nil {
+ logger.Error(
+ "Failed to connect to node via RPC",
+ zap.String("blockchainID", sourceBlockchain.BlockchainID),
+ zap.Error(err),
+ )
+ return nil, err
+ }
}
+ return clients, nil
+}
- logger.Info(
- "Listener initialized. Listening for messages to relay.",
- zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
- )
+// Returns a map of application relayers, as well as a map of source blockchain IDs to starting heights.
+func createApplicationRelayers(
+ ctx context.Context,
+ logger logging.Logger,
+ relayerMetrics *relayer.ApplicationRelayerMetrics,
+ db database.RelayerDatabase,
+ ticker *utils.Ticker,
+ network *peers.AppRequestNetwork,
+ messageCreator message.Creator,
+ cfg *config.Config,
+ sourceClients map[ids.ID]ethclient.Client,
+ destinationClients map[ids.ID]vms.DestinationClient,
+) (map[common.Hash]*relayer.ApplicationRelayer, map[ids.ID]uint64, error) {
+ applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer)
+ minHeights := make(map[ids.ID]uint64)
+ for _, sourceBlockchain := range cfg.SourceBlockchains {
+ currentHeight, err := sourceClients[sourceBlockchain.GetBlockchainID()].BlockNumber(ctx)
+ if err != nil {
+ logger.Error("Failed to get current block height", zap.Error(err))
+ return nil, nil, err
+ }
- // Wait for logs from the subscribed node
- // Will only return on error or context cancellation
- return listener.ProcessLogs(ctx)
+ // Create the ApplicationRelayers
+ applicationRelayersForSource, minHeight, err := createApplicationRelayersForSourceChain(
+ ctx,
+ logger,
+ relayerMetrics,
+ db,
+ ticker,
+ *sourceBlockchain,
+ network,
+ messageCreator,
+ cfg,
+ currentHeight,
+ destinationClients,
+ )
+ if err != nil {
+ logger.Error(
+ "Failed to create application relayers",
+ zap.String("blockchainID", sourceBlockchain.BlockchainID),
+ zap.Error(err),
+ )
+ return nil, nil, err
+ }
+
+ for relayerID, applicationRelayer := range applicationRelayersForSource {
+ applicationRelayers[relayerID] = applicationRelayer
+ }
+ minHeights[sourceBlockchain.GetBlockchainID()] = minHeight
+
+ logger.Info(
+ "Created application relayers",
+ zap.String("blockchainID", sourceBlockchain.BlockchainID),
+ )
+ }
+ return applicationRelayers, minHeights, nil
}
// createApplicationRelayers creates Application Relayers for a given source blockchain.
-func createApplicationRelayers(
+func createApplicationRelayersForSourceChain(
ctx context.Context,
logger logging.Logger,
metrics *relayer.ApplicationRelayerMetrics,
@@ -374,7 +370,7 @@ func createApplicationRelayers(
network *peers.AppRequestNetwork,
messageCreator message.Creator,
cfg *config.Config,
- srcEthClient ethclient.Client,
+ currentHeight uint64,
destinationClients map[ids.ID]vms.DestinationClient,
) (map[common.Hash]*relayer.ApplicationRelayer, uint64, error) {
// Create the ApplicationRelayers
@@ -384,17 +380,8 @@ func createApplicationRelayers(
)
applicationRelayers := make(map[common.Hash]*relayer.ApplicationRelayer)
- currentHeight, err := srcEthClient.BlockNumber(context.Background())
- if err != nil {
- logger.Error(
- "Failed to get current block height",
- zap.Error(err),
- )
- return nil, 0, err
- }
-
// Each ApplicationRelayer determines its starting height based on the database state.
- // The Listener begins processing messages starting from the minimum height across all of the ApplicationRelayers
+ // The Listener begins processing messages starting from the minimum height across all the ApplicationRelayers
minHeight := uint64(0)
for _, relayerID := range database.GetSourceBlockchainRelayerIDs(&sourceBlockchain) {
height, err := database.CalculateStartingBlockHeight(
@@ -441,6 +428,14 @@ func createApplicationRelayers(
return applicationRelayers, minHeight, nil
}
+func createHealthTrackers(cfg *config.Config) map[ids.ID]*atomic.Bool {
+ healthTrackers := make(map[ids.ID]*atomic.Bool, len(cfg.SourceBlockchains))
+ for _, sourceBlockchain := range cfg.SourceBlockchains {
+ healthTrackers[sourceBlockchain.GetBlockchainID()] = atomic.NewBool(true)
+ }
+ return healthTrackers
+}
+
func startMetricsServer(logger logging.Logger, gatherer prometheus.Gatherer, port uint16) {
http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))
diff --git a/messages/message_handler.go b/messages/message_handler.go
index 1ebfc04c..3beac8bc 100644
--- a/messages/message_handler.go
+++ b/messages/message_handler.go
@@ -27,7 +27,8 @@ type MessageHandler interface {
// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
- SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) error
+ // returns the transaction hash if the transaction is successful.
+ SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error)
// GetMessageRoutingInfo returns the source chain ID, origin sender address, destination chain ID, and destination address
GetMessageRoutingInfo() (
diff --git a/messages/off-chain-registry/message_handler.go b/messages/off-chain-registry/message_handler.go
index 8abd17e4..bf6ca351 100644
--- a/messages/off-chain-registry/message_handler.go
+++ b/messages/off-chain-registry/message_handler.go
@@ -144,7 +144,7 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie
return false, nil
}
-func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) error {
+func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) {
// Construct the transaction call data to call the TeleporterRegistry contract.
// Only one off-chain registry Warp message is sent at a time, so we hardcode the index to 0 in the call.
callData, err := teleporterregistry.PackAddProtocolVersion(0)
@@ -154,10 +154,10 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("destinationBlockchainID", destinationClient.DestinationBlockchainID().String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
- return err
+ return common.Hash{}, err
}
- _, err = destinationClient.SendTx(signedMessage, m.factory.registryAddress.Hex(), addProtocolVersionGasLimit, callData)
+ txHash, err := destinationClient.SendTx(signedMessage, m.factory.registryAddress.Hex(), addProtocolVersionGasLimit, callData)
if err != nil {
m.logger.Error(
"Failed to send tx.",
@@ -165,14 +165,14 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("warpMessageID", signedMessage.ID().String()),
zap.Error(err),
)
- return err
+ return common.Hash{}, err
}
m.logger.Info(
"Sent message to destination chain",
zap.String("destinationBlockchainID", destinationClient.DestinationBlockchainID().String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
- return nil
+ return txHash, nil
}
func (m *messageHandler) GetMessageRoutingInfo() (
diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go
index 806d254a..0e58e044 100644
--- a/messages/teleporter/message_handler.go
+++ b/messages/teleporter/message_handler.go
@@ -172,7 +172,7 @@ func (m *messageHandler) ShouldSendMessage(destinationClient vms.DestinationClie
// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract,
// and dispatches transaction construction and broadcast to the destination client
-func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) error {
+func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) {
destinationBlockchainID := destinationClient.DestinationBlockchainID()
teleporterMessageID, err := teleporterUtils.CalculateMessageID(
m.factory.protocolAddress,
@@ -181,7 +181,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
m.teleporterMessage.MessageNonce,
)
if err != nil {
- return fmt.Errorf("failed to calculate Teleporter message ID: %w", err)
+ return common.Hash{}, fmt.Errorf("failed to calculate Teleporter message ID: %w", err)
}
m.logger.Info(
@@ -198,7 +198,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
)
- return err
+ return common.Hash{}, err
}
gasLimit, err := gasUtils.CalculateReceiveMessageGasLimit(
@@ -215,7 +215,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
)
- return err
+ return common.Hash{}, err
}
// Construct the transaction call data to call the receive cross chain message method of the receiver precompile.
callData, err := teleportermessenger.PackReceiveCrossChainMessage(0, common.HexToAddress(m.factory.messageConfig.RewardAddress))
@@ -226,7 +226,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("warpMessageID", signedMessage.ID().String()),
zap.String("teleporterMessageID", teleporterMessageID.String()),
)
- return err
+ return common.Hash{}, err
}
txHash, err := destinationClient.SendTx(signedMessage, m.factory.protocolAddress.Hex(), gasLimit, callData)
@@ -238,13 +238,13 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("teleporterMessageID", teleporterMessageID.String()),
zap.Error(err),
)
- return err
+ return common.Hash{}, err
}
// Wait for the message to be included in a block before returning
err = m.waitForReceipt(signedMessage, destinationClient, txHash, teleporterMessageID)
if err != nil {
- return err
+ return common.Hash{}, err
}
m.logger.Info(
@@ -254,7 +254,7 @@ func (m *messageHandler) SendMessage(signedMessage *warp.Message, destinationCli
zap.String("teleporterMessageID", teleporterMessageID.String()),
zap.String("txHash", txHash.String()),
)
- return nil
+ return txHash, nil
}
func (m *messageHandler) waitForReceipt(signedMessage *warp.Message, destinationClient vms.DestinationClient, txHash common.Hash, teleporterMessageID ids.ID) error {
diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go
index c1f59600..d2ba8c08 100644
--- a/relayer/application_relayer.go
+++ b/relayer/application_relayer.go
@@ -31,6 +31,7 @@ import (
coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message"
msg "github.com/ava-labs/subnet-evm/plugin/evm/message"
"github.com/ava-labs/subnet-evm/rpc"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"golang.org/x/sync/errgroup"
@@ -158,7 +159,8 @@ func (r *ApplicationRelayer) ProcessHeight(height uint64, handlers []messages.Me
// Once we upgrade to Go 1.22, we can use the loop variable directly in the goroutine
h := handler
eg.Go(func() error {
- return r.ProcessMessage(h)
+ _, err := r.ProcessMessage(h)
+ return err
})
}
if err := eg.Wait(); err != nil {
@@ -182,29 +184,26 @@ func (r *ApplicationRelayer) ProcessHeight(height uint64, handlers []messages.Me
}
// Relays a message to the destination chain. Does not checkpoint the height.
-func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) error {
+// returns the transaction hash if the message is successfully relayed.
+func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (common.Hash, error) {
// Increment the request ID. Make sure we don't hold the lock while we relay the message.
r.lock.Lock()
r.currentRequestID++
reqID := r.currentRequestID
r.lock.Unlock()
- err := r.relayMessage(
- reqID,
- handler,
- )
-
- return err
+ return r.relayMessage(reqID, handler)
}
func (r *ApplicationRelayer) RelayerID() database.RelayerID {
return r.relayerID
}
+// returns the transaction hash if the message is successfully relayed.
func (r *ApplicationRelayer) relayMessage(
requestID uint32,
handler messages.MessageHandler,
-) error {
+) (common.Hash, error) {
r.logger.Debug(
"Relaying message",
zap.Uint32("requestID", requestID),
@@ -218,11 +217,11 @@ func (r *ApplicationRelayer) relayMessage(
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to check if message should be sent")
- return err
+ return common.Hash{}, err
}
if !shouldSend {
r.logger.Info("Message should not be sent")
- return nil
+ return common.Hash{}, nil
}
unsignedMessage := handler.GetUnsignedMessage()
@@ -240,7 +239,7 @@ func (r *ApplicationRelayer) relayMessage(
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to create signed warp message via AppRequest network")
- return err
+ return common.Hash{}, err
}
} else {
r.incFetchSignatureRPCCount()
@@ -251,29 +250,30 @@ func (r *ApplicationRelayer) relayMessage(
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to create signed warp message via RPC")
- return err
+ return common.Hash{}, err
}
}
// create signed message latency (ms)
r.setCreateSignedMessageLatencyMS(float64(time.Since(startCreateSignedMessageTime).Milliseconds()))
- err = handler.SendMessage(signedMessage, r.destinationClient)
+ txHash, err := handler.SendMessage(signedMessage, r.destinationClient)
if err != nil {
r.logger.Error(
"Failed to send warp message",
zap.Error(err),
)
r.incFailedRelayMessageCount("failed to send warp message")
- return err
+ return common.Hash{}, err
}
r.logger.Info(
"Finished relaying message to destination chain",
zap.String("destinationBlockchainID", r.relayerID.DestinationBlockchainID.String()),
+ zap.String("txHash", txHash.Hex()),
)
r.incSuccessfulRelayMessageCount()
- return nil
+ return txHash, nil
}
// createSignedMessage fetches the signed Warp message from the source chain via RPC.
diff --git a/relayer/listener.go b/relayer/listener.go
index 345e79b9..6a9948fd 100644
--- a/relayer/listener.go
+++ b/relayer/listener.go
@@ -8,20 +8,13 @@ import (
"fmt"
"math/big"
"math/rand"
- "sync"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
- "github.com/ava-labs/awm-relayer/database"
- "github.com/ava-labs/awm-relayer/messages"
- offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry"
- "github.com/ava-labs/awm-relayer/messages/teleporter"
- relayerTypes "github.com/ava-labs/awm-relayer/types"
"github.com/ava-labs/awm-relayer/utils"
- vms "github.com/ava-labs/awm-relayer/vms"
+ "github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/ethclient"
- "github.com/ethereum/go-ethereum/common"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@@ -35,28 +28,63 @@ const (
// Listener handles all messages sent from a given source chain
type Listener struct {
- Subscriber vms.Subscriber
- requestIDLock *sync.Mutex
- currentRequestID uint32
- contractMessage vms.ContractMessage
- messageHandlerFactories map[common.Address]messages.MessageHandlerFactory
- logger logging.Logger
- sourceBlockchain config.SourceBlockchain
- catchUpResultChan chan bool
- healthStatus *atomic.Bool
- globalConfig *config.Config
- applicationRelayers map[common.Hash]*ApplicationRelayer
- ethClient ethclient.Client
+ Subscriber vms.Subscriber
+ currentRequestID uint32
+ contractMessage vms.ContractMessage
+ logger logging.Logger
+ sourceBlockchain config.SourceBlockchain
+ catchUpResultChan chan bool
+ healthStatus *atomic.Bool
+ ethClient ethclient.Client
+ messageCoordinator *MessageCoordinator
}
-func NewListener(
+// runListener creates a Listener instance and the ApplicationRelayers for a subnet.
+// The Listener listens for warp messages on that subnet, and the ApplicationRelayers handle delivery to the destination
+func RunListener(
+ ctx context.Context,
logger logging.Logger,
sourceBlockchain config.SourceBlockchain,
+ ethRPCClient ethclient.Client,
relayerHealth *atomic.Bool,
- globalConfig *config.Config,
- applicationRelayers map[common.Hash]*ApplicationRelayer,
+ processMissedBlocks bool,
+ minHeight uint64,
+ messageCoordinator *MessageCoordinator,
+) error {
+ // Create the Listener
+ listener, err := newListener(
+ ctx,
+ logger,
+ sourceBlockchain,
+ ethRPCClient,
+ relayerHealth,
+ processMissedBlocks,
+ minHeight,
+ messageCoordinator,
+ )
+ if err != nil {
+ return fmt.Errorf("failed to create listener instance: %w", err)
+ }
+
+ logger.Info(
+ "Listener initialized. Listening for messages to relay.",
+ zap.String("originBlockchainID", sourceBlockchain.BlockchainID),
+ )
+
+ // Wait for logs from the subscribed node
+ // Will only return on error or context cancellation
+ return listener.processLogs(ctx)
+}
+
+func newListener(
+ ctx context.Context,
+ logger logging.Logger,
+ sourceBlockchain config.SourceBlockchain,
+ ethRPCClient ethclient.Client,
+ relayerHealth *atomic.Bool,
+ processMissedBlocks bool,
startingHeight uint64,
- ethClient ethclient.Client,
+ messageCoordinator *MessageCoordinator,
) (*Listener, error) {
blockchainID, err := ids.FromString(sourceBlockchain.BlockchainID)
if err != nil {
@@ -66,8 +94,9 @@ func NewListener(
)
return nil, err
}
+
ethWSClient, err := utils.NewEthClientWithConfig(
- context.Background(),
+ ctx,
sourceBlockchain.WSEndpoint.BaseURL,
sourceBlockchain.WSEndpoint.HTTPHeaders,
sourceBlockchain.WSEndpoint.QueryParams,
@@ -82,40 +111,6 @@ func NewListener(
}
sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient)
- // Create message managers for each supported message protocol
- messageHandlerFactories := make(map[common.Address]messages.MessageHandlerFactory)
- for addressStr, cfg := range sourceBlockchain.MessageContracts {
- address := common.HexToAddress(addressStr)
- format := cfg.MessageFormat
- var (
- m messages.MessageHandlerFactory
- err error
- )
- switch config.ParseMessageProtocol(format) {
- case config.TELEPORTER:
- m, err = teleporter.NewMessageHandlerFactory(
- logger,
- address,
- cfg,
- )
- case config.OFF_CHAIN_REGISTRY:
- m, err = offchainregistry.NewMessageHandlerFactory(
- logger,
- cfg,
- )
- default:
- m, err = nil, fmt.Errorf("invalid message format %s", format)
- }
- if err != nil {
- logger.Error(
- "Failed to create message manager",
- zap.Error(err),
- )
- return nil, err
- }
- messageHandlerFactories[address] = m
- }
-
// Marks when the listener has finished the catch-up process on startup.
// Until that time, we do not know the order in which messages are processed,
// since the catch-up process occurs concurrently with normal message processing
@@ -132,18 +127,15 @@ func NewListener(
zap.String("blockchainIDHex", sourceBlockchain.GetBlockchainID().Hex()),
)
lstnr := Listener{
- Subscriber: sub,
- requestIDLock: &sync.Mutex{},
- currentRequestID: rand.Uint32(), // Initialize to a random value to mitigate requestID collision
- contractMessage: vms.NewContractMessage(logger, sourceBlockchain),
- messageHandlerFactories: messageHandlerFactories,
- logger: logger,
- sourceBlockchain: sourceBlockchain,
- catchUpResultChan: catchUpResultChan,
- healthStatus: relayerHealth,
- globalConfig: globalConfig,
- applicationRelayers: applicationRelayers,
- ethClient: ethClient,
+ Subscriber: sub,
+ currentRequestID: rand.Uint32(), // Initialize to a random value to mitigate requestID collision
+ contractMessage: vms.NewContractMessage(logger, sourceBlockchain),
+ logger: logger,
+ sourceBlockchain: sourceBlockchain,
+ catchUpResultChan: catchUpResultChan,
+ healthStatus: relayerHealth,
+ ethClient: ethRPCClient,
+ messageCoordinator: messageCoordinator,
}
// Open the subscription. We must do this before processing any missed messages, otherwise we may miss an incoming message
@@ -157,7 +149,7 @@ func NewListener(
return nil, err
}
- if lstnr.globalConfig.ProcessMissedBlocks {
+ if processMissedBlocks {
// Process historical blocks in a separate goroutine so that the main processing loop can
// start processing new blocks as soon as possible. Otherwise, it's possible for
// ProcessFromHeight to overload the message queue and cause a deadlock.
@@ -176,7 +168,7 @@ func NewListener(
// Listens to the Subscriber logs channel to process them.
// On subscriber error, attempts to reconnect and errors if unable.
// Exits if context is cancelled by another goroutine.
-func (lstnr *Listener) ProcessLogs(ctx context.Context) error {
+func (lstnr *Listener) processLogs(ctx context.Context) error {
// Error channel for application relayer errors
errChan := make(chan error)
for {
@@ -211,52 +203,7 @@ func (lstnr *Listener) ProcessLogs(ctx context.Context) error {
return fmt.Errorf("failed to catch up on historical blocks")
}
case blockHeader := <-lstnr.Subscriber.Headers():
- // Parse the logs in the block, and group by application relayer
-
- block, err := relayerTypes.NewWarpBlockInfo(blockHeader, lstnr.ethClient)
- if err != nil {
- lstnr.logger.Error(
- "Failed to create Warp block info",
- zap.Error(err),
- )
- continue
- }
-
- // Relay the messages in the block to the destination chains. Continue on failure.
- lstnr.logger.Debug(
- "Processing block",
- zap.String("sourceBlockchainID", lstnr.sourceBlockchain.GetBlockchainID().String()),
- zap.Uint64("blockNumber", block.BlockNumber),
- )
-
- // Register each message in the block with the appropriate application relayer
- messageHandlers := make(map[common.Hash][]messages.MessageHandler)
- for _, warpLogInfo := range block.Messages {
- appRelayer, handler, err := lstnr.GetAppRelayerMessageHandler(warpLogInfo)
- if err != nil {
- lstnr.logger.Error(
- "Failed to parse message",
- zap.String("blockchainID", lstnr.sourceBlockchain.GetBlockchainID().String()),
- zap.Error(err),
- )
- continue
- }
- if appRelayer == nil {
- lstnr.logger.Debug("Application relayer not found. Skipping message relay")
- continue
- }
- messageHandlers[appRelayer.relayerID.ID] = append(messageHandlers[appRelayer.relayerID.ID], handler)
- }
- // Initiate message relay of all registered messages
- for _, appRelayer := range lstnr.applicationRelayers {
- // Dispatch all messages in the block to the appropriate application relayer.
- // An empty slice is still a valid argument to ProcessHeight; in this case the height is immediately committed.
- handlers := messageHandlers[appRelayer.relayerID.ID]
-
- // Process the height async. This is safe because the ApplicationRelayer maintains the threadsafe
- // invariant that heights are committed to the database one at a time, in order, with no gaps.
- go appRelayer.ProcessHeight(block.BlockNumber, handlers, errChan)
- }
+ go lstnr.messageCoordinator.ProcessBlock(blockHeader, lstnr.ethClient, errChan)
case err := <-lstnr.Subscriber.Err():
lstnr.healthStatus.Store(false)
lstnr.logger.Error(
@@ -298,162 +245,3 @@ func (lstnr *Listener) reconnectToSubscriber() error {
lstnr.healthStatus.Store(true)
return nil
}
-
-// Unpacks the Warp message and fetches the appropriate application relayer
-// Checks for the following registered keys. At most one of these keys should be registered.
-// 1. An exact match on sourceBlockchainID, destinationBlockchainID, originSenderAddress, and destinationAddress
-// 2. A match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress
-// 3. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress
-// 4. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress
-func (lstnr *Listener) getApplicationRelayer(
- sourceBlockchainID ids.ID,
- originSenderAddress common.Address,
- destinationBlockchainID ids.ID,
- destinationAddress common.Address,
-) *ApplicationRelayer {
- // Check for an exact match
- applicationRelayerID := database.CalculateRelayerID(
- sourceBlockchainID,
- destinationBlockchainID,
- originSenderAddress,
- destinationAddress,
- )
- if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok {
- return applicationRelayer
- }
-
- // Check for a match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress
- applicationRelayerID = database.CalculateRelayerID(
- sourceBlockchainID,
- destinationBlockchainID,
- originSenderAddress,
- database.AllAllowedAddress,
- )
- if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok {
- return applicationRelayer
- }
-
- // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress
- applicationRelayerID = database.CalculateRelayerID(
- sourceBlockchainID,
- destinationBlockchainID,
- database.AllAllowedAddress,
- destinationAddress,
- )
- if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok {
- return applicationRelayer
- }
-
- // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress
- applicationRelayerID = database.CalculateRelayerID(
- sourceBlockchainID,
- destinationBlockchainID,
- database.AllAllowedAddress,
- database.AllAllowedAddress,
- )
- if applicationRelayer, ok := lstnr.applicationRelayers[applicationRelayerID]; ok {
- return applicationRelayer
- }
- lstnr.logger.Debug(
- "Application relayer not found. Skipping message relay.",
- zap.String("blockchainID", lstnr.sourceBlockchain.GetBlockchainID().String()),
- zap.String("destinationBlockchainID", destinationBlockchainID.String()),
- zap.String("originSenderAddress", originSenderAddress.String()),
- zap.String("destinationAddress", destinationAddress.String()),
- )
- return nil
-}
-
-// Returns the ApplicationRelayer that is configured to handle this message, as well as a one-time MessageHandler
-// instance that the ApplicationRelayer uses to relay this specific message.
-// The MessageHandler and ApplicationRelayer are decoupled to support batch workflows in which a single ApplicationRelayer
-// processes multiple messages (using their corresponding MessageHandlers) in a single shot.
-func (lstnr *Listener) GetAppRelayerMessageHandler(warpMessageInfo *relayerTypes.WarpMessageInfo) (
- *ApplicationRelayer,
- messages.MessageHandler,
- error,
-) {
- // Check that the warp message is from a supported message protocol contract address.
- messageHandlerFactory, supportedMessageProtocol := lstnr.messageHandlerFactories[warpMessageInfo.SourceAddress]
- if !supportedMessageProtocol {
- // Do not return an error here because it is expected for there to be messages from other contracts
- // than just the ones supported by a single listener instance.
- lstnr.logger.Debug(
- "Warp message from unsupported message protocol address. Not relaying.",
- zap.String("protocolAddress", warpMessageInfo.SourceAddress.Hex()),
- )
- return nil, nil, nil
- }
- messageHandler, err := messageHandlerFactory.NewMessageHandler(warpMessageInfo.UnsignedMessage)
- if err != nil {
- lstnr.logger.Error(
- "Failed to create message handler",
- zap.Error(err),
- )
- return nil, nil, err
- }
-
- // Fetch the message delivery data
- sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo()
- if err != nil {
- lstnr.logger.Error(
- "Failed to get message routing information",
- zap.Error(err),
- )
- return nil, nil, err
- }
-
- lstnr.logger.Info(
- "Unpacked warp message",
- zap.String("sourceBlockchainID", sourceBlockchainID.String()),
- zap.String("originSenderAddress", originSenderAddress.String()),
- zap.String("destinationBlockchainID", destinationBlockchainID.String()),
- zap.String("destinationAddress", destinationAddress.String()),
- zap.String("warpMessageID", warpMessageInfo.UnsignedMessage.ID().String()),
- )
-
- appRelayer := lstnr.getApplicationRelayer(
- sourceBlockchainID,
- originSenderAddress,
- destinationBlockchainID,
- destinationAddress,
- )
- if appRelayer == nil {
- return nil, nil, nil
- }
- return appRelayer, messageHandler, nil
-}
-
-func (lstnr *Listener) ProcessManualWarpMessages(
- logger logging.Logger,
- manualWarpMessages []*relayerTypes.WarpMessageInfo,
- sourceBlockchain config.SourceBlockchain,
-) error {
- // Send any messages that were specified in the configuration
- for _, warpMessage := range manualWarpMessages {
- logger.Info(
- "Relaying manual Warp message",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()),
- )
- appRelayer, handler, err := lstnr.GetAppRelayerMessageHandler(warpMessage)
- if err != nil {
- logger.Error(
- "Failed to parse manual Warp message.",
- zap.Error(err),
- zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()),
- )
- return err
- }
- err = appRelayer.ProcessMessage(handler)
- if err != nil {
- logger.Error(
- "Failed to process manual Warp message",
- zap.String("blockchainID", sourceBlockchain.BlockchainID),
- zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()),
- )
- return err
- }
- }
- return nil
-}
diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go
new file mode 100644
index 00000000..49b2c37d
--- /dev/null
+++ b/relayer/message_coordinator.go
@@ -0,0 +1,259 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package relayer
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/big"
+
+ "github.com/ava-labs/avalanchego/ids"
+ "github.com/ava-labs/avalanchego/utils/logging"
+ "github.com/ava-labs/awm-relayer/database"
+ "github.com/ava-labs/awm-relayer/messages"
+ relayerTypes "github.com/ava-labs/awm-relayer/types"
+ "github.com/ava-labs/subnet-evm/core/types"
+ "github.com/ava-labs/subnet-evm/ethclient"
+ "github.com/ava-labs/subnet-evm/interfaces"
+ "github.com/ava-labs/subnet-evm/precompile/contracts/warp"
+ "github.com/ethereum/go-ethereum/common"
+ "go.uber.org/zap"
+)
+
+// MessageCoordinator contains all the logic required to process messages in the relayer.
+// Other components such as the listeners or the API should pass messages to the MessageCoordinator
+// so that it can parse the message(s) and pass them the the proper ApplicationRelayer.
+type MessageCoordinator struct {
+ logger logging.Logger
+ // Maps Source blockchain ID and protocol address to a Message Handler Factory
+ messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory
+ applicationRelayers map[common.Hash]*ApplicationRelayer
+ sourceClients map[ids.ID]ethclient.Client
+}
+
+func NewMessageCoordinator(
+ logger logging.Logger,
+ messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory,
+ applicationRelayers map[common.Hash]*ApplicationRelayer,
+ sourceClients map[ids.ID]ethclient.Client,
+) *MessageCoordinator {
+ return &MessageCoordinator{
+ logger: logger,
+ messageHandlerFactories: messageHandlerFactories,
+ applicationRelayers: applicationRelayers,
+ sourceClients: sourceClients,
+ }
+}
+
+// getAppRelayerMessageHandler returns the ApplicationRelayer that is configured to handle this message, as well as a
+// one-time MessageHandler instance that the ApplicationRelayer uses to relay this specific message.
+// The MessageHandler and ApplicationRelayer are decoupled to support batch workflows in which a single ApplicationRelayer
+// processes multiple messages (using their corresponding MessageHandlers) in a single shot.
+func (mc *MessageCoordinator) getAppRelayerMessageHandler(
+ warpMessageInfo *relayerTypes.WarpMessageInfo,
+) (
+ *ApplicationRelayer,
+ messages.MessageHandler,
+ error,
+) {
+ // Check that the warp message is from a supported message protocol contract address.
+ messageHandlerFactory, supportedMessageProtocol := mc.messageHandlerFactories[warpMessageInfo.UnsignedMessage.SourceChainID][warpMessageInfo.SourceAddress]
+ if !supportedMessageProtocol {
+ // Do not return an error here because it is expected for there to be messages from other contracts
+ // than just the ones supported by a single listener instance.
+ mc.logger.Debug(
+ "Warp message from unsupported message protocol address. Not relaying.",
+ zap.String("protocolAddress", warpMessageInfo.SourceAddress.Hex()),
+ )
+ return nil, nil, nil
+ }
+ messageHandler, err := messageHandlerFactory.NewMessageHandler(warpMessageInfo.UnsignedMessage)
+ if err != nil {
+ mc.logger.Error("Failed to create message handler", zap.Error(err))
+ return nil, nil, err
+ }
+
+ // Fetch the message delivery data
+ sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo()
+ if err != nil {
+ mc.logger.Error("Failed to get message routing information", zap.Error(err))
+ return nil, nil, err
+ }
+
+ mc.logger.Info(
+ "Unpacked warp message",
+ zap.String("sourceBlockchainID", sourceBlockchainID.String()),
+ zap.String("originSenderAddress", originSenderAddress.String()),
+ zap.String("destinationBlockchainID", destinationBlockchainID.String()),
+ zap.String("destinationAddress", destinationAddress.String()),
+ zap.String("warpMessageID", warpMessageInfo.UnsignedMessage.ID().String()),
+ )
+
+ appRelayer := mc.getApplicationRelayer(
+ sourceBlockchainID,
+ originSenderAddress,
+ destinationBlockchainID,
+ destinationAddress,
+ )
+ if appRelayer == nil {
+ return nil, nil, nil
+ }
+ return appRelayer, messageHandler, nil
+}
+
+// Unpacks the Warp message and fetches the appropriate application relayer
+// Checks for the following registered keys. At most one of these keys should be registered.
+// 1. An exact match on sourceBlockchainID, destinationBlockchainID, originSenderAddress, and destinationAddress
+// 2. A match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress
+// 3. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress
+// 4. A match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress
+func (mc *MessageCoordinator) getApplicationRelayer(
+ sourceBlockchainID ids.ID,
+ originSenderAddress common.Address,
+ destinationBlockchainID ids.ID,
+ destinationAddress common.Address,
+) *ApplicationRelayer {
+ // Check for an exact match
+ applicationRelayerID := database.CalculateRelayerID(
+ sourceBlockchainID,
+ destinationBlockchainID,
+ originSenderAddress,
+ destinationAddress,
+ )
+ if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok {
+ return applicationRelayer
+ }
+
+ // Check for a match on sourceBlockchainID and destinationBlockchainID, with a specific originSenderAddress and any destinationAddress
+ applicationRelayerID = database.CalculateRelayerID(
+ sourceBlockchainID,
+ destinationBlockchainID,
+ originSenderAddress,
+ database.AllAllowedAddress,
+ )
+ if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok {
+ return applicationRelayer
+ }
+
+ // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and a specific destinationAddress
+ applicationRelayerID = database.CalculateRelayerID(
+ sourceBlockchainID,
+ destinationBlockchainID,
+ database.AllAllowedAddress,
+ destinationAddress,
+ )
+ if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok {
+ return applicationRelayer
+ }
+
+ // Check for a match on sourceBlockchainID and destinationBlockchainID, with any originSenderAddress and any destinationAddress
+ applicationRelayerID = database.CalculateRelayerID(
+ sourceBlockchainID,
+ destinationBlockchainID,
+ database.AllAllowedAddress,
+ database.AllAllowedAddress,
+ )
+ if applicationRelayer, ok := mc.applicationRelayers[applicationRelayerID]; ok {
+ return applicationRelayer
+ }
+ mc.logger.Debug(
+ "Application relayer not found. Skipping message relay.",
+ zap.String("blockchainID", sourceBlockchainID.String()),
+ zap.String("destinationBlockchainID", destinationBlockchainID.String()),
+ zap.String("originSenderAddress", originSenderAddress.String()),
+ zap.String("destinationAddress", destinationAddress.String()),
+ )
+ return nil
+}
+
+func (mc *MessageCoordinator) ProcessWarpMessage(warpMessage *relayerTypes.WarpMessageInfo) (common.Hash, error) {
+ appRelayer, handler, err := mc.getAppRelayerMessageHandler(warpMessage)
+ if err != nil {
+ mc.logger.Error(
+ "Failed to parse Warp message.",
+ zap.Error(err),
+ zap.String("warpMessageID", warpMessage.UnsignedMessage.ID().String()),
+ )
+ return common.Hash{}, err
+ }
+ if appRelayer == nil {
+ mc.logger.Error("Application relayer not found")
+ return common.Hash{}, errors.New("application relayer not found")
+ }
+
+ return appRelayer.ProcessMessage(handler)
+}
+
+func (mc *MessageCoordinator) ProcessMessageID(blockchainID ids.ID, messageID ids.ID, blockNum *big.Int) (common.Hash, error) {
+ ethClient, ok := mc.sourceClients[blockchainID]
+ if !ok {
+ mc.logger.Error("Source client not found", zap.String("blockchainID", blockchainID.String()))
+ return common.Hash{}, fmt.Errorf("source client not set for blockchain: %s", blockchainID.String())
+ }
+
+ warpMessage, err := FetchWarpMessage(ethClient, messageID, blockNum)
+ if err != nil {
+ mc.logger.Error("Failed to fetch warp from blockchain", zap.String("blockchainID", blockchainID.String()), zap.Error(err))
+ return common.Hash{}, fmt.Errorf("could not fetch warp message from ID: %w", err)
+ }
+
+ return mc.ProcessWarpMessage(warpMessage)
+}
+
+// Meant to be ran asynchronously. Errors should be sent to errChan.
+func (mc *MessageCoordinator) ProcessBlock(blockHeader *types.Header, ethClient ethclient.Client, errChan chan error) {
+ // Parse the logs in the block, and group by application relayer
+ block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient)
+ if err != nil {
+ mc.logger.Error("Failed to create Warp block info", zap.Error(err))
+ errChan <- err
+ return
+ }
+
+ // Register each message in the block with the appropriate application relayer
+ messageHandlers := make(map[common.Hash][]messages.MessageHandler)
+ for _, warpLogInfo := range block.Messages {
+ appRelayer, handler, err := mc.getAppRelayerMessageHandler(warpLogInfo)
+ if err != nil {
+ mc.logger.Error(
+ "Failed to parse message",
+ zap.String("blockchainID", warpLogInfo.UnsignedMessage.SourceChainID.String()),
+ zap.String("protocolAddress", warpLogInfo.SourceAddress.String()),
+ zap.Error(err),
+ )
+ continue
+ }
+ if appRelayer == nil {
+ mc.logger.Debug("Application relayer not found. Skipping message relay")
+ continue
+ }
+ messageHandlers[appRelayer.relayerID.ID] = append(messageHandlers[appRelayer.relayerID.ID], handler)
+ }
+ // Initiate message relay of all registered messages
+ for _, appRelayer := range mc.applicationRelayers {
+ // Dispatch all messages in the block to the appropriate application relayer.
+ // An empty slice is still a valid argument to ProcessHeight; in this case the height is immediately committed.
+ handlers := messageHandlers[appRelayer.relayerID.ID]
+
+ go appRelayer.ProcessHeight(block.BlockNumber, handlers, errChan)
+ }
+}
+
+func FetchWarpMessage(ethClient ethclient.Client, warpID ids.ID, blockNum *big.Int) (*relayerTypes.WarpMessageInfo, error) {
+ logs, err := ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{
+ Topics: [][]common.Hash{{relayerTypes.WarpPrecompileLogFilter}, nil, {common.Hash(warpID)}},
+ Addresses: []common.Address{warp.ContractAddress},
+ FromBlock: blockNum,
+ ToBlock: blockNum,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("could not fetch logs: %w", err)
+ }
+ if len(logs) != 1 {
+ return nil, fmt.Errorf("found more than 1 log: %d", len(logs))
+ }
+
+ return relayerTypes.NewWarpMessageInfo(logs[0])
+}
diff --git a/tests/basic_relay.go b/tests/basic_relay.go
index 1c2a52df..28bd9735 100644
--- a/tests/basic_relay.go
+++ b/tests/basic_relay.go
@@ -52,6 +52,8 @@ func BasicRelay(network interfaces.LocalNetwork) {
fundedAddress,
relayerKey,
)
+ // The config needs to be validated in order to be passed to database.GetConfigRelayerIDs
+ relayerConfig.Validate()
relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname)
@@ -114,8 +116,10 @@ func BasicRelay(network interfaces.LocalNetwork) {
relayerIDA := database.CalculateRelayerID(subnetAInfo.BlockchainID, subnetBInfo.BlockchainID, database.AllAllowedAddress, database.AllAllowedAddress)
relayerIDB := database.CalculateRelayerID(subnetBInfo.BlockchainID, subnetAInfo.BlockchainID, database.AllAllowedAddress, database.AllAllowedAddress)
// Modify the JSON database to force the relayer to re-process old blocks
- jsonDB.Put(relayerIDA, database.LatestProcessedBlockKey, []byte("0"))
- jsonDB.Put(relayerIDB, database.LatestProcessedBlockKey, []byte("0"))
+ err = jsonDB.Put(relayerIDA, database.LatestProcessedBlockKey, []byte("0"))
+ Expect(err).Should(BeNil())
+ err = jsonDB.Put(relayerIDB, database.LatestProcessedBlockKey, []byte("0"))
+ Expect(err).Should(BeNil())
// Subscribe to the destination chain
newHeadsB := make(chan *types.Header, 10)
diff --git a/tests/e2e_test.go b/tests/e2e_test.go
index a587c874..be3b9768 100644
--- a/tests/e2e_test.go
+++ b/tests/e2e_test.go
@@ -21,9 +21,7 @@ const (
warpGenesisFile = "./tests/utils/warp-genesis.json"
)
-var (
- localNetworkInstance *local.LocalNetwork
-)
+var localNetworkInstance *local.LocalNetwork
func TestE2E(t *testing.T) {
if os.Getenv("RUN_E2E") == "" {
@@ -68,9 +66,6 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() {
ginkgo.It("Basic Relay", func() {
BasicRelay(localNetworkInstance)
})
- ginkgo.It("Teleporter Registry", func() {
- TeleporterRegistry(localNetworkInstance)
- })
ginkgo.It("Shared Database", func() {
SharedDatabaseAccess(localNetworkInstance)
})
@@ -80,6 +75,9 @@ var _ = ginkgo.Describe("[AWM Relayer Integration Tests", func() {
ginkgo.It("Batch Message", func() {
BatchRelay(localNetworkInstance)
})
+ ginkgo.It("Relay Message API", func() {
+ RelayMessageAPI(localNetworkInstance)
+ })
ginkgo.It("Warp API", func() {
WarpAPIRelay(localNetworkInstance)
})
diff --git a/tests/manual_message.go b/tests/manual_message.go
index 9a487d98..63a4a214 100644
--- a/tests/manual_message.go
+++ b/tests/manual_message.go
@@ -4,35 +4,45 @@
package tests
import (
+ "bytes"
"context"
- "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "math/big"
+ "net/http"
"time"
- avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
- "github.com/ava-labs/awm-relayer/config"
+ runner_sdk "github.com/ava-labs/avalanche-network-runner/client"
+ "github.com/ava-labs/awm-relayer/api"
+ offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry"
testUtils "github.com/ava-labs/awm-relayer/tests/utils"
"github.com/ava-labs/subnet-evm/accounts/abi/bind"
- "github.com/ava-labs/subnet-evm/core/types"
- subnetEvmInterfaces "github.com/ava-labs/subnet-evm/interfaces"
- "github.com/ava-labs/subnet-evm/precompile/contracts/warp"
"github.com/ava-labs/teleporter/tests/interfaces"
- "github.com/ava-labs/teleporter/tests/utils"
+ teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
-
. "github.com/onsi/gomega"
)
-// This tests relaying a message manually provided in the relayer config
+// Tests relayer support for off-chain Teleporter Registry updates
+// - Configures the relayer to send an off-chain message to the Teleporter Registry
+// - Verifies that the Teleporter Registry is updated
func ManualMessage(network interfaces.LocalNetwork) {
- subnetAInfo := network.GetPrimaryNetworkInfo()
- subnetBInfo, _ := utils.GetTwoSubnets(network)
+ cChainInfo := network.GetPrimaryNetworkInfo()
+ subnetAInfo, subnetBInfo := teleporterTestUtils.GetTwoSubnets(network)
fundedAddress, fundedKey := network.GetFundedAccountInfo()
teleporterContractAddress := network.GetTeleporterContractAddress()
err := testUtils.ClearRelayerStorage()
Expect(err).Should(BeNil())
+ //
+ // Get the current Teleporter Registry version
+ //
+ currentVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{})
+ Expect(err).Should(BeNil())
+ expectedNewVersion := currentVersion.Add(currentVersion, big.NewInt(1))
+
//
// Fund the relayer address on all subnets
//
@@ -41,89 +51,86 @@ func ManualMessage(network interfaces.LocalNetwork) {
log.Info("Funding relayer address on all subnets")
relayerKey, err := crypto.GenerateKey()
Expect(err).Should(BeNil())
- testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey)
+ testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{cChainInfo}, fundedKey, relayerKey)
//
- // Send two Teleporter message on Subnet A, before the relayer is running
+ // Define the off-chain Warp message
//
+ log.Info("Creating off-chain Warp message")
+ newProtocolAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567")
+ networkID := network.GetNetworkID()
- log.Info("Sending two teleporter messages on subnet A")
- // This message will be delivered by the relayer
- receipt1, _, id1 := testUtils.SendBasicTeleporterMessage(ctx, subnetAInfo, subnetBInfo, fundedKey, fundedAddress)
- msg1 := getWarpMessageFromLog(ctx, receipt1, subnetAInfo)
-
- // This message will not be delivered by the relayer
- _, _, id2 := testUtils.SendBasicTeleporterMessage(ctx, subnetAInfo, subnetBInfo, fundedKey, fundedAddress)
+ //
+ // Set up the nodes to accept the off-chain message
+ //
+ // Create chain config file with off chain message for each chain
+ unsignedMessage, warpEnabledChainConfigC := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, cChainInfo, newProtocolAddress, 2)
+ _, warpEnabledChainConfigA := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetAInfo, newProtocolAddress, 2)
+ _, warpEnabledChainConfigB := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetBInfo, newProtocolAddress, 2)
+
+ // Create chain config with off chain messages
+ chainConfigs := make(map[string]string)
+ teleporterTestUtils.SetChainConfig(chainConfigs, cChainInfo, warpEnabledChainConfigC)
+ teleporterTestUtils.SetChainConfig(chainConfigs, subnetBInfo, warpEnabledChainConfigB)
+ teleporterTestUtils.SetChainConfig(chainConfigs, subnetAInfo, warpEnabledChainConfigA)
+
+ // Restart nodes with new chain config
+ nodeNames := network.GetAllNodeNames()
+ log.Info("Restarting nodes with new chain config")
+ network.RestartNodes(ctx, nodeNames, runner_sdk.WithChainConfigs(chainConfigs))
+ // Refresh the subnet info to get the new clients
+ cChainInfo = network.GetPrimaryNetworkInfo()
//
- // Set up relayer config to deliver one of the two previously sent messages
+ // Set up relayer config
//
relayerConfig := testUtils.CreateDefaultRelayerConfig(
- []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
- []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
+ []interfaces.SubnetTestInfo{cChainInfo},
+ []interfaces.SubnetTestInfo{cChainInfo},
teleporterContractAddress,
fundedAddress,
relayerKey,
)
- relayerConfig.ManualWarpMessages = []*config.ManualWarpMessage{
- {
- UnsignedMessageBytes: hex.EncodeToString(msg1.Bytes()),
- SourceBlockchainID: subnetAInfo.BlockchainID.String(),
- DestinationBlockchainID: subnetBInfo.BlockchainID.String(),
- SourceAddress: teleporterContractAddress.Hex(),
- DestinationAddress: teleporterContractAddress.Hex(),
- },
- }
-
relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname)
- //
- // Run the Relayer. On startup, we should deliver the message provided in the config
- //
-
- // Subscribe to the destination chain
- newHeadsB := make(chan *types.Header, 10)
- sub, err := subnetBInfo.WSClient.SubscribeNewHead(ctx, newHeadsB)
- Expect(err).Should(BeNil())
- defer sub.Unsubscribe()
-
log.Info("Starting the relayer")
relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath)
defer relayerCleanup()
- log.Info("Waiting for a new block confirmation on subnet B")
- <-newHeadsB
- delivered1, err := subnetBInfo.TeleporterMessenger.MessageReceived(
- &bind.CallOpts{}, id1,
- )
- Expect(err).Should(BeNil())
- Expect(delivered1).Should(BeTrue())
+ // Sleep for some time to make sure relayer has started up and subscribed.
+ log.Info("Waiting for the relayer to start up")
+ time.Sleep(15 * time.Second)
- log.Info("Waiting for 10s to ensure no new block confirmations on destination chain")
- Consistently(newHeadsB, 10*time.Second, 500*time.Millisecond).ShouldNot(Receive())
+ reqBody := api.ManualWarpMessageRequest{
+ UnsignedMessageBytes: unsignedMessage.Bytes(),
+ SourceAddress: offchainregistry.OffChainRegistrySourceAddress.Hex(),
+ }
- delivered2, err := subnetBInfo.TeleporterMessenger.MessageReceived(
- &bind.CallOpts{}, id2,
- )
- Expect(err).Should(BeNil())
- Expect(delivered2).Should(BeFalse())
-}
+ client := http.Client{
+ Timeout: 30 * time.Second,
+ }
-func getWarpMessageFromLog(ctx context.Context, receipt *types.Receipt, source interfaces.SubnetTestInfo) *avalancheWarp.UnsignedMessage {
- log.Info("Fetching relevant warp logs from the newly produced block")
- logs, err := source.RPCClient.FilterLogs(ctx, subnetEvmInterfaces.FilterQuery{
- BlockHash: &receipt.BlockHash,
- Addresses: []common.Address{warp.Module.Address},
- })
- Expect(err).Should(BeNil())
- Expect(len(logs)).Should(Equal(1))
+ requestURL := fmt.Sprintf("http://localhost:%d%s", relayerConfig.APIPort, api.RelayMessageAPIPath)
- // Check for relevant warp log from subscription and ensure that it matches
- // the log extracted from the last block.
- txLog := logs[0]
- log.Info("Parsing logData as unsigned warp message")
- unsignedMsg, err := warp.UnpackSendWarpEventDataToMessage(txLog.Data)
- Expect(err).Should(BeNil())
+ // Send request to API
+ {
+ b, err := json.Marshal(reqBody)
+ Expect(err).Should(BeNil())
+ bodyReader := bytes.NewReader(b)
+
+ req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader)
+ Expect(err).Should(BeNil())
+ req.Header.Set("Content-Type", "application/json")
- return unsignedMsg
+ res, err := client.Do(req)
+ Expect(err).Should(BeNil())
+ Expect(res.Status).Should(Equal("200 OK"))
+
+ // Wait for all nodes to see new transaction
+ time.Sleep(1 * time.Second)
+
+ newVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{})
+ Expect(err).Should(BeNil())
+ Expect(newVersion.Uint64()).Should(Equal(expectedNewVersion.Uint64()))
+ }
}
diff --git a/tests/relay_message_api.go b/tests/relay_message_api.go
new file mode 100644
index 00000000..bdb5852b
--- /dev/null
+++ b/tests/relay_message_api.go
@@ -0,0 +1,156 @@
+// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
+// See the file LICENSE for licensing terms.
+
+package tests
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+
+ "github.com/ava-labs/avalanchego/ids"
+ avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
+ "github.com/ava-labs/awm-relayer/api"
+ testUtils "github.com/ava-labs/awm-relayer/tests/utils"
+ "github.com/ava-labs/subnet-evm/core/types"
+ subnetEvmInterfaces "github.com/ava-labs/subnet-evm/interfaces"
+ "github.com/ava-labs/subnet-evm/precompile/contracts/warp"
+ "github.com/ava-labs/teleporter/tests/interfaces"
+ "github.com/ava-labs/teleporter/tests/utils"
+ teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/log"
+ . "github.com/onsi/gomega"
+)
+
+func RelayMessageAPI(network interfaces.LocalNetwork) {
+ ctx := context.Background()
+ subnetAInfo := network.GetPrimaryNetworkInfo()
+ subnetBInfo, _ := utils.GetTwoSubnets(network)
+ fundedAddress, fundedKey := network.GetFundedAccountInfo()
+ teleporterContractAddress := network.GetTeleporterContractAddress()
+ err := testUtils.ClearRelayerStorage()
+ Expect(err).Should(BeNil())
+
+ log.Info("Funding relayer address on all subnets")
+ relayerKey, err := crypto.GenerateKey()
+ Expect(err).Should(BeNil())
+ testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo}, fundedKey, relayerKey)
+
+ log.Info("Sending teleporter message")
+ receipt, _, teleporterMessageID := testUtils.SendBasicTeleporterMessage(ctx, subnetAInfo, subnetBInfo, fundedKey, fundedAddress)
+ warpMessage := getWarpMessageFromLog(ctx, receipt, subnetAInfo)
+
+ // Set up relayer config
+ relayerConfig := testUtils.CreateDefaultRelayerConfig(
+ []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
+ []interfaces.SubnetTestInfo{subnetAInfo, subnetBInfo},
+ teleporterContractAddress,
+ fundedAddress,
+ relayerKey,
+ )
+ // Don't process missed blocks, so we can manually relay
+ relayerConfig.ProcessMissedBlocks = false
+
+ relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname)
+
+ log.Info("Starting the relayer")
+ relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath)
+ defer relayerCleanup()
+
+ // Sleep for some time to make sure relayer has started up and subscribed.
+ log.Info("Waiting for the relayer to start up")
+ time.Sleep(15 * time.Second)
+
+ reqBody := api.RelayMessageRequest{
+ BlockchainID: subnetAInfo.BlockchainID.String(),
+ MessageID: warpMessage.ID().String(),
+ BlockNum: receipt.BlockNumber.Uint64(),
+ }
+
+ client := http.Client{
+ Timeout: 30 * time.Second,
+ }
+
+ requestURL := fmt.Sprintf("http://localhost:%d%s", relayerConfig.APIPort, api.RelayAPIPath)
+
+ // Send request to API
+ {
+ b, err := json.Marshal(reqBody)
+ Expect(err).Should(BeNil())
+ bodyReader := bytes.NewReader(b)
+
+ req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader)
+ Expect(err).Should(BeNil())
+ req.Header.Set("Content-Type", "application/json")
+
+ res, err := client.Do(req)
+ Expect(err).Should(BeNil())
+ Expect(res.Status).Should(Equal("200 OK"))
+
+ defer res.Body.Close()
+ body, err := io.ReadAll(res.Body)
+ Expect(err).Should(BeNil())
+
+ var response api.RelayMessageResponse
+ err = json.Unmarshal(body, &response)
+ Expect(err).Should(BeNil())
+
+ receipt, err := subnetBInfo.RPCClient.TransactionReceipt(ctx, common.HexToHash(response.TransactionHash))
+ Expect(err).Should(BeNil())
+ receiveEvent, err := teleporterTestUtils.GetEventFromLogs(receipt.Logs, subnetBInfo.TeleporterMessenger.ParseReceiveCrossChainMessage)
+ Expect(err).Should(BeNil())
+ Expect(ids.ID(receiveEvent.MessageID)).Should(Equal(teleporterMessageID))
+ }
+
+ // Send the same request to ensure the correct response.
+ {
+ b, err := json.Marshal(reqBody)
+ Expect(err).Should(BeNil())
+ bodyReader := bytes.NewReader(b)
+
+ req, err := http.NewRequest(http.MethodPost, requestURL, bodyReader)
+ Expect(err).Should(BeNil())
+ req.Header.Set("Content-Type", "application/json")
+
+ res, err := client.Do(req)
+ Expect(err).Should(BeNil())
+ Expect(res.Status).Should(Equal("200 OK"))
+
+ defer res.Body.Close()
+ body, err := io.ReadAll(res.Body)
+ Expect(err).Should(BeNil())
+
+ var response api.RelayMessageResponse
+ err = json.Unmarshal(body, &response)
+ Expect(err).Should(BeNil())
+ Expect(response.TransactionHash).Should(Equal("0x0000000000000000000000000000000000000000000000000000000000000000"))
+ }
+
+ // Cancel the command and stop the relayer
+ relayerCleanup()
+}
+
+func getWarpMessageFromLog(ctx context.Context, receipt *types.Receipt, source interfaces.SubnetTestInfo) *avalancheWarp.UnsignedMessage {
+ log.Info("Fetching relevant warp logs from the newly produced block")
+ logs, err := source.RPCClient.FilterLogs(ctx, subnetEvmInterfaces.FilterQuery{
+ BlockHash: &receipt.BlockHash,
+ Addresses: []common.Address{warp.Module.Address},
+ })
+ Expect(err).Should(BeNil())
+ Expect(len(logs)).Should(Equal(1))
+
+ // Check for relevant warp log from subscription and ensure that it matches
+ // the log extracted from the last block.
+ txLog := logs[0]
+ log.Info("Parsing logData as unsigned warp message")
+ unsignedMsg, err := warp.UnpackSendWarpEventDataToMessage(txLog.Data)
+ Expect(err).Should(BeNil())
+
+ return unsignedMsg
+}
diff --git a/tests/teleporter_registry.go b/tests/teleporter_registry.go
deleted file mode 100644
index 13c4698e..00000000
--- a/tests/teleporter_registry.go
+++ /dev/null
@@ -1,122 +0,0 @@
-// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
-// See the file LICENSE for licensing terms.
-
-package tests
-
-import (
- "context"
- "encoding/hex"
- "math/big"
-
- runner_sdk "github.com/ava-labs/avalanche-network-runner/client"
- "github.com/ava-labs/awm-relayer/config"
- offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry"
- testUtils "github.com/ava-labs/awm-relayer/tests/utils"
- "github.com/ava-labs/subnet-evm/accounts/abi/bind"
- "github.com/ava-labs/subnet-evm/core/types"
- "github.com/ava-labs/teleporter/tests/interfaces"
- teleporterTestUtils "github.com/ava-labs/teleporter/tests/utils"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/log"
- . "github.com/onsi/gomega"
-)
-
-// Tests relayer support for off-chain Teleporter Registry updates
-// - Configures the relayer to send an off-chain message to the Teleporter Registry
-// - Verifies that the Teleporter Registry is updated
-func TeleporterRegistry(network interfaces.LocalNetwork) {
- cChainInfo := network.GetPrimaryNetworkInfo()
- subnetAInfo, subnetBInfo := teleporterTestUtils.GetTwoSubnets(network)
- fundedAddress, fundedKey := network.GetFundedAccountInfo()
- teleporterContractAddress := network.GetTeleporterContractAddress()
- err := testUtils.ClearRelayerStorage()
- Expect(err).Should(BeNil())
-
- //
- // Get the current Teleporter Registry version
- //
- currentVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{})
- Expect(err).Should(BeNil())
- expectedNewVersion := currentVersion.Add(currentVersion, big.NewInt(1))
-
- //
- // Fund the relayer address on all subnets
- //
- ctx := context.Background()
-
- log.Info("Funding relayer address on all subnets")
- relayerKey, err := crypto.GenerateKey()
- Expect(err).Should(BeNil())
- testUtils.FundRelayers(ctx, []interfaces.SubnetTestInfo{cChainInfo}, fundedKey, relayerKey)
-
- //
- // Define the off-chain Warp message
- //
- log.Info("Creating off-chain Warp message")
- newProtocolAddress := common.HexToAddress("0x0123456789abcdef0123456789abcdef01234567")
- networkID := network.GetNetworkID()
-
- //
- // Set up the nodes to accept the off-chain message
- //
- // Create chain config file with off chain message for each chain
- unsignedMessage, warpEnabledChainConfigC := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, cChainInfo, newProtocolAddress, 2)
- _, warpEnabledChainConfigA := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetAInfo, newProtocolAddress, 2)
- _, warpEnabledChainConfigB := teleporterTestUtils.InitOffChainMessageChainConfig(networkID, subnetBInfo, newProtocolAddress, 2)
-
- // Create chain config with off chain messages
- chainConfigs := make(map[string]string)
- teleporterTestUtils.SetChainConfig(chainConfigs, cChainInfo, warpEnabledChainConfigC)
- teleporterTestUtils.SetChainConfig(chainConfigs, subnetBInfo, warpEnabledChainConfigB)
- teleporterTestUtils.SetChainConfig(chainConfigs, subnetAInfo, warpEnabledChainConfigA)
-
- // Restart nodes with new chain config
- nodeNames := network.GetAllNodeNames()
- log.Info("Restarting nodes with new chain config")
- network.RestartNodes(ctx, nodeNames, runner_sdk.WithChainConfigs(chainConfigs))
- // Refresh the subnet info to get the new clients
- cChainInfo = network.GetPrimaryNetworkInfo()
-
- //
- // Set up relayer config
- //
- relayerConfig := testUtils.CreateDefaultRelayerConfig(
- []interfaces.SubnetTestInfo{cChainInfo},
- []interfaces.SubnetTestInfo{cChainInfo},
- teleporterContractAddress,
- fundedAddress,
- relayerKey,
- )
- relayerConfig.ManualWarpMessages = []*config.ManualWarpMessage{
- {
- UnsignedMessageBytes: hex.EncodeToString(unsignedMessage.Bytes()),
- SourceBlockchainID: cChainInfo.BlockchainID.String(),
- DestinationBlockchainID: cChainInfo.BlockchainID.String(),
- SourceAddress: offchainregistry.OffChainRegistrySourceAddress.Hex(),
- DestinationAddress: cChainInfo.TeleporterRegistryAddress.Hex(),
- },
- }
- relayerConfigPath := testUtils.WriteRelayerConfig(relayerConfig, testUtils.DefaultRelayerCfgFname)
- //
- // Run the Relayer. On startup, we should deliver the message provided in the config
- //
-
- // Subscribe to the destination chain
- newHeadsC := make(chan *types.Header, 10)
- sub, err := cChainInfo.WSClient.SubscribeNewHead(ctx, newHeadsC)
- Expect(err).Should(BeNil())
- defer sub.Unsubscribe()
-
- log.Info("Starting the relayer")
- relayerCleanup := testUtils.BuildAndRunRelayerExecutable(ctx, relayerConfigPath)
- defer relayerCleanup()
-
- log.Info("Waiting for a new block confirmation on the C-Chain")
- <-newHeadsC
-
- log.Info("Verifying that the Teleporter Registry was updated")
- newVersion, err := cChainInfo.TeleporterRegistry.LatestVersion(&bind.CallOpts{})
- Expect(err).Should(BeNil())
- Expect(newVersion.Cmp(expectedNewVersion)).Should(Equal(0))
-}
diff --git a/tests/utils/utils.go b/tests/utils/utils.go
index 3d8fd8d1..170b16b8 100644
--- a/tests/utils/utils.go
+++ b/tests/utils/utils.go
@@ -204,6 +204,7 @@ func CreateDefaultRelayerConfig(
MetricsPort: 9090,
SourceBlockchains: sources,
DestinationBlockchains: destinations,
+ APIPort: 8080,
}
}
diff --git a/types/types.go b/types/types.go
index 735115d2..b71a6805 100644
--- a/types/types.go
+++ b/types/types.go
@@ -16,12 +16,13 @@ import (
"github.com/ethereum/go-ethereum/common"
)
-var WarpPrecompileLogFilter = warp.WarpABI.Events["SendWarpMessage"].ID
-var ErrInvalidLog = errors.New("invalid warp message log")
+var (
+ WarpPrecompileLogFilter = warp.WarpABI.Events["SendWarpMessage"].ID
+ ErrInvalidLog = errors.New("invalid warp message log")
+)
// WarpBlockInfo describes the block height and logs needed to process Warp messages.
-// WarpBlockInfo instances are populated by the subscriber, and forwared to the
-// Listener to process
+// WarpBlockInfo instances are populated by the subscriber, and forwarded to the Listener to process.
type WarpBlockInfo struct {
BlockNumber uint64
Messages []*WarpMessageInfo
diff --git a/vms/destination_client.go b/vms/destination_client.go
index e417a458..12e2c895 100644
--- a/vms/destination_client.go
+++ b/vms/destination_client.go
@@ -20,7 +20,7 @@ import (
// DestinationClient is the interface for the destination chain client. Methods that interact with the destination chain
// should generally be implemented in a thread safe way, as they will be called concurrently by the application relayers.
type DestinationClient interface {
- // SendTx contructs the transaction from warp primitives, and sends to the configured destination chain endpoint.
+ // SendTx constructs the transaction from warp primitives, and sends to the configured destination chain endpoint.
// Returns the hash of the sent transaction.
// TODO: Make generic for any VM.
SendTx(signedMessage *warp.Message, toAddress string, gasLimit uint64, callData []byte) (common.Hash, error)
@@ -52,7 +52,7 @@ func CreateDestinationClients(logger logging.Logger, relayerConfig config.Config
if err != nil {
logger.Error(
"Failed to decode base-58 encoded source chain ID",
- zap.String("blockchainID", blockchainID.String()),
+ zap.String("blockchainID", subnetInfo.BlockchainID),
zap.Error(err),
)
return nil, err