Skip to content

Commit

Permalink
catch up height cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Dec 15, 2023
1 parent 91e254f commit a68fd4c
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SourceSubnet struct {
WSEndpoint string `mapstructure:"ws-endpoint" json:"ws-endpoint"`
MessageContracts map[string]MessageProtocolConfig `mapstructure:"message-contracts" json:"message-contracts"`
SupportedDestinations []string `mapstructure:"supported-destinations" json:"supported-destinations"`
CatchUpBlockHeight *big.Int `mapstructure:"catch-up-block-height" json:"catch-up-block-height"`

// convenience field to access the supported destinations after initialization
supportedDestinations set.Set[ids.ID]
Expand Down
54 changes: 33 additions & 21 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ava-labs/awm-relayer/database"
"github.com/ava-labs/awm-relayer/messages"
"github.com/ava-labs/awm-relayer/peers"
"github.com/ava-labs/awm-relayer/utils"
vms "github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -131,7 +132,7 @@ func NewRelayer(
}

if shouldProcessMissedBlocks {
err = r.processMissedBlocks(sub)
err = r.processMissedBlocks(sub, sourceSubnetInfo.CatchUpBlockHeight)
if err != nil {
logger.Error(
"Failed to process historical blocks mined during relayer downtime",
Expand All @@ -155,27 +156,47 @@ func NewRelayer(

func (r *Relayer) processMissedBlocks(
sub vms.Subscriber,
catchUpBlockHeight *big.Int,
) error {
// Get the latest processed block height from the database.
// Attempt to get the latest processed block height from the database.
// Note that the retrieved latest processed block may have already been partially (or fully) processed by the relayer on a previous run. When
// processing a warp message in real time, which is when we update the latest processed block in the database, we have no way of knowing
// if that is the last warp message in the block
latestProcessedBlockData, err := r.db.Get(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey))

// The following cases are treated as successful:
// First, determine the height to process from. There are two cases:
// 1) The database contains the latest processed block data for the chain
// - In this case, we parse the block height and process warp logs from that height to the current block
// - In this case, we process from the maximum of the latest processed block and the catch up block height to the latest block
// 2) The database has been configured for the chain, but does not contain the latest processed block data
// - In this case, we save the current block height in the database, but do not process any historical warp logs
// - In this case, if a catch up block height is provided, we process from the catch up block height to the latest block
// - Otherwise, we save the current block height in the database, but do not process any historical warp logs
var height *big.Int = nil
if err == nil {
// If the database contains the latest processed block data, then back-process all warp messages from that block to the latest block
// Note that the retrieved latest processed block may have already been partially (or fully) processed by the relayer on a previous run. When
// processing a warp message in real time, which is when we update the latest processed block in the database, we have no way of knowing
// if that is the last warp message in the block
// Use the max of the latest processed block and the catch up block height
latestProcessedBlock, success := new(big.Int).SetString(string(latestProcessedBlockData), 10)
if !success {
r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
return err
}
height = utils.MaxBigInt(latestProcessedBlock, catchUpBlockHeight)
} else if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) {
// If the database does not contain the latest processed block data, then we check if a catch up block height is provided.
if catchUpBlockHeight != nil {
height = catchUpBlockHeight
}
} else {
// Otherwise, we've encountered an unknown database error
r.logger.Warn(
"failed to get latest block from database",
zap.String("blockchainID", r.sourceBlockchainID.String()),
zap.Error(err),
)
return err
}

err = sub.ProcessFromHeight(latestProcessedBlock)
// If we've determined a height to process from, then we process from that height to the latest block.
if height != nil {
err = sub.ProcessFromHeight(height)
if err != nil {
r.logger.Warn(
"Encountered an error when processing historical blocks. Continuing to normal relaying operation.",
Expand All @@ -184,9 +205,8 @@ func (r *Relayer) processMissedBlocks(
)
}
return nil
}
if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) {
// Otherwise, latestProcessedBlock is nil, so we instead store the latest block height.
} else {
// Otherwise, do not process any historical blocks, and store the latest block height
r.logger.Info(
"Latest processed block not found in database. Starting from latest block.",
zap.String("blockchainID", r.sourceBlockchainID.String()),
Expand All @@ -202,14 +222,6 @@ func (r *Relayer) processMissedBlocks(
}
return nil
}

// If neither of the above conditions are met, then we return an error
r.logger.Warn(
"failed to get latest block from database",
zap.String("blockchainID", r.sourceBlockchainID.String()),
zap.Error(err),
)
return err
}

// RelayMessage relays a single warp message to the destination chain. Warp message relay requests from the same origin chain are processed serially
Expand Down
7 changes: 7 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,10 @@ func SanitizeHexString(hex string) string {
}
return hex
}

func MaxBigInt(a, b *big.Int) *big.Int {
if a.Cmp(b) > 0 {
return new(big.Int).Set(a)
}
return new(big.Int).Set(b)
}

0 comments on commit a68fd4c

Please sign in to comment.