From eeb4c51591c97d6ac4ca665854a34a85b8efa0ab Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 12:01:16 +0800 Subject: [PATCH 01/33] Add log --- internal/state/execution.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/state/execution.go b/internal/state/execution.go index 83ccec304..7df3943f2 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -98,10 +98,13 @@ func (blockExec *BlockExecutor) CreateProposalBlock( evidence, evSize := blockExec.evpool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) + fmt.Printf("[Debug] Creating proposal block height %d, mempool size: %d \n", height, blockExec.mempool.Size()) // Fetch a limited amount of valid txs maxDataBytes := types.MaxDataBytes(maxBytes, evSize, state.Validators.Size()) txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) + fmt.Printf("[Debug] Reap %d txs for height %d, mempool size: %d \n", len(txs), height, blockExec.mempool.Size()) + commit := lastExtCommit.ToCommit() block := state.MakeBlock(height, txs, commit, evidence, proposerAddr) rpp, err := blockExec.appClient.PrepareProposal( From 36b2abb83ad5329a0b2b24c13e45f283cb4c2c36 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 16:05:43 +0800 Subject: [PATCH 02/33] Add log for state --- internal/consensus/state.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 636152de6..b26786026 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -45,7 +45,13 @@ var ( ErrAddingVote = errors.New("error adding vote") ErrSignatureFoundInPastBlocks = errors.New("found signature from the same key") - errPubKeyIsNotSet = errors.New("pubkey is not set. Look for \"Can't get private validator pubkey\" errors") + errPubKeyIsNotSet = errors.New("pubkey is not set. Look for \"Can't get private validator pubkey\" errors") + ENTER_NEW_ROUND_TIME = time.Now() + ENTER_PROPOSE_TIME = time.Now() + ENTER_PREVOTE_TIME = time.Now() + ENTER_PRECOMMIT_TIME = time.Now() + ENTER_COMMIT_TIME = time.Now() + ENTER_FINALIZE_COMMIT_TIME = time.Now() ) var msgQueueSize = 1000 @@ -1295,6 +1301,9 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32, e logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step())) + ENTER_NEW_ROUND_TIME = time.Now() + fmt.Printf("[Debug] Finalize commit latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_FINALIZE_COMMIT_TIME)) + // increment validators if necessary validators := cs.roundState.Validators() if cs.roundState.Round() < round { @@ -1400,6 +1409,8 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32, en } logger.Debug("entering propose step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step())) + ENTER_PROPOSE_TIME = time.Now() + fmt.Printf("[Debug] New round latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_NEW_ROUND_TIME)) defer func() { // Done enterPropose: @@ -1614,6 +1625,8 @@ func (cs *State) enterPrevote(ctx context.Context, height int64, round int32, en }() logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()), "time", time.Now().UnixMilli()) + ENTER_PREVOTE_TIME = time.Now() + fmt.Printf("[Debug] Propose latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_PROPOSE_TIME)) // Sign and broadcast vote as necessary cs.doPrevote(ctx, height, round) @@ -1857,6 +1870,9 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, logger.Debug("entering precommit step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()), "time", time.Now().UnixMilli()) + ENTER_PRECOMMIT_TIME = time.Now() + fmt.Printf("[Debug] Prevote latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_PREVOTE_TIME)) + defer func() { // Done enterPrecommit: cs.updateRoundStep(round, cstypes.RoundStepPrecommit) @@ -2013,6 +2029,9 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3 logger.Debug("entering commit step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()), "time", time.Now().UnixMilli()) + ENTER_COMMIT_TIME = time.Now() + fmt.Printf("[Debug] Precommit latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_PRECOMMIT_TIME)) + defer func() { // Done enterCommit: // keep cs.Round the same, commitRound points to the right Precommits set. @@ -2127,6 +2146,8 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { panic(fmt.Errorf("+2/3 committed an invalid block: %w", err)) } + ENTER_FINALIZE_COMMIT_TIME = time.Now() + fmt.Printf("[Debug] Commit latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_COMMIT_TIME)) logger.Info( "finalizing commit of block", "hash", block.Hash(), From 2aa549a52e74a2c03e3f4b36fa3da1e896d554bf Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 16:41:02 +0800 Subject: [PATCH 03/33] Add more logs --- internal/consensus/state.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index b26786026..75da16717 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2157,6 +2157,10 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { ) logger.Debug(fmt.Sprintf("%v", block)) + finalizeStartTime := time.Now() + defer func() { + fmt.Printf("[Debug] ApplyBlock latency for block %d took %s \n", height, time.Since(finalizeStartTime)) + }() // Save to blockStore. if cs.blockStore.Height() < block.Height { // NOTE: the seenCommit is local justification to commit this block, From 2cacbbe93f7b73b860a2eae6b760d6cc73030185 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 16:49:30 +0800 Subject: [PATCH 04/33] Add log --- internal/consensus/state.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 75da16717..20e955716 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2157,10 +2157,6 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { ) logger.Debug(fmt.Sprintf("%v", block)) - finalizeStartTime := time.Now() - defer func() { - fmt.Printf("[Debug] ApplyBlock latency for block %d took %s \n", height, time.Since(finalizeStartTime)) - }() // Save to blockStore. if cs.blockStore.Height() < block.Height { // NOTE: the seenCommit is local justification to commit this block, @@ -2209,7 +2205,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { // Execute and commit the block, update and save the state, and update the mempool. // NOTE The block.AppHash won't reflect these txs until the next block. - + finalizeStartTime := time.Now() stateCopy, err := cs.blockExec.ApplyBlock(spanCtx, stateCopy, types.BlockID{ @@ -2223,6 +2219,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { logger.Error("failed to apply block", "err", err) return } + fmt.Printf("[Debug] ApplyBlock latency for block %d took %s \n", height, time.Since(finalizeStartTime)) // must be called before we update state cs.RecordMetrics(height, block) From 556420306c8c2181fedc6d8f225efcd7dc891e66 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 17:17:38 +0800 Subject: [PATCH 05/33] log --- internal/state/execution.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/internal/state/execution.go b/internal/state/execution.go index 7df3943f2..444c8f413 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -230,6 +230,8 @@ func (blockExec *BlockExecutor) ApplyBlock( ctx context.Context, state State, blockID types.BlockID, block *types.Block, tracer otrace.Tracer) (State, error) { + + startTime := time.Now() if tracer != nil { spanCtx, span := tracer.Start(ctx, "cs.state.ApplyBlock") ctx = spanCtx @@ -239,7 +241,6 @@ func (blockExec *BlockExecutor) ApplyBlock( if err := blockExec.ValidateBlock(ctx, state, block); err != nil { return state, ErrInvalidBlock(err) } - startTime := time.Now() defer func() { blockExec.metrics.BlockProcessingTime.Observe(time.Since(startTime).Seconds()) }() @@ -280,14 +281,15 @@ func (blockExec *BlockExecutor) ApplyBlock( } blockExec.logger.Info( - "finalized block", + "[Debug] finalized block", "height", block.Height, - "latency_ms", time.Now().Sub(startTime).Milliseconds(), + "latency_ms", time.Since(startTime).Milliseconds(), "num_txs_res", len(fBlockRes.TxResults), "num_val_updates", len(fBlockRes.ValidatorUpdates), "block_app_hash", fmt.Sprintf("%X", fBlockRes.AppHash), ) + saveResponseTime := time.Now() // Save the results before we commit. err = blockExec.store.SaveFinalizeBlockResponses(block.Height, fBlockRes) if err != nil && !errors.Is(err, ErrNoFinalizeBlockResponsesForHeight{block.Height}) { @@ -330,6 +332,11 @@ func (blockExec *BlockExecutor) ApplyBlock( _, commitSpan = tracer.Start(ctx, "cs.state.ApplyBlock.Commit") defer commitSpan.End() } + + fmt.Printf("[Debug] Save finalize response took %s \n", time.Since(saveResponseTime)) + + commitStartTime := time.Now() + // Lock mempool, commit app state, update mempoool. retainHeight, err := blockExec.Commit(ctx, state, block, fBlockRes.TxResults) if err != nil { @@ -338,6 +345,9 @@ func (blockExec *BlockExecutor) ApplyBlock( if commitSpan != nil { commitSpan.End() } + fmt.Printf("[Debug] blockExec Commit took %s \n", time.Since(commitStartTime)) + + blockSaveTime := time.Now() // Update evpool with the latest state. blockExec.evpool.Update(ctx, state, block.Evidence) @@ -347,6 +357,9 @@ func (blockExec *BlockExecutor) ApplyBlock( if err := blockExec.store.Save(state); err != nil { return state, err } + fmt.Printf("[Debug] blockExec save took %s \n", time.Since(blockSaveTime)) + + pruneTime := time.Now() // Prune old heights, if requested by ABCI app. if retainHeight > 0 { @@ -364,6 +377,7 @@ func (blockExec *BlockExecutor) ApplyBlock( // Events are fired after everything else. // NOTE: if we crash between Commit and Save, events wont be fired during replay FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates) + fmt.Printf("[Debug] prune block took %s \n", time.Since(pruneTime)) return state, nil } From 0882254d5f7fe52b427e22cb2a007df994e04189 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 17:39:12 +0800 Subject: [PATCH 06/33] Fix log --- internal/state/execution.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/internal/state/execution.go b/internal/state/execution.go index 444c8f413..93d246b6b 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -289,7 +289,6 @@ func (blockExec *BlockExecutor) ApplyBlock( "block_app_hash", fmt.Sprintf("%X", fBlockRes.AppHash), ) - saveResponseTime := time.Now() // Save the results before we commit. err = blockExec.store.SaveFinalizeBlockResponses(block.Height, fBlockRes) if err != nil && !errors.Is(err, ErrNoFinalizeBlockResponsesForHeight{block.Height}) { @@ -333,10 +332,6 @@ func (blockExec *BlockExecutor) ApplyBlock( defer commitSpan.End() } - fmt.Printf("[Debug] Save finalize response took %s \n", time.Since(saveResponseTime)) - - commitStartTime := time.Now() - // Lock mempool, commit app state, update mempoool. retainHeight, err := blockExec.Commit(ctx, state, block, fBlockRes.TxResults) if err != nil { @@ -345,9 +340,6 @@ func (blockExec *BlockExecutor) ApplyBlock( if commitSpan != nil { commitSpan.End() } - fmt.Printf("[Debug] blockExec Commit took %s \n", time.Since(commitStartTime)) - - blockSaveTime := time.Now() // Update evpool with the latest state. blockExec.evpool.Update(ctx, state, block.Evidence) @@ -357,10 +349,8 @@ func (blockExec *BlockExecutor) ApplyBlock( if err := blockExec.store.Save(state); err != nil { return state, err } - fmt.Printf("[Debug] blockExec save took %s \n", time.Since(blockSaveTime)) - - pruneTime := time.Now() + pruneStartTime := time.Now() // Prune old heights, if requested by ABCI app. if retainHeight > 0 { pruned, err := blockExec.pruneBlocks(retainHeight) @@ -370,14 +360,16 @@ func (blockExec *BlockExecutor) ApplyBlock( blockExec.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight) } } + fmt.Printf("[Debug] Prune block took %s\n", time.Since(pruneStartTime)) // reset the verification cache blockExec.cache = make(map[string]struct{}) + fireEventsStartTime := time.Now() // Events are fired after everything else. // NOTE: if we crash between Commit and Save, events wont be fired during replay FireEvents(blockExec.logger, blockExec.eventBus, block, blockID, fBlockRes, validatorUpdates) - fmt.Printf("[Debug] prune block took %s \n", time.Since(pruneTime)) + fmt.Printf("[Debug] FireEvents took %s\n", time.Since(fireEventsStartTime)) return state, nil } From 6463cbc1230a523ee1089ad288b450ded4c86785 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 18:59:37 +0800 Subject: [PATCH 07/33] Add log --- internal/state/execution.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/state/execution.go b/internal/state/execution.go index 93d246b6b..6f2088ffa 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -704,6 +704,8 @@ func FireEvents( finalizeBlockResponse *abci.ResponseFinalizeBlock, validatorUpdates []*types.Validator, ) { + + startTime := time.Now() if err := eventBus.PublishEventNewBlock(types.EventDataNewBlock{ Block: block, BlockID: blockID, @@ -711,7 +713,9 @@ func FireEvents( }); err != nil { logger.Error("failed publishing new block", "err", err) } + fmt.Printf("[Debug] Publish New block event took %s\n", time.Since(startTime)) + startBlockHeaderTime := time.Now() if err := eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ Header: block.Header, NumTxs: int64(len(block.Txs)), @@ -719,7 +723,9 @@ func FireEvents( }); err != nil { logger.Error("failed publishing new block header", "err", err) } + fmt.Printf("[Debug] Publish Block header took %s\n", time.Since(startBlockHeaderTime)) + startEvidenceTime := time.Now() if len(block.Evidence) != 0 { for _, ev := range block.Evidence { if err := eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{ @@ -730,6 +736,7 @@ func FireEvents( } } } + fmt.Printf("[Debug] Publish Evidence took %s\n", time.Since(startEvidenceTime)) // sanity check if len(finalizeBlockResponse.TxResults) != len(block.Data.Txs) { @@ -737,6 +744,7 @@ func FireEvents( len(block.Data.Txs), len(finalizeBlockResponse.TxResults))) } + startPublishTxTime := time.Now() for i, tx := range block.Data.Txs { if err := eventBus.PublishEventTx(types.EventDataTx{ TxResult: abci.TxResult{ @@ -749,6 +757,7 @@ func FireEvents( logger.Error("failed publishing event TX", "err", err) } } + fmt.Printf("[Debug] Publish %d event tx took %s\n", len(block.Data.Txs), time.Since(startPublishTxTime)) if len(finalizeBlockResponse.ValidatorUpdates) > 0 { if err := eventBus.PublishEventValidatorSetUpdates( From 2a382f899e11999071dce6bc8e94abe143b86ba4 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Mon, 25 Mar 2024 23:56:36 +0800 Subject: [PATCH 08/33] Add --- internal/eventbus/event_bus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index aa096aa17..af4a561ec 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -13,7 +13,7 @@ import ( "github.com/tendermint/tendermint/types" ) -var DefaultBufferCapacity = 100 +var DefaultBufferCapacity = 10 // Subscription is a proxy interface for a pubsub Subscription. type Subscription interface { From 25546bd39da4060c79ec23ab77890eec219bc1b4 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 13:56:42 -0400 Subject: [PATCH 09/33] add latency log for index --- internal/state/indexer/tx/kv/kv.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 47488e54e..c2f3094d2 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -7,6 +7,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/gogo/protobuf/proto" "github.com/google/orderedcode" @@ -65,6 +66,7 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { // respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. func (txi *TxIndex) Index(results []*abci.TxResult) error { + startTime := time.Now() b := txi.store.NewBatch() defer b.Close() @@ -94,7 +96,10 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } } - return b.WriteSync() + fmt.Printf("PERF TxIndex.Index latency=%dms, txs=%d\n", time.Since(startTime).Milliseconds(), len(results)) + err := b.WriteSync() + + return err } func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { From a464d2af207e466572a7b270bdb31b3d21669f50 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 14:00:16 -0400 Subject: [PATCH 10/33] add [Debug] to line --- internal/state/indexer/tx/kv/kv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index c2f3094d2..5e127bc26 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -96,7 +96,7 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } } - fmt.Printf("PERF TxIndex.Index latency=%dms, txs=%d\n", time.Since(startTime).Milliseconds(), len(results)) + fmt.Printf("[Debug] TxIndex.Index latency=%dms, txs=%d\n", time.Since(startTime).Milliseconds(), len(results)) err := b.WriteSync() return err From baf0e2119240169b7f8e5ad936cc93168b98c98e Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 14:16:27 -0400 Subject: [PATCH 11/33] add timer --- debugutil/timer.go | 22 ++++++++++++++++++++++ internal/consensus/state.go | 9 +++++++++ internal/state/indexer/tx/kv/kv.go | 17 +++++++---------- 3 files changed, 38 insertions(+), 10 deletions(-) create mode 100644 debugutil/timer.go diff --git a/debugutil/timer.go b/debugutil/timer.go new file mode 100644 index 000000000..f60bf4dc2 --- /dev/null +++ b/debugutil/timer.go @@ -0,0 +1,22 @@ +package debugutil + +import ( + "fmt" + "time" +) + +type Timer struct { + name string + startTime time.Time +} + +func NewTimer(name string) *Timer { + return &Timer{ + name: name, + startTime: time.Now(), + } +} + +func (t *Timer) Stop() { + fmt.Printf("[Debug] %s took %s\n", t.name, time.Since(t.startTime)) +} diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 20e955716..6fd7fd6d2 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/tendermint/tendermint/debugutil" "io" "os" "runtime/debug" @@ -1850,6 +1851,9 @@ func (cs *State) enterPrevoteWait(height int64, round int32) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, precommit nil otherwise. func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, entryLabel string) { + t := debugutil.NewTimer("enterPrecommit") + defer t.Stop() + _, span := cs.tracer.Start(cs.getTracingCtx(ctx), "cs.state.enterPrecommit") span.SetAttributes(attribute.Int("round", int(round))) span.SetAttributes(attribute.String("entry", entryLabel)) @@ -2011,6 +2015,9 @@ func (cs *State) enterPrecommitWait(height int64, round int32) { // Enter: +2/3 precommits for block func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int32, entryLabel string) { + t := debugutil.NewTimer("enterCommit") + defer t.Stop() + spanCtx, span := cs.tracer.Start(cs.getTracingCtx(ctx), "cs.state.enterCommit") span.SetAttributes(attribute.Int("round", int(commitRound))) span.SetAttributes(attribute.String("entry", entryLabel)) @@ -2632,6 +2639,8 @@ func (cs *State) addVote( peerID types.NodeID, handleVoteMsgSpan otrace.Span, ) (added bool, err error) { + t := debugutil.NewTimer("State.addVote") + defer t.Stop() cs.logger.Debug( "adding vote", "vote_height", vote.Height, diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 5e127bc26..aa19085ac 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -4,14 +4,13 @@ import ( "context" "encoding/hex" "fmt" - "regexp" - "strconv" - "strings" - "time" - "github.com/gogo/protobuf/proto" "github.com/google/orderedcode" + "github.com/tendermint/tendermint/debugutil" dbm "github.com/tendermint/tm-db" + "regexp" + "strconv" + "strings" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub/query" @@ -66,7 +65,8 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { // respective attribute's key delimited by a "." (eg. "account.number"). // Any event with an empty type is not indexed. func (txi *TxIndex) Index(results []*abci.TxResult) error { - startTime := time.Now() + t := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index (txs=%d)", len(results))) + defer t.Stop() b := txi.store.NewBatch() defer b.Close() @@ -96,10 +96,7 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } } - fmt.Printf("[Debug] TxIndex.Index latency=%dms, txs=%d\n", time.Since(startTime).Milliseconds(), len(results)) - err := b.WriteSync() - - return err + return b.WriteSync() } func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { From 458780ec9a817380e72f5ac94ebe8caf5975c811 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 14:27:09 -0400 Subject: [PATCH 12/33] add vote logs --- internal/consensus/state.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 6fd7fd6d2..70c3c43ef 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2752,6 +2752,7 @@ func (cs *State) addVote( switch vote.Type { case tmproto.PrevoteType: + t := debugutil.NewTimer("State.addVote case tmproto.PrevoteType") prevotes := cs.roundState.Votes().Prevotes(vote.Round) cs.logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.StringShort()) @@ -2812,8 +2813,10 @@ func (cs *State) addVote( cs.enterPrevote(ctx, height, cs.roundState.Round(), "prevote-future") } } + t.Stop() case tmproto.PrecommitType: + t := debugutil.NewTimer("State.addVote case tmproto.PrecommitType") precommits := cs.roundState.Votes().Precommits(vote.Round) cs.logger.Debug("added vote to precommit", "height", vote.Height, @@ -2841,6 +2844,7 @@ func (cs *State) addVote( cs.enterNewRound(ctx, height, vote.Round, "precommit-two-thirds-any") cs.enterPrecommitWait(height, vote.Round) } + t.Stop() default: panic(fmt.Sprintf("unexpected vote type %v", vote.Type)) From b994a1afb0646923e9ab1307af928012e65710a2 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 14:43:45 -0400 Subject: [PATCH 13/33] vote timings --- internal/consensus/state.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 70c3c43ef..2f46a87bd 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2825,24 +2825,42 @@ func (cs *State) addVote( "vote_timestamp", vote.Timestamp, "data", precommits.LogString()) + t2 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType TwoThirdsMajority") blockID, ok := precommits.TwoThirdsMajority() + t2.Stop() handleVoteMsgSpan.End() if ok { // Executed as TwoThirdsMajority could be from a higher round + + t3 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterNewRound precommit-two-thirds") cs.enterNewRound(ctx, height, vote.Round, "precommit-two-thirds") + t3.Stop() + + t4 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterPrecommit precommit-two-thirds") cs.enterPrecommit(ctx, height, vote.Round, "precommit-two-thirds") + t4.Stop() if !blockID.IsNil() { + t5 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterCommit precommit-two-thirds") cs.enterCommit(ctx, height, vote.Round, "precommit-two-thirds") + t5.Stop() if cs.bypassCommitTimeout() && precommits.HasAll() { + t6 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterNewRound precommit-skip-round") cs.enterNewRound(ctx, cs.roundState.Height(), 0, "precommit-skip-round") + t6.Stop() } } else { + t7 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterPrecommitWait") cs.enterPrecommitWait(height, vote.Round) + t7.Stop() } } else if cs.roundState.Round() <= vote.Round && precommits.HasTwoThirdsAny() { + t8 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterNewRound precommit-two-thirds-any") cs.enterNewRound(ctx, height, vote.Round, "precommit-two-thirds-any") + t8.Stop() + t9 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterPrecommitWait precommit-two-thirds-any") cs.enterPrecommitWait(height, vote.Round) + t9.Stop() } t.Stop() From 79f169f93349be97c773aea82c8937e7002e6cb0 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 14:52:45 -0400 Subject: [PATCH 14/33] moar logs --- internal/consensus/state.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 2f46a87bd..b3884f4f5 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2051,7 +2051,9 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3 cs.tryFinalizeCommit(spanCtx, height) }() + t2 := debugutil.NewTimer("enterCommit cs.roundState.Votes().Precommits(commitRound).TwoThirdsMajority()") blockID, ok := cs.roundState.Votes().Precommits(commitRound).TwoThirdsMajority() + t2.Stop() if !ok { panic("RunActionCommit() expects +2/3 precommits") } @@ -2061,8 +2063,14 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3 // otherwise they'll be cleared in updateToState. if cs.roundState.LockedBlock().HashesTo(blockID.Hash) { logger.Info("commit is for a locked block; set ProposalBlock=LockedBlock", "block_hash", blockID.Hash) + + t := debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlock(cs.roundState.LockedBlock())") cs.roundState.SetProposalBlock(cs.roundState.LockedBlock()) + t.Stop() + + t2 := debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlockParts(cs.roundState.LockedBlockParts())") cs.roundState.SetProposalBlockParts(cs.roundState.LockedBlockParts()) + t2.Stop() } // If we don't have the block being committed, set up to get it. @@ -2076,16 +2084,29 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3 // We're getting the wrong block. // Set up ProposalBlockParts and keep waiting. + t := debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlock(cs.roundState.LockedBlock())") cs.roundState.SetProposalBlock(nil) + t.Stop() + cs.metrics.MarkBlockGossipStarted() + + t = debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlockParts(types.NewPartSetFromHeader(blockID.PartSetHeader))") cs.roundState.SetProposalBlockParts(types.NewPartSetFromHeader(blockID.PartSetHeader)) + t.Stop() + t = debugutil.NewTimer("enterCommit cs.eventBus.PublishEventValidBlock(cs.roundState.RoundStateEvent())") if err := cs.eventBus.PublishEventValidBlock(cs.roundState.RoundStateEvent()); err != nil { logger.Error("failed publishing valid block", "err", err) } + t.Stop() + t = debugutil.NewTimer("enterCommit cs.roundState.CopyInternal()") roundState := cs.roundState.CopyInternal() + t.Stop() + + t = debugutil.NewTimer("enterCommit cs.evsw.FireEvent(types.EventValidBlockValue, roundState)") cs.evsw.FireEvent(types.EventValidBlockValue, roundState) + t.Stop() } } } From 891a38be692125704930feef856aabee931da082 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:03:15 -0400 Subject: [PATCH 15/33] add more logs --- debugutil/timer.go | 17 +++++++++++++++++ internal/state/indexer/tx/kv/kv.go | 6 +++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/debugutil/timer.go b/debugutil/timer.go index f60bf4dc2..d299de318 100644 --- a/debugutil/timer.go +++ b/debugutil/timer.go @@ -20,3 +20,20 @@ func NewTimer(name string) *Timer { func (t *Timer) Stop() { fmt.Printf("[Debug] %s took %s\n", t.name, time.Since(t.startTime)) } + +func PrintStats(msg string, durations []time.Duration) { + var sum time.Duration + var max time.Duration + var min time.Duration + for i, d := range durations { + sum += d + if d > max { + max = d + } + if i == 0 || d < min { + min = d + } + } + avg := sum / time.Duration(len(durations)) + fmt.Printf("[Debug] %s count=%d sum=%s avg=%s max=%s min=%s\n", msg, len(durations), sum, avg, max, min) +} diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index aa19085ac..2ed1c449b 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -11,6 +11,7 @@ import ( "regexp" "strconv" "strings" + "time" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/pubsub/query" @@ -100,6 +101,7 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { + var durations []time.Duration for _, event := range result.Result.Events { // only index events with a non-empty type if len(event.Type) == 0 { @@ -118,14 +120,16 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba return fmt.Errorf("event type and attribute key \"%s\" is reserved; please use a different key", compositeTag) } if attr.GetIndex() { + start := time.Now() err := store.Set(keyFromEvent(compositeTag, string(attr.Value), result), hash) + durations = append(durations, time.Since(start)) if err != nil { return err } } } } - + debugutil.PrintStats("indexEvents", durations) return nil } From e39f8b60c540027d1dc8bff3b0ec8081fa413588 Mon Sep 17 00:00:00 2001 From: Kartik Bhat Date: Mon, 25 Mar 2024 15:04:52 -0400 Subject: [PATCH 16/33] More Preccomit logs (#216) --- internal/consensus/state.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index b3884f4f5..4ef8dbe5e 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/tendermint/tendermint/debugutil" "io" "os" "runtime/debug" @@ -15,6 +14,8 @@ import ( "sync" "time" + "github.com/tendermint/tendermint/debugutil" + "github.com/gogo/protobuf/proto" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/trace" @@ -1884,10 +1885,13 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, }() // check for a polka + t = debugutil.NewTimer("checkPolka") blockID, ok := cs.roundState.Votes().Prevotes(round).TwoThirdsMajority() + t.Stop() // If we don't have a polka, we must precommit nil. if !ok { + t = debugutil.NewTimer("noPolkaPreccomitNil") if cs.roundState.LockedBlock() != nil { logger.Info("precommit step; no +2/3 prevotes during enterPrecommit while we are locked; precommitting nil") } else { @@ -1895,44 +1899,56 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) + t.Stop() return } // At this point +2/3 prevoted for a particular block or nil. + t = debugutil.NewTimer("publishEventPolka") if err := cs.eventBus.PublishEventPolka(cs.roundState.RoundStateEvent()); err != nil { logger.Error("failed publishing polka", "err", err) } + t.Stop() // the latest POLRound should be this round. + t = debugutil.NewTimer("polInfo") polRound, _ := cs.roundState.Votes().POLInfo() if polRound < round { panic(fmt.Sprintf("this POLRound should be %v but got %v", round, polRound)) } + t.Stop() // +2/3 prevoted nil. Precommit nil. if blockID.IsNil() { + t = debugutil.NewTimer("prevoteNilPreccomitNil") logger.Info("precommit step: +2/3 prevoted for nil; precommitting nil") cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) + t.Stop() return } // At this point, +2/3 prevoted for a particular block. // If we never received a proposal for this block, we must precommit nil if cs.roundState.Proposal() == nil || cs.roundState.ProposalBlock() == nil { + t = debugutil.NewTimer("noBlockProposalPreccomitNil") logger.Info("precommit step; did not receive proposal, precommitting nil") cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) + t.Stop() return } // If the proposal time does not match the block time, precommit nil. if !cs.roundState.Proposal().Timestamp.Equal(cs.roundState.ProposalBlock().Header.Time) { logger.Info("precommit step: proposal timestamp not equal; precommitting nil") + t = debugutil.NewTimer("proposalNotMatchBlockTimePreccomitNil") cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) + t.Stop() return } // If we're already locked on that block, precommit it, and update the LockedRound if cs.roundState.LockedBlock().HashesTo(blockID.Hash) { + t = debugutil.NewTimer("lockedBlockPreccomit") logger.Info("precommit step: +2/3 prevoted locked block; relocking") cs.roundState.SetLockedRound(round) @@ -1941,6 +1957,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) + t.Stop() return } @@ -1948,6 +1965,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, // the proposed block, update our locked block to this block and issue a // precommit vote for it. if cs.roundState.ProposalBlock().HashesTo(blockID.Hash) { + t = debugutil.NewTimer("updatedLockedBlockPreccomit") logger.Info("precommit step: +2/3 prevoted proposal block; locking", "hash", blockID.Hash) // Validate the block. @@ -1964,6 +1982,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) + t.Stop() return } @@ -1971,6 +1990,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, // Fetch that block, and precommit nil. logger.Info("precommit step: +2/3 prevotes for a block we do not have; voting nil", "block_id", blockID) + t = debugutil.NewTimer("polkaNotHavePreccomitNil") if !cs.roundState.ProposalBlockParts().HasHeader(blockID.PartSetHeader) { cs.roundState.SetProposalBlock(nil) cs.metrics.MarkBlockGossipStarted() @@ -1978,6 +1998,7 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) + t.Stop() } // Enter: any +2/3 precommits for next round. From b46ea5857090b4e5206833d378ef791a5b802c55 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:06:26 -0400 Subject: [PATCH 17/33] fix --- debugutil/timer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/debugutil/timer.go b/debugutil/timer.go index d299de318..1719440f0 100644 --- a/debugutil/timer.go +++ b/debugutil/timer.go @@ -22,6 +22,10 @@ func (t *Timer) Stop() { } func PrintStats(msg string, durations []time.Duration) { + if len(durations) == 0 { + fmt.Printf("[Debug] %s count=0 (no stats)\n", msg) + return + } var sum time.Duration var max time.Duration var min time.Duration From 100f53abff0c3aef2e23cd9879d1beff0c5aab86 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:14:55 -0400 Subject: [PATCH 18/33] add more logs for txindex --- debugutil/timer.go | 5 ++++- internal/state/indexer/tx/kv/kv.go | 10 ++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/debugutil/timer.go b/debugutil/timer.go index 1719440f0..0c4084ba6 100644 --- a/debugutil/timer.go +++ b/debugutil/timer.go @@ -39,5 +39,8 @@ func PrintStats(msg string, durations []time.Duration) { } } avg := sum / time.Duration(len(durations)) - fmt.Printf("[Debug] %s count=%d sum=%s avg=%s max=%s min=%s\n", msg, len(durations), sum, avg, max, min) + // only show the slow ones + if sum > 500*time.Millisecond { + fmt.Printf("[Debug] %s count=%d sum=%s avg=%s max=%s min=%s\n", msg, len(durations), sum, avg, max, min) + } } diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 2ed1c449b..d8c11f685 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -71,6 +71,7 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { b := txi.store.NewBatch() defer b.Close() + var durations []time.Duration for _, result := range results { hash := types.Tx(result.Tx).Hash() @@ -91,13 +92,18 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { return err } // index by hash (always) + start := time.Now() err = b.Set(primaryKey(hash), rawBytes) + durations = append(durations, time.Since(start)) if err != nil { return err } } - - return b.WriteSync() + debugutil.PrintStats("TxIndex.Index b.Set(primaryKey(hash), rawBytes)", durations) + t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index WriteSync (txs=%d)", len(results))) + err := b.WriteSync() + t2.Stop() + return err } func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { From 1729fe1add2644799ffe4bba4dedf07df3ac6847 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:25:55 -0400 Subject: [PATCH 19/33] add store stats --- internal/state/indexer/tx/kv/kv.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index d8c11f685..2621a3868 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -32,6 +32,13 @@ type TxIndex struct { // NewTxIndex creates new KV indexer. func NewTxIndex(store dbm.DB) *TxIndex { + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + <-ticker.C + fmt.Printf("[Debug] TxIndex store stats: %v\n", store.Stats()) + } + }() return &TxIndex{ store: store, } From 24d9862275fd65c9f93b0dfb8a452191ebb530a5 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:29:58 -0400 Subject: [PATCH 20/33] try better stats --- internal/state/indexer/tx/kv/kv.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 2621a3868..4c70b1b13 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -36,7 +36,11 @@ func NewTxIndex(store dbm.DB) *TxIndex { go func() { for { <-ticker.C - fmt.Printf("[Debug] TxIndex store stats: %v\n", store.Stats()) + var vals []string + for k, v := range store.Stats() { + vals = append(vals, fmt.Sprintf("%s=%v", k, v)) + } + fmt.Printf("[Debug] TxIndex store stats: %s\n", strings.Join(vals, " ")) } }() return &TxIndex{ From 3b181dd025b77413dc647aaa457266273bbbf7c4 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:37:27 -0400 Subject: [PATCH 21/33] cleanup stats --- internal/state/indexer/tx/kv/kv.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 4c70b1b13..544351c46 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -36,11 +36,7 @@ func NewTxIndex(store dbm.DB) *TxIndex { go func() { for { <-ticker.C - var vals []string - for k, v := range store.Stats() { - vals = append(vals, fmt.Sprintf("%s=%v", k, v)) - } - fmt.Printf("[Debug] TxIndex store stats: %s\n", strings.Join(vals, " ")) + fmt.Printf("[Debug] leveldb.stats: %s", store.Stats()["leveldb.stats"]) } }() return &TxIndex{ From 6ecb91a2c656b31ee07d14563c7ea3d422615302 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 15:44:37 -0400 Subject: [PATCH 22/33] try regular write --- internal/state/indexer/tx/kv/kv.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 544351c46..19d6831d7 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -107,8 +107,8 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } } debugutil.PrintStats("TxIndex.Index b.Set(primaryKey(hash), rawBytes)", durations) - t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index WriteSync (txs=%d)", len(results))) - err := b.WriteSync() + t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index Write (txs=%d)", len(results))) + err := b.Write() t2.Stop() return err } From 7bc8b774338fd25723728adcb6c839c5fe99150c Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Mon, 25 Mar 2024 16:06:56 -0400 Subject: [PATCH 23/33] go back to writesync --- internal/state/indexer/tx/kv/kv.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 19d6831d7..544351c46 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -107,8 +107,8 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } } debugutil.PrintStats("TxIndex.Index b.Set(primaryKey(hash), rawBytes)", durations) - t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index Write (txs=%d)", len(results))) - err := b.Write() + t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index WriteSync (txs=%d)", len(results))) + err := b.WriteSync() t2.Stop() return err } From b6c7ba88f0061038ed3f5b353cecb09124872a7a Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 26 Mar 2024 13:51:07 +0800 Subject: [PATCH 24/33] Change to write --- internal/eventbus/event_bus.go | 2 +- internal/state/indexer/tx/kv/kv.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index af4a561ec..aa096aa17 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -13,7 +13,7 @@ import ( "github.com/tendermint/tendermint/types" ) -var DefaultBufferCapacity = 10 +var DefaultBufferCapacity = 100 // Subscription is a proxy interface for a pubsub Subscription. type Subscription interface { diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 544351c46..90f5edff2 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -108,7 +108,7 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } debugutil.PrintStats("TxIndex.Index b.Set(primaryKey(hash), rawBytes)", durations) t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index WriteSync (txs=%d)", len(results))) - err := b.WriteSync() + err := b.Write() t2.Stop() return err } From a24698addb83046ad353230616b71f477dbc16c0 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 26 Mar 2024 14:09:32 +0800 Subject: [PATCH 25/33] cap to 10 --- internal/eventbus/event_bus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/eventbus/event_bus.go b/internal/eventbus/event_bus.go index aa096aa17..af4a561ec 100644 --- a/internal/eventbus/event_bus.go +++ b/internal/eventbus/event_bus.go @@ -13,7 +13,7 @@ import ( "github.com/tendermint/tendermint/types" ) -var DefaultBufferCapacity = 100 +var DefaultBufferCapacity = 10 // Subscription is a proxy interface for a pubsub Subscription. type Subscription interface { From f50175af2d441497e0b22b9ec9e4ed626699518f Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 26 Mar 2024 14:16:24 +0800 Subject: [PATCH 26/33] Fix --- internal/state/indexer/tx/kv/kv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/state/indexer/tx/kv/kv.go b/internal/state/indexer/tx/kv/kv.go index 90f5edff2..544351c46 100644 --- a/internal/state/indexer/tx/kv/kv.go +++ b/internal/state/indexer/tx/kv/kv.go @@ -108,7 +108,7 @@ func (txi *TxIndex) Index(results []*abci.TxResult) error { } debugutil.PrintStats("TxIndex.Index b.Set(primaryKey(hash), rawBytes)", durations) t2 := debugutil.NewTimer(fmt.Sprintf("TxIndex.Index WriteSync (txs=%d)", len(results))) - err := b.Write() + err := b.WriteSync() t2.Stop() return err } From a8d025bf6fc776abdda3302f53f879f55d3c420f Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 26 Mar 2024 18:23:00 +0800 Subject: [PATCH 27/33] Add goroutine --- internal/state/indexer/indexer_service.go | 26 +++++++++++++---------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index e73e4a3ba..dab6d45b2 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -95,17 +95,21 @@ func (is *Service) publish(msg pubsub.Message) error { } if curr.Size() != 0 { - start := time.Now() - err := sink.IndexTxEvents(curr.Ops) - if err != nil { - is.logger.Error("failed to index block txs", - "height", is.currentBlock.height, "err", err) - } else { - is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds()) - is.metrics.TransactionsIndexed.Add(float64(curr.Size())) - is.logger.Debug("indexed txs", - "height", is.currentBlock.height, "sink", sink.Type()) - } + go func() { + ops := curr.Ops + start := time.Now() + err := sink.IndexTxEvents(ops) + if err != nil { + is.logger.Error("failed to index block txs", + "height", is.currentBlock.height, "err", err) + } else { + is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds()) + is.metrics.TransactionsIndexed.Add(float64(curr.Size())) + is.logger.Debug("indexed txs", + "height", is.currentBlock.height, "sink", sink.Type()) + } + }() + } } is.currentBlock.batch = nil // return to the WAIT state for the next block From 4d0625f32ca4f7e081ad175cdc311d15837ceb8f Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 26 Mar 2024 18:49:07 +0800 Subject: [PATCH 28/33] Not index --- internal/state/indexer/indexer_service.go | 28 +++++++++++------------ 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index dab6d45b2..c8fb084a1 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -95,20 +95,20 @@ func (is *Service) publish(msg pubsub.Message) error { } if curr.Size() != 0 { - go func() { - ops := curr.Ops - start := time.Now() - err := sink.IndexTxEvents(ops) - if err != nil { - is.logger.Error("failed to index block txs", - "height", is.currentBlock.height, "err", err) - } else { - is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds()) - is.metrics.TransactionsIndexed.Add(float64(curr.Size())) - is.logger.Debug("indexed txs", - "height", is.currentBlock.height, "sink", sink.Type()) - } - }() + //go func() { + // ops := curr.Ops + // start := time.Now() + // err := sink.IndexTxEvents(ops[:1]) + // if err != nil { + // is.logger.Error("failed to index block txs", + // "height", is.currentBlock.height, "err", err) + // } else { + // is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds()) + // is.metrics.TransactionsIndexed.Add(float64(curr.Size())) + // is.logger.Debug("indexed txs", + // "height", is.currentBlock.height, "sink", sink.Type()) + // } + //}() } } From 57a2ace437f71cf526b2e4ac1de1725fe3e28c52 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Tue, 26 Mar 2024 18:51:59 +0800 Subject: [PATCH 29/33] Fix --- internal/state/indexer/indexer_service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index c8fb084a1..a3aa2e7b6 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -109,7 +109,6 @@ func (is *Service) publish(msg pubsub.Message) error { // "height", is.currentBlock.height, "sink", sink.Type()) // } //}() - } } is.currentBlock.batch = nil // return to the WAIT state for the next block From 4b58623b891858cd64ed4fbc628c477b999e3071 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 29 Mar 2024 15:44:51 +0800 Subject: [PATCH 30/33] Remove some metric --- internal/consensus/state.go | 22 ---------------------- internal/state/execution.go | 7 ------- 2 files changed, 29 deletions(-) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 4ef8dbe5e..7da141ffa 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -1852,8 +1852,6 @@ func (cs *State) enterPrevoteWait(height int64, round int32) { // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, precommit nil otherwise. func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, entryLabel string) { - t := debugutil.NewTimer("enterPrecommit") - defer t.Stop() _, span := cs.tracer.Start(cs.getTracingCtx(ctx), "cs.state.enterPrecommit") span.SetAttributes(attribute.Int("round", int(round))) @@ -1885,13 +1883,10 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, }() // check for a polka - t = debugutil.NewTimer("checkPolka") blockID, ok := cs.roundState.Votes().Prevotes(round).TwoThirdsMajority() - t.Stop() // If we don't have a polka, we must precommit nil. if !ok { - t = debugutil.NewTimer("noPolkaPreccomitNil") if cs.roundState.LockedBlock() != nil { logger.Info("precommit step; no +2/3 prevotes during enterPrecommit while we are locked; precommitting nil") } else { @@ -1899,56 +1894,44 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) - t.Stop() return } // At this point +2/3 prevoted for a particular block or nil. - t = debugutil.NewTimer("publishEventPolka") if err := cs.eventBus.PublishEventPolka(cs.roundState.RoundStateEvent()); err != nil { logger.Error("failed publishing polka", "err", err) } - t.Stop() // the latest POLRound should be this round. - t = debugutil.NewTimer("polInfo") polRound, _ := cs.roundState.Votes().POLInfo() if polRound < round { panic(fmt.Sprintf("this POLRound should be %v but got %v", round, polRound)) } - t.Stop() // +2/3 prevoted nil. Precommit nil. if blockID.IsNil() { - t = debugutil.NewTimer("prevoteNilPreccomitNil") logger.Info("precommit step: +2/3 prevoted for nil; precommitting nil") cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) - t.Stop() return } // At this point, +2/3 prevoted for a particular block. // If we never received a proposal for this block, we must precommit nil if cs.roundState.Proposal() == nil || cs.roundState.ProposalBlock() == nil { - t = debugutil.NewTimer("noBlockProposalPreccomitNil") logger.Info("precommit step; did not receive proposal, precommitting nil") cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) - t.Stop() return } // If the proposal time does not match the block time, precommit nil. if !cs.roundState.Proposal().Timestamp.Equal(cs.roundState.ProposalBlock().Header.Time) { logger.Info("precommit step: proposal timestamp not equal; precommitting nil") - t = debugutil.NewTimer("proposalNotMatchBlockTimePreccomitNil") cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) - t.Stop() return } // If we're already locked on that block, precommit it, and update the LockedRound if cs.roundState.LockedBlock().HashesTo(blockID.Hash) { - t = debugutil.NewTimer("lockedBlockPreccomit") logger.Info("precommit step: +2/3 prevoted locked block; relocking") cs.roundState.SetLockedRound(round) @@ -1957,7 +1940,6 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) - t.Stop() return } @@ -1965,7 +1947,6 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, // the proposed block, update our locked block to this block and issue a // precommit vote for it. if cs.roundState.ProposalBlock().HashesTo(blockID.Hash) { - t = debugutil.NewTimer("updatedLockedBlockPreccomit") logger.Info("precommit step: +2/3 prevoted proposal block; locking", "hash", blockID.Hash) // Validate the block. @@ -1982,7 +1963,6 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, blockID.Hash, blockID.PartSetHeader) - t.Stop() return } @@ -1990,7 +1970,6 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, // Fetch that block, and precommit nil. logger.Info("precommit step: +2/3 prevotes for a block we do not have; voting nil", "block_id", blockID) - t = debugutil.NewTimer("polkaNotHavePreccomitNil") if !cs.roundState.ProposalBlockParts().HasHeader(blockID.PartSetHeader) { cs.roundState.SetProposalBlock(nil) cs.metrics.MarkBlockGossipStarted() @@ -1998,7 +1977,6 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, } cs.signAddVote(ctx, tmproto.PrecommitType, nil, types.PartSetHeader{}) - t.Stop() } // Enter: any +2/3 precommits for next round. diff --git a/internal/state/execution.go b/internal/state/execution.go index 6f2088ffa..75b17763f 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -705,7 +705,6 @@ func FireEvents( validatorUpdates []*types.Validator, ) { - startTime := time.Now() if err := eventBus.PublishEventNewBlock(types.EventDataNewBlock{ Block: block, BlockID: blockID, @@ -713,9 +712,7 @@ func FireEvents( }); err != nil { logger.Error("failed publishing new block", "err", err) } - fmt.Printf("[Debug] Publish New block event took %s\n", time.Since(startTime)) - startBlockHeaderTime := time.Now() if err := eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{ Header: block.Header, NumTxs: int64(len(block.Txs)), @@ -723,9 +720,7 @@ func FireEvents( }); err != nil { logger.Error("failed publishing new block header", "err", err) } - fmt.Printf("[Debug] Publish Block header took %s\n", time.Since(startBlockHeaderTime)) - startEvidenceTime := time.Now() if len(block.Evidence) != 0 { for _, ev := range block.Evidence { if err := eventBus.PublishEventNewEvidence(types.EventDataNewEvidence{ @@ -736,8 +731,6 @@ func FireEvents( } } } - fmt.Printf("[Debug] Publish Evidence took %s\n", time.Since(startEvidenceTime)) - // sanity check if len(finalizeBlockResponse.TxResults) != len(block.Data.Txs) { panic(fmt.Sprintf("number of TXs (%d) and ABCI TX responses (%d) do not match", From 8b3447ab70ec50683e616aa5b05e0174deb4d4cb Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 29 Mar 2024 15:48:04 +0800 Subject: [PATCH 31/33] Add block time --- internal/consensus/state.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 7da141ffa..a239572ca 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -2355,6 +2355,7 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) { block.Time.Sub(lastBlockMeta.Header.Time).Seconds(), ) } + fmt.Printf("[Debug] Block time for height %d is: %s\n", block.Height, block.Time.Sub(lastBlockMeta.Header.Time)) } roundState := cs.GetRoundState() From a17df43b995c984a2e2dcb40b9f2f3ee495afa2a Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 29 Mar 2024 15:51:22 +0800 Subject: [PATCH 32/33] Fix --- internal/state/execution.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/state/execution.go b/internal/state/execution.go index 75b17763f..7297519c1 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -98,12 +98,12 @@ func (blockExec *BlockExecutor) CreateProposalBlock( evidence, evSize := blockExec.evpool.PendingEvidence(state.ConsensusParams.Evidence.MaxBytes) - fmt.Printf("[Debug] Creating proposal block height %d, mempool size: %d \n", height, blockExec.mempool.Size()) + startTime := time.Now() // Fetch a limited amount of valid txs maxDataBytes := types.MaxDataBytes(maxBytes, evSize, state.Validators.Size()) txs := blockExec.mempool.ReapMaxBytesMaxGas(maxDataBytes, maxGas) - fmt.Printf("[Debug] Reap %d txs for height %d, mempool size: %d \n", len(txs), height, blockExec.mempool.Size()) + fmt.Printf("[Debug] Reap %d txs for height %d, mempool size: %d took %s \n", len(txs), height, blockExec.mempool.Size(), time.Since(startTime)) commit := lastExtCommit.ToCommit() block := state.MakeBlock(height, txs, commit, evidence, proposerAddr) From b8605377cd022db5e95210ede6064583331dde01 Mon Sep 17 00:00:00 2001 From: yzang2019 Date: Fri, 29 Mar 2024 16:31:26 +0800 Subject: [PATCH 33/33] Add log --- internal/state/execution.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/state/execution.go b/internal/state/execution.go index 7297519c1..7773dfef6 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -289,6 +289,7 @@ func (blockExec *BlockExecutor) ApplyBlock( "block_app_hash", fmt.Sprintf("%X", fBlockRes.AppHash), ) + saveStartTime := time.Now() // Save the results before we commit. err = blockExec.store.SaveFinalizeBlockResponses(block.Height, fBlockRes) if err != nil && !errors.Is(err, ErrNoFinalizeBlockResponsesForHeight{block.Height}) { @@ -349,6 +350,7 @@ func (blockExec *BlockExecutor) ApplyBlock( if err := blockExec.store.Save(state); err != nil { return state, err } + fmt.Printf("[Debug] Save block took %s\n", time.Since(saveStartTime)) pruneStartTime := time.Now() // Prune old heights, if requested by ABCI app.