diff --git a/cmd/tendermint/commands/reindex_event.go b/cmd/tendermint/commands/reindex_event.go index 34d07fdd5..533388753 100644 --- a/cmd/tendermint/commands/reindex_event.go +++ b/cmd/tendermint/commands/reindex_event.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/internal/state/indexer/sink/kv" "github.com/tendermint/tendermint/internal/state/indexer/sink/psql" "github.com/tendermint/tendermint/internal/store" + "github.com/tendermint/tendermint/libs/cli" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/rpc/coretypes" @@ -52,6 +53,8 @@ either or both arguments. tendermint reindex-event --start-height 2 --end-height 10 `, RunE: func(cmd *cobra.Command, args []string) error { + home, err := cmd.Flags().GetString(cli.HomeFlag) + conf.RootDir = home bs, ss, err := loadStateAndBlockStore(conf) if err != nil { return fmt.Errorf("%s: %w", reindexFailed, err) diff --git a/config/config.go b/config/config.go index 2d78698ab..27085797e 100644 --- a/config/config.go +++ b/config/config.go @@ -250,6 +250,7 @@ func DefaultBaseConfig() BaseConfig { FilterPeers: false, DBBackend: "goleveldb", DBPath: "data", + RootDir: "/root/.sei", } } diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 1c3673e39..1af02b10d 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -395,7 +395,11 @@ func (h *Handshaker) ReplayBlocks( return h.replayBlocks(ctx, state, appClient, appBlockHeight, storeBlockHeight, false) } else if appBlockHeight == storeBlockHeight { - // We're good! + // We're good! But we need to reindex events + err := h.replayEvents(appBlockHeight) + if err != nil { + return nil, err + } if err := checkAppHashEqualsOneFromState(appHash, state); err != nil { return nil, err } @@ -550,6 +554,22 @@ func (h *Handshaker) replayBlock( return state, nil } +// replayEvents will be called during restart to avoid tx missing to be indexed +func (h *Handshaker) replayEvents(height int64) error { + block := h.store.LoadBlock(height) + meta := h.store.LoadBlockMeta(height) + res, err := h.stateStore.LoadFinalizeBlockResponses(height) + if err != nil { + return err + } + validatorUpdates, err := types.PB2TM.ValidatorUpdates(res.ValidatorUpdates) + if err != nil { + return err + } + sm.FireEvents(h.logger, h.eventBus, block, meta.BlockID, res, validatorUpdates) + return nil +} + func checkAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) error { if !bytes.Equal(appHash, block.AppHash) { return fmt.Errorf(`block.AppHash does not match AppHash after replay. Got '%X', expected '%X'. diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index c353fd6b8..4f70e9aa0 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -3,9 +3,9 @@ package mempool import ( "bytes" "context" - "encoding/json" "errors" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -1036,17 +1036,16 @@ func (txmp *TxMempool) GetPeerFailedCheckTxCount(nodeID types.NodeID) uint64 { // AppendCheckTxErr wraps error message into an ABCIMessageLogs json string func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string { - var logs []map[string]interface{} - json.Unmarshal([]byte(existingLogs), &logs) + var builder strings.Builder - // Append the new ABCIMessageLog to the slice - logs = append(logs, map[string]interface{}{ - "log": log, - }) + builder.WriteString(existingLogs) + // If there are already logs, append the new log with a separator + if builder.Len() > 0 { + builder.WriteString("; ") + } + builder.WriteString(log) - // Marshal the updated slice back into a JSON string - jsonData, _ := json.Marshal(logs) - return string(jsonData) + return builder.String() } func (txmp *TxMempool) handlePendingTransactions() { diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 3a3d44025..d444cfb58 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -3,7 +3,6 @@ package mempool import ( "bytes" "context" - "encoding/json" "errors" "fmt" "math/rand" @@ -947,24 +946,17 @@ func TestAppendCheckTxErr(t *testing.T) { } t.Cleanup(client.Wait) txmp := setup(t, client, 500) - existingData := `[{"log":"existing error log"}]` + existingLogData := "existing error log" + newLogData := "sample error log" // Append new error - result := txmp.AppendCheckTxErr(existingData, "sample error msg") + actualResult := txmp.AppendCheckTxErr(existingLogData, newLogData) + expectedResult := fmt.Sprintf("%s; %s", existingLogData, newLogData) - // Unmarshal the result - var data []map[string]interface{} - err := json.Unmarshal([]byte(result), &data) - require.NoError(t, err) - require.Equal(t, len(data), 2) - require.Equal(t, data[1]["log"], "sample error msg") + require.Equal(t, expectedResult, actualResult) // Append new error to empty log - result = txmp.AppendCheckTxErr("", "sample error msg") + actualResult = txmp.AppendCheckTxErr("", newLogData) - // Unmarshal the result - err = json.Unmarshal([]byte(result), &data) - require.NoError(t, err) - require.Equal(t, len(data), 1) - require.Equal(t, data[0]["log"], "sample error msg") + require.Equal(t, newLogData, actualResult) } diff --git a/internal/state/execution.go b/internal/state/execution.go index 540ef51e2..83ccec304 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -360,7 +360,7 @@ func (blockExec *BlockExecutor) ApplyBlock( // Events are fired after everything else. // NOTE: if we crash between Commit and Save, events wont be fired during replay - fireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates) + FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates) return state, nil } @@ -687,7 +687,7 @@ func (state State) Update( // Fire NewBlock, NewBlockHeader. // Fire TxEvent for every tx. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again. -func fireEvents( +func FireEvents( logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, @@ -811,7 +811,7 @@ func ExecCommitBlock( } blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()} - fireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates) + FireEvents(be.logger, be.eventBus, block, blockID, finalizeBlockResponse, validatorUpdates) } // Commit block