Skip to content

Commit

Permalink
ignore redundant tx & check pending tx even if it timed out (#78)
Browse files Browse the repository at this point in the history
* ignore redundant tx & check pending tx even if it timed out

* handle tx failed

* minor fix if statement

* ci: use ubuntu-22.04 and remove cache

---------

Co-authored-by: Wasin Watthanasrisong <wasinwatt@gmail.com>
  • Loading branch information
sh-cha and WasinWatt authored Feb 7, 2025
1 parent 0c61dcd commit e4ebae9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 54 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ env:
jobs:
opinitd:
name: opinitd
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

permissions:
contents: read
Expand Down Expand Up @@ -68,5 +68,3 @@ jobs:
platforms: linux/amd64,linux/arm64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha
92 changes: 41 additions & 51 deletions node/broadcaster/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package broadcaster
import (
"encoding/hex"
"fmt"
"regexp"
"slices"
"sync"
"time"
Expand All @@ -22,6 +23,8 @@ import (
"github.com/initia-labs/opinit-bots/types"
)

var txNotFoundRegex = regexp.MustCompile("Internal error: tx ([A-Fa-f0-9]+) not found")

type Broadcaster struct {
cfg btypes.BroadcasterConfig

Expand Down Expand Up @@ -143,57 +146,42 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
}
ctx.Logger().Debug("load pending txs", zap.Int("count", len(pendingTxs)))

if len(pendingTxs) == 0 {
return nil
}

pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC()

// if we have pending txs, wait until timeout
if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) {
waitingTime := timeoutTime.Sub(lastBlockTime)
timer := time.NewTimer(waitingTime)
defer timer.Stop()

ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime))
pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()

pollingTimer := time.NewTicker(ctx.PollingInterval())
defer pollingTimer.Stop()
reProcessingTxs := make([]btypes.PendingTxInfo, 0)

WAITLOOP:
for {
if len(pendingTxs) == 0 {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
break WAITLOOP
case <-pollingTimer.C:
}
for txIndex := 0; txIndex < len(pendingTxs); {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollingTimer.C:
}

txHash, err := hex.DecodeString(pendingTxs[0].TxHash)
if err != nil {
return err
}
txHash, err := hex.DecodeString(pendingTxs[txIndex].TxHash)
if err != nil {
return err
}

res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil && res.TxResult.Code == 0 {
ctx.Logger().Debug("transaction successfully included",
zap.String("hash", pendingTxs[0].TxHash),
zap.Int64("height", res.Height))
err = DeletePendingTx(b.db, pendingTxs[0])
if err != nil {
return err
}
pendingTxs = pendingTxs[1:]
} else if err == nil && res != nil {
ctx.Logger().Warn("transaction failed",
zap.String("hash", pendingTxs[0].TxHash),
zap.Uint32("code", res.TxResult.Code),
zap.String("log", res.TxResult.Log))
res, err := b.rpcClient.QueryTx(ctx, txHash)
if err == nil && res != nil && res.TxResult.Code == 0 {
ctx.Logger().Debug("transaction successfully included",
zap.String("hash", pendingTxs[txIndex].TxHash),
zap.Int64("height", res.Height))
txIndex++
} else if err == nil && res != nil {
ctx.Logger().Warn("transaction failed",
zap.String("hash", pendingTxs[txIndex].TxHash),
zap.Uint32("code", res.TxResult.Code),
zap.String("log", res.TxResult.Log))
reProcessingTxs = append(reProcessingTxs, pendingTxs[txIndex])
txIndex++
} else if err != nil && txNotFoundRegex.FindStringSubmatch(err.Error()) != nil {
pendingTxTime := time.Unix(0, pendingTxs[txIndex].Timestamp).UTC()
timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout)
if lastBlockTime.After(timeoutTime) {
reProcessingTxs = append(reProcessingTxs, pendingTxs[txIndex:]...)
break
}
}
}
Expand All @@ -203,11 +191,13 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las
return err
}

processedMsgsBatch, err := b.pendingTxsToProcessedMsgsBatch(ctx, pendingTxs)
if err != nil {
return err
if len(reProcessingTxs) != 0 {
processedMsgsBatch, err := b.pendingTxsToProcessedMsgsBatch(ctx, reProcessingTxs)
if err != nil {
return err
}
b.pendingProcessedMsgsBatch = append(b.pendingProcessedMsgsBatch, processedMsgsBatch...)
}
b.pendingProcessedMsgsBatch = append(b.pendingProcessedMsgsBatch, processedMsgsBatch...)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions node/broadcaster/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var ignoringErrors = []error{
opchildtypes.ErrOracleValidatorsNotRegistered,
opchildtypes.ErrInvalidOracleHeight,
opchildtypes.ErrInvalidOracleTimestamp,

opchildtypes.ErrRedundantTx,
}
var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)")
var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index")
Expand Down

0 comments on commit e4ebae9

Please sign in to comment.