Skip to content

Commit

Permalink
Adding circuit breaker (#64)
Browse files Browse the repository at this point in the history
* adding configurable http client timeout

* added circuit breaker

* added gobreaker license

* vendored gobreaker

* addressed all the pr suggestions

* moved the circuit braker from api to processor

* added a test for the circuit breaker opening

* changed consecutive with total failures to trigger
  • Loading branch information
lhelman authored Feb 9, 2021
1 parent 140b289 commit b45a6b5
Show file tree
Hide file tree
Showing 15 changed files with 652 additions and 38 deletions.
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ go-spew,github.com/davecgh/go-spew,ISC,"Copyright (c) 2012-2016 Dave Collins <da
go-jmespath,github.com/jmespath/go-jmespath,Apache-2.0,"Copyright 2015 James Saryerwinnie"
go-difflib,github.com/pmezard/go-difflib,BSD-3-Clause,"Copyright (c) 2013, Patrick Mezard. All rights reserved."
testify,github.com/stretchr/testify,MIT,"Copyright (c) 2012-2018 Mat Ryer and Tyler Bunnell"
gobreaker,github.com/sony/gobreaker,MIT,"Copyright 2015 Sony Corporation"
16 changes: 16 additions & 0 deletions ddlambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ type (
DDTraceEnabled bool
// MergeXrayTraces will cause Datadog traces to be merged with traces from AWS X-Ray.
MergeXrayTraces bool
// HttpClientTimeout specifies a time limit for requests to the API. It defaults to 5s.
HttpClientTimeout time.Duration
// CircuitBreakerInterval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// default: 30s
CircuitBreakerInterval time.Duration
// CircuitBreakerTimeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// default: 60s
CircuitBreakerTimeout time.Duration
// CircuitBreakerTotalFailures after this amount of times
// of a request failing in the closed state, the state will become open.
// the counter will get totally reset after CircuitBreakerInterval
// default: 4
CircuitBreakerTotalFailures uint32
}
)

Expand Down Expand Up @@ -191,6 +206,7 @@ func (cfg *Config) toMetricsConfig() metrics.Config {
mc.KMSAPIKey = cfg.KMSAPIKey
mc.Site = cfg.Site
mc.ShouldUseLogForwarder = cfg.ShouldUseLogForwarder
mc.HttpClientTimeout = cfg.HttpClientTimeout
}

if mc.Site == "" {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/uuid v1.1.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/philhofer/fwd v1.0.0 // indirect
github.com/sony/gobreaker v0.4.1
github.com/stretchr/testify v1.3.0
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
14 changes: 8 additions & 6 deletions internal/metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type (

// APIClientOptions contains instantiation options from creating an APIClient.
APIClientOptions struct {
baseAPIURL string
apiKey string
kmsAPIKey string
decrypter Decrypter
baseAPIURL string
apiKey string
kmsAPIKey string
decrypter Decrypter
httpClientTimeout time.Duration
}

postMetricsModel struct {
Expand All @@ -51,7 +52,7 @@ type (
// MakeAPIClient creates a new API client with the given api and app keys
func MakeAPIClient(ctx context.Context, options APIClientOptions) *APIClient {
httpClient := &http.Client{
Timeout: time.Second * 5,
Timeout: options.httpClientTimeout,
}
client := &APIClient{
apiKey: options.apiKey,
Expand Down Expand Up @@ -113,7 +114,8 @@ func (cl *APIClient) SendMetrics(metrics []APIMetric) error {
}
return fmt.Errorf("Failed to send metrics to API. Status Code %d, Body %s", resp.StatusCode, body)
}
return nil

return err
}

func (cl *APIClient) decryptAPIKey(decrypter Decrypter, kmsAPIKey string) <-chan string {
Expand Down
12 changes: 8 additions & 4 deletions internal/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ package metrics
import "time"

const (
apiKeyParam = "api_key"
appKeyParam = "application_key"
defaultRetryInterval = time.Millisecond * 250
defaultBatchInterval = time.Second * 15
apiKeyParam = "api_key"
appKeyParam = "application_key"
defaultRetryInterval = time.Millisecond * 250
defaultBatchInterval = time.Second * 15
defaultHttpClientTimeout = time.Second * 5
defaultCircuitBreakerInterval = time.Second * 30
defaultCircuitBreakerTimeout = time.Second * 60
defaultCircuitBreakerTotalFailures = 4
)

// MetricType enumerates all the available metric types
Expand Down
41 changes: 29 additions & 12 deletions internal/metrics/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ type (

// Config gives options for how the listener should work
Config struct {
APIKey string
KMSAPIKey string
Site string
ShouldRetryOnFailure bool
ShouldUseLogForwarder bool
BatchInterval time.Duration
EnhancedMetrics bool
APIKey string
KMSAPIKey string
Site string
ShouldRetryOnFailure bool
ShouldUseLogForwarder bool
BatchInterval time.Duration
EnhancedMetrics bool
HttpClientTimeout time.Duration
CircuitBreakerInterval time.Duration
CircuitBreakerTimeout time.Duration
CircuitBreakerTotalFailures uint32
}

logMetric struct {
Expand All @@ -66,11 +70,24 @@ const (
func MakeListener(config Config) Listener {

apiClient := MakeAPIClient(context.Background(), APIClientOptions{
baseAPIURL: config.Site,
apiKey: config.APIKey,
decrypter: MakeKMSDecrypter(),
kmsAPIKey: config.KMSAPIKey,
baseAPIURL: config.Site,
apiKey: config.APIKey,
decrypter: MakeKMSDecrypter(),
kmsAPIKey: config.KMSAPIKey,
httpClientTimeout: config.HttpClientTimeout,
})
if config.HttpClientTimeout <= 0 {
config.HttpClientTimeout = defaultHttpClientTimeout
}
if config.CircuitBreakerInterval <= 0 {
config.CircuitBreakerInterval = defaultCircuitBreakerInterval
}
if config.CircuitBreakerTimeout <= 0 {
config.CircuitBreakerTimeout = defaultCircuitBreakerTimeout
}
if config.CircuitBreakerTotalFailures <= 0 {
config.CircuitBreakerTotalFailures = defaultCircuitBreakerTotalFailures
}
if config.BatchInterval <= 0 {
config.BatchInterval = defaultBatchInterval
}
Expand Down Expand Up @@ -102,7 +119,7 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont
}

ts := MakeTimeService()
pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure)
pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure, l.config.CircuitBreakerInterval, l.config.CircuitBreakerTimeout, l.config.CircuitBreakerTotalFailures)
l.processor = pr

ctx = AddListener(ctx, l)
Expand Down
49 changes: 37 additions & 12 deletions internal/metrics/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/DataDog/datadog-lambda-go/internal/logger"
"github.com/cenkalti/backoff"
"github.com/sony/gobreaker"
)

type (
Expand All @@ -41,13 +42,16 @@ type (
batcher *Batcher
shouldRetryOnFail bool
isProcessing bool
breaker *gobreaker.CircuitBreaker
}
)

// MakeProcessor creates a new metrics context
func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool) Processor {
func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool, circuitBreakerInterval time.Duration, circuitBreakerTimeout time.Duration, circuitBreakerTotalFailures uint32) Processor {
batcher := MakeBatcher(batchInterval)

breaker := MakeCircuitBreaker(circuitBreakerInterval, circuitBreakerTimeout, circuitBreakerTotalFailures)

return &processor{
context: ctx,
metricsChan: make(chan Metric, 2000),
Expand All @@ -58,7 +62,22 @@ func MakeProcessor(ctx context.Context, client Client, timeService TimeService,
shouldRetryOnFail: shouldRetryOnFail,
timeService: timeService,
isProcessing: false,
breaker: breaker,
}
}

func MakeCircuitBreaker(circuitBreakerInterval time.Duration, circuitBreakerTimeout time.Duration, circuitBreakerTotalFailures uint32) *gobreaker.CircuitBreaker {
readyToTrip := func(counts gobreaker.Counts) bool {
return counts.TotalFailures > circuitBreakerTotalFailures
}

st := gobreaker.Settings{
Name: "post distribution_points",
Interval: circuitBreakerInterval,
Timeout: circuitBreakerTimeout,
ReadyToTrip: readyToTrip,
}
return gobreaker.NewCircuitBreaker(st)
}

func (p *processor) AddMetric(metric Metric) {
Expand Down Expand Up @@ -125,18 +144,24 @@ func (p *processor) processMetrics() {
}

if shouldSendBatch {
if shouldExit && p.shouldRetryOnFail {
// If we are shutting down, and we just failed to send our last batch, do a retry
bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2)
err := backoff.Retry(p.sendMetricsBatch, bo)
if err != nil {
logger.Error(fmt.Errorf("failed to flush metrics to datadog API after retry: %v", err))
}
} else {
err := p.sendMetricsBatch()
if err != nil {
logger.Error(fmt.Errorf("failed to flush metrics to datadog API: %v", err))
_, err := p.breaker.Execute(func() (interface{}, error) {
if shouldExit && p.shouldRetryOnFail {
// If we are shutting down, and we just failed to send our last batch, do a retry
bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2)
err := backoff.Retry(p.sendMetricsBatch, bo)
if err != nil {
return nil, fmt.Errorf("after retry: %v", err)
}
} else {
err := p.sendMetricsBatch()
if err != nil {
return nil, fmt.Errorf("with no retry: %v", err)
}
}
return nil, nil
})
if err != nil {
logger.Error(fmt.Errorf("failed to flush metrics to datadog API: %v", err))
}
}
}
Expand Down
35 changes: 31 additions & 4 deletions internal/metrics/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package metrics
import (
"context"
"errors"
"math"
"testing"
"time"

Expand Down Expand Up @@ -67,7 +68,7 @@ func TestProcessorBatches(t *testing.T) {
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
nowUnix := float64(mts.now.Unix())

processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32)

d1 := Distribution{
Name: "metric-1",
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestProcessorBatchesPerTick(t *testing.T) {
secondTimeUnix := float64(secondTime.Unix())
mts.now = firstTime

processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32)

d1 := Distribution{
Name: "metric-1",
Expand Down Expand Up @@ -188,7 +189,7 @@ func TestProcessorPerformsRetry(t *testing.T) {
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")

shouldRetry := true
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32)

d1 := Distribution{
Name: "metric-1",
Expand All @@ -213,7 +214,7 @@ func TestProcessorCancelsWithContext(t *testing.T) {

shouldRetry := true
ctx, cancelFunc := context.WithCancel(context.Background())
processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry)
processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32)

d1 := Distribution{
Name: "metric-1",
Expand All @@ -230,3 +231,29 @@ func TestProcessorCancelsWithContext(t *testing.T) {

assert.Equal(t, 0, mc.sendMetricsCalledCount)
}

func TestProcessorBatchesWithOpeningCircuitBreaker(t *testing.T) {
mc := makeMockClient()
mts := makeMockTimeService()

mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")

// Will open the circuit breaker at number of total failures > 1
circuitBreakerTotalFailures := uint32(1)
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, circuitBreakerTotalFailures)

d1 := Distribution{
Name: "metric-1",
Tags: []string{"a", "b", "c"},
Values: []MetricValue{{Timestamp: mts.now, Value: 1}, {Timestamp: mts.now, Value: 2}, {Timestamp: mts.now, Value: 3}},
}

mc.err = errors.New("Some error")

processor.AddMetric(&d1)

processor.FinishProcessing()

// It should have retried 3 times, but circuit breaker opened at the second time
assert.Equal(t, 1, mc.sendMetricsCalledCount)
}
15 changes: 15 additions & 0 deletions vendor/github.com/sony/gobreaker/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/sony/gobreaker/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b45a6b5

Please sign in to comment.