Skip to content

Commit

Permalink
Add metrics test
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilippMatthes committed Apr 29, 2024
1 parent 1b8e1e6 commit d5225a8
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 24 deletions.
8 changes: 4 additions & 4 deletions monitor/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ import (
)

// Interface to overwrite for testing purposes.
var getAllThings = things.Things.Range
var getAllThingsForMap = things.Things.Range

// Interface to overwrite for testing purposes.
var getCurrentPrediction = predictions.GetCurrentPrediction
var getCurrentPredictionForMap = predictions.GetCurrentPrediction

// Write geojson data that can be used to visualize the predictions.
// The geojson file is written to the static directory.
func WriteGeoJSONMap() {
// Write the geojson to the file.
locationFeatureCollection := geojson.NewFeatureCollection() // Locations of traffic lights.
laneFeatureCollection := geojson.NewFeatureCollection() // Lanes of traffic lights.
getAllThings(func(key, value interface{}) bool {
getAllThingsForMap(func(key, value interface{}) bool {
thingName := key.(string)
thing := value.(things.Thing)

Expand All @@ -39,7 +39,7 @@ func WriteGeoJSONMap() {
lat, lng := coordinate[1], coordinate[0]

// Check if there is a prediction for this thing.
prediction, predictionOk := getCurrentPrediction(thingName)
prediction, predictionOk := getCurrentPredictionForMap(thingName)
// Build the properties.
properties := make(map[string]interface{})
if predictionOk {
Expand Down
4 changes: 2 additions & 2 deletions monitor/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestWriteGeoJSONMap(t *testing.T) {
},
},
}
getAllThings = func(callback func(key, value interface{}) bool) {
getAllThingsForMap = func(callback func(key, value interface{}) bool) {
callback(
"1337_1", things.Thing{
Name: "1337_1",
Expand All @@ -71,7 +71,7 @@ func TestWriteGeoJSONMap(t *testing.T) {
ThenQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100},
ReferenceTime: time.Unix(0, 0),
}
getCurrentPrediction = func(thingName string) (predictions.Prediction, bool) {
getCurrentPredictionForMap = func(thingName string) (predictions.Prediction, bool) {
return mockPrediction, true
}

Expand Down
76 changes: 58 additions & 18 deletions monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,48 @@ type MetricsEntry struct {
// This is to gobally protect concurrent access to the same file.
var metricsFileLock = &sync.Mutex{}

// Interfaces to overwrite for testing purposes.
var getAllThingsForMetrics = things.Things.Range
var getCurrentPrimarySignalForMetrics = observations.GetCurrentPrimarySignal
var getCurrentProgramForMetrics = observations.GetCurrentProgram
var getCurrentPredictionForMetrics = predictions.GetCurrentPrediction
var getLastPredictionTimeForMetrics = predictions.GetLastPredictionTime
var getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) {
// Lock for async access.
observations.ObservationsReceivedByTopicLock.RLock()
defer observations.ObservationsReceivedByTopicLock.RUnlock()
for dsType, count := range observations.ObservationsReceivedByTopic {
callback(dsType, count)
}
}
var getObservationsReceived = func() uint64 {
return observations.ObservationsReceived
}
var getObservationsProcessed = func() uint64 {
return observations.ObservationsProcessed
}
var getObservationsDiscarded = func() uint64 {
return observations.ObservationsDiscarded
}
var getHistoryUpdatesRequested = func() uint64 {
return histories.HistoryUpdatesRequested
}
var getHistoryUpdatesProcessed = func() uint64 {
return histories.HistoryUpdatesProcessed
}
var getHistoryUpdatesDiscarded = func() uint64 {
return histories.HistoryUpdatesDiscarded
}
var getPredictionsChecked = func() uint64 {
return predictions.PredictionsChecked
}
var getPredictionsPublished = func() uint64 {
return predictions.PredictionsPublished
}
var getPredictionsDiscarded = func() uint64 {
return predictions.PredictionsDiscarded
}

func generateMetrics() Metrics {
entries := []MetricsEntry{}

Expand All @@ -49,20 +91,20 @@ func generateMetrics() Metrics {
var delaySum float64
var delayCount int

things.Things.Range(func(key, _ interface{}) bool {
getAllThingsForMetrics(func(key, _ interface{}) bool {
thingName := key.(string)
entry := MetricsEntry{ThingName: thingName}
defer func() { entries = append(entries, entry) }()

// Get the current state of the thing.
primarySignalObservation, ok := observations.GetCurrentPrimarySignal(thingName)
primarySignalObservation, ok := getCurrentPrimarySignalForMetrics(thingName)
if !ok {
return true
}
entry.ActualColor = &primarySignalObservation.Result

// Get the last running program of the thing.
if programObservation, ok := observations.GetCurrentProgram(thingName); ok {
if programObservation, ok := getCurrentProgramForMetrics(thingName); ok {
entry.Program = &programObservation.Result
}

Expand All @@ -78,7 +120,7 @@ func generateMetrics() Metrics {
// compare a delayed prediction with a delayed observation.
nowWithDelay := time.Now().Add(-timeDelay)

if prediction, ok := predictions.GetCurrentPrediction(thingName); ok {
if prediction, ok := getCurrentPredictionForMetrics(thingName); ok {
delayedTimeInPrediction := int(math.Abs(
nowWithDelay.Sub(prediction.ReferenceTime).Seconds(),
))
Expand Down Expand Up @@ -134,7 +176,7 @@ func generateMetrics() Metrics {
}

// Get the age of the prediction.
if lastPredictionTime, ok := predictions.GetLastPredictionTime(thingName); ok {
if lastPredictionTime, ok := getLastPredictionTimeForMetrics(thingName); ok {
age := int(time.Since(lastPredictionTime).Abs().Seconds())
entry.PredictionAge = &age
}
Expand Down Expand Up @@ -181,24 +223,22 @@ func generatePrometheusMetrics(m Metrics) []string {
lines = append(lines, fmt.Sprintf("predictor_mean_msg_delay %f", m.MeanMsgDelay))

// Add metrics for the observations.
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"received\"} %d", observations.ObservationsReceived))
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"processed\"} %d", observations.ObservationsProcessed))
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"discarded\"} %d", observations.ObservationsDiscarded))
observations.ObservationsReceivedByTopicLock.RLock()
for dsType, count := range observations.ObservationsReceivedByTopic {
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"received\"} %d", getObservationsReceived()))
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"processed\"} %d", getObservationsProcessed()))
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"discarded\"} %d", getObservationsDiscarded()))
getObservationsReceivedByTopic(func(dsType string, count uint64) {
lines = append(lines, fmt.Sprintf("predictor_observations_by_topic{topic=\"%s\"} %d", dsType, count))
}
observations.ObservationsReceivedByTopicLock.RUnlock()
})

// Add metrics for the histories.
lines = append(lines, fmt.Sprintf("predictor_histories{action=\"requested\"} %d", histories.HistoryUpdatesRequested))
lines = append(lines, fmt.Sprintf("predictor_histories{action=\"processed\"} %d", histories.HistoryUpdatesProcessed))
lines = append(lines, fmt.Sprintf("predictor_histories{action=\"discarded\"} %d", histories.HistoryUpdatesDiscarded))
lines = append(lines, fmt.Sprintf("predictor_histories{action=\"requested\"} %d", getHistoryUpdatesRequested()))
lines = append(lines, fmt.Sprintf("predictor_histories{action=\"processed\"} %d", getHistoryUpdatesProcessed()))
lines = append(lines, fmt.Sprintf("predictor_histories{action=\"discarded\"} %d", getHistoryUpdatesDiscarded()))

// Add metrics for the predictions.
lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"checked\"} %d", predictions.PredictionsChecked))
lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"published\"} %d", predictions.PredictionsPublished))
lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"discarded\"} %d", predictions.PredictionsDiscarded))
lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"checked\"} %d", getPredictionsChecked()))
lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"published\"} %d", getPredictionsPublished()))
lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"discarded\"} %d", getPredictionsDiscarded()))

for bucket, value := range m.Deviations {
// Add with trailing 0s to make the graph look nicer.
Expand Down
176 changes: 176 additions & 0 deletions monitor/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package monitor

import (
"predictor/observations"
"predictor/predictions"
"predictor/things"
"strconv"
"strings"
"testing"
"time"
)

func prepareMocks() {
getAllThingsForMetrics = func(f func(key, value interface{}) bool) {
f("1337_1", things.Thing{}) // Thing can be empty, only name is needed
}
getCurrentPrimarySignalForMetrics = func(thingName string) (observations.Observation, bool) {
return observations.Observation{
Result: 4, // RedAmber
ReceivedTime: time.Unix(1, 0),
PhenomenonTime: time.Unix(0, 0), // 1 second delay
}, true
}
getCurrentProgramForMetrics = func(thingName string) (observations.Observation, bool) {
return observations.Observation{
Result: 10, // Program 10
ReceivedTime: time.Unix(1, 0),
PhenomenonTime: time.Unix(0, 0), // 1 second delay
}, true
}
getCurrentPredictionForMetrics = func(thingName string) (predictions.Prediction, bool) {
return predictions.Prediction{
ThingName: "1337_1",
Now: []byte{1, 1, 1, 1, 1, 3, 3, 3, 3, 3},
NowQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100},
Then: []byte{4, 4, 4, 4, 4, 4, 4, 4, 4, 4},
ThenQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100},
// Should point to the then array (now is >50yrs after)
// -> should result in a 0 seconds deviation from the actual data
ReferenceTime: time.Unix(0, 0),
}, true
}
getLastPredictionTimeForMetrics = func(thingName string) (time.Time, bool) {
return time.Unix(0, 0), true
}
getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) {
callback("primary_signal", 1)
callback("signal_program", 1)
callback("cycle_second", 1)
callback("detector_bike", 0)
callback("detector_car", 0)
}
getObservationsReceived = func() uint64 {
return 1
}
getObservationsProcessed = func() uint64 {
return 1
}
getObservationsDiscarded = func() uint64 {
return 1
}
getHistoryUpdatesRequested = func() uint64 {
return 1
}
getHistoryUpdatesProcessed = func() uint64 {
return 1
}
getHistoryUpdatesDiscarded = func() uint64 {
return 1
}
getPredictionsChecked = func() uint64 {
return 1
}
getPredictionsPublished = func() uint64 {
return 1
}
getPredictionsDiscarded = func() uint64 {
return 1
}
}

func TestGenerateMetrics(t *testing.T) {
prepareMocks()
metrics := generateMetrics()

if len(metrics.Entries) != 1 {
t.Errorf("metrics has more or fewer entries than expected")
t.FailNow()
}
entry := metrics.Entries[0]
// Check the entry
if entry.ActualColor == nil || entry.PredictedColor == nil {
t.Errorf("actual color or predicted color is nil")
t.FailNow()
}
if *entry.ActualColor != 4 || *entry.PredictedColor != 4 {
t.Errorf("unexpected prediction color and actual color")
t.FailNow()
}

// Check the metrics
if metrics.Deviations["0"] != 1 {
t.Errorf("deviations are not correct: %v", metrics.Deviations)
t.FailNow()
}
if metrics.MeanMsgDelay != 1 {
t.Errorf("unexpected message delay: %f", metrics.MeanMsgDelay)
t.FailNow()
}
if metrics.Correct != 1 || metrics.Verifiable != 1 {
t.Errorf("prediction for the given example should be both verifiable and correct")
t.FailNow()
}
}

func TestGeneratePrometheusMetrics(t *testing.T) {
prepareMocks()
baseMetrics := generateMetrics()
prometheusMetrics := generatePrometheusMetrics(baseMetrics)

var search = func(metricName string, expectedValue interface{}) bool {
for _, metric := range prometheusMetrics {
if !strings.Contains(metric, metricName) {
continue
}
// Found the metric we are searching for
strParts := strings.Split(metric, " ")
if len(strParts) < 2 {
t.Errorf("something is wrong with the metric's formatting: %s", metric)
t.FailNow()
}
valueAsStr := strParts[len(strParts)-1]
var actualValue interface{}
// Try to parse as int, then as float
actualValue, err := strconv.Atoi(valueAsStr)
if err != nil {
actualValue, err = strconv.ParseFloat(valueAsStr, 64)
if err != nil {
t.Errorf("could not parse value: %s", valueAsStr)
t.FailNow()
}
}
// Since we only compare floats or ints, its ok to use ==
return expectedValue == actualValue
}
return false
}

if !search("predictor_verifiable", 1) || !search("predictor_correct", 1) {
t.Errorf("prediction for the given example should be both verifiable and correct")
t.FailNow()
}
if !search("predictor_mean_msg_delay", 1.0) {
t.Errorf("unexpected message delay")
t.FailNow()
}
if !search("predictor_observations{action=\"received\"}", 1) || //
!search("predictor_observations{action=\"processed\"}", 1) || //
!search("predictor_observations{action=\"discarded\"}", 1) {
t.Errorf("unexpected metrics value")
t.FailNow()
}
if !search("predictor_histories{action=\"requested\"}", 1) || //
!search("predictor_histories{action=\"processed\"}", 1) || //
!search("predictor_histories{action=\"discarded\"}", 1) || //
!search("predictor_predictions{action=\"checked\"}", 1) || //
!search("predictor_predictions{action=\"published\"}", 1) || //
!search("predictor_predictions{action=\"discarded\"}", 1) {
t.Errorf("unexpected metrics value")
t.FailNow()
}
if !search("predictor_deviation{bucket=\"00\"}", 1) {
t.Errorf("unexpected metrics value")
t.FailNow()
}
}

0 comments on commit d5225a8

Please sign in to comment.