Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore redundant tx & check pending tx even if it timed out #78

Merged
merged 4 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 41 additions & 51 deletions node/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package broadcaster
import (
"encoding/hex"
"fmt"
"regexp"
"slices"
"sync"
"time"
Expand All @@ -22,6 +23,8 @@ import (
"github.com/initia-labs/opinit-bots/types"
)

var txNotFoundRegex = regexp.MustCompile("Internal error: tx ([A-Fa-f0-9]+) not found")

type Broadcaster struct {
cfg btypes.BroadcasterConfig

Expand Down Expand Up @@ -143,57 +146,42 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
}
ctx.Logger().Debug("load pending txs", zap.Int("count", len(pendingTxs)))

if len(pendingTxs) == 0 {
return nil
}

pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC()

// if we have pending txs, wait until timeout
if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) {
waitingTime := timeoutTime.Sub(lastBlockTime)
timer := time.NewTimer(waitingTime)
defer timer.Stop()

ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime))
pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()

pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()
reProcessingTxs := make([]btypes.PendingTxInfo, 0)

WAITLOOP:
for {
if len(pendingTxs) == 0 {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
break WAITLOOP
case <-pollingTimer.C:
}
for txIndex := 0; txIndex < len(pendingTxs); {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollingTimer.C:
}

txHash, err := hex.DecodeString(pendingTxs[0].TxHash)
if err != nil {
return err
}
txHash, err := hex.DecodeString(pendingTxs[txIndex].TxHash)
if err != nil {
return err
}

res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil && res.TxResult.Code == 0 {
ctx.Logger().Debug("transaction successfully included",
zap.String("hash", pendingTxs[0].TxHash),
zap.Int64("height", res.Height))
err = DeletePendingTx(b.db, pendingTxs[0])
if err != nil {
return err
}
pendingTxs = pendingTxs[1:]
} else if err == nil && res != nil {
ctx.Logger().Warn("transaction failed",
zap.String("hash", pendingTxs[0].TxHash),
zap.Uint32("code", res.TxResult.Code),
zap.String("log", res.TxResult.Log))
res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil && res.TxResult.Code == 0 {
ctx.Logger().Debug("transaction successfully included",
zap.String("hash", pendingTxs[txIndex].TxHash),
zap.Int64("height", res.Height))
txIndex++
} else if err == nil && res != nil {
ctx.Logger().Warn("transaction failed",
zap.String("hash", pendingTxs[txIndex].TxHash),
zap.Uint32("code", res.TxResult.Code),
zap.String("log", res.TxResult.Log))
reProcessingTxs = append(reProcessingTxs, pendingTxs[txIndex])
txIndex++
} else if err != nil && txNotFoundRegex.FindStringSubmatch(err.Error()) != nil {
pendingTxTime := time.Unix(0, pendingTxs[txIndex].Timestamp).UTC()
timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout)
if lastBlockTime.After(timeoutTime) {
reProcessingTxs = append(reProcessingTxs, pendingTxs[txIndex:]...)
break
}
}
}
Expand All @@ -203,11 +191,13 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
return err
}

processedMsgsBatch, err := b.pendingTxsToProcessedMsgsBatch(ctx, pendingTxs)
if err != nil {
return err
if len(reProcessingTxs) != 0 {
processedMsgsBatch, err := b.pendingTxsToProcessedMsgsBatch(ctx, reProcessingTxs)
if err != nil {
return err
}
b.pendingProcessedMsgsBatch = append(b.pendingProcessedMsgsBatch, processedMsgsBatch...)
}
b.pendingProcessedMsgsBatch = append(b.pendingProcessedMsgsBatch, processedMsgsBatch...)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions node/broadcaster/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var ignoringErrors = []error{
opchildtypes.ErrOracleValidatorsNotRegistered,
opchildtypes.ErrInvalidOracleHeight,
opchildtypes.ErrInvalidOracleTimestamp,

opchildtypes.ErrRedundantTx,
}
var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index")
Expand Down
Loading