From 7663a6fbad85c74eea42b85012254d011c5147b5 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Sun, 5 May 2024 22:14:06 +0200 Subject: [PATCH 01/15] ethclient: add local ethclient This Makes the eth client configurable with query-params and http-headers --- ethclient_utils/client.go | 51 ++++++++++++++++++++++++++++++++++ ethclient_utils/client_test.go | 45 ++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 ethclient_utils/client.go create mode 100644 ethclient_utils/client_test.go diff --git a/ethclient_utils/client.go b/ethclient_utils/client.go new file mode 100644 index 00000000..1f56470c --- /dev/null +++ b/ethclient_utils/client.go @@ -0,0 +1,51 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package ethclient_utils + +import ( + "context" + "errors" + "fmt" + "net/url" + + "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/rpc" +) + +var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") + +func DialWithConfig(ctx context.Context, endpoint string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { + endpoint, err := addQueryParams(endpoint, queryParams) + if err != nil { + return nil, err + } + client, err := rpc.DialOptions(ctx, endpoint, newClientOptions(httpHeaders)...) + if err != nil { + return nil, err + } + return ethclient.NewClient(client), nil +} + +// addQueryParams adds the query parameters to the url +func addQueryParams(endpoint string, queryParams map[string]string) (string, error) { + uri, err := url.ParseRequestURI(endpoint) + if err != nil { + return "", fmt.Errorf("%w: %v", ErrInvalidEndpoint, err) + } + values := uri.Query() + for key, value := range queryParams { + values.Add(key, value) + } + uri.RawQuery = values.Encode() + return uri.String(), nil +} + +// newClientOptions creates a ClientOption slice from httpHeaders +func newClientOptions(httpHeaders map[string]string) []rpc.ClientOption { + opts := make([]rpc.ClientOption, 0, len(httpHeaders)) + for key, value := range httpHeaders { + opts = append(opts, rpc.WithHeader(key, value)) + } + return opts +} diff --git a/ethclient_utils/client_test.go b/ethclient_utils/client_test.go new file mode 100644 index 00000000..cb53ab38 --- /dev/null +++ b/ethclient_utils/client_test.go @@ -0,0 +1,45 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package ethclient_utils + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAddQueryParams(t *testing.T) { + t.Run("NoQueryParams", func(t *testing.T) { + newurl, err := addQueryParams("https://avalabs.com", nil) + require.NoError(t, err) + require.Equal(t, "https://avalabs.com", newurl) + }) + t.Run("TwoQueryParams", func(t *testing.T) { + newurl, err := addQueryParams("https://avalabs.com", map[string]string{ + "first": "value1", + "second": "value2", + }) + require.NoError(t, err) + require.Equal(t, "https://avalabs.com?first=value1&second=value2", newurl) + }) + t.Run("InvalidEndpoint", func(t *testing.T) { + _, err := addQueryParams("invalid-endpoint", nil) + require.True(t, errors.Is(err, ErrInvalidEndpoint)) + }) +} + +func TestNewClientOptions(t *testing.T) { + t.Run("NoHttpHeaders", func(t *testing.T) { + opts := newClientOptions(nil) + require.Len(t, opts, 0) + }) + t.Run("TwoHttpHeaders", func(t *testing.T) { + opts := newClientOptions(map[string]string{ + "first": "value1", + "second": "value2", + }) + require.Len(t, opts, 2) + }) +} From e8e34f12828050c37daa17557b69a970eb31e2d0 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Sun, 5 May 2024 22:14:41 +0200 Subject: [PATCH 02/15] config: add new fields query-parameters & http-headers are added in the Source and Destination Blockchain Config Destination Warp Quorum initialization uses new config fields for the requests --- config/config.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index c9c76659..a566a6c2 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/awm-relayer/utils" + "github.com/ava-labs/awm-relayer/ethclient_utils" "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" @@ -86,6 +87,8 @@ type SourceBlockchain struct { BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` VM string `mapstructure:"vm" json:"vm"` RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` + HttpHeaders map[string]string `mapstructure:"http-headers" json:"http-headers"` + QueryParams map[string]string `mapstructure:"query-parameters" json:"query-parameters"` WSEndpoint string `mapstructure:"ws-endpoint" json:"ws-endpoint"` MessageContracts map[string]MessageProtocolConfig `mapstructure:"message-contracts" json:"message-contracts"` SupportedDestinations []*SupportedDestination `mapstructure:"supported-destinations" json:"supported-destinations"` @@ -100,13 +103,15 @@ type SourceBlockchain struct { // Destination blockchain configuration. Specifies how to connect to and issue transactions on the desination blockchain. type DestinationBlockchain struct { - SubnetID string `mapstructure:"subnet-id" json:"subnet-id"` - BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` - VM string `mapstructure:"vm" json:"vm"` - RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` - KMSKeyID string `mapstructure:"kms-key-id" json:"kms-key-id"` - KMSAWSRegion string `mapstructure:"kms-aws-region" json:"kms-aws-region"` - AccountPrivateKey string `mapstructure:"account-private-key" json:"account-private-key"` + SubnetID string `mapstructure:"subnet-id" json:"subnet-id"` + BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` + VM string `mapstructure:"vm" json:"vm"` + RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` + HttpHeaders map[string]string `mapstructure:"http-headers" json:"http-headers"` + QueryParams map[string]string `mapstructure:"query-parameters" json:"query-parameters"` + KMSKeyID string `mapstructure:"kms-key-id" json:"kms-key-id"` + KMSAWSRegion string `mapstructure:"kms-aws-region" json:"kms-aws-region"` + AccountPrivateKey string `mapstructure:"account-private-key" json:"account-private-key"` // Fetched from the chain after startup warpQuorum WarpQuorum @@ -597,7 +602,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error { return fmt.Errorf("invalid subnetID in configuration. error: %w", err) } - client, err := ethclient.Dial(s.RPCEndpoint) + client, err := ethclient_utils.DialWithConfig(context.Background(), s.RPCEndpoint, s.HttpHeaders, s.QueryParams) if err != nil { return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err) } From f2aad490bb895ddfd34d5d9e53899473cc23c660 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Sun, 5 May 2024 22:15:41 +0200 Subject: [PATCH 03/15] relayer: use configurable ethclient --- relayer/application_relayer.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 29f8326e..19ef1a4f 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -23,11 +23,11 @@ import ( avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/database" + "github.com/ava-labs/awm-relayer/ethclient_utils" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/peers" "github.com/ava-labs/awm-relayer/utils" coreEthMsg "github.com/ava-labs/coreth/plugin/evm/message" - "github.com/ava-labs/subnet-evm/ethclient" msg "github.com/ava-labs/subnet-evm/plugin/evm/message" warpBackend "github.com/ava-labs/subnet-evm/warp" "go.uber.org/zap" @@ -639,7 +639,12 @@ func (r *applicationRelayer) calculateStartingBlockHeight(processHistoricalBlock // Gets the height of the chain head, writes it to the database, then returns it. func (r *applicationRelayer) setProcessedBlockHeightToLatest() (uint64, error) { - ethClient, err := ethclient.Dial(r.sourceBlockchain.RPCEndpoint) + ethClient, err := ethclient_utils.DialWithConfig( + context.Background(), + r.sourceBlockchain.RPCEndpoint, + r.sourceBlockchain.HttpHeaders, + r.sourceBlockchain.QueryParams, + ) if err != nil { r.logger.Error( "Failed to dial node", From 97a48ccaaac0ec28bc40ca4aad8cbee558cd4976 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Sun, 5 May 2024 22:16:38 +0200 Subject: [PATCH 04/15] vms/evm: use configurable ethclient --- vms/evm/destination_client.go | 11 +++++++++-- vms/evm/subscriber.go | 13 +++++++++---- vms/evm/subscriber_test.go | 5 ++++- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index cfed5ef6..94e5fc18 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/ethclient_utils" "github.com/ava-labs/awm-relayer/vms/evm/signer" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" @@ -48,7 +49,12 @@ type destinationClient struct { func NewDestinationClient(logger logging.Logger, destinationBlockchain *config.DestinationBlockchain) (*destinationClient, error) { // Dial the destination RPC endpoint - client, err := ethclient.Dial(destinationBlockchain.RPCEndpoint) + client, err := ethclient_utils.DialWithConfig( + context.Background(), + destinationBlockchain.RPCEndpoint, + destinationBlockchain.HttpHeaders, + destinationBlockchain.QueryParams, + ) if err != nil { logger.Error( "Failed to dial rpc endpoint", @@ -107,7 +113,8 @@ func NewDestinationClient(logger logging.Logger, destinationBlockchain *config.D func (c *destinationClient) SendTx(signedMessage *avalancheWarp.Message, toAddress string, gasLimit uint64, - callData []byte) error { + callData []byte, +) error { // Synchronize teleporter message requests to the same destination chain so that message ordering is preserved c.lock.Lock() defer c.lock.Unlock() diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 432c8df9..7952e0f0 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -14,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/ethclient_utils" "github.com/ava-labs/awm-relayer/vms/vmtypes" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" @@ -49,6 +50,8 @@ var warpFilterQuery = interfaces.FilterQuery{ type subscriber struct { nodeWSURL string nodeRPCURL string + httpHeaders map[string]string + queryParams map[string]string blockchainID ids.ID logsChan chan vmtypes.WarpLogInfo evmLog <-chan types.Log @@ -57,7 +60,7 @@ type subscriber struct { logger logging.Logger // seams for mock injection: - dial func(url string) (ethclient.Client, error) + dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) } // NewSubscriber returns a subscriber @@ -76,10 +79,12 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceBlockchain) *s return &subscriber{ nodeWSURL: subnetInfo.WSEndpoint, nodeRPCURL: subnetInfo.RPCEndpoint, + httpHeaders: subnetInfo.HttpHeaders, + queryParams: subnetInfo.QueryParams, blockchainID: blockchainID, logger: logger, logsChan: logs, - dial: ethclient.Dial, + dial: ethclient_utils.DialWithConfig, } } @@ -140,7 +145,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) { s.logger.Error("cannot process logs from nil height") done <- false } - ethClient, err := s.dial(s.nodeWSURL) + ethClient, err := s.dial(context.Background(), s.nodeWSURL, s.httpHeaders, s.queryParams) if err != nil { s.logger.Error("failed to dial eth client", zap.Error(err)) done <- false @@ -272,7 +277,7 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { func (s *subscriber) dialAndSubscribe() error { // Dial the configured source chain endpoint // This needs to be a websocket - ethClient, err := s.dial(s.nodeWSURL) + ethClient, err := s.dial(context.Background(), s.nodeWSURL, s.httpHeaders, s.queryParams) if err != nil { return err } diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go index cbbbe880..5ec48643 100644 --- a/vms/evm/subscriber_test.go +++ b/vms/evm/subscriber_test.go @@ -4,6 +4,7 @@ package evm import ( + "context" "math/big" "testing" @@ -29,7 +30,9 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t)) subscriber := NewSubscriber(logger, sourceSubnet) - subscriber.dial = func(_url string) (ethclient.Client, error) { return mockEthClient, nil } + subscriber.dial = func(_ctx context.Context, _url string, _httpHeaders, _queryParams map[string]string) (ethclient.Client, error) { + return mockEthClient, nil + } return subscriber, mockEthClient } From aaa4e33de31b258794750e7db8cdb904ca297cf2 Mon Sep 17 00:00:00 2001 From: nathan haim Date: Thu, 9 May 2024 10:22:03 +0200 Subject: [PATCH 05/15] Apply suggestions from code review ethclient_utils: add comment for public factory function change comment date from 2023 to 2024 Co-authored-by: cam-schultz <78878559+cam-schultz@users.noreply.github.com> Signed-off-by: nathan haim --- ethclient_utils/client.go | 3 ++- ethclient_utils/client_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ethclient_utils/client.go b/ethclient_utils/client.go index 1f56470c..d86a39ac 100644 --- a/ethclient_utils/client.go +++ b/ethclient_utils/client.go @@ -1,4 +1,4 @@ -// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. package ethclient_utils @@ -15,6 +15,7 @@ import ( var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") +// DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options. func DialWithConfig(ctx context.Context, endpoint string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { endpoint, err := addQueryParams(endpoint, queryParams) if err != nil { diff --git a/ethclient_utils/client_test.go b/ethclient_utils/client_test.go index cb53ab38..bfadc86a 100644 --- a/ethclient_utils/client_test.go +++ b/ethclient_utils/client_test.go @@ -1,4 +1,4 @@ -// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. package ethclient_utils From 6f1dabb683ab59435fa171cf9d464ce894237dc1 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 10:55:46 +0200 Subject: [PATCH 06/15] rename packageethclient_utils as ethclient + resolve package name conflicts with subnet-evm/ethclient package --- config/config.go | 8 ++++---- {ethclient_utils => ethclient}/client.go | 2 +- {ethclient_utils => ethclient}/client_test.go | 2 +- messages/off-chain-registry/message_manager.go | 4 +--- relayer/application_relayer.go | 4 ++-- vms/evm/destination_client.go | 10 +++++----- vms/evm/subscriber.go | 10 +++++----- 7 files changed, 19 insertions(+), 21 deletions(-) rename {ethclient_utils => ethclient}/client.go (98%) rename {ethclient_utils => ethclient}/client_test.go (97%) diff --git a/config/config.go b/config/config.go index a566a6c2..85768522 100644 --- a/config/config.go +++ b/config/config.go @@ -17,8 +17,8 @@ import ( "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/awm-relayer/utils" - "github.com/ava-labs/awm-relayer/ethclient_utils" - "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/awm-relayer/ethclient" + evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" // Force-load precompiles to trigger registration @@ -360,7 +360,7 @@ func calculateQuorumNumerator(cfgNumerator uint64) uint64 { func getWarpQuorum( subnetID ids.ID, blockchainID ids.ID, - client ethclient.Client, + client evmethclient.Client, ) (WarpQuorum, error) { if subnetID == constants.PrimaryNetworkID { return WarpQuorum{ @@ -602,7 +602,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error { return fmt.Errorf("invalid subnetID in configuration. error: %w", err) } - client, err := ethclient_utils.DialWithConfig(context.Background(), s.RPCEndpoint, s.HttpHeaders, s.QueryParams) + client, err := ethclient.DialWithConfig(context.Background(), s.RPCEndpoint, s.HttpHeaders, s.QueryParams) if err != nil { return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err) } diff --git a/ethclient_utils/client.go b/ethclient/client.go similarity index 98% rename from ethclient_utils/client.go rename to ethclient/client.go index d86a39ac..86eb517e 100644 --- a/ethclient_utils/client.go +++ b/ethclient/client.go @@ -1,7 +1,7 @@ // Copyright (C) 2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package ethclient_utils +package ethclient import ( "context" diff --git a/ethclient_utils/client_test.go b/ethclient/client_test.go similarity index 97% rename from ethclient_utils/client_test.go rename to ethclient/client_test.go index bfadc86a..80b30b49 100644 --- a/ethclient_utils/client_test.go +++ b/ethclient/client_test.go @@ -1,7 +1,7 @@ // Copyright (C) 2024, Ava Labs, Inc. All rights reserved. // See the file LICENSE for licensing terms. -package ethclient_utils +package ethclient import ( "errors" diff --git a/messages/off-chain-registry/message_manager.go b/messages/off-chain-registry/message_manager.go index 8fbc2000..94062246 100644 --- a/messages/off-chain-registry/message_manager.go +++ b/messages/off-chain-registry/message_manager.go @@ -21,9 +21,7 @@ import ( "go.uber.org/zap" ) -var ( - OffChainRegistrySourceAddress = common.HexToAddress("0x0000000000000000000000000000000000000000") -) +var OffChainRegistrySourceAddress = common.HexToAddress("0x0000000000000000000000000000000000000000") const ( addProtocolVersionGasLimit uint64 = 500_000 diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 19ef1a4f..6dee6503 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -23,7 +23,7 @@ import ( avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" "github.com/ava-labs/awm-relayer/database" - "github.com/ava-labs/awm-relayer/ethclient_utils" + "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/peers" "github.com/ava-labs/awm-relayer/utils" @@ -639,7 +639,7 @@ func (r *applicationRelayer) calculateStartingBlockHeight(processHistoricalBlock // Gets the height of the chain head, writes it to the database, then returns it. func (r *applicationRelayer) setProcessedBlockHeightToLatest() (uint64, error) { - ethClient, err := ethclient_utils.DialWithConfig( + ethClient, err := ethclient.DialWithConfig( context.Background(), r.sourceBlockchain.RPCEndpoint, r.sourceBlockchain.HttpHeaders, diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index 94e5fc18..2f14fc94 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -14,10 +14,10 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/config" - "github.com/ava-labs/awm-relayer/ethclient_utils" + "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/vms/evm/signer" "github.com/ava-labs/subnet-evm/core/types" - "github.com/ava-labs/subnet-evm/ethclient" + evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" predicateutils "github.com/ava-labs/subnet-evm/predicate" "github.com/ethereum/go-ethereum/common" @@ -33,12 +33,12 @@ const ( // Client interface wraps the ethclient.Client interface for mocking purposes. type Client interface { - ethclient.Client + evmethclient.Client } // Implements DestinationClient type destinationClient struct { - client ethclient.Client + client evmethclient.Client lock *sync.Mutex destinationBlockchainID ids.ID signer signer.Signer @@ -49,7 +49,7 @@ type destinationClient struct { func NewDestinationClient(logger logging.Logger, destinationBlockchain *config.DestinationBlockchain) (*destinationClient, error) { // Dial the destination RPC endpoint - client, err := ethclient_utils.DialWithConfig( + client, err := ethclient.DialWithConfig( context.Background(), destinationBlockchain.RPCEndpoint, destinationBlockchain.HttpHeaders, diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 7952e0f0..973f411b 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -14,10 +14,10 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" - "github.com/ava-labs/awm-relayer/ethclient_utils" + "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/vms/vmtypes" "github.com/ava-labs/subnet-evm/core/types" - "github.com/ava-labs/subnet-evm/ethclient" + evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/interfaces" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" "github.com/ethereum/go-ethereum/common" @@ -60,7 +60,7 @@ type subscriber struct { logger logging.Logger // seams for mock injection: - dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) + dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (evmethclient.Client, error) } // NewSubscriber returns a subscriber @@ -84,7 +84,7 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceBlockchain) *s blockchainID: blockchainID, logger: logger, logsChan: logs, - dial: ethclient_utils.DialWithConfig, + dial: ethclient.DialWithConfig, } } @@ -188,7 +188,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) { // Since initializationFilterQuery does not modify existing fields of warpFilterQuery, // we can safely reuse warpFilterQuery with only a shallow copy func (s *subscriber) processBlockRange( - ethClient ethclient.Client, + ethClient evmethclient.Client, fromBlock, toBlock *big.Int, ) error { initializationFilterQuery := interfaces.FilterQuery{ From e6b2bc601f9db19bb64fb3e6f8025e6c329a902b Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 13:52:11 +0200 Subject: [PATCH 07/15] ethclient: modify factory + rename function --- ethclient/client.go | 14 ++++++++++---- ethclient/client_test.go | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/ethclient/client.go b/ethclient/client.go index 86eb517e..d49ece16 100644 --- a/ethclient/client.go +++ b/ethclient/client.go @@ -15,13 +15,19 @@ import ( var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") +type Config struct { + BaseURL string + QueryParams map[string]string + HTTPHeaders map[string]string +} + // DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options. -func DialWithConfig(ctx context.Context, endpoint string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { - endpoint, err := addQueryParams(endpoint, queryParams) +func DialWithConfig(ctx context.Context, cfg Config) (ethclient.Client, error) { + url, err := addQueryParams(cfg.BaseURL, cfg.QueryParams) if err != nil { return nil, err } - client, err := rpc.DialOptions(ctx, endpoint, newClientOptions(httpHeaders)...) + client, err := rpc.DialOptions(ctx, url, newClientHeaderOptions(cfg.HTTPHeaders)...) if err != nil { return nil, err } @@ -43,7 +49,7 @@ func addQueryParams(endpoint string, queryParams map[string]string) (string, err } // newClientOptions creates a ClientOption slice from httpHeaders -func newClientOptions(httpHeaders map[string]string) []rpc.ClientOption { +func newClientHeaderOptions(httpHeaders map[string]string) []rpc.ClientOption { opts := make([]rpc.ClientOption, 0, len(httpHeaders)) for key, value := range httpHeaders { opts = append(opts, rpc.WithHeader(key, value)) diff --git a/ethclient/client_test.go b/ethclient/client_test.go index 80b30b49..f31c16a6 100644 --- a/ethclient/client_test.go +++ b/ethclient/client_test.go @@ -32,11 +32,11 @@ func TestAddQueryParams(t *testing.T) { func TestNewClientOptions(t *testing.T) { t.Run("NoHttpHeaders", func(t *testing.T) { - opts := newClientOptions(nil) + opts := newClientHeaderOptions(nil) require.Len(t, opts, 0) }) t.Run("TwoHttpHeaders", func(t *testing.T) { - opts := newClientOptions(map[string]string{ + opts := newClientHeaderOptions(map[string]string{ "first": "value1", "second": "value2", }) From 42545247049f0ad672f6b5d395f5b16b36a580ff Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 13:55:38 +0200 Subject: [PATCH 08/15] config: change RPCEndpoint and WSEndpoint type updated from string to APIConfig --- config/config.go | 36 ++++++++++++++++-------------------- config/config_test.go | 14 ++++++++++---- config/test_utils.go | 30 +++++++++++++++++++++--------- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/config/config.go b/config/config.go index bef9d1c5..e6c9453f 100644 --- a/config/config.go +++ b/config/config.go @@ -86,10 +86,8 @@ type SourceBlockchain struct { SubnetID string `mapstructure:"subnet-id" json:"subnet-id"` BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` VM string `mapstructure:"vm" json:"vm"` - RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` - HttpHeaders map[string]string `mapstructure:"http-headers" json:"http-headers"` - QueryParams map[string]string `mapstructure:"query-parameters" json:"query-parameters"` - WSEndpoint string `mapstructure:"ws-endpoint" json:"ws-endpoint"` + RPCEndpoint APIConfig `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` + WSEndpoint APIConfig `mapstructure:"ws-endpoint" json:"ws-endpoint"` MessageContracts map[string]MessageProtocolConfig `mapstructure:"message-contracts" json:"message-contracts"` SupportedDestinations []*SupportedDestination `mapstructure:"supported-destinations" json:"supported-destinations"` ProcessHistoricalBlocksFromHeight uint64 `mapstructure:"process-historical-blocks-from-height" json:"process-historical-blocks-from-height"` @@ -103,15 +101,13 @@ type SourceBlockchain struct { // Destination blockchain configuration. Specifies how to connect to and issue transactions on the desination blockchain. type DestinationBlockchain struct { - SubnetID string `mapstructure:"subnet-id" json:"subnet-id"` - BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` - VM string `mapstructure:"vm" json:"vm"` - RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` - HttpHeaders map[string]string `mapstructure:"http-headers" json:"http-headers"` - QueryParams map[string]string `mapstructure:"query-parameters" json:"query-parameters"` - KMSKeyID string `mapstructure:"kms-key-id" json:"kms-key-id"` - KMSAWSRegion string `mapstructure:"kms-aws-region" json:"kms-aws-region"` - AccountPrivateKey string `mapstructure:"account-private-key" json:"account-private-key"` + SubnetID string `mapstructure:"subnet-id" json:"subnet-id"` + BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"` + VM string `mapstructure:"vm" json:"vm"` + RPCEndpoint APIConfig `mapstructure:"rpc-endpoint" json:"rpc-endpoint"` + KMSKeyID string `mapstructure:"kms-key-id" json:"kms-key-id"` + KMSAWSRegion string `mapstructure:"kms-aws-region" json:"kms-aws-region"` + AccountPrivateKey string `mapstructure:"account-private-key" json:"account-private-key"` // Fetched from the chain after startup warpQuorum WarpQuorum @@ -441,11 +437,11 @@ func (s *SourceBlockchain) Validate(destinationBlockchainIDs *set.Set[string]) e if _, err := ids.FromString(s.BlockchainID); err != nil { return fmt.Errorf("invalid blockchainID in source subnet configuration. Provided ID: %s", s.BlockchainID) } - if _, err := url.ParseRequestURI(s.WSEndpoint); err != nil { - return fmt.Errorf("invalid relayer subscribe URL in source subnet configuration: %w", err) + if err := s.RPCEndpoint.Validate(); err != nil { + return fmt.Errorf("invalid rpc-endpoint in source subnet configuration: %w", err) } - if _, err := url.ParseRequestURI(s.RPCEndpoint); err != nil { - return fmt.Errorf("invalid relayer RPC URL in source subnet configuration: %w", err) + if err := s.WSEndpoint.Validate(); err != nil { + return fmt.Errorf("invalid ws-endpoint in source subnet configuration: %w", err) } // Validate the VM specific settings @@ -548,8 +544,8 @@ func (s *DestinationBlockchain) Validate() error { if _, err := ids.FromString(s.BlockchainID); err != nil { return fmt.Errorf("invalid blockchainID in source subnet configuration. Provided ID: %s", s.BlockchainID) } - if _, err := url.ParseRequestURI(s.RPCEndpoint); err != nil { - return fmt.Errorf("invalid relayer broadcast URL: %w", err) + if err := s.RPCEndpoint.Validate(); err != nil { + return fmt.Errorf("invalid rpc-endpoint in destination subnet configuration: %w", err) } if s.KMSKeyID != "" { if s.KMSAWSRegion == "" { @@ -603,7 +599,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error { return fmt.Errorf("invalid subnetID in configuration. error: %w", err) } - client, err := ethclient.DialWithConfig(context.Background(), s.RPCEndpoint, s.HttpHeaders, s.QueryParams) + client, err := ethclient.DialWithConfig(context.Background(), ethclient.Config(s.RPCEndpoint)) if err != nil { return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err) } diff --git a/config/config_test.go b/config/config_test.go index e65984c9..ba889fbf 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -118,6 +118,7 @@ func TestGetRelayerAccountPrivateKey_set_pk_with_subnet_env(t *testing.T) { } runConfigModifierEnvVarTest(t, testCase) } + func TestGetRelayerAccountPrivateKey_set_pk_with_global_env(t *testing.T) { testCase := configMondifierEnvVarTestCase{ baseConfig: TestValidConfig, @@ -218,6 +219,7 @@ func TestEitherKMSOrAccountPrivateKey(t *testing.T) { } } } + func TestGetWarpQuorum(t *testing.T) { blockchainID, err := ids.FromString("p433wpuXyJiDhyazPYyZMJeaoPSW76CBZ2x7wrVPLgvokotXz") require.NoError(t, err) @@ -347,10 +349,14 @@ func TestGetWarpQuorum(t *testing.T) { func TestValidateSourceBlockchain(t *testing.T) { validSourceCfg := SourceBlockchain{ BlockchainID: testBlockchainID, - RPCEndpoint: fmt.Sprintf("http://test.avax.network/ext/bc/%s/rpc", testBlockchainID), - WSEndpoint: fmt.Sprintf("ws://test.avax.network/ext/bc/%s/ws", testBlockchainID), - SubnetID: testSubnetID, - VM: "evm", + RPCEndpoint: APIConfig{ + BaseURL: fmt.Sprintf("http://test.avax.network/ext/bc/%s/rpc", testBlockchainID), + }, + WSEndpoint: APIConfig{ + BaseURL: fmt.Sprintf("ws://test.avax.network/ext/bc/%s/ws", testBlockchainID), + }, + SubnetID: testSubnetID, + VM: "evm", SupportedDestinations: []*SupportedDestination{ { BlockchainID: testBlockchainID, diff --git a/config/test_utils.go b/config/test_utils.go index 603d8909..17e0e9b9 100644 --- a/config/test_utils.go +++ b/config/test_utils.go @@ -35,8 +35,12 @@ var ( }, SourceBlockchains: []*SourceBlockchain{ { - RPCEndpoint: fmt.Sprintf("http://test.avax.network/ext/bc/%s/rpc", testBlockchainID), - WSEndpoint: fmt.Sprintf("ws://test.avax.network/ext/bc/%s/ws", testBlockchainID), + RPCEndpoint: APIConfig{ + BaseURL: fmt.Sprintf("http://test.avax.network/ext/bc/%s/rpc", testBlockchainID), + }, + WSEndpoint: APIConfig{ + BaseURL: fmt.Sprintf("ws://test.avax.network/ext/bc/%s/ws", testBlockchainID), + }, BlockchainID: testBlockchainID, SubnetID: testSubnetID, VM: "evm", @@ -49,7 +53,9 @@ var ( }, DestinationBlockchains: []*DestinationBlockchain{ { - RPCEndpoint: fmt.Sprintf("http://test.avax.network/ext/bc/%s/rpc", testBlockchainID), + RPCEndpoint: APIConfig{ + BaseURL: fmt.Sprintf("http://test.avax.network/ext/bc/%s/rpc", testBlockchainID), + }, BlockchainID: testBlockchainID, SubnetID: testSubnetID, VM: "evm", @@ -58,8 +64,12 @@ var ( }, } TestValidSourceBlockchainConfig = SourceBlockchain{ - RPCEndpoint: "http://test.avax.network/ext/bc/C/rpc", - WSEndpoint: "ws://test.avax.network/ext/bc/C/ws", + RPCEndpoint: APIConfig{ + BaseURL: "http://test.avax.network/ext/bc/C/rpc", + }, + WSEndpoint: APIConfig{ + BaseURL: "ws://test.avax.network/ext/bc/C/ws", + }, BlockchainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", VM: "evm", @@ -70,10 +80,12 @@ var ( }, } TestValidDestinationBlockchainConfig = DestinationBlockchain{ - SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", - BlockchainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", - VM: "evm", - RPCEndpoint: "http://test.avax.network/ext/bc/C/rpc", + SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", + BlockchainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", + VM: "evm", + RPCEndpoint: APIConfig{ + BaseURL: "http://test.avax.network/ext/bc/C/rpc", + }, AccountPrivateKey: "1234567890123456789012345678901234567890123456789012345678901234", } ) From 3d415f462dda7073e1a7b7ebe5fcda61b45926b9 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 13:57:44 +0200 Subject: [PATCH 09/15] relayer: update new eth client usage --- relayer/application_relayer.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index 6dee6503..e9e85f10 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -191,7 +191,7 @@ func (r *applicationRelayer) relayMessage( func (r *applicationRelayer) createSignedMessage(unsignedMessage *avalancheWarp.UnsignedMessage) (*avalancheWarp.Message, error) { r.logger.Info("Fetching aggregate signature from the source chain validators via API") // TODO: To properly support this, we should provide a dedicated Warp API endpoint in the config - uri := utils.StripFromString(r.sourceBlockchain.RPCEndpoint, "/ext") + uri := utils.StripFromString(r.sourceBlockchain.RPCEndpoint.BaseURL, "/ext") warpClient, err := warpBackend.NewClient(uri, r.sourceBlockchain.GetBlockchainID().String()) if err != nil { r.logger.Error( @@ -641,9 +641,7 @@ func (r *applicationRelayer) calculateStartingBlockHeight(processHistoricalBlock func (r *applicationRelayer) setProcessedBlockHeightToLatest() (uint64, error) { ethClient, err := ethclient.DialWithConfig( context.Background(), - r.sourceBlockchain.RPCEndpoint, - r.sourceBlockchain.HttpHeaders, - r.sourceBlockchain.QueryParams, + ethclient.Config(r.sourceBlockchain.RPCEndpoint), ) if err != nil { r.logger.Error( From 8a8843f59d9f8705a6ecbf0435be22cefbe35072 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 13:58:03 +0200 Subject: [PATCH 10/15] tests: update new eth client usage --- tests/utils/utils.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/utils/utils.go b/tests/utils/utils.go index 1ee442ad..809159de 100644 --- a/tests/utils/utils.go +++ b/tests/utils/utils.go @@ -116,8 +116,12 @@ func CreateDefaultRelayerConfig( SubnetID: subnetInfo.SubnetID.String(), BlockchainID: subnetInfo.BlockchainID.String(), VM: config.EVM.String(), - RPCEndpoint: fmt.Sprintf("http://%s:%d/ext/bc/%s/rpc", host, port, subnetInfo.BlockchainID.String()), - WSEndpoint: fmt.Sprintf("ws://%s:%d/ext/bc/%s/ws", host, port, subnetInfo.BlockchainID.String()), + RPCEndpoint: config.APIConfig{ + BaseURL: fmt.Sprintf("http://%s:%d/ext/bc/%s/rpc", host, port, subnetInfo.BlockchainID.String()), + }, + WSEndpoint: config.APIConfig{ + BaseURL: fmt.Sprintf("ws://%s:%d/ext/bc/%s/ws", host, port, subnetInfo.BlockchainID.String()), + }, MessageContracts: map[string]config.MessageProtocolConfig{ teleporterContractAddress.Hex(): { @@ -149,10 +153,12 @@ func CreateDefaultRelayerConfig( Expect(err).Should(BeNil()) destinations[i] = &config.DestinationBlockchain{ - SubnetID: subnetInfo.SubnetID.String(), - BlockchainID: subnetInfo.BlockchainID.String(), - VM: config.EVM.String(), - RPCEndpoint: fmt.Sprintf("http://%s:%d/ext/bc/%s/rpc", host, port, subnetInfo.BlockchainID.String()), + SubnetID: subnetInfo.SubnetID.String(), + BlockchainID: subnetInfo.BlockchainID.String(), + VM: config.EVM.String(), + RPCEndpoint: config.APIConfig{ + BaseURL: fmt.Sprintf("http://%s:%d/ext/bc/%s/rpc", host, port, subnetInfo.BlockchainID.String()), + }, AccountPrivateKey: hex.EncodeToString(relayerKey.D.Bytes()), } From 6d90a872713366a4b8bf983a9a14df9eb0654a4f Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 13:58:29 +0200 Subject: [PATCH 11/15] vms: update new eth client usage --- vms/evm/destination_client.go | 4 +--- vms/evm/destination_client_test.go | 10 +++++---- vms/evm/subscriber.go | 34 +++++++++++++----------------- vms/evm/subscriber_test.go | 9 +++++--- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index 2f14fc94..f5a763f4 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -51,9 +51,7 @@ func NewDestinationClient(logger logging.Logger, destinationBlockchain *config.D // Dial the destination RPC endpoint client, err := ethclient.DialWithConfig( context.Background(), - destinationBlockchain.RPCEndpoint, - destinationBlockchain.HttpHeaders, - destinationBlockchain.QueryParams, + ethclient.Config(destinationBlockchain.RPCEndpoint), ) if err != nil { logger.Error( diff --git a/vms/evm/destination_client_test.go b/vms/evm/destination_client_test.go index 4ac9aa1d..4608fe5d 100644 --- a/vms/evm/destination_client_test.go +++ b/vms/evm/destination_client_test.go @@ -19,10 +19,12 @@ import ( ) var destinationSubnet = config.DestinationBlockchain{ - SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", - BlockchainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", - VM: config.EVM.String(), - RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc", + SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", + BlockchainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", + VM: config.EVM.String(), + RPCEndpoint: config.APIConfig{ + BaseURL: "https://subnets.avax.network/mysubnet/rpc", + }, AccountPrivateKey: "56289e99c94b6912bfc12adc093c9b51124f0dc54ac7a766b2bc5ccf558d8027", } diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 973f411b..e3e44bdb 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -48,19 +48,17 @@ var warpFilterQuery = interfaces.FilterQuery{ // subscriber implements Subscriber type subscriber struct { - nodeWSURL string - nodeRPCURL string - httpHeaders map[string]string - queryParams map[string]string - blockchainID ids.ID - logsChan chan vmtypes.WarpLogInfo - evmLog <-chan types.Log - sub interfaces.Subscription + nodeWSEndpoint ethclient.Config + nodeRPCEndpoint ethclient.Config + blockchainID ids.ID + logsChan chan vmtypes.WarpLogInfo + evmLog <-chan types.Log + sub interfaces.Subscription logger logging.Logger // seams for mock injection: - dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (evmethclient.Client, error) + dial func(ctx context.Context, cfg ethclient.Config) (evmethclient.Client, error) } // NewSubscriber returns a subscriber @@ -77,14 +75,12 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceBlockchain) *s logs := make(chan vmtypes.WarpLogInfo, maxClientSubscriptionBuffer) return &subscriber{ - nodeWSURL: subnetInfo.WSEndpoint, - nodeRPCURL: subnetInfo.RPCEndpoint, - httpHeaders: subnetInfo.HttpHeaders, - queryParams: subnetInfo.QueryParams, - blockchainID: blockchainID, - logger: logger, - logsChan: logs, - dial: ethclient.DialWithConfig, + nodeWSEndpoint: ethclient.Config(subnetInfo.WSEndpoint), + nodeRPCEndpoint: ethclient.Config(subnetInfo.RPCEndpoint), + blockchainID: blockchainID, + logger: logger, + logsChan: logs, + dial: ethclient.DialWithConfig, } } @@ -145,7 +141,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) { s.logger.Error("cannot process logs from nil height") done <- false } - ethClient, err := s.dial(context.Background(), s.nodeWSURL, s.httpHeaders, s.queryParams) + ethClient, err := s.dial(context.Background(), s.nodeWSEndpoint) if err != nil { s.logger.Error("failed to dial eth client", zap.Error(err)) done <- false @@ -277,7 +273,7 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { func (s *subscriber) dialAndSubscribe() error { // Dial the configured source chain endpoint // This needs to be a websocket - ethClient, err := s.dial(context.Background(), s.nodeWSURL, s.httpHeaders, s.queryParams) + ethClient, err := s.dial(context.Background(), s.nodeWSEndpoint) if err != nil { return err } diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go index 5ec48643..fd3734c9 100644 --- a/vms/evm/subscriber_test.go +++ b/vms/evm/subscriber_test.go @@ -10,9 +10,10 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/ethclient" mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks" "github.com/ava-labs/subnet-evm/core/types" - "github.com/ava-labs/subnet-evm/ethclient" + evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/interfaces" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -23,14 +24,16 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient SubnetID: "2TGBXcnwx5PqiXWiqxAKUaNSqDguXNh1mxnp82jui68hxJSZAx", BlockchainID: "S4mMqUXe7vHsGiRAma6bv3CKnyaLssyAxmQ2KvFpX1KEvfFCD", VM: config.EVM.String(), - RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc", + RPCEndpoint: config.APIConfig{ + BaseURL: "https://subnets.avax.network/mysubnet/rpc", + }, } logger := logging.NoLog{} mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t)) subscriber := NewSubscriber(logger, sourceSubnet) - subscriber.dial = func(_ctx context.Context, _url string, _httpHeaders, _queryParams map[string]string) (ethclient.Client, error) { + subscriber.dial = func(_ctx context.Context, cfg ethclient.Config) (evmethclient.Client, error) { return mockEthClient, nil } From 5e0b3795258ea15ee43c24785686ce6979c8de61 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 14:03:41 +0200 Subject: [PATCH 12/15] README: update to reflect changes of the config --- README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 406a6420..d45135aa 100644 --- a/README.md +++ b/README.md @@ -210,13 +210,13 @@ The relayer is configured via a JSON file, the path to which is passed in via th - The VM type of the source blockchain. - `"rpc-endpoint": string` + `"rpc-endpoint": APIConfig` - - The RPC endpoint of the source blockchain's API node. + - The RPC endpoint configuration of the source blockchain's API node. - `"ws-endpoint": string` + `"ws-endpoint": APIConfig` - - The WebSocket endpoint of the source blockchain's API node. + - The WebSocket endpoint configuration of the source blockchain's API node. `"message-contracts": map[string]MessageProtocolConfig` @@ -250,9 +250,9 @@ The relayer is configured via a JSON file, the path to which is passed in via th - The VM type of the source blockchain. - `"rpc-endpoint": string` + `"rpc-endpoint": APIConfig` - - The RPC endpoint of the destination blockchains's API node. + - The RPC endpoint configuration of the destination blockchains's API node. `"account-private-key": string` From 7ebb710c61497483af89826a1a44c2348645a934 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Fri, 10 May 2024 15:40:04 +0200 Subject: [PATCH 13/15] sample-relayer-config: update sample to reflect config structure changes --- sample-relayer-config.json | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sample-relayer-config.json b/sample-relayer-config.json index 9f2d0254..129c81a4 100644 --- a/sample-relayer-config.json +++ b/sample-relayer-config.json @@ -10,8 +10,12 @@ "subnet-id": "11111111111111111111111111111111LpoYY", "blockchain-id": "yH8D7ThNJkxmtkuv2jgBa4P1Rn3Qpr4pPr7QYNfcdoS6k6HWp", "vm": "evm", - "rpc-endpoint": "https://api.avax-test.network/ext/bc/C/rpc", - "ws-endpoint": "wss://api.avax-test.network/ext/bc/C/ws", + "rpc-endpoint": { + "base-url": "https://api.avax-test.network/ext/bc/C/rpc" + }, + "ws-endpoint": { + "base-url": "wss://api.avax-test.network/ext/bc/C/ws" + }, "message-contracts": { "0x253b2784c75e510dD0fF1da844684a1aC0aa5fcf": { "message-format": "teleporter", @@ -27,7 +31,9 @@ "subnet-id": "7WtoAMPhrmh5KosDUsFL9yTcvw7YSxiKHPpdfs4JsgW47oZT5", "blockchain-id": "2D8RG4UpSXbPbvPCAWppNJyqTG2i2CAXSkTgmTBBvs7GKNZjsY", "vm": "evm", - "rpc-endpoint": "https://subnets.avax.network/dispatch/testnet/rpc", + "rpc-endpoint": { + "base-url": "https://subnets.avax.network/dispatch/testnet/rpc" + }, "account-private-key": "0x7493..." } ] From f768549e9bcaca3db9931e169401ea253bd9a4cb Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Mon, 13 May 2024 19:21:40 +0200 Subject: [PATCH 14/15] ethclient: change eth client factory args The factory definition is changed and everywhere it is used --- config/config.go | 2 +- ethclient/client.go | 12 +++--------- relayer/application_relayer.go | 4 +++- vms/evm/destination_client.go | 4 +++- vms/evm/subscriber.go | 14 +++++++------- vms/evm/subscriber_test.go | 3 +-- 6 files changed, 18 insertions(+), 21 deletions(-) diff --git a/config/config.go b/config/config.go index e6c9453f..a68440ec 100644 --- a/config/config.go +++ b/config/config.go @@ -599,7 +599,7 @@ func (s *DestinationBlockchain) initializeWarpQuorum() error { return fmt.Errorf("invalid subnetID in configuration. error: %w", err) } - client, err := ethclient.DialWithConfig(context.Background(), ethclient.Config(s.RPCEndpoint)) + client, err := ethclient.DialWithConfig(context.Background(), s.RPCEndpoint.BaseURL, s.RPCEndpoint.HTTPHeaders, s.RPCEndpoint.QueryParams) if err != nil { return fmt.Errorf("failed to dial destination blockchain %s: %w", blockchainID, err) } diff --git a/ethclient/client.go b/ethclient/client.go index d49ece16..0809a382 100644 --- a/ethclient/client.go +++ b/ethclient/client.go @@ -15,19 +15,13 @@ import ( var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") -type Config struct { - BaseURL string - QueryParams map[string]string - HTTPHeaders map[string]string -} - // DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options. -func DialWithConfig(ctx context.Context, cfg Config) (ethclient.Client, error) { - url, err := addQueryParams(cfg.BaseURL, cfg.QueryParams) +func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { + url, err := addQueryParams(baseURL, queryParams) if err != nil { return nil, err } - client, err := rpc.DialOptions(ctx, url, newClientHeaderOptions(cfg.HTTPHeaders)...) + client, err := rpc.DialOptions(ctx, url, newClientHeaderOptions(httpHeaders)...) if err != nil { return nil, err } diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index e9e85f10..5d689e9f 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -641,7 +641,9 @@ func (r *applicationRelayer) calculateStartingBlockHeight(processHistoricalBlock func (r *applicationRelayer) setProcessedBlockHeightToLatest() (uint64, error) { ethClient, err := ethclient.DialWithConfig( context.Background(), - ethclient.Config(r.sourceBlockchain.RPCEndpoint), + r.sourceBlockchain.RPCEndpoint.BaseURL, + r.sourceBlockchain.RPCEndpoint.HTTPHeaders, + r.sourceBlockchain.RPCEndpoint.QueryParams, ) if err != nil { r.logger.Error( diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index f5a763f4..cf9bfa0e 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -51,7 +51,9 @@ func NewDestinationClient(logger logging.Logger, destinationBlockchain *config.D // Dial the destination RPC endpoint client, err := ethclient.DialWithConfig( context.Background(), - ethclient.Config(destinationBlockchain.RPCEndpoint), + destinationBlockchain.RPCEndpoint.BaseURL, + destinationBlockchain.RPCEndpoint.HTTPHeaders, + destinationBlockchain.RPCEndpoint.QueryParams, ) if err != nil { logger.Error( diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index e3e44bdb..90ccc267 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -48,8 +48,8 @@ var warpFilterQuery = interfaces.FilterQuery{ // subscriber implements Subscriber type subscriber struct { - nodeWSEndpoint ethclient.Config - nodeRPCEndpoint ethclient.Config + nodeWSEndpoint config.APIConfig + nodeRPCEndpoint config.APIConfig blockchainID ids.ID logsChan chan vmtypes.WarpLogInfo evmLog <-chan types.Log @@ -58,7 +58,7 @@ type subscriber struct { logger logging.Logger // seams for mock injection: - dial func(ctx context.Context, cfg ethclient.Config) (evmethclient.Client, error) + dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (evmethclient.Client, error) } // NewSubscriber returns a subscriber @@ -75,8 +75,8 @@ func NewSubscriber(logger logging.Logger, subnetInfo config.SourceBlockchain) *s logs := make(chan vmtypes.WarpLogInfo, maxClientSubscriptionBuffer) return &subscriber{ - nodeWSEndpoint: ethclient.Config(subnetInfo.WSEndpoint), - nodeRPCEndpoint: ethclient.Config(subnetInfo.RPCEndpoint), + nodeWSEndpoint: subnetInfo.WSEndpoint, + nodeRPCEndpoint: subnetInfo.RPCEndpoint, blockchainID: blockchainID, logger: logger, logsChan: logs, @@ -141,7 +141,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) { s.logger.Error("cannot process logs from nil height") done <- false } - ethClient, err := s.dial(context.Background(), s.nodeWSEndpoint) + ethClient, err := s.dial(context.Background(), s.nodeWSEndpoint.BaseURL, s.nodeWSEndpoint.HTTPHeaders, s.nodeWSEndpoint.QueryParams) if err != nil { s.logger.Error("failed to dial eth client", zap.Error(err)) done <- false @@ -273,7 +273,7 @@ func (s *subscriber) Subscribe(maxResubscribeAttempts int) error { func (s *subscriber) dialAndSubscribe() error { // Dial the configured source chain endpoint // This needs to be a websocket - ethClient, err := s.dial(context.Background(), s.nodeWSEndpoint) + ethClient, err := s.dial(context.Background(), s.nodeWSEndpoint.BaseURL, s.nodeWSEndpoint.HTTPHeaders, s.nodeWSEndpoint.QueryParams) if err != nil { return err } diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go index fd3734c9..e660189e 100644 --- a/vms/evm/subscriber_test.go +++ b/vms/evm/subscriber_test.go @@ -10,7 +10,6 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" - "github.com/ava-labs/awm-relayer/ethclient" mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks" "github.com/ava-labs/subnet-evm/core/types" evmethclient "github.com/ava-labs/subnet-evm/ethclient" @@ -33,7 +32,7 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t)) subscriber := NewSubscriber(logger, sourceSubnet) - subscriber.dial = func(_ctx context.Context, cfg ethclient.Config) (evmethclient.Client, error) { + subscriber.dial = func(_ctx context.Context, _url string, _httpHeaders, _queryParams map[string]string) (evmethclient.Client, error) { return mockEthClient, nil } From 350904801fcea87f003cbb266efcef1140976b87 Mon Sep 17 00:00:00 2001 From: Nathan Haim Date: Mon, 13 May 2024 19:27:31 +0200 Subject: [PATCH 15/15] ethclient: define a local Client Every package that was using (subnet-evm)ethclient.Client is now using local ethclient.Client --- config/config.go | 3 +-- ethclient/client.go | 4 +++- messages/off-chain-registry/message_manager.go | 2 +- messages/teleporter/message_manager.go | 2 +- vms/evm/destination_client.go | 5 ++--- vms/evm/subscriber.go | 5 ++--- vms/evm/subscriber_test.go | 4 ++-- 7 files changed, 12 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index a68440ec..addd3fd1 100644 --- a/config/config.go +++ b/config/config.go @@ -18,7 +18,6 @@ import ( "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/ethclient" - evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" // Force-load precompiles to trigger registration @@ -357,7 +356,7 @@ func calculateQuorumNumerator(cfgNumerator uint64) uint64 { func getWarpQuorum( subnetID ids.ID, blockchainID ids.ID, - client evmethclient.Client, + client ethclient.Client, ) (WarpQuorum, error) { if subnetID == constants.PrimaryNetworkID { return WarpQuorum{ diff --git a/ethclient/client.go b/ethclient/client.go index 0809a382..807f2d1f 100644 --- a/ethclient/client.go +++ b/ethclient/client.go @@ -15,8 +15,10 @@ import ( var ErrInvalidEndpoint = errors.New("invalid rpc endpoint") +type Client ethclient.Client + // DialWithContext returns an ethclient.Client with the internal RPC client configured with the provided options. -func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) { +func DialWithConfig(ctx context.Context, baseURL string, httpHeaders, queryParams map[string]string) (Client, error) { url, err := addQueryParams(baseURL, queryParams) if err != nil { return nil, err diff --git a/messages/off-chain-registry/message_manager.go b/messages/off-chain-registry/message_manager.go index 94062246..4204d6e2 100644 --- a/messages/off-chain-registry/message_manager.go +++ b/messages/off-chain-registry/message_manager.go @@ -13,9 +13,9 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/warp" warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/accounts/abi/bind" - "github.com/ava-labs/subnet-evm/ethclient" teleporterregistry "github.com/ava-labs/teleporter/abi-bindings/go/Teleporter/upgrades/TeleporterRegistry" "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" diff --git a/messages/teleporter/message_manager.go b/messages/teleporter/message_manager.go index c4dc36fa..ef7323e4 100644 --- a/messages/teleporter/message_manager.go +++ b/messages/teleporter/message_manager.go @@ -13,9 +13,9 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/warp" warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/accounts/abi/bind" - "github.com/ava-labs/subnet-evm/ethclient" teleportermessenger "github.com/ava-labs/teleporter/abi-bindings/go/Teleporter/TeleporterMessenger" gasUtils "github.com/ava-labs/teleporter/utils/gas-utils" teleporterUtils "github.com/ava-labs/teleporter/utils/teleporter-utils" diff --git a/vms/evm/destination_client.go b/vms/evm/destination_client.go index cf9bfa0e..9878f9dc 100644 --- a/vms/evm/destination_client.go +++ b/vms/evm/destination_client.go @@ -17,7 +17,6 @@ import ( "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/vms/evm/signer" "github.com/ava-labs/subnet-evm/core/types" - evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" predicateutils "github.com/ava-labs/subnet-evm/predicate" "github.com/ethereum/go-ethereum/common" @@ -33,12 +32,12 @@ const ( // Client interface wraps the ethclient.Client interface for mocking purposes. type Client interface { - evmethclient.Client + ethclient.Client } // Implements DestinationClient type destinationClient struct { - client evmethclient.Client + client ethclient.Client lock *sync.Mutex destinationBlockchainID ids.ID signer signer.Signer diff --git a/vms/evm/subscriber.go b/vms/evm/subscriber.go index 90ccc267..33a355b4 100644 --- a/vms/evm/subscriber.go +++ b/vms/evm/subscriber.go @@ -17,7 +17,6 @@ import ( "github.com/ava-labs/awm-relayer/ethclient" "github.com/ava-labs/awm-relayer/vms/vmtypes" "github.com/ava-labs/subnet-evm/core/types" - evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/interfaces" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" "github.com/ethereum/go-ethereum/common" @@ -58,7 +57,7 @@ type subscriber struct { logger logging.Logger // seams for mock injection: - dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (evmethclient.Client, error) + dial func(ctx context.Context, url string, httpHeaders, queryParams map[string]string) (ethclient.Client, error) } // NewSubscriber returns a subscriber @@ -184,7 +183,7 @@ func (s *subscriber) ProcessFromHeight(height *big.Int, done chan bool) { // Since initializationFilterQuery does not modify existing fields of warpFilterQuery, // we can safely reuse warpFilterQuery with only a shallow copy func (s *subscriber) processBlockRange( - ethClient evmethclient.Client, + ethClient ethclient.Client, fromBlock, toBlock *big.Int, ) error { initializationFilterQuery := interfaces.FilterQuery{ diff --git a/vms/evm/subscriber_test.go b/vms/evm/subscriber_test.go index e660189e..da1d2fc1 100644 --- a/vms/evm/subscriber_test.go +++ b/vms/evm/subscriber_test.go @@ -10,9 +10,9 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/awm-relayer/config" + "github.com/ava-labs/awm-relayer/ethclient" mock_ethclient "github.com/ava-labs/awm-relayer/vms/evm/mocks" "github.com/ava-labs/subnet-evm/core/types" - evmethclient "github.com/ava-labs/subnet-evm/ethclient" "github.com/ava-labs/subnet-evm/interfaces" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -32,7 +32,7 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t)) subscriber := NewSubscriber(logger, sourceSubnet) - subscriber.dial = func(_ctx context.Context, _url string, _httpHeaders, _queryParams map[string]string) (evmethclient.Client, error) { + subscriber.dial = func(_ctx context.Context, _url string, _httpHeaders, _queryParams map[string]string) (ethclient.Client, error) { return mockEthClient, nil }