Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yzang/debug tps #215

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eeb4c51
Add log
yzang2019 Mar 25, 2024
36b2abb
Add log for state
yzang2019 Mar 25, 2024
2aa549a
Add more logs
yzang2019 Mar 25, 2024
2cacbbe
Add log
yzang2019 Mar 25, 2024
5564203
log
yzang2019 Mar 25, 2024
0882254
Fix log
yzang2019 Mar 25, 2024
6463cbc
Add log
yzang2019 Mar 25, 2024
2a382f8
Add
yzang2019 Mar 25, 2024
25546bd
add latency log for index
stevenlanders Mar 25, 2024
a464d2a
add [Debug] to line
stevenlanders Mar 25, 2024
baf0e21
add timer
stevenlanders Mar 25, 2024
458780e
add vote logs
stevenlanders Mar 25, 2024
b994a1a
vote timings
stevenlanders Mar 25, 2024
79f169f
moar logs
stevenlanders Mar 25, 2024
891a38b
add more logs
stevenlanders Mar 25, 2024
e39f8b6
More Preccomit logs (#216)
Kbhat1 Mar 25, 2024
b46ea58
fix
stevenlanders Mar 25, 2024
100f53a
add more logs for txindex
stevenlanders Mar 25, 2024
1729fe1
add store stats
stevenlanders Mar 25, 2024
24d9862
try better stats
stevenlanders Mar 25, 2024
3b181dd
cleanup stats
stevenlanders Mar 25, 2024
6ecb91a
try regular write
stevenlanders Mar 25, 2024
7bc8b77
go back to writesync
stevenlanders Mar 25, 2024
b6c7ba8
Change to write
yzang2019 Mar 26, 2024
a24698a
cap to 10
yzang2019 Mar 26, 2024
f50175a
Fix
yzang2019 Mar 26, 2024
a8d025b
Add goroutine
yzang2019 Mar 26, 2024
4d0625f
Not index
yzang2019 Mar 26, 2024
57a2ace
Fix
yzang2019 Mar 26, 2024
4b58623
Remove some metric
yzang2019 Mar 29, 2024
8b3447a
Add block time
yzang2019 Mar 29, 2024
a17df43
Fix
yzang2019 Mar 29, 2024
b860537
Add log
yzang2019 Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions debugutil/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package debugutil

import (
"fmt"
"time"
)

type Timer struct {
name string
startTime time.Time
}

func NewTimer(name string) *Timer {
return &Timer{
name: name,
startTime: time.Now(),
}
}

func (t *Timer) Stop() {
fmt.Printf("[Debug] %s took %s\n", t.name, time.Since(t.startTime))
}

func PrintStats(msg string, durations []time.Duration) {
if len(durations) == 0 {
fmt.Printf("[Debug] %s count=0 (no stats)\n", msg)
return
}
var sum time.Duration
var max time.Duration
var min time.Duration
for i, d := range durations {
sum += d
if d > max {
max = d
}
if i == 0 || d < min {
min = d
}
}
avg := sum / time.Duration(len(durations))
// only show the slow ones
if sum > 500*time.Millisecond {
fmt.Printf("[Debug] %s count=%d sum=%s avg=%s max=%s min=%s\n", msg, len(durations), sum, avg, max, min)
}
}
78 changes: 76 additions & 2 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync"
"time"

"github.com/tendermint/tendermint/debugutil"

"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -45,7 +47,13 @@ var (
ErrAddingVote = errors.New("error adding vote")
ErrSignatureFoundInPastBlocks = errors.New("found signature from the same key")

errPubKeyIsNotSet = errors.New("pubkey is not set. Look for \"Can't get private validator pubkey\" errors")
errPubKeyIsNotSet = errors.New("pubkey is not set. Look for \"Can't get private validator pubkey\" errors")
ENTER_NEW_ROUND_TIME = time.Now()
ENTER_PROPOSE_TIME = time.Now()
ENTER_PREVOTE_TIME = time.Now()
ENTER_PRECOMMIT_TIME = time.Now()
ENTER_COMMIT_TIME = time.Now()
ENTER_FINALIZE_COMMIT_TIME = time.Now()
)

var msgQueueSize = 1000
Expand Down Expand Up @@ -1295,6 +1303,9 @@ func (cs *State) enterNewRound(ctx context.Context, height int64, round int32, e

logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()))

ENTER_NEW_ROUND_TIME = time.Now()
fmt.Printf("[Debug] Finalize commit latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_FINALIZE_COMMIT_TIME))

// increment validators if necessary
validators := cs.roundState.Validators()
if cs.roundState.Round() < round {
Expand Down Expand Up @@ -1400,6 +1411,8 @@ func (cs *State) enterPropose(ctx context.Context, height int64, round int32, en
}

logger.Debug("entering propose step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()))
ENTER_PROPOSE_TIME = time.Now()
fmt.Printf("[Debug] New round latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_NEW_ROUND_TIME))

defer func() {
// Done enterPropose:
Expand Down Expand Up @@ -1614,6 +1627,8 @@ func (cs *State) enterPrevote(ctx context.Context, height int64, round int32, en
}()

logger.Debug("entering prevote step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()), "time", time.Now().UnixMilli())
ENTER_PREVOTE_TIME = time.Now()
fmt.Printf("[Debug] Propose latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_PROPOSE_TIME))

// Sign and broadcast vote as necessary
cs.doPrevote(ctx, height, round)
Expand Down Expand Up @@ -1837,6 +1852,7 @@ func (cs *State) enterPrevoteWait(height int64, round int32) {
// Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round)
// else, precommit nil otherwise.
func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32, entryLabel string) {

_, span := cs.tracer.Start(cs.getTracingCtx(ctx), "cs.state.enterPrecommit")
span.SetAttributes(attribute.Int("round", int(round)))
span.SetAttributes(attribute.String("entry", entryLabel))
Expand All @@ -1857,6 +1873,9 @@ func (cs *State) enterPrecommit(ctx context.Context, height int64, round int32,

logger.Debug("entering precommit step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()), "time", time.Now().UnixMilli())

ENTER_PRECOMMIT_TIME = time.Now()
fmt.Printf("[Debug] Prevote latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_PREVOTE_TIME))

defer func() {
// Done enterPrecommit:
cs.updateRoundStep(round, cstypes.RoundStepPrecommit)
Expand Down Expand Up @@ -1995,6 +2014,9 @@ func (cs *State) enterPrecommitWait(height int64, round int32) {

// Enter: +2/3 precommits for block
func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int32, entryLabel string) {
t := debugutil.NewTimer("enterCommit")
defer t.Stop()

spanCtx, span := cs.tracer.Start(cs.getTracingCtx(ctx), "cs.state.enterCommit")
span.SetAttributes(attribute.Int("round", int(commitRound)))
span.SetAttributes(attribute.String("entry", entryLabel))
Expand All @@ -2013,6 +2035,9 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3

logger.Debug("entering commit step", "current", fmt.Sprintf("%v/%v/%v", cs.roundState.Height(), cs.roundState.Round(), cs.roundState.Step()), "time", time.Now().UnixMilli())

ENTER_COMMIT_TIME = time.Now()
fmt.Printf("[Debug] Precommit latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_PRECOMMIT_TIME))

defer func() {
// Done enterCommit:
// keep cs.Round the same, commitRound points to the right Precommits set.
Expand All @@ -2025,7 +2050,9 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3
cs.tryFinalizeCommit(spanCtx, height)
}()

t2 := debugutil.NewTimer("enterCommit cs.roundState.Votes().Precommits(commitRound).TwoThirdsMajority()")
blockID, ok := cs.roundState.Votes().Precommits(commitRound).TwoThirdsMajority()
t2.Stop()
if !ok {
panic("RunActionCommit() expects +2/3 precommits")
}
Expand All @@ -2035,8 +2062,14 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3
// otherwise they'll be cleared in updateToState.
if cs.roundState.LockedBlock().HashesTo(blockID.Hash) {
logger.Info("commit is for a locked block; set ProposalBlock=LockedBlock", "block_hash", blockID.Hash)

t := debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlock(cs.roundState.LockedBlock())")
cs.roundState.SetProposalBlock(cs.roundState.LockedBlock())
t.Stop()

t2 := debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlockParts(cs.roundState.LockedBlockParts())")
cs.roundState.SetProposalBlockParts(cs.roundState.LockedBlockParts())
t2.Stop()
}

// If we don't have the block being committed, set up to get it.
Expand All @@ -2050,16 +2083,29 @@ func (cs *State) enterCommit(ctx context.Context, height int64, commitRound int3

// We're getting the wrong block.
// Set up ProposalBlockParts and keep waiting.
t := debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlock(cs.roundState.LockedBlock())")
cs.roundState.SetProposalBlock(nil)
t.Stop()

cs.metrics.MarkBlockGossipStarted()

t = debugutil.NewTimer("enterCommit cs.roundState.SetProposalBlockParts(types.NewPartSetFromHeader(blockID.PartSetHeader))")
cs.roundState.SetProposalBlockParts(types.NewPartSetFromHeader(blockID.PartSetHeader))
t.Stop()

t = debugutil.NewTimer("enterCommit cs.eventBus.PublishEventValidBlock(cs.roundState.RoundStateEvent())")
if err := cs.eventBus.PublishEventValidBlock(cs.roundState.RoundStateEvent()); err != nil {
logger.Error("failed publishing valid block", "err", err)
}
t.Stop()

t = debugutil.NewTimer("enterCommit cs.roundState.CopyInternal()")
roundState := cs.roundState.CopyInternal()
t.Stop()

t = debugutil.NewTimer("enterCommit cs.evsw.FireEvent(types.EventValidBlockValue, roundState)")
cs.evsw.FireEvent(types.EventValidBlockValue, roundState)
t.Stop()
}
}
}
Expand Down Expand Up @@ -2127,6 +2173,8 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
panic(fmt.Errorf("+2/3 committed an invalid block: %w", err))
}

ENTER_FINALIZE_COMMIT_TIME = time.Now()
fmt.Printf("[Debug] Commit latency for block %d is: %s \n", cs.roundState.Height(), time.Since(ENTER_COMMIT_TIME))
logger.Info(
"finalizing commit of block",
"hash", block.Hash(),
Expand Down Expand Up @@ -2184,7 +2232,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {

// Execute and commit the block, update and save the state, and update the mempool.
// NOTE The block.AppHash won't reflect these txs until the next block.

finalizeStartTime := time.Now()
stateCopy, err := cs.blockExec.ApplyBlock(spanCtx,
stateCopy,
types.BlockID{
Expand All @@ -2198,6 +2246,7 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) {
logger.Error("failed to apply block", "err", err)
return
}
fmt.Printf("[Debug] ApplyBlock latency for block %d took %s \n", height, time.Since(finalizeStartTime))

// must be called before we update state
cs.RecordMetrics(height, block)
Expand Down Expand Up @@ -2306,6 +2355,7 @@ func (cs *State) RecordMetrics(height int64, block *types.Block) {
block.Time.Sub(lastBlockMeta.Header.Time).Seconds(),
)
}
fmt.Printf("[Debug] Block time for height %d is: %s\n", block.Height, block.Time.Sub(lastBlockMeta.Header.Time))
}

roundState := cs.GetRoundState()
Expand Down Expand Up @@ -2610,6 +2660,8 @@ func (cs *State) addVote(
peerID types.NodeID,
handleVoteMsgSpan otrace.Span,
) (added bool, err error) {
t := debugutil.NewTimer("State.addVote")
defer t.Stop()
cs.logger.Debug(
"adding vote",
"vote_height", vote.Height,
Expand Down Expand Up @@ -2721,6 +2773,7 @@ func (cs *State) addVote(

switch vote.Type {
case tmproto.PrevoteType:
t := debugutil.NewTimer("State.addVote case tmproto.PrevoteType")
prevotes := cs.roundState.Votes().Prevotes(vote.Round)
cs.logger.Debug("added vote to prevote", "vote", vote, "prevotes", prevotes.StringShort())

Expand Down Expand Up @@ -2781,8 +2834,10 @@ func (cs *State) addVote(
cs.enterPrevote(ctx, height, cs.roundState.Round(), "prevote-future")
}
}
t.Stop()

case tmproto.PrecommitType:
t := debugutil.NewTimer("State.addVote case tmproto.PrecommitType")
precommits := cs.roundState.Votes().Precommits(vote.Round)
cs.logger.Debug("added vote to precommit",
"height", vote.Height,
Expand All @@ -2791,25 +2846,44 @@ func (cs *State) addVote(
"vote_timestamp", vote.Timestamp,
"data", precommits.LogString())

t2 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType TwoThirdsMajority")
blockID, ok := precommits.TwoThirdsMajority()
t2.Stop()
handleVoteMsgSpan.End()
if ok {
// Executed as TwoThirdsMajority could be from a higher round

t3 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterNewRound precommit-two-thirds")
cs.enterNewRound(ctx, height, vote.Round, "precommit-two-thirds")
t3.Stop()

t4 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterPrecommit precommit-two-thirds")
cs.enterPrecommit(ctx, height, vote.Round, "precommit-two-thirds")
t4.Stop()

if !blockID.IsNil() {
t5 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterCommit precommit-two-thirds")
cs.enterCommit(ctx, height, vote.Round, "precommit-two-thirds")
t5.Stop()
if cs.bypassCommitTimeout() && precommits.HasAll() {
t6 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterNewRound precommit-skip-round")
cs.enterNewRound(ctx, cs.roundState.Height(), 0, "precommit-skip-round")
t6.Stop()
}
} else {
t7 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterPrecommitWait")
cs.enterPrecommitWait(height, vote.Round)
t7.Stop()
}
} else if cs.roundState.Round() <= vote.Round && precommits.HasTwoThirdsAny() {
t8 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterNewRound precommit-two-thirds-any")
cs.enterNewRound(ctx, height, vote.Round, "precommit-two-thirds-any")
t8.Stop()
t9 := debugutil.NewTimer("State.addVote case tmproto.PrecommitType enterPrecommitWait precommit-two-thirds-any")
cs.enterPrecommitWait(height, vote.Round)
t9.Stop()
}
t.Stop()

default:
panic(fmt.Sprintf("unexpected vote type %v", vote.Type))
Expand Down
2 changes: 1 addition & 1 deletion internal/eventbus/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/tendermint/tendermint/types"
)

var DefaultBufferCapacity = 100
var DefaultBufferCapacity = 10

// Subscription is a proxy interface for a pubsub Subscription.
type Subscription interface {
Expand Down
Loading
Loading