Skip to content

Commit

Permalink
Add metrics for sent messages.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 716746491
  • Loading branch information
Spferical authored and copybara-github committed Jan 17, 2025
1 parent 789e3b3 commit 199ad1d
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 1 deletion.
12 changes: 11 additions & 1 deletion fleetspeak/src/inttesting/integrationtest/frr.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const numClients = 5

// statsCounter is a simple server.StatsCollector. It only counts messages for the "FRR" service.
type statsCounter struct {
messagesIngested, payloadBytesSaved, messagesProcessed, messagesErrored, messagesDropped, clientPolls, datastoreOperations int64
messagesIngested, messagesSent, payloadBytesSaved, messagesProcessed, messagesErrored, messagesDropped, clientPolls, datastoreOperations int64
}

func (c *statsCounter) MessageIngested(backlogged bool, m *fspb.Message, cd *db.ClientData) {
Expand All @@ -71,6 +71,12 @@ func (c *statsCounter) MessageIngested(backlogged bool, m *fspb.Message, cd *db.
}
}

func (c *statsCounter) MessageSent(m *fspb.Message) {
if m.Source.ServiceName == "FRR" {
atomic.AddInt64(&c.messagesSent, 1)
}
}

func (c *statsCounter) MessageSaved(forClient bool, m *fspb.Message, cd *db.ClientData) {
if m.Destination.ServiceName == "FRR" {
savedPayloadBytes := 0
Expand Down Expand Up @@ -315,6 +321,10 @@ func FRRIntegrationTest(t *testing.T, ds db.Store, streaming bool) {
t.Errorf("Got %v messages processed, expected %v <= x <= %v", stats.messagesProcessed, 20*numClients+1, 40*numClients+1)
}

if stats.messagesSent < numClients || stats.messagesSent > 2*numClients {
t.Errorf("Got %v messages sent, expected %v <= x <= %v", stats.messagesSent, numClients, 2*numClients)
}

// Each client should have produced at least 20 and 40, and each should have been sorted at least once.
if stats.messagesIngested < 20*numClients {
t.Errorf("Got %v messages sorted, expected at least %v", stats.messagesIngested, 20*numClients+1)
Expand Down
21 changes: 21 additions & 0 deletions fleetspeak/src/server/components/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ var (
[]string{"backlogged", "source_service", "destination_service", "message_type", "client_labels"},
)

messagesSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "fleetspeak_messages_sent_total",
Help: "The total number of messages sent to clients by Fleetspeak server",
},
[]string{"source_service", "destination_service", "message_type", "client_labels"},
)

messagesSentSize = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "fleetspeak_messages_sent_payload_bytes_size",
Help: "The total payload size of messages sent to clients by Fleetspeak server (in bytes)",
},
[]string{"source_service", "destination_service", "message_type", "client_labels"},
)

messagesSaved = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "fleetspeak_messages_saved_total",
Help: "The total number of messages saved by Fleetspeak server",
Expand Down Expand Up @@ -213,6 +227,13 @@ func (s StatsCollector) MessageIngested(backlogged bool, m *fspb.Message, cd *db
messagesIngestedSize.WithLabelValues(strconv.FormatBool(backlogged), m.Source.ServiceName, m.Destination.ServiceName, m.MessageType, clientLabels(cd)).Add(float64(payloadBytes))
}

func (s StatsCollector) MessageSent(m *fspb.Message) {
messagesSent.WithLabelValues(m.Source.ServiceName, m.Destination.ServiceName, m.MessageType).Inc()
payloadBytes := calculatePayloadBytes(m)
messagesSentSize.WithLabelValues(m.Source.ServiceName, m.Destination.ServiceName, m.MessageType).Add(float64(payloadBytes))

}

func calculatePayloadBytes(m *fspb.Message) int {
payloadBytes := 0
if m.Data != nil {
Expand Down
3 changes: 3 additions & 0 deletions fleetspeak/src/server/https/message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (s messageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
http.Error(res, fmt.Sprintf("error preparing messages: %v", err), pi.Status)
return
}
for _, msg := range toSend.Messages {
s.fs.StatsCollector().MessageSent(msg)
}

res.Header().Set("Content-Type", "application/octet-stream")
res.WriteHeader(http.StatusOK)
Expand Down
3 changes: 3 additions & 0 deletions fleetspeak/src/server/https/streaming_message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ func (m *streamManager) writeLoop() {
if len(cd.Messages) > 0 {
m.s.fs.StatsCollector().ClientPoll(pi)
}
for _, msg := range cd.Messages {
m.s.fs.StatsCollector().MessageSent(msg)
}
case <-m.ctx.Done():
return
}
Expand Down
3 changes: 3 additions & 0 deletions fleetspeak/src/server/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type noopStatsCollector struct{}
func (s noopStatsCollector) MessageIngested(backlogged bool, m *fspb.Message, cd *db.ClientData) {
}

func (s noopStatsCollector) MessageSent(m *fspb.Message) {
}

func (s noopStatsCollector) MessageSaved(forClient bool, m *fspb.Message, cd *db.ClientData) {
}

Expand Down
3 changes: 3 additions & 0 deletions fleetspeak/src/server/stats/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Collector interface {
// a backlogged message from the datastore.
MessageIngested(backlogged bool, m *fspb.Message, cd *db.ClientData)

// MessageSent is called when a message is sent to a client.
MessageSent(m *fspb.Message)

// MessageSaved is called when a message is first saved to the database.
// m.Data will have been set to nil for fully processed messages.
MessageSaved(forClient bool, m *fspb.Message, cd *db.ClientData)
Expand Down

0 comments on commit 199ad1d

Please sign in to comment.