From d5225a81cc05fb78244e90ffebb2cd493b96ef67 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Mon, 29 Apr 2024 16:07:37 +0200 Subject: [PATCH] Add metrics test --- monitor/map.go | 8 +- monitor/map_test.go | 4 +- monitor/metrics.go | 76 +++++++++++++---- monitor/metrics_test.go | 176 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 240 insertions(+), 24 deletions(-) create mode 100644 monitor/metrics_test.go diff --git a/monitor/map.go b/monitor/map.go index f50a7e5..0a2771c 100644 --- a/monitor/map.go +++ b/monitor/map.go @@ -15,10 +15,10 @@ import ( ) // Interface to overwrite for testing purposes. -var getAllThings = things.Things.Range +var getAllThingsForMap = things.Things.Range // Interface to overwrite for testing purposes. -var getCurrentPrediction = predictions.GetCurrentPrediction +var getCurrentPredictionForMap = predictions.GetCurrentPrediction // Write geojson data that can be used to visualize the predictions. // The geojson file is written to the static directory. @@ -26,7 +26,7 @@ func WriteGeoJSONMap() { // Write the geojson to the file. locationFeatureCollection := geojson.NewFeatureCollection() // Locations of traffic lights. laneFeatureCollection := geojson.NewFeatureCollection() // Lanes of traffic lights. - getAllThings(func(key, value interface{}) bool { + getAllThingsForMap(func(key, value interface{}) bool { thingName := key.(string) thing := value.(things.Thing) @@ -39,7 +39,7 @@ func WriteGeoJSONMap() { lat, lng := coordinate[1], coordinate[0] // Check if there is a prediction for this thing. - prediction, predictionOk := getCurrentPrediction(thingName) + prediction, predictionOk := getCurrentPredictionForMap(thingName) // Build the properties. properties := make(map[string]interface{}) if predictionOk { diff --git a/monitor/map_test.go b/monitor/map_test.go index 899b7c4..a75692f 100644 --- a/monitor/map_test.go +++ b/monitor/map_test.go @@ -46,7 +46,7 @@ func TestWriteGeoJSONMap(t *testing.T) { }, }, } - getAllThings = func(callback func(key, value interface{}) bool) { + getAllThingsForMap = func(callback func(key, value interface{}) bool) { callback( "1337_1", things.Thing{ Name: "1337_1", @@ -71,7 +71,7 @@ func TestWriteGeoJSONMap(t *testing.T) { ThenQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}, ReferenceTime: time.Unix(0, 0), } - getCurrentPrediction = func(thingName string) (predictions.Prediction, bool) { + getCurrentPredictionForMap = func(thingName string) (predictions.Prediction, bool) { return mockPrediction, true } diff --git a/monitor/metrics.go b/monitor/metrics.go index ba250be..c6f8bb4 100644 --- a/monitor/metrics.go +++ b/monitor/metrics.go @@ -40,6 +40,48 @@ type MetricsEntry struct { // This is to gobally protect concurrent access to the same file. var metricsFileLock = &sync.Mutex{} +// Interfaces to overwrite for testing purposes. +var getAllThingsForMetrics = things.Things.Range +var getCurrentPrimarySignalForMetrics = observations.GetCurrentPrimarySignal +var getCurrentProgramForMetrics = observations.GetCurrentProgram +var getCurrentPredictionForMetrics = predictions.GetCurrentPrediction +var getLastPredictionTimeForMetrics = predictions.GetLastPredictionTime +var getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) { + // Lock for async access. + observations.ObservationsReceivedByTopicLock.RLock() + defer observations.ObservationsReceivedByTopicLock.RUnlock() + for dsType, count := range observations.ObservationsReceivedByTopic { + callback(dsType, count) + } +} +var getObservationsReceived = func() uint64 { + return observations.ObservationsReceived +} +var getObservationsProcessed = func() uint64 { + return observations.ObservationsProcessed +} +var getObservationsDiscarded = func() uint64 { + return observations.ObservationsDiscarded +} +var getHistoryUpdatesRequested = func() uint64 { + return histories.HistoryUpdatesRequested +} +var getHistoryUpdatesProcessed = func() uint64 { + return histories.HistoryUpdatesProcessed +} +var getHistoryUpdatesDiscarded = func() uint64 { + return histories.HistoryUpdatesDiscarded +} +var getPredictionsChecked = func() uint64 { + return predictions.PredictionsChecked +} +var getPredictionsPublished = func() uint64 { + return predictions.PredictionsPublished +} +var getPredictionsDiscarded = func() uint64 { + return predictions.PredictionsDiscarded +} + func generateMetrics() Metrics { entries := []MetricsEntry{} @@ -49,20 +91,20 @@ func generateMetrics() Metrics { var delaySum float64 var delayCount int - things.Things.Range(func(key, _ interface{}) bool { + getAllThingsForMetrics(func(key, _ interface{}) bool { thingName := key.(string) entry := MetricsEntry{ThingName: thingName} defer func() { entries = append(entries, entry) }() // Get the current state of the thing. - primarySignalObservation, ok := observations.GetCurrentPrimarySignal(thingName) + primarySignalObservation, ok := getCurrentPrimarySignalForMetrics(thingName) if !ok { return true } entry.ActualColor = &primarySignalObservation.Result // Get the last running program of the thing. - if programObservation, ok := observations.GetCurrentProgram(thingName); ok { + if programObservation, ok := getCurrentProgramForMetrics(thingName); ok { entry.Program = &programObservation.Result } @@ -78,7 +120,7 @@ func generateMetrics() Metrics { // compare a delayed prediction with a delayed observation. nowWithDelay := time.Now().Add(-timeDelay) - if prediction, ok := predictions.GetCurrentPrediction(thingName); ok { + if prediction, ok := getCurrentPredictionForMetrics(thingName); ok { delayedTimeInPrediction := int(math.Abs( nowWithDelay.Sub(prediction.ReferenceTime).Seconds(), )) @@ -134,7 +176,7 @@ func generateMetrics() Metrics { } // Get the age of the prediction. - if lastPredictionTime, ok := predictions.GetLastPredictionTime(thingName); ok { + if lastPredictionTime, ok := getLastPredictionTimeForMetrics(thingName); ok { age := int(time.Since(lastPredictionTime).Abs().Seconds()) entry.PredictionAge = &age } @@ -181,24 +223,22 @@ func generatePrometheusMetrics(m Metrics) []string { lines = append(lines, fmt.Sprintf("predictor_mean_msg_delay %f", m.MeanMsgDelay)) // Add metrics for the observations. - lines = append(lines, fmt.Sprintf("predictor_observations{action=\"received\"} %d", observations.ObservationsReceived)) - lines = append(lines, fmt.Sprintf("predictor_observations{action=\"processed\"} %d", observations.ObservationsProcessed)) - lines = append(lines, fmt.Sprintf("predictor_observations{action=\"discarded\"} %d", observations.ObservationsDiscarded)) - observations.ObservationsReceivedByTopicLock.RLock() - for dsType, count := range observations.ObservationsReceivedByTopic { + lines = append(lines, fmt.Sprintf("predictor_observations{action=\"received\"} %d", getObservationsReceived())) + lines = append(lines, fmt.Sprintf("predictor_observations{action=\"processed\"} %d", getObservationsProcessed())) + lines = append(lines, fmt.Sprintf("predictor_observations{action=\"discarded\"} %d", getObservationsDiscarded())) + getObservationsReceivedByTopic(func(dsType string, count uint64) { lines = append(lines, fmt.Sprintf("predictor_observations_by_topic{topic=\"%s\"} %d", dsType, count)) - } - observations.ObservationsReceivedByTopicLock.RUnlock() + }) // Add metrics for the histories. - lines = append(lines, fmt.Sprintf("predictor_histories{action=\"requested\"} %d", histories.HistoryUpdatesRequested)) - lines = append(lines, fmt.Sprintf("predictor_histories{action=\"processed\"} %d", histories.HistoryUpdatesProcessed)) - lines = append(lines, fmt.Sprintf("predictor_histories{action=\"discarded\"} %d", histories.HistoryUpdatesDiscarded)) + lines = append(lines, fmt.Sprintf("predictor_histories{action=\"requested\"} %d", getHistoryUpdatesRequested())) + lines = append(lines, fmt.Sprintf("predictor_histories{action=\"processed\"} %d", getHistoryUpdatesProcessed())) + lines = append(lines, fmt.Sprintf("predictor_histories{action=\"discarded\"} %d", getHistoryUpdatesDiscarded())) // Add metrics for the predictions. - lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"checked\"} %d", predictions.PredictionsChecked)) - lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"published\"} %d", predictions.PredictionsPublished)) - lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"discarded\"} %d", predictions.PredictionsDiscarded)) + lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"checked\"} %d", getPredictionsChecked())) + lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"published\"} %d", getPredictionsPublished())) + lines = append(lines, fmt.Sprintf("predictor_predictions{action=\"discarded\"} %d", getPredictionsDiscarded())) for bucket, value := range m.Deviations { // Add with trailing 0s to make the graph look nicer. diff --git a/monitor/metrics_test.go b/monitor/metrics_test.go new file mode 100644 index 0000000..8862d75 --- /dev/null +++ b/monitor/metrics_test.go @@ -0,0 +1,176 @@ +package monitor + +import ( + "predictor/observations" + "predictor/predictions" + "predictor/things" + "strconv" + "strings" + "testing" + "time" +) + +func prepareMocks() { + getAllThingsForMetrics = func(f func(key, value interface{}) bool) { + f("1337_1", things.Thing{}) // Thing can be empty, only name is needed + } + getCurrentPrimarySignalForMetrics = func(thingName string) (observations.Observation, bool) { + return observations.Observation{ + Result: 4, // RedAmber + ReceivedTime: time.Unix(1, 0), + PhenomenonTime: time.Unix(0, 0), // 1 second delay + }, true + } + getCurrentProgramForMetrics = func(thingName string) (observations.Observation, bool) { + return observations.Observation{ + Result: 10, // Program 10 + ReceivedTime: time.Unix(1, 0), + PhenomenonTime: time.Unix(0, 0), // 1 second delay + }, true + } + getCurrentPredictionForMetrics = func(thingName string) (predictions.Prediction, bool) { + return predictions.Prediction{ + ThingName: "1337_1", + Now: []byte{1, 1, 1, 1, 1, 3, 3, 3, 3, 3}, + NowQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}, + Then: []byte{4, 4, 4, 4, 4, 4, 4, 4, 4, 4}, + ThenQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}, + // Should point to the then array (now is >50yrs after) + // -> should result in a 0 seconds deviation from the actual data + ReferenceTime: time.Unix(0, 0), + }, true + } + getLastPredictionTimeForMetrics = func(thingName string) (time.Time, bool) { + return time.Unix(0, 0), true + } + getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) { + callback("primary_signal", 1) + callback("signal_program", 1) + callback("cycle_second", 1) + callback("detector_bike", 0) + callback("detector_car", 0) + } + getObservationsReceived = func() uint64 { + return 1 + } + getObservationsProcessed = func() uint64 { + return 1 + } + getObservationsDiscarded = func() uint64 { + return 1 + } + getHistoryUpdatesRequested = func() uint64 { + return 1 + } + getHistoryUpdatesProcessed = func() uint64 { + return 1 + } + getHistoryUpdatesDiscarded = func() uint64 { + return 1 + } + getPredictionsChecked = func() uint64 { + return 1 + } + getPredictionsPublished = func() uint64 { + return 1 + } + getPredictionsDiscarded = func() uint64 { + return 1 + } +} + +func TestGenerateMetrics(t *testing.T) { + prepareMocks() + metrics := generateMetrics() + + if len(metrics.Entries) != 1 { + t.Errorf("metrics has more or fewer entries than expected") + t.FailNow() + } + entry := metrics.Entries[0] + // Check the entry + if entry.ActualColor == nil || entry.PredictedColor == nil { + t.Errorf("actual color or predicted color is nil") + t.FailNow() + } + if *entry.ActualColor != 4 || *entry.PredictedColor != 4 { + t.Errorf("unexpected prediction color and actual color") + t.FailNow() + } + + // Check the metrics + if metrics.Deviations["0"] != 1 { + t.Errorf("deviations are not correct: %v", metrics.Deviations) + t.FailNow() + } + if metrics.MeanMsgDelay != 1 { + t.Errorf("unexpected message delay: %f", metrics.MeanMsgDelay) + t.FailNow() + } + if metrics.Correct != 1 || metrics.Verifiable != 1 { + t.Errorf("prediction for the given example should be both verifiable and correct") + t.FailNow() + } +} + +func TestGeneratePrometheusMetrics(t *testing.T) { + prepareMocks() + baseMetrics := generateMetrics() + prometheusMetrics := generatePrometheusMetrics(baseMetrics) + + var search = func(metricName string, expectedValue interface{}) bool { + for _, metric := range prometheusMetrics { + if !strings.Contains(metric, metricName) { + continue + } + // Found the metric we are searching for + strParts := strings.Split(metric, " ") + if len(strParts) < 2 { + t.Errorf("something is wrong with the metric's formatting: %s", metric) + t.FailNow() + } + valueAsStr := strParts[len(strParts)-1] + var actualValue interface{} + // Try to parse as int, then as float + actualValue, err := strconv.Atoi(valueAsStr) + if err != nil { + actualValue, err = strconv.ParseFloat(valueAsStr, 64) + if err != nil { + t.Errorf("could not parse value: %s", valueAsStr) + t.FailNow() + } + } + // Since we only compare floats or ints, its ok to use == + return expectedValue == actualValue + } + return false + } + + if !search("predictor_verifiable", 1) || !search("predictor_correct", 1) { + t.Errorf("prediction for the given example should be both verifiable and correct") + t.FailNow() + } + if !search("predictor_mean_msg_delay", 1.0) { + t.Errorf("unexpected message delay") + t.FailNow() + } + if !search("predictor_observations{action=\"received\"}", 1) || // + !search("predictor_observations{action=\"processed\"}", 1) || // + !search("predictor_observations{action=\"discarded\"}", 1) { + t.Errorf("unexpected metrics value") + t.FailNow() + } + if !search("predictor_histories{action=\"requested\"}", 1) || // + !search("predictor_histories{action=\"processed\"}", 1) || // + !search("predictor_histories{action=\"discarded\"}", 1) || // + !search("predictor_predictions{action=\"checked\"}", 1) || // + !search("predictor_predictions{action=\"published\"}", 1) || // + !search("predictor_predictions{action=\"discarded\"}", 1) { + t.Errorf("unexpected metrics value") + t.FailNow() + } + if !search("predictor_deviation{bucket=\"00\"}", 1) { + t.Errorf("unexpected metrics value") + t.FailNow() + } +}