Skip to content

Commit

Permalink
db access helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Jan 17, 2024
1 parent 2da8eb8 commit eedb6be
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,7 @@ func (r *Relayer) ProcessLogs(ctx context.Context) error {
// 2) The database has been configured for the chain, but does not contain the latest processed block data
// - In this case, we return the configured start block height
func (r *Relayer) calculateStartingBlockHeight(startBlockHeight uint64) (uint64, error) {
// Attempt to get the latest processed block height from the database.
// Note that there may be unrelayed messages in the latest processed block
// because it is updated as soon as a single message from that block is relayed,
// and there may be multiple message in the same block.
latestProcessedBlockData, err := r.db.Get(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey))
latestProcessedBlock, err := r.getLatestProcessedBlockHeight()
if errors.Is(err, database.ErrChainNotFound) || errors.Is(err, database.ErrKeyNotFound) {
// The database does not contain the latest processed block data for the chain, so use the configured StartBlockHeight instead
if startBlockHeight == 0 {
Expand All @@ -303,11 +299,6 @@ func (r *Relayer) calculateStartingBlockHeight(startBlockHeight uint64) (uint64,

// If the database does contain the latest processed block data for the chain,
// use the max of the latest processed block and the configured start block height (if it was provided)
latestProcessedBlock, err := strconv.ParseUint(string(latestProcessedBlockData), 10, 64)
if err != nil {
r.logger.Error("failed to parse Uint from the database", zap.Error(err))
return 0, err
}
if latestProcessedBlock > startBlockHeight {
r.logger.Info(
"Processing historical blocks from the latest processed block in the DB",
Expand Down Expand Up @@ -352,7 +343,7 @@ func (r *Relayer) setProcessedBlockHeightToLatest() error {
zap.Uint64("latestBlock", latestBlock),
)

err = r.db.Put(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(latestBlock, 10)))
err = r.storeProcessedBlockHeight(latestBlock)
if err != nil {
r.logger.Error(
fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey),
Expand Down Expand Up @@ -464,7 +455,7 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, storeProcessedH
// Update the database with the latest processed block height
// First, check that the stored height is less than the current block height
// This is necessary because the relayer may be processing blocks out of order on startup
latestProcessedBlockData, err := r.db.Get(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey))
latestProcessedBlock, err := r.getLatestProcessedBlockHeight()
if err != nil && !errors.Is(err, database.ErrChainNotFound) && !errors.Is(err, database.ErrKeyNotFound) {
r.logger.Error(
"Encountered an unknown error while getting latest processed block from database",
Expand All @@ -473,13 +464,9 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, storeProcessedH
)
return err
}
latestProcessedBlock, success := new(big.Int).SetString(string(latestProcessedBlockData), 10)
if !success {
r.logger.Error("failed to convert latest block to big.Int", zap.Error(err))
return err
}
if warpLogInfo.BlockNumber > latestProcessedBlock.Uint64() {
err = r.db.Put(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(warpLogInfo.BlockNumber, 10)))

if warpLogInfo.BlockNumber > latestProcessedBlock {
err = r.storeProcessedBlockHeight(warpLogInfo.BlockNumber)
if err != nil {
r.logger.Error(
fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey),
Expand All @@ -497,3 +484,24 @@ func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, storeProcessedH
func (r *Relayer) CheckSupportedDestination(destinationBlockchainID ids.ID) bool {
return len(r.supportedDestinations) == 0 || r.supportedDestinations.Contains(destinationBlockchainID)
}

// Get the latest processed block height from the database.
// Note that there may be unrelayed messages in the latest processed block
// because it is updated as soon as a single message from that block is relayed,
// and there may be multiple message in the same block.
func (r *Relayer) getLatestProcessedBlockHeight() (uint64, error) {
latestProcessedBlockData, err := r.db.Get(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey))
if err != nil {
return 0, err
}
latestProcessedBlock, err := strconv.ParseUint(string(latestProcessedBlockData), 10, 64)
if err != nil {
return 0, err
}
return latestProcessedBlock, nil
}

// Store the block height in the database. Does not check against the current latest processed block height.
func (r *Relayer) storeProcessedBlockHeight(height uint64) error {
return r.db.Put(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(height, 10)))
}

0 comments on commit eedb6be

Please sign in to comment.