diff --git a/nil/internal/consensus/ibft/ibft.go b/nil/internal/consensus/ibft/ibft.go index 0885f392d..c8419ded3 100644 --- a/nil/internal/consensus/ibft/ibft.go +++ b/nil/internal/consensus/ibft/ibft.go @@ -50,6 +50,7 @@ type backendIBFT struct { transport transport signer *Signer validatorsCache *validatorsMap + mh *MetricsHandler } var _ core.Backend = &backendIBFT{} @@ -63,6 +64,9 @@ func (i *backendIBFT) unmarshalProposal(raw []byte) (*execution.ProposalSSZ, err } func (i *backendIBFT) BuildProposal(view *protoIBFT.View) []byte { + i.mh.StartBuildProposalMeasurement(i.transportCtx, view.Round) + defer i.mh.EndBuildProposalMeasurement(i.transportCtx) + proposal, err := i.validator.BuildProposal(i.ctx) if err != nil { i.logger.Error().Err(err).Msg("failed to build proposal") @@ -142,6 +146,9 @@ func (i *backendIBFT) buildSignature(committedSeals []*messages.CommittedSeal, h } func (i *backendIBFT) InsertProposal(proposal *protoIBFT.Proposal, committedSeals []*messages.CommittedSeal) { + i.mh.StartBuildProposalMeasurement(i.transportCtx, proposal.Round) + defer i.mh.EndBuildProposalMeasurement(i.transportCtx) + proposalBlock, err := i.unmarshalProposal(proposal.RawProposal) if err != nil { i.logger.Error().Err(err).Msg("failed to unmarshal proposal") @@ -186,12 +193,19 @@ func (i *backendIBFT) isActiveValidator() bool { return true } -func NewConsensus(cfg *ConsensusParams) *backendIBFT { +func NewConsensus(cfg *ConsensusParams) (*backendIBFT, error) { logger := logging.NewLogger("consensus").With().Stringer(logging.FieldShardId, cfg.ShardId).Logger() l := &ibftLogger{ logger: logger.With().CallerWithSkipFrameCount(3).Logger(), } + const mhName = "github.com/NilFoundation/nil/nil/internal/consensus" + mh, err := NewMetricsHandler(mhName, cfg.ShardId) + if err != nil { + logger.Error().Err(err).Msg("Failed to create metrics handler") + return nil, err + } + backend := &backendIBFT{ shardId: cfg.ShardId, validator: cfg.Validator, @@ -199,9 +213,10 @@ func NewConsensus(cfg *ConsensusParams) *backendIBFT { nm: cfg.NetManager, signer: NewSigner(cfg.PrivateKey), validatorsCache: newValidatorsMap(cfg.Db, cfg.ShardId), + mh: mh, } backend.consensus = core.NewIBFT(l, backend, backend) - return backend + return backend, nil } func (i *backendIBFT) Init(ctx context.Context) error { @@ -223,14 +238,19 @@ func (i *backendIBFT) GetVotingPowers(height uint64) (map[string]*big.Int, error return nil, err } - result := make(map[string]*big.Int, len(validators)) + count := len(validators) + result := make(map[string]*big.Int, count) for _, v := range validators { result[string(v.PublicKey[:])] = big.NewInt(1) } + i.mh.SetValidatorsCount(i.transportCtx, count) return result, nil } func (i *backendIBFT) RunSequence(ctx context.Context, height uint64) error { + i.mh.StartSequenceMeasurement(ctx, height) + defer i.mh.EndSequenceMeasurement(ctx) + i.ctx = ctx i.consensus.RunSequence(ctx, height) return nil diff --git a/nil/internal/consensus/ibft/metrics.go b/nil/internal/consensus/ibft/metrics.go new file mode 100644 index 000000000..317265375 --- /dev/null +++ b/nil/internal/consensus/ibft/metrics.go @@ -0,0 +1,127 @@ +package ibft + +import ( + "context" + + "github.com/NilFoundation/nil/nil/internal/telemetry" + "github.com/NilFoundation/nil/nil/internal/telemetry/telattr" + "github.com/NilFoundation/nil/nil/internal/types" + "go.opentelemetry.io/otel/metric" +) + +type MetricsHandler struct { + option metric.MeasurementOption + + buildProposalMeasurer *telemetry.Measurer + insertProposalMeasurer *telemetry.Measurer + sequenceMeasurer *telemetry.Measurer + + height telemetry.Histogram + round telemetry.Histogram + validatorsCount telemetry.Histogram + sentMessages telemetry.Counter + receivedMessages telemetry.Counter +} + +func NewMetricsHandler(name string, shardId types.ShardId) (*MetricsHandler, error) { + meter := telemetry.NewMeter(name) + buildProposalMeasurer, err := telemetry.NewMeasurer( + meter, "build_proposal", telattr.ShardId(shardId), + ) + if err != nil { + return nil, err + } + + insertProposalMeasurer, err := telemetry.NewMeasurer( + meter, "insert_proposal", telattr.ShardId(shardId), + ) + if err != nil { + return nil, err + } + + sequenceMeasurer, err := telemetry.NewMeasurer( + meter, "sequence", telattr.ShardId(shardId), + ) + if err != nil { + return nil, err + } + + handler := &MetricsHandler{ + buildProposalMeasurer: buildProposalMeasurer, + insertProposalMeasurer: insertProposalMeasurer, + sequenceMeasurer: sequenceMeasurer, + option: telattr.With(telattr.ShardId(shardId)), + } + + if err := handler.initMetrics(meter); err != nil { + return nil, err + } + + return handler, nil +} + +func (mh *MetricsHandler) initMetrics(meter metric.Meter) error { + var err error + + if mh.validatorsCount, err = meter.Int64Histogram("validators_count"); err != nil { + return err + } + + if mh.height, err = meter.Int64Histogram("height"); err != nil { + return err + } + + if mh.round, err = meter.Int64Histogram("round"); err != nil { + return err + } + + if mh.sentMessages, err = meter.Int64Counter("sent_messages"); err != nil { + return err + } + + if mh.receivedMessages, err = meter.Int64Counter("received_messages"); err != nil { + return err + } + + return nil +} + +func (mh *MetricsHandler) StartBuildProposalMeasurement(ctx context.Context, round uint64) { + mh.round.Record(ctx, int64(round), mh.option) + mh.buildProposalMeasurer.Restart() +} + +func (mh *MetricsHandler) EndBuildProposalMeasurement(ctx context.Context) { + mh.buildProposalMeasurer.Measure(ctx) +} + +func (mh *MetricsHandler) StartInsertProposalMeasurement(ctx context.Context, round uint64) { + mh.round.Record(ctx, int64(round), mh.option) + mh.insertProposalMeasurer.Restart() +} + +func (mh *MetricsHandler) EndInsertProposalMeasurement(ctx context.Context, height, round uint64) { + mh.insertProposalMeasurer.Measure(ctx) +} + +func (mh *MetricsHandler) StartSequenceMeasurement(ctx context.Context, height uint64) { + mh.height.Record(ctx, int64(height), mh.option) + mh.round.Record(ctx, 0, mh.option) + mh.sequenceMeasurer.Restart() +} + +func (mh *MetricsHandler) EndSequenceMeasurement(ctx context.Context) { + mh.sequenceMeasurer.Measure(ctx) +} + +func (mh *MetricsHandler) SetValidatorsCount(ctx context.Context, count int) { + mh.validatorsCount.Record(ctx, int64(count), mh.option) +} + +func (mh *MetricsHandler) IncSentMessages(ctx context.Context, t string) { + mh.sentMessages.Add(ctx, 1, mh.option, telattr.With(telattr.Type(t))) +} + +func (mh *MetricsHandler) IncReceivedMessages(ctx context.Context, t string) { + mh.receivedMessages.Add(ctx, 1, mh.option, telattr.With(telattr.Type(t))) +} diff --git a/nil/internal/consensus/ibft/transport.go b/nil/internal/consensus/ibft/transport.go index 672b92bd9..3bc9b04a4 100644 --- a/nil/internal/consensus/ibft/transport.go +++ b/nil/internal/consensus/ibft/transport.go @@ -32,6 +32,7 @@ func (i *backendIBFT) Multicast(msg *proto.IbftMessage) { if err := i.transport.Multicast(msg); err != nil { i.logger.Error().Err(err).Msg("Fail to gossip") } + i.mh.IncSentMessages(i.transportCtx, msg.Type.String()) } func (i *backendIBFT) getProto() string { @@ -90,6 +91,7 @@ func (i *backendIBFT) setupTransport(ctx context.Context) error { event.Msg("Validator message received") i.consensus.AddMessage(msg) + i.mh.IncReceivedMessages(ctx, msg.Type.String()) } } }(ctx) diff --git a/nil/internal/telemetry/internal/resource.go b/nil/internal/telemetry/internal/resource.go index 48ed1eef6..e0af6f366 100644 --- a/nil/internal/telemetry/internal/resource.go +++ b/nil/internal/telemetry/internal/resource.go @@ -2,7 +2,7 @@ package internal import ( "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + semconv "go.opentelemetry.io/otel/semconv/v1.27.0" ) func NewResource(config *Config) (*resource.Resource, error) { diff --git a/nil/internal/telemetry/telattr/attributes.go b/nil/internal/telemetry/telattr/attributes.go index 8112fb202..eba607643 100644 --- a/nil/internal/telemetry/telattr/attributes.go +++ b/nil/internal/telemetry/telattr/attributes.go @@ -27,3 +27,7 @@ func ProtocolId(id protocol.ID) attribute.KeyValue { func Topic(topic string) attribute.KeyValue { return attribute.String(logging.FieldTopic, topic) } + +func Type(t string) attribute.KeyValue { + return attribute.String(logging.FieldType, t) +} diff --git a/nil/services/nilservice/service.go b/nil/services/nilservice/service.go index 590d04681..877c1cbbb 100644 --- a/nil/services/nilservice/service.go +++ b/nil/services/nilservice/service.go @@ -601,13 +601,16 @@ func createShards( return nil, err } - consensus := ibft.NewConsensus(&ibft.ConsensusParams{ + consensus, err := ibft.NewConsensus(&ibft.ConsensusParams{ ShardId: shardId, Db: database, Validator: validators[i], NetManager: networkManager, PrivateKey: pKey, }) + if err != nil { + return nil, err + } collator := collate.NewScheduler(validators[i], database, consensus, networkManager) funcs = append(funcs, func(ctx context.Context) error { diff --git a/nix/nil.nix b/nix/nil.nix index 235124921..b8a73de5b 100644 --- a/nix/nil.nix +++ b/nix/nil.nix @@ -49,7 +49,7 @@ buildGo124Module rec { ]; # to obtain run `nix build` with vendorHash = ""; - vendorHash = "sha256-X7eLWC27qd5hqwLHsaVQ7WzEj6D3kRwGiUgZg+TuInU="; + vendorHash = "sha256-7bYfaDiTejpoec67um+RQmpejmLGUj+i6DOhoZwipHQ="; hardeningDisable = [ "all" ]; postInstall = ''