Skip to content

Commit

Permalink
feat: uniform metric types, update tests, add outbound hist labels
Browse files Browse the repository at this point in the history
  • Loading branch information
denopink committed Feb 26, 2024
1 parent 50efda5 commit 382ab3c
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 73 deletions.
24 changes: 12 additions & 12 deletions ackDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"time"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/webpa-common/v2/device"
"go.uber.org/zap"

Expand All @@ -26,10 +26,10 @@ type ackDispatcher struct {
hostname string
logger *zap.Logger
timeout time.Duration
AckSuccess metrics.Counter
AckFailure metrics.Counter
AckSuccessLatency metrics.Histogram
AckFailureLatency metrics.Histogram
AckSuccess CounterVec
AckFailure CounterVec
AckSuccessLatency HistogramVec
AckFailureLatency HistogramVec
}

// NewAckDispatcher is an ackDispatcher factory which processes outbound events
Expand Down Expand Up @@ -130,33 +130,33 @@ func (d *ackDispatcher) OnDeviceEvent(event *device.Event) {
p := dm.PartnerIDClaim()
t := m.Type.FriendlyName()
// Metric labels
ls := []string{qosLevelLabel, l.String(), partnerIDLabel, p, messageType, t}
ls := prometheus.Labels{qosLevelLabel: l.String(), partnerIDLabel: p, messageType: t}
ctx, cancel := context.WithTimeout(context.Background(), d.timeout)
defer cancel()

// Observe the latency of sending an ack to the source device
ackFailure := false
defer func(s time.Time) {
d.recordAckLatency(s, ackFailure, ls...)
d.recordAckLatency(s, ackFailure, ls)
}(time.Now())

if _, err := event.Device.Send(r.WithContext(ctx)); err != nil {
d.logger.Error("Error dispatching QOS ack", zap.Any("qosLevel", l), zap.Any("partnerID", p), zap.Any("messageType", t), zap.Error(err))
d.AckFailure.With(ls...).Add(1)
d.AckFailure.With(ls).Add(1)
ackFailure = true
return
}

d.AckSuccess.With(ls...).Add(1)
d.AckSuccess.With(ls).Add(1)
}

// recordAckLatency records the latency for both successful and failed acks
func (d *ackDispatcher) recordAckLatency(s time.Time, f bool, l ...string) {
func (d *ackDispatcher) recordAckLatency(s time.Time, f bool, l prometheus.Labels) {
switch {
case f:
d.AckFailureLatency.With(l...).Observe(time.Since(s).Seconds())
d.AckFailureLatency.With(l).Observe(time.Since(s).Seconds())

default:
d.AckSuccessLatency.With(l...).Observe(time.Since(s).Seconds())
d.AckSuccessLatency.With(l).Observe(time.Since(s).Seconds())
}
}
7 changes: 4 additions & 3 deletions ackDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,7 +47,7 @@ func testAckDispatcherOnDeviceEventQOSEventFailure(t *testing.T) {
mAckFailureLatency := new(mockHistogram)
p, mt, qosl := failure_case, failure_case, failure_case
// Setup labels for metrics
l := []string{qosLevelLabel, qosl, partnerIDLabel, p, messageType, mt}
l := prometheus.Labels{qosLevelLabel: qosl, partnerIDLabel: p, messageType: mt}
om := OutboundMeasures{
AckSuccess: mAckSuccess,
AckFailure: mAckFailure,
Expand Down Expand Up @@ -169,7 +170,7 @@ func testAckDispatcherOnDeviceEventQOSDeviceFailure(t *testing.T) {
require.True(ok)
// Setup labels for metrics
dm := genTestMetadata()
l := []string{qosLevelLabel, m.QualityOfService.Level().String(), partnerIDLabel, dm.PartnerIDClaim(), messageType, m.Type.FriendlyName()}
l := prometheus.Labels{qosLevelLabel: m.QualityOfService.Level().String(), partnerIDLabel: dm.PartnerIDClaim(), messageType: m.Type.FriendlyName()}
// Setup metrics for the dispatcher
om := OutboundMeasures{
AckSuccess: mAckSuccess,
Expand Down Expand Up @@ -493,7 +494,7 @@ func testAckDispatcherOnDeviceEventQOSSuccess(t *testing.T) {
}

// Setup labels for metrics
l := []string{qosLevelLabel, qosl, partnerIDLabel, p, messageType, mt}
l := prometheus.Labels{qosLevelLabel: qosl, partnerIDLabel: p, messageType: mt}
// Setup metrics for the dispatcher
om := OutboundMeasures{
AckSuccess: mAckSuccess,
Expand Down
20 changes: 9 additions & 11 deletions eventDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type eventDispatcher struct {
source string
eventMap event.MultiMap
queueSize metrics.Gauge
droppedMessages metrics.Counter
outboundEvents *prometheus.CounterVec
droppedMessages CounterVec
outboundEvents CounterVec
outbounds chan<- outboundEnvelope
}

Expand Down Expand Up @@ -101,10 +101,9 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {
d.logger.Debug("stacktrace from panic", zap.String("stacktrace", string(debug.Stack())), zap.Any("panic", r))
switch event.Type {
case device.Connect, device.Disconnect, device.MessageReceived:
labels := prometheus.Labels{eventLabel: eventType, reasonLabel: panicReason, urlLabel: url, outcomeLabel: failureOutcome}
d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, panicReason), zap.String(urlLabel, url), zap.Any("panic", r))
d.droppedMessages.With(eventLabel, eventType, codeLabel, code, reasonLabel, panicReason, urlLabel, url).Add(1.0)
d.outboundEvents.With(labels).Add(1.0)
d.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: panicReason, urlLabel: url}).Add(1.0)
d.outboundEvents.With(prometheus.Labels{eventLabel: eventType, reasonLabel: panicReason, urlLabel: url, outcomeLabel: failureOutcome}).Add(1.0)

Check warning on line 106 in eventDispatcher.go

View check run for this annotation

Codecov / codecov/patch

eventDispatcher.go#L101-L106

Added lines #L101 - L106 were not covered by tests
}
}
}()
Expand Down Expand Up @@ -159,23 +158,22 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {
}

Check warning on line 158 in eventDispatcher.go

View check run for this annotation

Codecov / codecov/patch

eventDispatcher.go#L153-L158

Added lines #L153 - L158 were not covered by tests
}

var labels prometheus.Labels
var outboundEventsLabels prometheus.Labels
if err != nil {
reason := getDroppedMessageReason(err)
labels = prometheus.Labels{eventLabel: eventType, reasonLabel: reason, urlLabel: url, outcomeLabel: failureOutcome}
outboundEventsLabels = prometheus.Labels{eventLabel: eventType, reasonLabel: reason, urlLabel: url, outcomeLabel: failureOutcome}
if errors.Is(err, ErrorUnsupportedEvent) {
d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.String(urlLabel, url), zap.Error(err))

Check warning on line 166 in eventDispatcher.go

View check run for this annotation

Codecov / codecov/patch

eventDispatcher.go#L166

Added line #L166 was not covered by tests
} else {
d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.String(urlLabel, url), zap.Error(err))
}

d.droppedMessages.With(eventLabel, eventType, codeLabel, code, reasonLabel, reason, urlLabel, url).Add(1.0)
d.outboundEvents.With(labels).Add(1.0)
d.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: reason, urlLabel: url}).Add(1.0)
} else {
labels = prometheus.Labels{eventLabel: eventType, reasonLabel: noErrReason, urlLabel: url, outcomeLabel: successOutcome}
outboundEventsLabels = prometheus.Labels{eventLabel: eventType, reasonLabel: noErrReason, urlLabel: url, outcomeLabel: successOutcome}
}

d.outboundEvents.With(labels).Add(1.0)
d.outboundEvents.With(outboundEventsLabels).Add(1.0)
}

// send wraps the given request in an outboundEnvelope together with a cancellable context,
Expand Down
3 changes: 2 additions & 1 deletion eventDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xmidt-org/webpa-common/v2/convey"
Expand Down Expand Up @@ -256,7 +257,7 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) {
require.NoError(err)

d.(*eventDispatcher).outbounds = make(chan outboundEnvelope)
dm.On("With", []string{eventLabel, "iot", codeLabel, messageDroppedCode, reasonLabel, fullQueueReason, urlLabel, "nowhere.com"}).Return().Once()
dm.On("With", prometheus.Labels{eventLabel: "iot", codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once()
dm.On("Add", 1.).Return().Once()
d.OnDeviceEvent(&device.Event{
Type: device.MessageReceived,
Expand Down
105 changes: 74 additions & 31 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ func Metrics() []xmetrics.Metric {
Help: "The number of active, in-flight requests from devices",
},
{
Name: OutboundRequestDuration,
Type: "histogram",
Help: "The durations of outbound requests from devices",
Buckets: []float64{.25, .5, 1, 2.5, 5, 10},
Name: OutboundRequestDuration,
Type: "histogram",
Help: "The durations of outbound requests from devices",
LabelNames: []string{eventLabel, codeLabel, reasonLabel, urlLabel},
Buckets: []float64{.25, .5, 1, 2.5, 5, 10},
},
{
Name: OutboundRequestCounter,
Expand Down Expand Up @@ -189,70 +190,112 @@ func Metrics() []xmetrics.Metric {
}
}

type HistogramVec interface {
prometheus.Collector
With(prometheus.Labels) prometheus.Observer
CurryWith(prometheus.Labels) (prometheus.ObserverVec, error)
GetMetricWith(prometheus.Labels) (prometheus.Observer, error)
GetMetricWithLabelValues(...string) (prometheus.Observer, error)
MustCurryWith(labels prometheus.Labels) (o prometheus.ObserverVec)
WithLabelValues(lvs ...string) (o prometheus.Observer)
}

type CounterVec interface {
prometheus.Collector
With(prometheus.Labels) prometheus.Counter
}

type OutboundMeasures struct {
InFlight prometheus.Gauge
RequestDuration prometheus.Observer
RequestCounter *prometheus.CounterVec
OutboundEvents *prometheus.CounterVec
RequestDuration HistogramVec
RequestCounter CounterVec
OutboundEvents CounterVec
QueueSize metrics.Gauge
Retries metrics.Counter
DroppedMessages metrics.Counter
AckSuccess metrics.Counter
AckFailure metrics.Counter
AckSuccessLatency metrics.Histogram
AckFailureLatency metrics.Histogram
DroppedMessages CounterVec
AckSuccess CounterVec
AckFailure CounterVec
AckSuccessLatency HistogramVec
AckFailureLatency HistogramVec
}

func NewOutboundMeasures(r xmetrics.Registry) OutboundMeasures {
return OutboundMeasures{
InFlight: r.NewGaugeVec(OutboundInFlightGauge).WithLabelValues(),
RequestDuration: r.NewHistogramVec(OutboundRequestDuration).WithLabelValues(),
RequestDuration: r.NewHistogramVec(OutboundRequestDuration),
RequestCounter: r.NewCounterVec(OutboundRequestCounter),
OutboundEvents: r.NewCounterVec(TotalOutboundEvents),
QueueSize: r.NewGauge(OutboundQueueSize),
Retries: r.NewCounter(OutboundRetries),
DroppedMessages: r.NewCounter(OutboundDroppedMessageCounter),
AckSuccess: r.NewCounter(OutboundAckSuccessCounter),
AckFailure: r.NewCounter(OutboundAckFailureCounter),
DroppedMessages: r.NewCounterVec(OutboundDroppedMessageCounter),
AckSuccess: r.NewCounterVec(OutboundAckSuccessCounter),
AckFailure: r.NewCounterVec(OutboundAckFailureCounter),
// 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram
AckSuccessLatency: r.NewHistogram(OutboundAckSuccessLatencyHistogram, 0),
AckSuccessLatency: r.NewHistogramVec(OutboundAckSuccessLatencyHistogram),
// 0 is for the unused `buckets` argument in xmetrics.Registry.NewHistogram
AckFailureLatency: r.NewHistogram(OutboundAckFailureLatencyHistogram, 0),
AckFailureLatency: r.NewHistogramVec(OutboundAckFailureLatencyHistogram),
}
}

func InstrumentOutboundDuration(obs prometheus.Observer, next http.RoundTripper) promhttp.RoundTripperFunc {
func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promhttp.RoundTripperFunc {
return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) {
eventType, ok := request.Context().Value(eventTypeContextKey{}).(string)
if !ok {
eventType = unknown
}

Check warning on line 245 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L244-L245

Added lines #L244 - L245 were not covered by tests

start := time.Now()
response, err := next.RoundTrip(request)
if err == nil {
obs.Observe(time.Since(start).Seconds())
delta := time.Since(start).Seconds()

var labels prometheus.Labels
if err != nil {
code := genericDoReason
if response != nil {
code = strconv.Itoa(response.StatusCode)
}

Check warning on line 256 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L253-L256

Added lines #L253 - L256 were not covered by tests

labels = prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: getDoErrReason(err), urlLabel: response.Request.URL.String()}

Check warning on line 258 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L258

Added line #L258 was not covered by tests

} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: response.Request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
}
}

obs.With(labels).Observe(delta)

return response, err
})
}

func InstrumentOutboundCounter(counter *prometheus.CounterVec, next http.RoundTripper) promhttp.RoundTripperFunc {
func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promhttp.RoundTripperFunc {
return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) {
response, err := next.RoundTrip(request)

eventType, ok := request.Context().Value(eventTypeContextKey{}).(string)
if !ok {
eventType = unknown
}

Check warning on line 278 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L277-L278

Added lines #L277 - L278 were not covered by tests
if err == nil {
labels := prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: response.Request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code

response, err := next.RoundTrip(request)

var labels prometheus.Labels
if err != nil {
code := genericDoReason
if response != nil {
code = strconv.Itoa(response.StatusCode)

Check warning on line 286 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L284-L286

Added lines #L284 - L286 were not covered by tests
}

counter.With(labels).Inc()
labels = prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: getDoErrReason(err), urlLabel: response.Request.URL.String()}

Check warning on line 289 in metrics.go

View check run for this annotation

Codecov / codecov/patch

metrics.go#L289

Added line #L289 was not covered by tests
} else {
labels := prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: getDoErrReason(err), urlLabel: response.Request.URL.String()}
counter.With(labels).Inc()
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: response.Request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
}
}

counter.With(labels).Inc()

return response, err
})
}
Expand Down
Loading

0 comments on commit 382ab3c

Please sign in to comment.