Skip to content

Commit

Permalink
handle ProcessFromHeight errors
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Jan 17, 2024
1 parent f0c2558 commit 2da8eb8
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 22 deletions.
8 changes: 0 additions & 8 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,6 @@ func runRelayer(
zap.String("originBlockchainID", sourceSubnetInfo.BlockchainID),
)

// Marks when the relayer has finished the catch-up process on startup.
// Until that time, we do not know the order in which messages are processed,
// since the catch-up process occurs concurrently with normal message processing
// via the subscriber's Subscribe method. As a result, we cannot safely write the
// latest processed block to the database without risking missing a block in a fault
// scenario.
doneCatchingUp := make(chan bool, 1)
relayer, err := relayer.NewRelayer(
logger,
metrics,
Expand All @@ -244,7 +237,6 @@ func runRelayer(
destinationClients,
messageCreator,
shouldProcessMissedBlocks,
doneCatchingUp,
relayerHealth,
)
if err != nil {
Expand Down
31 changes: 25 additions & 6 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Relayer struct {
rpcEndpoint string
apiNodeURI string
messageCreator message.Creator
catchUpResultChan chan bool
healthStatus *atomic.Bool
}

Expand All @@ -75,7 +76,6 @@ func NewRelayer(
destinationClients map[ids.ID]vms.DestinationClient,
messageCreator message.Creator,
shouldProcessMissedBlocks bool,
doneProcessingMissedBlocks chan bool,
relayerHealth *atomic.Bool,
) (*Relayer, error) {
sub := vms.NewSubscriber(logger, sourceSubnetInfo)
Expand Down Expand Up @@ -127,6 +127,14 @@ func NewRelayer(
rpcEndpoint := sourceSubnetInfo.GetNodeRPCEndpoint()
uri := utils.StripFromString(rpcEndpoint, "/ext")

// Marks when the relayer has finished the catch-up process on startup.
// Until that time, we do not know the order in which messages are processed,
// since the catch-up process occurs concurrently with normal message processing
// via the subscriber's Subscribe method. As a result, we cannot safely write the
// latest processed block to the database without risking missing a block in a fault
// scenario.
catchUpResultChan := make(chan bool, 1)

logger.Info(
"Creating relayer",
zap.String("subnetID", subnetID.String()),
Expand All @@ -152,6 +160,7 @@ func NewRelayer(
rpcEndpoint: rpcEndpoint,
apiNodeURI: uri,
messageCreator: messageCreator,
catchUpResultChan: catchUpResultChan,
healthStatus: relayerHealth,
}

Expand All @@ -178,8 +187,7 @@ func NewRelayer(
// Process historical blocks asynchronously so that the main processing loop can
// start processing new blocks as soon as possible. Otherwise, it's possible for
// ProcessFromHeight to overload the message queue and cause a deadlock.
// TODONOW: Handle the error case here
go sub.ProcessFromHeight(big.NewInt(0).SetUint64(height), doneProcessingMissedBlocks)
go sub.ProcessFromHeight(big.NewInt(0).SetUint64(height), r.catchUpResultChan)
} else {
err = r.setProcessedBlockHeightToLatest()
if err != nil {
Expand All @@ -190,7 +198,7 @@ func NewRelayer(
)
return nil, err
}
doneProcessingMissedBlocks <- true
r.catchUpResultChan <- true
}

return &r, nil
Expand All @@ -200,8 +208,20 @@ func NewRelayer(
// On subscriber error, attempts to reconnect and errors if unable.
// Exits if context is cancelled by another goroutine.
func (r *Relayer) ProcessLogs(ctx context.Context) error {
doneCatchingUp := false
for {
select {
case catchUpResult := <-r.catchUpResultChan:
if !catchUpResult {
r.healthStatus.Store(false)
r.logger.Error(
"Failed to catch up on historical blocks. Exiting relayer goroutine.",
zap.String("originChainId", r.sourceBlockchainID.String()),
)
return fmt.Errorf("failed to catch up on historical blocks")
} else {
doneCatchingUp = true
}
case txLog := <-r.Subscriber.Logs():
r.logger.Info(
"Handling Teleporter submit message log.",
Expand All @@ -211,8 +231,7 @@ func (r *Relayer) ProcessLogs(ctx context.Context) error {
)

// Relay the message to the destination chain. Continue on failure.
// TODONOW: set the bool here only once we've finished catching up
err := r.RelayMessage(&txLog, true)
err := r.RelayMessage(&txLog, doneCatchingUp)
if err != nil {
r.logger.Error(
"Error relaying message",
Expand Down
15 changes: 9 additions & 6 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,21 @@ func (s *subscriber) forwardLogs() {
// number of blocks retrieved in a single eth_getLogs request to
// `MaxBlocksPerRequest`; if processing more than that, multiple eth_getLogs
// requests will be made.
func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) error {
// Writes true to the done channel when finished, or false if an error occurs
func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) {
s.logger.Info(
"Processing historical logs",
zap.String("fromBlockHeight", height.String()),
zap.String("blockchainID", s.blockchainID.String()),
)
if height == nil {
return fmt.Errorf("cannot process logs from nil height")
s.logger.Error("cannot process logs from nil height")
done <- false
}
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
return err
s.logger.Error("failed to dial eth client", zap.Error(err))
done <- false
}

// Grab the latest block before filtering logs so we don't miss any before updating the db
Expand All @@ -149,7 +152,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) error {
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
return err
done <- false
}

bigLatestBlockHeight := big.NewInt(0).SetUint64(latestBlockHeight)
Expand All @@ -167,11 +170,11 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) error {

err = s.processBlockRange(ethClient, fromBlock, toBlock)
if err != nil {
return err
s.logger.Error("failed to process block range", zap.Error(err))
done <- false
}
}
done <- true
return nil
}

// Filter logs from the latest processed block to the latest block
Expand Down
5 changes: 3 additions & 2 deletions vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
// channel returned by Logs() are assumed to be in block order. Logs within individual blocks
// may be in any order.
type Subscriber interface {
// ProcessFromHeight processes events from {height} to the latest block
ProcessFromHeight(height *big.Int, done chan bool) error
// ProcessFromHeight processes events from {height} to the latest block.
// Writes true to the channel on success, false on failure
ProcessFromHeight(height *big.Int, done chan bool)

// Subscribe registers a subscription. After Subscribe is called,
// log events that match [filter] are written to the channel returned
Expand Down

0 comments on commit 2da8eb8

Please sign in to comment.