diff --git a/config/config.go b/config/config.go index 149d3022..29867534 100644 --- a/config/config.go +++ b/config/config.go @@ -250,6 +250,7 @@ func setQuorumNumerator(cfgNumerator uint64) uint64 { // Helper to retrieve the Warp Quorum from the chain config. // Differentiates between subnet-evm and coreth RPC internally func getWarpQuorum( + blockchainID ids.ID, subnetID ids.ID, client ethclient.Client, ) (WarpQuorum, error) { @@ -263,7 +264,7 @@ func getWarpQuorum( // Fetch the subnet's chain config chainConfig, err := client.ChainConfig(context.Background()) if err != nil { - return WarpQuorum{}, fmt.Errorf("failed to fetch chain config for subnet %s: %v", subnetID, err) + return WarpQuorum{}, fmt.Errorf("failed to fetch chain config for blockchain %s: %v", blockchainID, err) } // First, check the list of precompile upgrades to get the most up to date Warp config @@ -286,7 +287,7 @@ func getWarpQuorum( QuorumDenominator: params.WarpQuorumDenominator, }, nil } - return WarpQuorum{}, fmt.Errorf("failed to find warp config for subnet %s", subnetID) + return WarpQuorum{}, fmt.Errorf("failed to find warp config for blockchain %s", blockchainID) } func (c *Config) initializeWarpQuorum() error { @@ -294,6 +295,10 @@ func (c *Config) initializeWarpQuorum() error { // Fetch the Warp quorum values for each source subnet for _, sourceSubnet := range c.SourceSubnets { + blockchainID, err := ids.FromString(sourceSubnet.BlockchainID) + if err != nil { + return fmt.Errorf("invalid blockchainID in configuration. error: %v", err) + } subnetID, err := ids.FromString(sourceSubnet.SubnetID) if err != nil { return fmt.Errorf("invalid subnetID in configuration. error: %v", err) @@ -304,37 +309,41 @@ func (c *Config) initializeWarpQuorum() error { return fmt.Errorf("failed to dial source subnet %s: %v", subnetID, err) } defer client.Close() - quorum, err := getWarpQuorum(subnetID, client) + quorum, err := getWarpQuorum(blockchainID, subnetID, client) if err != nil { return err } - c.warpQuorum[subnetID] = quorum + c.warpQuorum[blockchainID] = quorum } // Fetch the Warp quorum values for each destination subnet. // We do this to properly handle Warp messages originating from the primary network for _, destinationSubnet := range c.DestinationSubnets { + blockchainID, err := ids.FromString(destinationSubnet.BlockchainID) + if err != nil { + return fmt.Errorf("invalid blockchainID in configuration. error: %v", err) + } subnetID, err := ids.FromString(destinationSubnet.SubnetID) if err != nil { return fmt.Errorf("invalid subnetID in configuration. error: %v", err) } - if _, ok := c.warpQuorum[subnetID]; ok { + if _, ok := c.warpQuorum[blockchainID]; ok { // We already fetched the quorum for this subnet continue } client, err := ethclient.Dial(destinationSubnet.GetNodeRPCEndpoint()) if err != nil { - return fmt.Errorf("failed to dial destination subnet %s: %v", subnetID, err) + return fmt.Errorf("failed to dial destination blockchain %s: %v", blockchainID, err) } defer client.Close() - quorum, err := getWarpQuorum(subnetID, client) + quorum, err := getWarpQuorum(blockchainID, subnetID, client) if err != nil { return err } - c.warpQuorum[subnetID] = quorum + c.warpQuorum[blockchainID] = quorum } return nil } diff --git a/config/config_test.go b/config/config_test.go index ae8d22f4..4bf43606 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -417,11 +417,13 @@ func TestGetRelayerAccountInfoSkipChainConfigCheckCompatible(t *testing.T) { } func TestGetWarpQuorum(t *testing.T) { - subnetID, err := ids.FromString("p433wpuXyJiDhyazPYyZMJeaoPSW76CBZ2x7wrVPLgvokotXz") + blockchainID, err := ids.FromString("p433wpuXyJiDhyazPYyZMJeaoPSW76CBZ2x7wrVPLgvokotXz") require.NoError(t, err) + subnetID, err := ids.FromString("2PsShLjrFFwR51DMcAh8pyuwzLn1Ym3zRhuXLTmLCR1STk2mL6") testCases := []struct { name string + blockchainID ids.ID subnetID ids.ID chainConfig params.ChainConfig getChainConfigCalls int @@ -430,6 +432,7 @@ func TestGetWarpQuorum(t *testing.T) { }{ { name: "primary network", + blockchainID: blockchainID, subnetID: ids.Empty, getChainConfigCalls: 0, expectedError: nil, @@ -440,6 +443,7 @@ func TestGetWarpQuorum(t *testing.T) { }, { name: "subnet genesis precompile", + blockchainID: blockchainID, subnetID: subnetID, getChainConfigCalls: 1, chainConfig: params.ChainConfig{ @@ -457,6 +461,7 @@ func TestGetWarpQuorum(t *testing.T) { }, { name: "subnet genesis precompile non-default", + blockchainID: blockchainID, subnetID: subnetID, getChainConfigCalls: 1, chainConfig: params.ChainConfig{ @@ -474,6 +479,7 @@ func TestGetWarpQuorum(t *testing.T) { }, { name: "subnet upgrade precompile", + blockchainID: blockchainID, subnetID: subnetID, getChainConfigCalls: 1, chainConfig: params.ChainConfig{ @@ -495,6 +501,7 @@ func TestGetWarpQuorum(t *testing.T) { }, { name: "subnet upgrade precompile non-default", + blockchainID: blockchainID, subnetID: subnetID, getChainConfigCalls: 1, chainConfig: params.ChainConfig{ @@ -523,7 +530,7 @@ func TestGetWarpQuorum(t *testing.T) { client.EXPECT().ChainConfig(gomock.Any()).Return(&testCase.chainConfig, nil).Times(testCase.getChainConfigCalls), ) - quorum, err := getWarpQuorum(testCase.subnetID, client) + quorum, err := getWarpQuorum(testCase.blockchainID, testCase.subnetID, client) require.Equal(t, testCase.expectedError, err) require.Equal(t, testCase.expectedQuorum, quorum) }) diff --git a/main/main.go b/main/main.go index 041c0fe4..32d579be 100644 --- a/main/main.go +++ b/main/main.go @@ -207,13 +207,6 @@ func runRelayer(logger logging.Logger, zap.String("blockchainID", sourceSubnetInfo.BlockchainID), ) - subnetID, err := ids.FromString(sourceSubnetInfo.SubnetID) - if err != nil { - // The subnetID should have already been validated - panic(err) - } - quorum := cfg.GetWarpQuorum()[subnetID] - relayer, subscriber, err := relayer.NewRelayer( logger, db, @@ -223,7 +216,6 @@ func runRelayer(logger logging.Logger, responseChan, destinationClients, cfg.ProcessMissedBlocks, - quorum, ) if err != nil { logger.Error( @@ -249,7 +241,7 @@ func runRelayer(logger logging.Logger, ) // Relay the message to the destination chain. Continue on failure. - err = relayer.RelayMessage(&txLog, metrics, messageCreator) + err = relayer.RelayMessage(&txLog, metrics, messageCreator, &cfg) if err != nil { logger.Error( "Error relaying message", diff --git a/relayer/message_relayer.go b/relayer/message_relayer.go index ccb2bef1..7c44c3a4 100644 --- a/relayer/message_relayer.go +++ b/relayer/message_relayer.go @@ -20,6 +20,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/vms/platformvm/warp" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/peers" "github.com/ava-labs/awm-relayer/utils" @@ -56,6 +57,7 @@ type messageRelayer struct { relayer *Relayer warpMessage *warp.UnsignedMessage destinationBlockchainID ids.ID + warpQuorum config.WarpQuorum messageResponseChan chan message.InboundMessage logger logging.Logger messageCreator message.Creator @@ -68,6 +70,7 @@ func newMessageRelayer( relayer *Relayer, warpMessage *warp.UnsignedMessage, destinationBlockchainID ids.ID, + warpQuorum config.WarpQuorum, messageResponseChan chan message.InboundMessage, messageCreator message.Creator, ) *messageRelayer { @@ -75,6 +78,7 @@ func newMessageRelayer( relayer: relayer, warpMessage: warpMessage, destinationBlockchainID: destinationBlockchainID, + warpQuorum: warpQuorum, messageResponseChan: messageResponseChan, logger: logger, metrics: metrics, @@ -182,7 +186,7 @@ func (r *messageRelayer) createSignedMessage() (*warp.Message, error) { signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature( context.Background(), r.warpMessage.ID(), - r.relayer.warpQuorum.QuorumNumerator, + r.warpQuorum.QuorumNumerator, signingSubnetID.String(), ) if err == nil { @@ -413,8 +417,8 @@ func (r *messageRelayer) createSignedMessageAppRequest(requestID uint32) (*warp. if utils.CheckStakeWeightExceedsThreshold( accumulatedSignatureWeight, totalValidatorWeight, - r.relayer.warpQuorum.QuorumNumerator, - r.relayer.warpQuorum.QuorumDenominator, + r.warpQuorum.QuorumNumerator, + r.warpQuorum.QuorumDenominator, ) { aggSig, vdrBitSet, err := r.aggregateSignatures(signatureMap) if err != nil { diff --git a/relayer/relayer.go b/relayer/relayer.go index 1c22b1d5..a80d495e 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -41,7 +41,6 @@ type Relayer struct { db database.RelayerDatabase supportedDestinations set.Set[ids.ID] apiNodeURI string - warpQuorum config.WarpQuorum } func NewRelayer( @@ -53,7 +52,6 @@ func NewRelayer( responseChan chan message.InboundMessage, destinationClients map[ids.ID]vms.DestinationClient, shouldProcessMissedBlocks bool, - warpQuorum config.WarpQuorum, ) (*Relayer, vms.Subscriber, error) { sub := vms.NewSubscriber(logger, sourceSubnetInfo, db) @@ -124,7 +122,6 @@ func NewRelayer( db: db, supportedDestinations: supportedDestinationsBlockchainIDs, apiNodeURI: uri, - warpQuorum: warpQuorum, } // Open the subscription. We must do this before processing any missed messages, otherwise we may miss an incoming message @@ -221,7 +218,12 @@ func (r *Relayer) processMissedBlocks( } // RelayMessage relays a single warp message to the destination chain. Warp message relay requests from the same origin chain are processed serially -func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *MessageRelayerMetrics, messageCreator message.Creator) error { +func (r *Relayer) RelayMessage( + warpLogInfo *vmtypes.WarpLogInfo, + metrics *MessageRelayerMetrics, + messageCreator message.Creator, + cfg *config.Config, +) error { r.logger.Info( "Relaying message", zap.String("blockchainID", r.sourceBlockchainID.String()), @@ -274,7 +276,16 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag } // Create and run the message relayer to attempt to deliver the message to the destination chain - messageRelayer := newMessageRelayer(r.logger, metrics, r, warpMessageInfo.WarpUnsignedMessage, destinationBlockchainID, r.responseChan, messageCreator) + messageRelayer := newMessageRelayer( + r.logger, + metrics, + r, + warpMessageInfo.WarpUnsignedMessage, + destinationBlockchainID, + cfg.GetWarpQuorum()[destinationBlockchainID], + r.responseChan, + messageCreator, + ) if err != nil { r.logger.Error( "Failed to create message relayer",