From 199ad1d2872a29593f67f30d944a9ea77446ca4b Mon Sep 17 00:00:00 2001 From: Matthew Pfeiffer Date: Fri, 17 Jan 2025 11:52:16 -0800 Subject: [PATCH] Add metrics for sent messages. PiperOrigin-RevId: 716746491 --- .../src/inttesting/integrationtest/frr.go | 12 ++++++++++- .../components/prometheus/prometheus.go | 21 +++++++++++++++++++ fleetspeak/src/server/https/message_server.go | 3 +++ .../server/https/streaming_message_server.go | 3 +++ fleetspeak/src/server/stats.go | 3 +++ fleetspeak/src/server/stats/collector.go | 3 +++ 6 files changed, 44 insertions(+), 1 deletion(-) diff --git a/fleetspeak/src/inttesting/integrationtest/frr.go b/fleetspeak/src/inttesting/integrationtest/frr.go index e792b646e..5bbc374dd 100644 --- a/fleetspeak/src/inttesting/integrationtest/frr.go +++ b/fleetspeak/src/inttesting/integrationtest/frr.go @@ -62,7 +62,7 @@ const numClients = 5 // statsCounter is a simple server.StatsCollector. It only counts messages for the "FRR" service. type statsCounter struct { - messagesIngested, payloadBytesSaved, messagesProcessed, messagesErrored, messagesDropped, clientPolls, datastoreOperations int64 + messagesIngested, messagesSent, payloadBytesSaved, messagesProcessed, messagesErrored, messagesDropped, clientPolls, datastoreOperations int64 } func (c *statsCounter) MessageIngested(backlogged bool, m *fspb.Message, cd *db.ClientData) { @@ -71,6 +71,12 @@ func (c *statsCounter) MessageIngested(backlogged bool, m *fspb.Message, cd *db. } } +func (c *statsCounter) MessageSent(m *fspb.Message) { + if m.Source.ServiceName == "FRR" { + atomic.AddInt64(&c.messagesSent, 1) + } +} + func (c *statsCounter) MessageSaved(forClient bool, m *fspb.Message, cd *db.ClientData) { if m.Destination.ServiceName == "FRR" { savedPayloadBytes := 0 @@ -315,6 +321,10 @@ func FRRIntegrationTest(t *testing.T, ds db.Store, streaming bool) { t.Errorf("Got %v messages processed, expected %v <= x <= %v", stats.messagesProcessed, 20*numClients+1, 40*numClients+1) } + if stats.messagesSent < numClients || stats.messagesSent > 2*numClients { + t.Errorf("Got %v messages sent, expected %v <= x <= %v", stats.messagesSent, numClients, 2*numClients) + } + // Each client should have produced at least 20 and 40, and each should have been sorted at least once. if stats.messagesIngested < 20*numClients { t.Errorf("Got %v messages sorted, expected at least %v", stats.messagesIngested, 20*numClients+1) diff --git a/fleetspeak/src/server/components/prometheus/prometheus.go b/fleetspeak/src/server/components/prometheus/prometheus.go index 1c7c5f736..c0b14a4a2 100644 --- a/fleetspeak/src/server/components/prometheus/prometheus.go +++ b/fleetspeak/src/server/components/prometheus/prometheus.go @@ -46,6 +46,20 @@ var ( []string{"backlogged", "source_service", "destination_service", "message_type", "client_labels"}, ) + messagesSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "fleetspeak_messages_sent_total", + Help: "The total number of messages sent to clients by Fleetspeak server", + }, + []string{"source_service", "destination_service", "message_type", "client_labels"}, + ) + + messagesSentSize = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "fleetspeak_messages_sent_payload_bytes_size", + Help: "The total payload size of messages sent to clients by Fleetspeak server (in bytes)", + }, + []string{"source_service", "destination_service", "message_type", "client_labels"}, + ) + messagesSaved = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "fleetspeak_messages_saved_total", Help: "The total number of messages saved by Fleetspeak server", @@ -213,6 +227,13 @@ func (s StatsCollector) MessageIngested(backlogged bool, m *fspb.Message, cd *db messagesIngestedSize.WithLabelValues(strconv.FormatBool(backlogged), m.Source.ServiceName, m.Destination.ServiceName, m.MessageType, clientLabels(cd)).Add(float64(payloadBytes)) } +func (s StatsCollector) MessageSent(m *fspb.Message) { + messagesSent.WithLabelValues(m.Source.ServiceName, m.Destination.ServiceName, m.MessageType).Inc() + payloadBytes := calculatePayloadBytes(m) + messagesSentSize.WithLabelValues(m.Source.ServiceName, m.Destination.ServiceName, m.MessageType).Add(float64(payloadBytes)) + +} + func calculatePayloadBytes(m *fspb.Message) int { payloadBytes := 0 if m.Data != nil { diff --git a/fleetspeak/src/server/https/message_server.go b/fleetspeak/src/server/https/message_server.go index c63518074..caa58dee9 100644 --- a/fleetspeak/src/server/https/message_server.go +++ b/fleetspeak/src/server/https/message_server.go @@ -174,6 +174,9 @@ func (s messageServer) ServeHTTP(res http.ResponseWriter, req *http.Request) { http.Error(res, fmt.Sprintf("error preparing messages: %v", err), pi.Status) return } + for _, msg := range toSend.Messages { + s.fs.StatsCollector().MessageSent(msg) + } res.Header().Set("Content-Type", "application/octet-stream") res.WriteHeader(http.StatusOK) diff --git a/fleetspeak/src/server/https/streaming_message_server.go b/fleetspeak/src/server/https/streaming_message_server.go index 91622441a..2153e315e 100644 --- a/fleetspeak/src/server/https/streaming_message_server.go +++ b/fleetspeak/src/server/https/streaming_message_server.go @@ -566,6 +566,9 @@ func (m *streamManager) writeLoop() { if len(cd.Messages) > 0 { m.s.fs.StatsCollector().ClientPoll(pi) } + for _, msg := range cd.Messages { + m.s.fs.StatsCollector().MessageSent(msg) + } case <-m.ctx.Done(): return } diff --git a/fleetspeak/src/server/stats.go b/fleetspeak/src/server/stats.go index cdc1ccd53..6d342a22d 100644 --- a/fleetspeak/src/server/stats.go +++ b/fleetspeak/src/server/stats.go @@ -36,6 +36,9 @@ type noopStatsCollector struct{} func (s noopStatsCollector) MessageIngested(backlogged bool, m *fspb.Message, cd *db.ClientData) { } +func (s noopStatsCollector) MessageSent(m *fspb.Message) { +} + func (s noopStatsCollector) MessageSaved(forClient bool, m *fspb.Message, cd *db.ClientData) { } diff --git a/fleetspeak/src/server/stats/collector.go b/fleetspeak/src/server/stats/collector.go index 5c00808fd..5f0812e8b 100644 --- a/fleetspeak/src/server/stats/collector.go +++ b/fleetspeak/src/server/stats/collector.go @@ -90,6 +90,9 @@ type Collector interface { // a backlogged message from the datastore. MessageIngested(backlogged bool, m *fspb.Message, cd *db.ClientData) + // MessageSent is called when a message is sent to a client. + MessageSent(m *fspb.Message) + // MessageSaved is called when a message is first saved to the database. // m.Data will have been set to nil for fully processed messages. MessageSaved(forClient bool, m *fspb.Message, cd *db.ClientData)