Skip to content

Commit

Permalink
remove db from subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Jan 12, 2024
1 parent d5d56d5 commit cfeb2c8
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 63 deletions.
50 changes: 47 additions & 3 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package relayer

import (
"context"
"errors"
"fmt"
"math/big"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/ava-labs/awm-relayer/utils"
vms "github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ava-labs/coreth/ethclient"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
Expand All @@ -40,6 +42,7 @@ type Relayer struct {
logger logging.Logger
db database.RelayerDatabase
supportedDestinations set.Set[ids.ID]
rpcEndpoint string
apiNodeURI string
}

Expand All @@ -53,7 +56,7 @@ func NewRelayer(
destinationClients map[ids.ID]vms.DestinationClient,
shouldProcessMissedBlocks bool,
) (*Relayer, vms.Subscriber, error) {
sub := vms.NewSubscriber(logger, sourceSubnetInfo, db)
sub := vms.NewSubscriber(logger, sourceSubnetInfo)

subnetID, err := ids.FromString(sourceSubnetInfo.SubnetID)
if err != nil {
Expand Down Expand Up @@ -99,7 +102,8 @@ func NewRelayer(
messageManagers[addressHash] = messageManager
}

uri := utils.StripFromString(sourceSubnetInfo.GetNodeRPCEndpoint(), "/ext")
rpcEndpoint := sourceSubnetInfo.GetNodeRPCEndpoint()
uri := utils.StripFromString(rpcEndpoint, "/ext")

logger.Info(
"Creating relayer",
Expand All @@ -121,6 +125,7 @@ func NewRelayer(
logger: logger,
db: db,
supportedDestinations: supportedDestinationsBlockchainIDs,
rpcEndpoint: rpcEndpoint,
apiNodeURI: uri,
}

Expand All @@ -146,7 +151,7 @@ func NewRelayer(
}
sub.ProcessFromHeight(height)
} else {
err = sub.SetProcessedBlockHeightToLatest()
err = r.setProcessedBlockHeightToLatest()
if err != nil {
logger.Warn(
"Failed to update latest processed block. Continuing to normal relaying operation",
Expand Down Expand Up @@ -231,6 +236,45 @@ func (r *Relayer) setProcessedBlockHeight(
return nil, height
}

func (r *Relayer) setProcessedBlockHeightToLatest() error {
ethClient, err := ethclient.Dial(r.rpcEndpoint)
if err != nil {
r.logger.Error(
"Failed to dial node",
zap.String("blockchainID", r.sourceBlockchainID.String()),
zap.Error(err),
)
return err
}

latestBlock, err := ethClient.BlockNumber(context.Background())
if err != nil {
r.logger.Error(
"Failed to get latest block",
zap.String("blockchainID", r.sourceBlockchainID.String()),
zap.Error(err),
)
return err
}

r.logger.Info(
"Updating latest processed block in database",
zap.String("blockchainID", r.sourceBlockchainID.String()),
zap.Uint64("latestBlock", latestBlock),
)

err = r.db.Put(r.sourceBlockchainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(latestBlock, 10)))
if err != nil {
r.logger.Error(
fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey),
zap.String("blockchainID", r.sourceBlockchainID.String()),
zap.Error(err),
)
return err
}
return nil
}

// RelayMessage relays a single warp message to the destination chain. Warp message relay requests from the same origin chain are processed serially
func (r *Relayer) RelayMessage(warpLogInfo *vmtypes.WarpLogInfo, metrics *MessageRelayerMetrics, messageCreator message.Creator) error {
r.logger.Info(
Expand Down
45 changes: 1 addition & 44 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import (
"fmt"
"math/big"
"sort"
"strconv"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
Expand Down Expand Up @@ -60,14 +58,13 @@ type subscriber struct {
sub interfaces.Subscription

logger logging.Logger
db database.RelayerDatabase

// seams for mock injection:
dial func(url string) (ethclient.Client, error)
}

// NewSubscriber returns a subscriber
func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db database.RelayerDatabase) *subscriber {
func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet) *subscriber {
blockchainID, err := ids.FromString(subnetInfo.BlockchainID)
if err != nil {
logger.Error(
Expand All @@ -84,7 +81,6 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db dat
nodeRPCURL: subnetInfo.GetNodeRPCEndpoint(),
blockchainID: blockchainID,
logger: logger,
db: db,
logsChan: logs,
dial: ethclient.Dial,
}
Expand Down Expand Up @@ -234,45 +230,6 @@ func (s *subscriber) processBlockRange(
return nil
}

func (s *subscriber) SetProcessedBlockHeightToLatest() error {
ethClient, err := s.dial(s.nodeWSURL)
if err != nil {
s.logger.Error(
"Failed to dial node",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
return err
}

latestBlock, err := ethClient.BlockNumber(context.Background())
if err != nil {
s.logger.Error(
"Failed to get latest block",
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
return err
}

s.logger.Info(
"Updating latest processed block in database",
zap.String("blockchainID", s.blockchainID.String()),
zap.Uint64("latestBlock", latestBlock),
)

err = s.db.Put(s.blockchainID, []byte(database.LatestProcessedBlockKey), []byte(strconv.FormatUint(latestBlock, 10)))
if err != nil {
s.logger.Error(
fmt.Sprintf("failed to put %s into database", database.LatestProcessedBlockKey),
zap.String("blockchainID", s.blockchainID.String()),
zap.Error(err),
)
return err
}
return nil
}

func (s *subscriber) Subscribe() error {
// Retry subscribing until successful. Attempt to resubscribe maxResubscribeAttempts times
for attempt := 0; attempt < maxResubscribeAttempts; attempt++ {
Expand Down
11 changes: 1 addition & 10 deletions vms/evm/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import (
"os"
"testing"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ava-labs/subnet-evm/x/warp"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

Expand All @@ -39,14 +36,8 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient
),
)

subnetId, err := ids.FromString(sourceSubnet.BlockchainID)
require.NoError(t, err, "Failed to create subnet ID")

db, err := database.NewJSONFileStorage(logger, t.TempDir(), []ids.ID{subnetId})
require.NoError(t, err, "Failed to create JSON file storage")

mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t))
subscriber := NewSubscriber(logger, sourceSubnet, db)
subscriber := NewSubscriber(logger, sourceSubnet)
subscriber.dial = func(_url string) (ethclient.Client, error) { return mockEthClient, nil }

return subscriber, mockEthClient
Expand Down
8 changes: 2 additions & 6 deletions vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/database"
"github.com/ava-labs/awm-relayer/vms/evm"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
)
Expand All @@ -20,9 +19,6 @@ type Subscriber interface {
// ProcessFromHeight processes events from {height} to the latest block
ProcessFromHeight(height *big.Int) error

// SetProcessedBlockHeightToLatest retrieves the latest block from the chain and updates the database
SetProcessedBlockHeightToLatest() error

// Subscribe registers a subscription. After Subscribe is called,
// log events that match [filter] are written to the channel returned
// by Logs
Expand All @@ -40,10 +36,10 @@ type Subscriber interface {
}

// NewSubscriber returns a concrete Subscriber according to the VM specified by [subnetInfo]
func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet, db database.RelayerDatabase) Subscriber {
func NewSubscriber(logger logging.Logger, subnetInfo config.SourceSubnet) Subscriber {
switch config.ParseVM(subnetInfo.VM) {
case config.EVM:
return evm.NewSubscriber(logger, subnetInfo, db)
return evm.NewSubscriber(logger, subnetInfo)
default:
return nil
}
Expand Down

0 comments on commit cfeb2c8

Please sign in to comment.