From a68fd4ce8f5a48bc523329713ab7b7638eb11af3 Mon Sep 17 00:00:00 2001 From: cam-schultz Date: Fri, 15 Dec 2023 01:41:39 +0000 Subject: [PATCH] catch up height cfg --- config/config.go | 1 + relayer/relayer.go | 54 ++++++++++++++++++++++++++++------------------ utils/utils.go | 7 ++++++ 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/config/config.go b/config/config.go index b0bd8bfe..20ead58a 100644 --- a/config/config.go +++ b/config/config.go @@ -47,6 +47,7 @@ type SourceSubnet struct { WSEndpoint string `mapstructure:"ws-endpoint" json:"ws-endpoint"` MessageContracts map[string]MessageProtocolConfig `mapstructure:"message-contracts" json:"message-contracts"` SupportedDestinations []string `mapstructure:"supported-destinations" json:"supported-destinations"` + CatchUpBlockHeight *big.Int `mapstructure:"catch-up-block-height" json:"catch-up-block-height"` // convenience field to access the supported destinations after initialization supportedDestinations set.Set[ids.ID] diff --git a/relayer/relayer.go b/relayer/relayer.go index 218f0d53..8a67d6ea 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -19,6 +19,7 @@ import ( "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/peers" + "github.com/ava-labs/awm-relayer/utils" vms "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/awm-relayer/vms/vmtypes" "github.com/ethereum/go-ethereum/common" @@ -131,7 +132,7 @@ func NewRelayer( } if shouldProcessMissedBlocks { - err = r.processMissedBlocks(sub) + err = r.processMissedBlocks(sub, sourceSubnetInfo.CatchUpBlockHeight) if err != nil { logger.Error( "Failed to process historical blocks mined during relayer downtime", @@ -155,27 +156,47 @@ func NewRelayer( func (r *Relayer) processMissedBlocks( sub vms.Subscriber, + catchUpBlockHeight *big.Int, ) error { - // Get the latest processed block height from the database. + // Attempt to get the latest processed block height from the database. + // Note that the retrieved latest processed block may have already been partially (or fully) processed by the relayer on a previous run. When + // processing a warp message in real time, which is when we update the latest processed block in the database, we have no way of knowing + // if that is the last warp message in the block latestProcessedBlockData, err := r.db.Get(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey)) - // The following cases are treated as successful: + // First, determine the height to process from. There are two cases: // 1) The database contains the latest processed block data for the chain - // - In this case, we parse the block height and process warp logs from that height to the current block + // - In this case, we process from the maximum of the latest processed block and the catch up block height to the latest block // 2) The database has been configured for the chain, but does not contain the latest processed block data - // - In this case, we save the current block height in the database, but do not process any historical warp logs + // - In this case, if a catch up block height is provided, we process from the catch up block height to the latest block + // - Otherwise, we save the current block height in the database, but do not process any historical warp logs + var height *big.Int = nil if err == nil { - // If the database contains the latest processed block data, then back-process all warp messages from that block to the latest block - // Note that the retrieved latest processed block may have already been partially (or fully) processed by the relayer on a previous run. When - // processing a warp message in real time, which is when we update the latest processed block in the database, we have no way of knowing - // if that is the last warp message in the block + // Use the max of the latest processed block and the catch up block height latestProcessedBlock, success := new(big.Int).SetString(string(latestProcessedBlockData), 10) if !success { r.logger.Error("failed to convert latest block to big.Int", zap.Error(err)) return err } + height = utils.MaxBigInt(latestProcessedBlock, catchUpBlockHeight) + } else if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) { + // If the database does not contain the latest processed block data, then we check if a catch up block height is provided. + if catchUpBlockHeight != nil { + height = catchUpBlockHeight + } + } else { + // Otherwise, we've encountered an unknown database error + r.logger.Warn( + "failed to get latest block from database", + zap.String("blockchainID", r.sourceBlockchainID.String()), + zap.Error(err), + ) + return err + } - err = sub.ProcessFromHeight(latestProcessedBlock) + // If we've determined a height to process from, then we process from that height to the latest block. + if height != nil { + err = sub.ProcessFromHeight(height) if err != nil { r.logger.Warn( "Encountered an error when processing historical blocks. Continuing to normal relaying operation.", @@ -184,9 +205,8 @@ func (r *Relayer) processMissedBlocks( ) } return nil - } - if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) { - // Otherwise, latestProcessedBlock is nil, so we instead store the latest block height. + } else { + // Otherwise, do not process any historical blocks, and store the latest block height r.logger.Info( "Latest processed block not found in database. Starting from latest block.", zap.String("blockchainID", r.sourceBlockchainID.String()), @@ -202,14 +222,6 @@ func (r *Relayer) processMissedBlocks( } return nil } - - // If neither of the above conditions are met, then we return an error - r.logger.Warn( - "failed to get latest block from database", - zap.String("blockchainID", r.sourceBlockchainID.String()), - zap.Error(err), - ) - return err } // RelayMessage relays a single warp message to the destination chain. Warp message relay requests from the same origin chain are processed serially diff --git a/utils/utils.go b/utils/utils.go index 29fa3f6f..c5cdb28e 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -97,3 +97,10 @@ func SanitizeHexString(hex string) string { } return hex } + +func MaxBigInt(a, b *big.Int) *big.Int { + if a.Cmp(b) > 0 { + return new(big.Int).Set(a) + } + return new(big.Int).Set(b) +}