Skip to content

Commit

Permalink
use destination warp quorum to gather signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Jan 8, 2024
1 parent 86870da commit 2e57834
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 27 deletions.
25 changes: 17 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func setQuorumNumerator(cfgNumerator uint64) uint64 {
// Helper to retrieve the Warp Quorum from the chain config.
// Differentiates between subnet-evm and coreth RPC internally
func getWarpQuorum(
blockchainID ids.ID,
subnetID ids.ID,
client ethclient.Client,
) (WarpQuorum, error) {
Expand All @@ -263,7 +264,7 @@ func getWarpQuorum(
// Fetch the subnet's chain config
chainConfig, err := client.ChainConfig(context.Background())

Check failure on line 265 in config/config.go

View workflow job for this annotation

GitHub Actions / Unit tests

client.ChainConfig undefined (type ethclient.Client has no field or method ChainConfig)
if err != nil {
return WarpQuorum{}, fmt.Errorf("failed to fetch chain config for subnet %s: %v", subnetID, err)
return WarpQuorum{}, fmt.Errorf("failed to fetch chain config for blockchain %s: %v", blockchainID, err)
}

// First, check the list of precompile upgrades to get the most up to date Warp config
Expand All @@ -286,14 +287,18 @@ func getWarpQuorum(
QuorumDenominator: params.WarpQuorumDenominator,
}, nil
}
return WarpQuorum{}, fmt.Errorf("failed to find warp config for subnet %s", subnetID)
return WarpQuorum{}, fmt.Errorf("failed to find warp config for blockchain %s", blockchainID)
}

func (c *Config) initializeWarpQuorum() error {
c.warpQuorum = make(map[ids.ID]WarpQuorum)

// Fetch the Warp quorum values for each source subnet
for _, sourceSubnet := range c.SourceSubnets {
blockchainID, err := ids.FromString(sourceSubnet.BlockchainID)
if err != nil {
return fmt.Errorf("invalid blockchainID in configuration. error: %v", err)
}
subnetID, err := ids.FromString(sourceSubnet.SubnetID)
if err != nil {
return fmt.Errorf("invalid subnetID in configuration. error: %v", err)
Expand All @@ -304,37 +309,41 @@ func (c *Config) initializeWarpQuorum() error {
return fmt.Errorf("failed to dial source subnet %s: %v", subnetID, err)
}
defer client.Close()
quorum, err := getWarpQuorum(subnetID, client)
quorum, err := getWarpQuorum(blockchainID, subnetID, client)
if err != nil {
return err
}

c.warpQuorum[subnetID] = quorum
c.warpQuorum[blockchainID] = quorum
}

// Fetch the Warp quorum values for each destination subnet.
// We do this to properly handle Warp messages originating from the primary network
for _, destinationSubnet := range c.DestinationSubnets {
blockchainID, err := ids.FromString(destinationSubnet.BlockchainID)
if err != nil {
return fmt.Errorf("invalid blockchainID in configuration. error: %v", err)
}
subnetID, err := ids.FromString(destinationSubnet.SubnetID)
if err != nil {
return fmt.Errorf("invalid subnetID in configuration. error: %v", err)
}
if _, ok := c.warpQuorum[subnetID]; ok {
if _, ok := c.warpQuorum[blockchainID]; ok {
// We already fetched the quorum for this subnet
continue
}

client, err := ethclient.Dial(destinationSubnet.GetNodeRPCEndpoint())
if err != nil {
return fmt.Errorf("failed to dial destination subnet %s: %v", subnetID, err)
return fmt.Errorf("failed to dial destination blockchain %s: %v", blockchainID, err)
}
defer client.Close()
quorum, err := getWarpQuorum(subnetID, client)
quorum, err := getWarpQuorum(blockchainID, subnetID, client)
if err != nil {
return err
}

c.warpQuorum[subnetID] = quorum
c.warpQuorum[blockchainID] = quorum
}
return nil
}
Expand Down
11 changes: 9 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,13 @@ func TestGetRelayerAccountInfoSkipChainConfigCheckCompatible(t *testing.T) {
}

func TestGetWarpQuorum(t *testing.T) {
subnetID, err := ids.FromString("p433wpuXyJiDhyazPYyZMJeaoPSW76CBZ2x7wrVPLgvokotXz")
blockchainID, err := ids.FromString("p433wpuXyJiDhyazPYyZMJeaoPSW76CBZ2x7wrVPLgvokotXz")
require.NoError(t, err)
subnetID, err := ids.FromString("2PsShLjrFFwR51DMcAh8pyuwzLn1Ym3zRhuXLTmLCR1STk2mL6")

testCases := []struct {
name string
blockchainID ids.ID
subnetID ids.ID
chainConfig params.ChainConfig
getChainConfigCalls int
Expand All @@ -430,6 +432,7 @@ func TestGetWarpQuorum(t *testing.T) {
}{
{
name: "primary network",
blockchainID: blockchainID,
subnetID: ids.Empty,
getChainConfigCalls: 0,
expectedError: nil,
Expand All @@ -440,6 +443,7 @@ func TestGetWarpQuorum(t *testing.T) {
},
{
name: "subnet genesis precompile",
blockchainID: blockchainID,
subnetID: subnetID,
getChainConfigCalls: 1,
chainConfig: params.ChainConfig{
Expand All @@ -457,6 +461,7 @@ func TestGetWarpQuorum(t *testing.T) {
},
{
name: "subnet genesis precompile non-default",
blockchainID: blockchainID,
subnetID: subnetID,
getChainConfigCalls: 1,
chainConfig: params.ChainConfig{
Expand All @@ -474,6 +479,7 @@ func TestGetWarpQuorum(t *testing.T) {
},
{
name: "subnet upgrade precompile",
blockchainID: blockchainID,
subnetID: subnetID,
getChainConfigCalls: 1,
chainConfig: params.ChainConfig{
Expand All @@ -495,6 +501,7 @@ func TestGetWarpQuorum(t *testing.T) {
},
{
name: "subnet upgrade precompile non-default",
blockchainID: blockchainID,
subnetID: subnetID,
getChainConfigCalls: 1,
chainConfig: params.ChainConfig{
Expand Down Expand Up @@ -523,7 +530,7 @@ func TestGetWarpQuorum(t *testing.T) {
client.EXPECT().ChainConfig(gomock.Any()).Return(&testCase.chainConfig, nil).Times(testCase.getChainConfigCalls),
)

quorum, err := getWarpQuorum(testCase.subnetID, client)
quorum, err := getWarpQuorum(testCase.blockchainID, testCase.subnetID, client)
require.Equal(t, testCase.expectedError, err)
require.Equal(t, testCase.expectedQuorum, quorum)
})
Expand Down
10 changes: 1 addition & 9 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,6 @@ func runRelayer(logger logging.Logger,
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
)

subnetID, err := ids.FromString(sourceSubnetInfo.SubnetID)
if err != nil {
// The subnetID should have already been validated
panic(err)
}
quorum := cfg.GetWarpQuorum()[subnetID]

relayer, subscriber, err := relayer.NewRelayer(
logger,
db,
Expand All @@ -223,7 +216,6 @@ func runRelayer(logger logging.Logger,
responseChan,
destinationClients,
cfg.ProcessMissedBlocks,
quorum,
)
if err != nil {
logger.Error(
Expand All @@ -249,7 +241,7 @@ func runRelayer(logger logging.Logger,
)

// Relay the message to the destination chain. Continue on failure.
err = relayer.RelayMessage(&txLog, metrics, messageCreator)
err = relayer.RelayMessage(&txLog, metrics, messageCreator, &cfg)
if err != nil {
logger.Error(
"Error relaying message",
Expand Down
10 changes: 7 additions & 3 deletions relayer/message_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/utils"
Expand Down Expand Up @@ -56,6 +57,7 @@ type messageRelayer struct {
relayer *Relayer
warpMessage *warp.UnsignedMessage
destinationBlockchainID ids.ID
warpQuorum config.WarpQuorum
messageResponseChan chan message.InboundMessage
logger logging.Logger
messageCreator message.Creator
Expand All @@ -68,13 +70,15 @@ func newMessageRelayer(
relayer *Relayer,
warpMessage *warp.UnsignedMessage,
destinationBlockchainID ids.ID,
warpQuorum config.WarpQuorum,
messageResponseChan chan message.InboundMessage,
messageCreator message.Creator,
) *messageRelayer {
return &messageRelayer{
relayer: relayer,
warpMessage: warpMessage,
destinationBlockchainID: destinationBlockchainID,
warpQuorum: warpQuorum,
messageResponseChan: messageResponseChan,
logger: logger,
metrics: metrics,
Expand Down Expand Up @@ -182,7 +186,7 @@ func (r *messageRelayer) createSignedMessage() (*warp.Message, error) {
signedWarpMessageBytes, err = warpClient.GetMessageAggregateSignature(
context.Background(),
r.warpMessage.ID(),
r.relayer.warpQuorum.QuorumNumerator,
r.warpQuorum.QuorumNumerator,
signingSubnetID.String(),
)
if err == nil {
Expand Down Expand Up @@ -413,8 +417,8 @@ func (r *messageRelayer) createSignedMessageAppRequest(requestID uint32) (*warp.
if utils.CheckStakeWeightExceedsThreshold(
accumulatedSignatureWeight,
totalValidatorWeight,
r.relayer.warpQuorum.QuorumNumerator,
r.relayer.warpQuorum.QuorumDenominator,
r.warpQuorum.QuorumNumerator,
r.warpQuorum.QuorumDenominator,
) {
aggSig, vdrBitSet, err := r.aggregateSignatures(signatureMap)
if err != nil {
Expand Down
21 changes: 16 additions & 5 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Relayer struct {
db database.RelayerDatabase
supportedDestinations set.Set[ids.ID]
apiNodeURI string
warpQuorum config.WarpQuorum
}

func NewRelayer(
Expand All @@ -53,7 +52,6 @@ func NewRelayer(
responseChan chan message.InboundMessage,
destinationClients map[ids.ID]vms.DestinationClient,
shouldProcessMissedBlocks bool,
warpQuorum config.WarpQuorum,
) (*Relayer, vms.Subscriber, error) {
sub := vms.NewSubscriber(logger, sourceSubnetInfo, db)

Expand Down Expand Up @@ -124,7 +122,6 @@ func NewRelayer(
db: db,
supportedDestinations: supportedDestinationsBlockchainIDs,
apiNodeURI: uri,
warpQuorum: warpQuorum,
}

// Open the subscription. We must do this before processing any missed messages, otherwise we may miss an incoming message
Expand Down Expand Up @@ -221,7 +218,12 @@ func (r *Relayer) processMissedBlocks(
}

// RelayMessage relays a single warp message to the destination chain. Warp message relay requests from the same origin chain are processed serially
func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *MessageRelayerMetrics, messageCreator message.Creator) error {
func (r *Relayer) RelayMessage(
warpLogInfo *vmtypes.WarpLogInfo,
metrics *MessageRelayerMetrics,
messageCreator message.Creator,
cfg *config.Config,
) error {
r.logger.Info(
"Relaying message",
zap.String("blockchainID", r.sourceBlockchainID.String()),
Expand Down Expand Up @@ -274,7 +276,16 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *Messag
}

// Create and run the message relayer to attempt to deliver the message to the destination chain
messageRelayer := newMessageRelayer(r.logger, metrics, r, warpMessageInfo.WarpUnsignedMessage, destinationBlockchainID, r.responseChan, messageCreator)
messageRelayer := newMessageRelayer(
r.logger,
metrics,
r,
warpMessageInfo.WarpUnsignedMessage,
destinationBlockchainID,
cfg.GetWarpQuorum()[destinationBlockchainID],
r.responseChan,
messageCreator,
)
if err != nil {
r.logger.Error(
"Failed to create message relayer",
Expand Down

0 comments on commit 2e57834

Please sign in to comment.