Skip to content

Commit

Permalink
[consensus] add metrics
Browse files Browse the repository at this point in the history
This patch adds some minimal telemetry to consensus subsystem.
Right now it's BuildProposal/InsertProposal/RunSequence time measuremants,
amout of validators for some specific height and counters for sent and
received messages.
  • Loading branch information
olegrok committed Mar 4, 2025
1 parent 8156896 commit bada5da
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 6 deletions.
26 changes: 23 additions & 3 deletions nil/internal/consensus/ibft/ibft.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type backendIBFT struct {
transport transport
signer *Signer
validatorsCache *validatorsMap
mh *MetricsHandler
}

var _ core.Backend = &backendIBFT{}
Expand All @@ -63,6 +64,9 @@ func (i *backendIBFT) unmarshalProposal(raw []byte) (*execution.ProposalSSZ, err
}

func (i *backendIBFT) BuildProposal(view *protoIBFT.View) []byte {
i.mh.StartBuildProposalMeasurement(i.transportCtx, view.Round)
defer i.mh.EndBuildProposalMeasurement(i.transportCtx)

proposal, err := i.validator.BuildProposal(i.ctx)
if err != nil {
i.logger.Error().Err(err).Msg("failed to build proposal")
Expand Down Expand Up @@ -142,6 +146,9 @@ func (i *backendIBFT) buildSignature(committedSeals []*messages.CommittedSeal, h
}

func (i *backendIBFT) InsertProposal(proposal *protoIBFT.Proposal, committedSeals []*messages.CommittedSeal) {
i.mh.StartBuildProposalMeasurement(i.transportCtx, proposal.Round)
defer i.mh.EndBuildProposalMeasurement(i.transportCtx)

proposalBlock, err := i.unmarshalProposal(proposal.RawProposal)
if err != nil {
i.logger.Error().Err(err).Msg("failed to unmarshal proposal")
Expand Down Expand Up @@ -186,22 +193,30 @@ func (i *backendIBFT) isActiveValidator() bool {
return true
}

func NewConsensus(cfg *ConsensusParams) *backendIBFT {
func NewConsensus(cfg *ConsensusParams) (*backendIBFT, error) {
logger := logging.NewLogger("consensus").With().Stringer(logging.FieldShardId, cfg.ShardId).Logger()
l := &ibftLogger{
logger: logger.With().CallerWithSkipFrameCount(3).Logger(),
}

const mhName = "github.com/NilFoundation/nil/nil/internal/consensus"
mh, err := NewMetricsHandler(mhName, cfg.ShardId)
if err != nil {
logger.Error().Err(err).Msg("Failed to create metrics handler")
return nil, err
}

backend := &backendIBFT{
shardId: cfg.ShardId,
validator: cfg.Validator,
logger: logger,
nm: cfg.NetManager,
signer: NewSigner(cfg.PrivateKey),
validatorsCache: newValidatorsMap(cfg.Db, cfg.ShardId),
mh: mh,
}
backend.consensus = core.NewIBFT(l, backend, backend)
return backend
return backend, nil
}

func (i *backendIBFT) Init(ctx context.Context) error {
Expand All @@ -223,14 +238,19 @@ func (i *backendIBFT) GetVotingPowers(height uint64) (map[string]*big.Int, error
return nil, err
}

result := make(map[string]*big.Int, len(validators))
count := len(validators)
result := make(map[string]*big.Int, count)
for _, v := range validators {
result[string(v.PublicKey[:])] = big.NewInt(1)
}
i.mh.SetValidatorsCount(i.transportCtx, count)
return result, nil
}

func (i *backendIBFT) RunSequence(ctx context.Context, height uint64) error {
i.mh.StartSequenceMeasurement(ctx, height)
defer i.mh.EndSequenceMeasurement(ctx)

i.ctx = ctx
i.consensus.RunSequence(ctx, height)
return nil
Expand Down
127 changes: 127 additions & 0 deletions nil/internal/consensus/ibft/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package ibft

import (
"context"

"github.com/NilFoundation/nil/nil/internal/telemetry"
"github.com/NilFoundation/nil/nil/internal/telemetry/telattr"
"github.com/NilFoundation/nil/nil/internal/types"
"go.opentelemetry.io/otel/metric"
)

type MetricsHandler struct {
option metric.MeasurementOption

buildProposalMeasurer *telemetry.Measurer
insertProposalMeasurer *telemetry.Measurer
sequenceMeasurer *telemetry.Measurer

height telemetry.Histogram
round telemetry.Histogram
validatorsCount telemetry.Histogram
sentMessages telemetry.Counter
receivedMessages telemetry.Counter
}

func NewMetricsHandler(name string, shardId types.ShardId) (*MetricsHandler, error) {
meter := telemetry.NewMeter(name)
buildProposalMeasurer, err := telemetry.NewMeasurer(
meter, "build_proposal", telattr.ShardId(shardId),
)
if err != nil {
return nil, err
}

insertProposalMeasurer, err := telemetry.NewMeasurer(
meter, "insert_proposal", telattr.ShardId(shardId),
)
if err != nil {
return nil, err
}

sequenceMeasurer, err := telemetry.NewMeasurer(
meter, "sequence", telattr.ShardId(shardId),
)
if err != nil {
return nil, err
}

handler := &MetricsHandler{
buildProposalMeasurer: buildProposalMeasurer,
insertProposalMeasurer: insertProposalMeasurer,
sequenceMeasurer: sequenceMeasurer,
option: telattr.With(telattr.ShardId(shardId)),
}

if err := handler.initMetrics(meter); err != nil {
return nil, err
}

return handler, nil
}

func (mh *MetricsHandler) initMetrics(meter metric.Meter) error {
var err error

if mh.validatorsCount, err = meter.Int64Histogram("validators_count"); err != nil {
return err
}

if mh.height, err = meter.Int64Histogram("height"); err != nil {
return err
}

if mh.round, err = meter.Int64Histogram("round"); err != nil {
return err
}

if mh.sentMessages, err = meter.Int64Counter("sent_messages"); err != nil {
return err
}

if mh.receivedMessages, err = meter.Int64Counter("received_messages"); err != nil {
return err
}

return nil
}

func (mh *MetricsHandler) StartBuildProposalMeasurement(ctx context.Context, round uint64) {
mh.round.Record(ctx, int64(round), mh.option)
mh.buildProposalMeasurer.Restart()
}

func (mh *MetricsHandler) EndBuildProposalMeasurement(ctx context.Context) {
mh.buildProposalMeasurer.Measure(ctx)
}

func (mh *MetricsHandler) StartInsertProposalMeasurement(ctx context.Context, round uint64) {
mh.round.Record(ctx, int64(round), mh.option)
mh.insertProposalMeasurer.Restart()
}

func (mh *MetricsHandler) EndInsertProposalMeasurement(ctx context.Context, height, round uint64) {
mh.insertProposalMeasurer.Measure(ctx)
}

func (mh *MetricsHandler) StartSequenceMeasurement(ctx context.Context, height uint64) {
mh.height.Record(ctx, int64(height), mh.option)
mh.round.Record(ctx, 0, mh.option)
mh.sequenceMeasurer.Restart()
}

func (mh *MetricsHandler) EndSequenceMeasurement(ctx context.Context) {
mh.sequenceMeasurer.Measure(ctx)
}

func (mh *MetricsHandler) SetValidatorsCount(ctx context.Context, count int) {
mh.validatorsCount.Record(ctx, int64(count), mh.option)
}

func (mh *MetricsHandler) IncSentMessages(ctx context.Context, t string) {
mh.sentMessages.Add(ctx, 1, mh.option, telattr.With(telattr.Type(t)))
}

func (mh *MetricsHandler) IncReceivedMessages(ctx context.Context, t string) {
mh.receivedMessages.Add(ctx, 1, mh.option, telattr.With(telattr.Type(t)))
}
2 changes: 2 additions & 0 deletions nil/internal/consensus/ibft/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (i *backendIBFT) Multicast(msg *proto.IbftMessage) {
if err := i.transport.Multicast(msg); err != nil {
i.logger.Error().Err(err).Msg("Fail to gossip")
}
i.mh.IncSentMessages(i.transportCtx, msg.Type.String())
}

func (i *backendIBFT) getProto() string {
Expand Down Expand Up @@ -90,6 +91,7 @@ func (i *backendIBFT) setupTransport(ctx context.Context) error {
event.Msg("Validator message received")

i.consensus.AddMessage(msg)
i.mh.IncReceivedMessages(ctx, msg.Type.String())
}
}
}(ctx)
Expand Down
2 changes: 1 addition & 1 deletion nil/internal/telemetry/internal/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package internal

import (
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
)

func NewResource(config *Config) (*resource.Resource, error) {
Expand Down
4 changes: 4 additions & 0 deletions nil/internal/telemetry/telattr/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ func ProtocolId(id protocol.ID) attribute.KeyValue {
func Topic(topic string) attribute.KeyValue {
return attribute.String(logging.FieldTopic, topic)
}

func Type(t string) attribute.KeyValue {
return attribute.String(logging.FieldType, t)
}
5 changes: 4 additions & 1 deletion nil/services/nilservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,16 @@ func createShards(
return nil, err
}

consensus := ibft.NewConsensus(&ibft.ConsensusParams{
consensus, err := ibft.NewConsensus(&ibft.ConsensusParams{
ShardId: shardId,
Db: database,
Validator: validators[i],
NetManager: networkManager,
PrivateKey: pKey,
})
if err != nil {
return nil, err
}
collator := collate.NewScheduler(validators[i], database, consensus, networkManager)

funcs = append(funcs, func(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion nix/nil.nix
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ buildGo124Module rec {
];

# to obtain run `nix build` with vendorHash = "";
vendorHash = "sha256-X7eLWC27qd5hqwLHsaVQ7WzEj6D3kRwGiUgZg+TuInU=";
vendorHash = "sha256-7bYfaDiTejpoec67um+RQmpejmLGUj+i6DOhoZwipHQ=";
hardeningDisable = [ "all" ];

postInstall = ''
Expand Down

0 comments on commit bada5da

Please sign in to comment.