Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ChainHelper] reduce complexity, pool based noncemanager #2332

Merged
merged 17 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dal.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
REDIS_HOST: "localhost"
REDIS_PORT: "6379"
KAIA_PROVIDER_URL: "https://public-en-kairos.node.kaia.io"
KAIA_WEBSOCKET_URL: "wss://public-en-baobab.klaytn.net/ws"
KAIA_WEBSOCKET_URL: "wss://public-en-kairos.node.kaia.io/ws"
SUBMISSION_PROXY_CONTRACT: "0x35bA1102A4954147272782302856BD8440227B85"
SIGNER_PK: ${{ secrets.TEST_DELEGATOR_REPORTER_PK}}
API_KEY: "MikoTestApiKey"
2 changes: 1 addition & 1 deletion .github/workflows/reporter.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
DATABASE_URL: "postgresql://postgres:postgres@localhost:5432/miko-test?search_path=public"
REDIS_HOST: "localhost"
REDIS_PORT: "6379"
KAIA_WEBSOCKET_URL: "wss://public-en-baobab.klaytn.net/ws"
KAIA_WEBSOCKET_URL: "wss://public-en-kairos.node.kaia.io/ws"
KAIA_PROVIDER_URL: "https://public-en-kairos.node.kaia.io"
KAIA_REPORTER_PK: ${{ secrets.TEST_DELEGATOR_REPORTER_PK}}
SUBMISSION_PROXY_CONTRACT: "0xE2bA32d006e6FEd86AeA2502C561540c28CF65e9"
Expand Down
186 changes: 35 additions & 151 deletions node/pkg/chain/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package helper
import (
"context"
"crypto/ecdsa"
"errors"
"math/big"
"os"
"strings"

"bisonai.com/miko/node/pkg/chain/noncemanager"
"bisonai.com/miko/node/pkg/chain/noncemanagerv2"
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
"bisonai.com/miko/node/pkg/chain/utils"
errorSentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/secrets"
Expand Down Expand Up @@ -60,8 +59,7 @@ func setProviderAndReporter(config *ChainHelperConfig, blockchainType Blockchain

func NewChainHelper(ctx context.Context, opts ...ChainHelperOption) (*ChainHelper, error) {
config := &ChainHelperConfig{
BlockchainType: Kaia,
UseAdditionalProviderUrls: true,
BlockchainType: Kaia,
}
for _, opt := range opts {
opt(config)
Expand All @@ -82,49 +80,25 @@ func NewChainHelper(ctx context.Context, opts ...ChainHelperOption) (*ChainHelpe
log.Error().Err(err).Msg("failed to get chain id based on:" + config.ProviderUrl)
return nil, err
}
clients := make([]utils.ClientInterface, 0)
clients = append(clients, primaryClient)

if config.UseAdditionalProviderUrls {
providerUrls, providerUrlLoadErr := utils.LoadProviderUrls(ctx, int(chainID.Int64()))
if providerUrlLoadErr != nil {
log.Warn().Err(providerUrlLoadErr).Msg("failed to load additional provider urls")
}

for _, url := range providerUrls {
subClient, subClientErr := dialFuncs[config.BlockchainType](url)
if subClientErr != nil {
log.Error().Err(subClientErr).Msg("failed to dial sub client")
continue
}
clients = append(clients, subClient)
}
}

wallet := strings.TrimPrefix(config.ReporterPk, "0x")
nonce, err := utils.GetNonceFromPk(ctx, wallet, primaryClient)
if err != nil {
return nil, err
}
noncemanager.Set(wallet, nonce)

nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
nonceManager := noncemanagerv2.New(primaryClient)
go nonceManager.StartAutoRefill(ctx)
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved

delegatorUrl := os.Getenv(EnvDelegatorUrl)
if delegatorUrl == "" {
log.Warn().Msg("delegator url not set")
}

return &ChainHelper{
clients: clients,
client: primaryClient,
wallet: wallet,
chainID: chainID,
delegatorUrl: delegatorUrl,
noncemanager: nonceManager,
}, nil
}

func (t *ChainHelper) Close() {
for _, helperClient := range t.clients {
helperClient.Close()
}
t.client.Close()
}

func (t *ChainHelper) GetSignedFromDelegator(tx *types.Transaction) (*types.Transaction, error) {
Expand Down Expand Up @@ -154,39 +128,20 @@ func (t *ChainHelper) GetSignedFromDelegator(tx *types.Transaction) (*types.Tran
}

func (t *ChainHelper) MakeDirectTx(ctx context.Context, contractAddressHex string, functionString string, args ...interface{}) (*types.Transaction, error) {
var result *types.Transaction

nonce, err := utils.GetNonceFromPk(ctx, t.wallet, t.clients[0])
nonce, err := t.noncemanager.GetNonce(ctx, t.wallet)
if err != nil {
return nil, err
}

job := func(c utils.ClientInterface) error {
tmp, err := utils.MakeDirectTx(ctx, c, contractAddressHex, t.wallet, functionString, t.chainID, nonce, args...)
if err == nil {
result = tmp
}
return err
}
err = t.retryOnJsonRpcFailure(ctx, job)
return result, err
return utils.MakeDirectTx(ctx, t.client, contractAddressHex, t.wallet, functionString, t.chainID, nonce, args...)
}

func (t *ChainHelper) Submit(ctx context.Context, tx *types.Transaction) error {
return utils.SubmitRawTx(ctx, t.clients[0], tx)
return utils.SubmitRawTx(ctx, t.client, tx)
}

func (t *ChainHelper) MakeFeeDelegatedTx(ctx context.Context, contractAddressHex string, functionString string, nonce uint64, args ...interface{}) (*types.Transaction, error) {
var result *types.Transaction
job := func(c utils.ClientInterface) error {
tmp, err := utils.MakeFeeDelegatedTx(ctx, c, contractAddressHex, t.wallet, functionString, t.chainID, nonce, args...)
if err == nil {
result = tmp
}
return err
}
err := t.retryOnJsonRpcFailure(ctx, job)
return result, err
return utils.MakeFeeDelegatedTx(ctx, t.client, contractAddressHex, t.wallet, functionString, t.chainID, nonce, args...)
}

// SignTxByFeePayer: used for testing purpose
Expand All @@ -195,26 +150,13 @@ func (t *ChainHelper) SignTxByFeePayer(ctx context.Context, tx *types.Transactio
}

func (t *ChainHelper) ReadContract(ctx context.Context, contractAddressHex string, functionString string, args ...interface{}) (interface{}, error) {
var result interface{}
job := func(c utils.ClientInterface) error {
tmp, err := utils.ReadContract(ctx, c, functionString, contractAddressHex, args...)
if err == nil {
result = tmp
}
return err
}
err := t.retryOnJsonRpcFailure(ctx, job)
return result, err
return utils.ReadContract(ctx, t.client, functionString, contractAddressHex, args...)
}

func (t *ChainHelper) ChainID() *big.Int {
return t.chainID
}

func (t *ChainHelper) NumClients() int {
return len(t.clients)
}

func (t *ChainHelper) PublicAddress() (common.Address, error) {
// should get the public address of next reporter yet not move the index
result := common.Address{}
Expand Down Expand Up @@ -242,98 +184,40 @@ func (t *ChainHelper) PublicAddressString() (string, error) {
return address.Hex(), nil
}

func (t *ChainHelper) SubmitDelegatedFallbackDirect(ctx context.Context, contractAddress string, functionString string, maxRetrial int, args ...interface{}) error {
var err error
var tx *types.Transaction

clientIndex := 0

nonce, err := noncemanager.GetAndIncrementNonce(t.wallet)
func (t *ChainHelper) SubmitDelegatedFallbackDirect(ctx context.Context, contractAddress, functionString string, args ...interface{}) error {
nonce, err := t.noncemanager.GetNonce(ctx, t.wallet)
if err != nil {
return err
}
log.Debug().Uint64("nonce", nonce).Msg("nonce")

if t.delegatorUrl != "" {
for i := 0; i < maxRetrial; i++ {
tx, err = utils.MakeFeeDelegatedTx(ctx, t.clients[clientIndex], contractAddress, t.wallet, functionString, t.chainID, nonce, args...)
if err != nil {
if utils.ShouldRetryWithSwitchedJsonRPC(err) {
clientIndex = (clientIndex + 1) % len(t.clients)
}
continue
}

tx, err = t.GetSignedFromDelegator(tx)
if err != nil {
break // if delegator signing fails, try direct transaction
}

err = utils.SubmitRawTx(ctx, t.clients[clientIndex], tx)
if err != nil {
if utils.ShouldRetryWithSwitchedJsonRPC(err) {
clientIndex = (clientIndex + 1) % len(t.clients)
} else if errors.Is(err, errorSentinel.ErrChainTransactionFail) {
return err // if transaction fails, the data will probably be too old to retry
} else if utils.IsNonceError(err) || err == context.DeadlineExceeded {
err = noncemanager.ResetNonce(ctx, t.wallet, t.clients[clientIndex])
if err != nil {
return err
}
nonce, err = noncemanager.GetAndIncrementNonce(t.wallet)
if err != nil {
return err
}
}
continue
}
return nil
}
tx, err := utils.MakeFeeDelegatedTx(ctx, t.client, contractAddress, t.wallet, functionString, t.chainID, nonce, args...)
if err != nil {
return err
}

for i := 0; i < maxRetrial; i++ {
tx, err = utils.MakeDirectTx(ctx, t.clients[clientIndex], contractAddress, t.wallet, functionString, t.chainID, nonce, args...)
tx, err = t.GetSignedFromDelegator(tx)
if err != nil {
tx, err = utils.MakeDirectTx(ctx, t.client, contractAddress, t.wallet, functionString, t.chainID, nonce, args...)
if err != nil {
if utils.ShouldRetryWithSwitchedJsonRPC(err) {
clientIndex = (clientIndex + 1) % len(t.clients)
}
continue
return err
}

err = utils.SubmitRawTx(ctx, t.clients[clientIndex], tx)
if err != nil {
if utils.ShouldRetryWithSwitchedJsonRPC(err) {
clientIndex = (clientIndex + 1) % len(t.clients)
} else if errors.Is(err, errorSentinel.ErrChainTransactionFail) {
return err // if transaction fails, the data will probably be too old to retry
} else if utils.IsNonceError(err) || err == context.DeadlineExceeded {
err = noncemanager.ResetNonce(ctx, t.wallet, t.clients[clientIndex])
if err != nil {
return err
}
nonce, err = noncemanager.GetAndIncrementNonce(t.wallet)
if err != nil {
return err
}
}
continue
}
return nil
return utils.SubmitRawTx(ctx, t.client, tx)
}

return err
return utils.SubmitRawTx(ctx, t.client, tx)
}

func (t *ChainHelper) retryOnJsonRpcFailure(ctx context.Context, job func(c utils.ClientInterface) error) error {
for _, client := range t.clients {
err := job(client)
if err != nil {
if utils.ShouldRetryWithSwitchedJsonRPC(err) {
log.Error().Err(err).Msg("Error on retrying on JsonRpcFailure")
continue
}
return err
}
break
func (t *ChainHelper) SubmitDirect(ctx context.Context, contractAddress, functionString string, args ...interface{}) error {
tx, err := t.MakeDirectTx(ctx, contractAddress, functionString, args...)
if err != nil {
return err
}
return nil

return utils.SubmitRawTx(ctx, t.client, tx)
}

func (t *ChainHelper) FlushNoncePool(ctx context.Context) error {
return t.noncemanager.Refill(ctx, t.wallet)
}
3 changes: 1 addition & 2 deletions node/pkg/chain/helper/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,5 @@ func (s *Signer) Renew(ctx context.Context, newPK *ecdsa.PrivateKey, newPkHex st
}

func (s *Signer) signerUpdate(ctx context.Context, newAddr common.Address) error {
return s.chainHelper.SubmitDelegatedFallbackDirect(ctx, s.submissionProxyContractAddr, UpdateSignerFuncSignature, maxTxSubmissionRetries, newAddr)

return s.chainHelper.SubmitDelegatedFallbackDirect(ctx, s.submissionProxyContractAddr, UpdateSignerFuncSignature, newAddr)
}
10 changes: 3 additions & 7 deletions node/pkg/chain/helper/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
"time"

"bisonai.com/miko/node/pkg/chain/eth_client"
"bisonai.com/miko/node/pkg/chain/noncemanagerv2"
"bisonai.com/miko/node/pkg/chain/utils"
"github.com/klaytn/klaytn/client"
)

type ChainHelper struct {
clients []utils.ClientInterface
client utils.ClientInterface
wallet string
chainID *big.Int
delegatorUrl string
noncemanager *noncemanagerv2.NonceManagerV2
}

type ChainHelperConfig struct {
Expand Down Expand Up @@ -45,12 +47,6 @@ func WithBlockchainType(t BlockchainType) ChainHelperOption {
}
}

func WithoutAdditionalProviderUrls() ChainHelperOption {
return func(c *ChainHelperConfig) {
c.UseAdditionalProviderUrls = false
}
}

type Signer struct {
PK *ecdsa.PrivateKey
chainHelper *ChainHelper
Expand Down
Loading
Loading