diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 0cdf4ea..3dafd9e 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -25,7 +25,7 @@ env: jobs: opinitd: name: opinitd - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 permissions: contents: read @@ -68,5 +68,3 @@ jobs: platforms: linux/amd64,linux/arm64 tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha - cache-to: type=gha diff --git a/node/broadcaster/broadcaster.go b/node/broadcaster/broadcaster.go index 7ca30a4..cc2514e 100644 --- a/node/broadcaster/broadcaster.go +++ b/node/broadcaster/broadcaster.go @@ -3,6 +3,7 @@ package broadcaster import ( "encoding/hex" "fmt" + "regexp" "slices" "sync" "time" @@ -22,6 +23,8 @@ import ( "github.com/initia-labs/opinit-bots/types" ) +var txNotFoundRegex = regexp.MustCompile("Internal error: tx ([A-Fa-f0-9]+) not found") + type Broadcaster struct { cfg btypes.BroadcasterConfig @@ -143,57 +146,42 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las } ctx.Logger().Debug("load pending txs", zap.Int("count", len(pendingTxs))) - if len(pendingTxs) == 0 { - return nil - } - - pendingTxTime := time.Unix(0, pendingTxs[0].Timestamp).UTC() - - // if we have pending txs, wait until timeout - if timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout); lastBlockTime.Before(timeoutTime) { - waitingTime := timeoutTime.Sub(lastBlockTime) - timer := time.NewTimer(waitingTime) - defer timer.Stop() - - ctx.Logger().Info("waiting for pending txs to be processed", zap.Duration("waiting_time", waitingTime)) + pollingTimer := time.NewTicker(ctx.PollingInterval()) + defer pollingTimer.Stop() - pollingTimer := time.NewTicker(ctx.PollingInterval()) - defer pollingTimer.Stop() + reProcessingTxs := make([]btypes.PendingTxInfo, 0) - WAITLOOP: - for { - if len(pendingTxs) == 0 { - return nil - } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-timer.C: - break WAITLOOP - case <-pollingTimer.C: - } + for txIndex := 0; txIndex < len(pendingTxs); { + select { + case <-ctx.Done(): + return ctx.Err() + case <-pollingTimer.C: + } - txHash, err := hex.DecodeString(pendingTxs[0].TxHash) - if err != nil { - return err - } + txHash, err := hex.DecodeString(pendingTxs[txIndex].TxHash) + if err != nil { + return err + } - res, err := b.rpcClient.QueryTx(ctx, txHash) - if err == nil && res != nil && res.TxResult.Code == 0 { - ctx.Logger().Debug("transaction successfully included", - zap.String("hash", pendingTxs[0].TxHash), - zap.Int64("height", res.Height)) - err = DeletePendingTx(b.db, pendingTxs[0]) - if err != nil { - return err - } - pendingTxs = pendingTxs[1:] - } else if err == nil && res != nil { - ctx.Logger().Warn("transaction failed", - zap.String("hash", pendingTxs[0].TxHash), - zap.Uint32("code", res.TxResult.Code), - zap.String("log", res.TxResult.Log)) + res, err := b.rpcClient.QueryTx(ctx, txHash) + if err == nil && res != nil && res.TxResult.Code == 0 { + ctx.Logger().Debug("transaction successfully included", + zap.String("hash", pendingTxs[txIndex].TxHash), + zap.Int64("height", res.Height)) + txIndex++ + } else if err == nil && res != nil { + ctx.Logger().Warn("transaction failed", + zap.String("hash", pendingTxs[txIndex].TxHash), + zap.Uint32("code", res.TxResult.Code), + zap.String("log", res.TxResult.Log)) + reProcessingTxs = append(reProcessingTxs, pendingTxs[txIndex]) + txIndex++ + } else if err != nil && txNotFoundRegex.FindStringSubmatch(err.Error()) != nil { + pendingTxTime := time.Unix(0, pendingTxs[txIndex].Timestamp).UTC() + timeoutTime := pendingTxTime.Add(b.cfg.TxTimeout) + if lastBlockTime.After(timeoutTime) { + reProcessingTxs = append(reProcessingTxs, pendingTxs[txIndex:]...) + break } } } @@ -203,11 +191,13 @@ func (b *Broadcaster) loadPendingTxs(ctx types.Context, stage types.BasicDB, las return err } - processedMsgsBatch, err := b.pendingTxsToProcessedMsgsBatch(ctx, pendingTxs) - if err != nil { - return err + if len(reProcessingTxs) != 0 { + processedMsgsBatch, err := b.pendingTxsToProcessedMsgsBatch(ctx, reProcessingTxs) + if err != nil { + return err + } + b.pendingProcessedMsgsBatch = append(b.pendingProcessedMsgsBatch, processedMsgsBatch...) } - b.pendingProcessedMsgsBatch = append(b.pendingProcessedMsgsBatch, processedMsgsBatch...) return nil } diff --git a/node/broadcaster/tx.go b/node/broadcaster/tx.go index d4b755e..a382797 100644 --- a/node/broadcaster/tx.go +++ b/node/broadcaster/tx.go @@ -21,6 +21,8 @@ var ignoringErrors = []error{ opchildtypes.ErrOracleValidatorsNotRegistered, opchildtypes.ErrInvalidOracleHeight, opchildtypes.ErrInvalidOracleTimestamp, + + opchildtypes.ErrRedundantTx, } var accountSeqRegex = regexp.MustCompile("account sequence mismatch, expected ([0-9]+), got ([0-9]+)") var outputIndexRegex = regexp.MustCompile("expected ([0-9]+), got ([0-9]+): invalid output index")