Skip to content

Commit

Permalink
Merge pull request #613 from ava-labs/timeout-backoff-ops
Browse files Browse the repository at this point in the history
Add timeout to remote ops
  • Loading branch information
cam-schultz authored Jan 6, 2025
2 parents 7d1e095 + 990eb2c commit 740eeb2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
7 changes: 5 additions & 2 deletions relayer/application_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

const (
retryTimeout = 10 * time.Second
retryTimeout = 10 * time.Second
warpAPITimeout = utils.DefaultRPCTimeout + peers.DefaultAppRequestTimeout
)

// Errors
Expand Down Expand Up @@ -270,9 +271,11 @@ func (r *ApplicationRelayer) createSignedMessage(
signedWarpMessageBytes hexutil.Bytes
err error
)
cctx, cancel := context.WithTimeout(context.Background(), warpAPITimeout)
defer cancel()
operation := func() error {
return r.sourceWarpSignatureClient.CallContext(
context.Background(),
cctx,
&signedWarpMessageBytes,
"warp_getMessageAggregateSignature",
unsignedMessage.ID(),
Expand Down
3 changes: 2 additions & 1 deletion utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
)

// WithRetriesTimeout uses an exponential backoff to run the operation until it
// succeeds or timeout limit has been reached.
// succeeds or timeout limit has been reached. It is the caller's responsibility
// to ensure {operation} returns. It is safe for {operation} to take longer than {timeout}.
func WithRetriesTimeout(
logger logging.Logger,
operation backoff.Operation,
Expand Down
14 changes: 10 additions & 4 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,14 @@ func (s *subscriber) processBlockRange(
}

func (s *subscriber) getHeaderByNumberRetryable(headerNumber *big.Int) (*types.Header, error) {
var err error
var header *types.Header
var (
err error
header *types.Header
)
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCTimeout)
defer cancel()
operation := func() (err error) {
header, err = s.rpcClient.HeaderByNumber(context.Background(), headerNumber)
header, err = s.rpcClient.HeaderByNumber(cctx, headerNumber)
return err
}
err = utils.WithRetriesTimeout(s.logger, operation, utils.DefaultRPCTimeout)
Expand Down Expand Up @@ -161,8 +165,10 @@ func (s *subscriber) Subscribe(retryTimeout time.Duration) error {
// subscribe until it succeeds or reached timeout.
func (s *subscriber) subscribe(retryTimeout time.Duration) error {
var sub interfaces.Subscription
cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCTimeout)
defer cancel()
operation := func() (err error) {
sub, err = s.wsClient.SubscribeNewHead(context.Background(), s.headers)
sub, err = s.wsClient.SubscribeNewHead(cctx, s.headers)
return err
}
err := utils.WithRetriesTimeout(s.logger, operation, retryTimeout)
Expand Down

0 comments on commit 740eeb2

Please sign in to comment.