From 0cc8a8f214830df10c9a587c52c56343f7cc990c Mon Sep 17 00:00:00 2001 From: Romain Marcadier Date: Fri, 20 Dec 2024 12:17:15 +0100 Subject: [PATCH] chore: exponential backoff in flaky kafka test (#479) The segmentio_kafka test has been falking due to the test container not having the topic and partition ready by the time the test tries to produce messages. This is possiblye due to the test waiting up to 1 second, while the container has been observed to need almost 2 seconds to transition the initial partition to available. --- .../tests/ibm_sarama/ibm_sarama.go | 41 +++-- .../segmentio_kafka.v0/segmentio_kafka.go | 41 ++--- _integration-tests/utils/backoff/backoff.go | 105 ++++++++++++ .../utils/backoff/backoff_test.go | 160 ++++++++++++++++++ _integration-tests/utils/backoff/constant.go | 20 +++ .../utils/backoff/exponential.go | 49 ++++++ .../utils/backoff/exponential_test.go | 24 +++ 7 files changed, 395 insertions(+), 45 deletions(-) create mode 100644 _integration-tests/utils/backoff/backoff.go create mode 100644 _integration-tests/utils/backoff/backoff_test.go create mode 100644 _integration-tests/utils/backoff/constant.go create mode 100644 _integration-tests/utils/backoff/exponential.go create mode 100644 _integration-tests/utils/backoff/exponential_test.go diff --git a/_integration-tests/tests/ibm_sarama/ibm_sarama.go b/_integration-tests/tests/ibm_sarama/ibm_sarama.go index f64f8584..0320d93c 100644 --- a/_integration-tests/tests/ibm_sarama/ibm_sarama.go +++ b/_integration-tests/tests/ibm_sarama/ibm_sarama.go @@ -8,11 +8,14 @@ package ibm_sarama import ( + "context" + "errors" "fmt" "testing" "time" "datadoghq.dev/orchestrion/_integration-tests/utils" + "datadoghq.dev/orchestrion/_integration-tests/utils/backoff" "datadoghq.dev/orchestrion/_integration-tests/validator/trace" "github.com/IBM/sarama" "github.com/stretchr/testify/assert" @@ -46,30 +49,26 @@ func (tc *TestCase) Setup(t *testing.T) { func produceMessage(t *testing.T, addrs []string, cfg *sarama.Config) { t.Helper() - createProducer := func() (_ sarama.SyncProducer, err error) { - defer func() { - if r := recover(); r != nil && err == nil { - var ok bool - if err, ok = r.(error); !ok { - err = fmt.Errorf("panic: %v", r) + var producer sarama.SyncProducer + err := backoff.Retry( + context.Background(), + backoff.NewConstantStrategy(50*time.Millisecond), + func() (err error) { + defer func() { + if r := recover(); r != nil && err == nil { + var ok bool + if err, ok = r.(error); !ok { + err = errors.Join(err, fmt.Errorf("panic: %v", r)) + } } - } - }() - return sarama.NewSyncProducer(addrs, cfg) - } + }() - var ( - producer sarama.SyncProducer - err error + producer, err = sarama.NewSyncProducer(addrs, cfg) + return err + }, + &backoff.RetryOptions{MaxAttempts: 3}, ) - for attemptsLeft := 3; attemptsLeft > 0; attemptsLeft-- { - producer, err = createProducer() - if err != nil { - time.Sleep(50 * time.Millisecond) - continue - } - break - } + require.NoError(t, err, "failed to create producer") defer func() { assert.NoError(t, producer.Close(), "failed to close producer") }() diff --git a/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go b/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go index b8faba86..6fed60d1 100644 --- a/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go +++ b/_integration-tests/tests/segmentio_kafka.v0/segmentio_kafka.go @@ -13,15 +13,14 @@ import ( "testing" "time" + "datadoghq.dev/orchestrion/_integration-tests/utils" + "datadoghq.dev/orchestrion/_integration-tests/utils/backoff" + "datadoghq.dev/orchestrion/_integration-tests/validator/trace" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" kafkatest "github.com/testcontainers/testcontainers-go/modules/kafka" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - - "datadoghq.dev/orchestrion/_integration-tests/utils" - "datadoghq.dev/orchestrion/_integration-tests/validator/trace" ) const ( @@ -86,27 +85,21 @@ func (tc *TestCase) produce(t *testing.T) { Value: []byte("Third message"), }, } - const ( - maxRetries = 10 - retryDelay = 100 * time.Millisecond - ) - var ( - retryCount int - err error + err := backoff.Retry( + ctx, + backoff.NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second), + func() error { return tc.writer.WriteMessages(ctx, messages...) }, + &backoff.RetryOptions{ + MaxAttempts: 10, + ShouldRetry: func(err error, attempt int, delay time.Duration) bool { + if !errors.Is(err, kafka.UnknownTopicOrPartition) { + return false + } + t.Logf("failed to produce kafka messages, will retry in %s (attempt left: %d)", delay, 10-attempt) + return true + }, + }, ) - for retryCount < maxRetries { - err = tc.writer.WriteMessages(ctx, messages...) - if err == nil { - break - } - // This error happens sometimes with brand-new topics, as there is a delay between when the topic is created - // on the broker, and when the topic can actually be written to. - if errors.Is(err, kafka.UnknownTopicOrPartition) { - retryCount++ - t.Logf("failed to produce kafka messages, will retry in %s (retryCount: %d)", retryDelay, retryCount) - time.Sleep(retryDelay) - } - } require.NoError(t, err) require.NoError(t, tc.writer.Close()) } diff --git a/_integration-tests/utils/backoff/backoff.go b/_integration-tests/utils/backoff/backoff.go new file mode 100644 index 00000000..f983cb48 --- /dev/null +++ b/_integration-tests/utils/backoff/backoff.go @@ -0,0 +1,105 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +// Package backoff provides utilities to retry operations when encoutering +// transient errors. It is used by integration tests to allow for testcontainers +// enough time to become fully ready, avoiding tests flaking because the CI +// resources are constrained enough to cause containers to not be "immediately" +// ready to serve traffic. +package backoff + +import ( + "context" + "errors" + "math" + "time" +) + +type Strategy interface { + // Next returns the back-off delay to wait for before making the next attempt. + Next() time.Duration +} + +const ( + defaultMaxAttempts = 10 +) + +// RetryAllErrors is the default function used by [RetryOptions.ShouldRetry]. It +// returns [true] regardless of its arguments. +func RetryAllErrors(error, int, time.Duration) bool { + return true +} + +type RetryOptions struct { + // MaxAttempts is the maximum number of attempts to make before giving up. If + // it is negative, there is no limit to the number of attempts (it will be set + // to [math.MaxInt]); if it is zero, the default value of 10 will be used. It + // is fine (although a little silly) to set [RetryOptions.MaxAttempts] to 1. + MaxAttempts int + // ShouldRetry is called with the error returned by the action, the attempt + // number, and the delay before the next attempt could be made. If it returns + // [true], the next attempt will be made; otherwise, the [Retry] function will + // immediately return. If [nil], the default [RetryAllErrors] function will be + // used. + ShouldRetry func(err error, attempt int, delay time.Duration) bool + // Sleep is the function used to wait in between attempts. It is intended to + // be used in testing. If [nil], the default [time.Sleep] function will be + // used. + Sleep func(time.Duration) +} + +// Retry makes up to [RetryOptions.MaxAttempts] at calling the [action] +// function. It uses the [Strategy] to determine how much time to wait between +// attempts. The [RetryOptions.ShouldRetry] function is called with all +// non-[nil] errors returned by [action], the attempt number, and the delay +// before the next attempt. If it returns [true], the [RetryOptions.Sleep] +// function is called with the delay, and the next attempt is made. Otherwise, +// [Retry] returns immediately. +func Retry( + ctx context.Context, + strategy Strategy, + action func() error, + opts *RetryOptions, +) error { + var ( + maxAttempts = defaultMaxAttempts + shouldRetry = RetryAllErrors + sleep = time.Sleep + ) + if opts != nil { + if opts.MaxAttempts > 0 { + maxAttempts = opts.MaxAttempts + } else if opts.MaxAttempts < 0 { + maxAttempts = math.MaxInt + } + if opts.ShouldRetry != nil { + shouldRetry = opts.ShouldRetry + } + if opts.Sleep != nil { + sleep = opts.Sleep + } + } + + var errs error + for attempt, delay := 0, time.Duration(0); attempt < maxAttempts && ctx.Err() == nil; attempt, delay = attempt+1, strategy.Next() { + if delay > 0 { + sleep(delay) + } + + err := action() + if err == nil { + // Success! + return nil + } + + // Accumulate this error on top of the others we have observed so far. + errs = errors.Join(errs, err) + + if shouldRetry != nil && !shouldRetry(err, attempt, delay) { + break + } + } + return errors.Join(errs, ctx.Err()) +} diff --git a/_integration-tests/utils/backoff/backoff_test.go b/_integration-tests/utils/backoff/backoff_test.go new file mode 100644 index 00000000..9a3a5e88 --- /dev/null +++ b/_integration-tests/utils/backoff/backoff_test.go @@ -0,0 +1,160 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package backoff + +import ( + "context" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRetry(t *testing.T) { + // The sequence of delays observed for 10 attempts using an exponential + // backoff strategy with initial delay of 100ms, factor of 2, max delay of 5s. + delaySequence := []time.Duration{ + /* attempt 1 */ // Immediate + /* attempt 2 */ 100 * time.Millisecond, + /* attempt 3 */ 200 * time.Millisecond, + /* attempt 4 */ 400 * time.Millisecond, + /* attempt 5 */ 800 * time.Millisecond, + /* attempt 6 */ 1600 * time.Millisecond, + /* attempt 7 */ 3200 * time.Millisecond, + /* attempt 8 */ 5 * time.Second, + /* attempt 9 */ 5 * time.Second, + /* attempt 10 */ 5 * time.Second, + } + + t.Run("no-success", func(t *testing.T) { + ctx := context.Background() + strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second) + maxAttempts := 10 + expectedErrs := make([]error, 0, maxAttempts) + action := func() error { + err := fmt.Errorf("Error number %d", len(expectedErrs)+1) + expectedErrs = append(expectedErrs, err) + return err + } + delays := make([]time.Duration, 0, maxAttempts) + timeSleep := func(d time.Duration) { + delays = append(delays, d) + } + + err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, Sleep: timeSleep}) + require.Error(t, err) + assert.Equal(t, delaySequence, delays) + for _, expectedErr := range expectedErrs { + assert.ErrorIs(t, err, expectedErr) + } + }) + + t.Run("non-retryable error", func(t *testing.T) { + ctx := context.Background() + strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second) + maxAttempts := 10 + shouldRetry := func(err error, _ int, _ time.Duration) bool { + return !strings.Contains(err.Error(), "3") + } + expectedErrs := make([]error, 0, maxAttempts) + action := func() error { + err := fmt.Errorf("Error number %d", len(expectedErrs)+1) + expectedErrs = append(expectedErrs, err) + return err + } + delays := make([]time.Duration, 0, maxAttempts) + timeSleep := func(d time.Duration) { + delays = append(delays, d) + } + + err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, ShouldRetry: shouldRetry, Sleep: timeSleep}) + require.Error(t, err) + // We hit the non-retryable error at the 3rd attempt. + assert.Equal(t, delaySequence[:2], delays) + for _, expectedErr := range expectedErrs { + assert.ErrorIs(t, err, expectedErr) + } + }) + + t.Run("context cancelled", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second) + maxAttempts := 10 + expectedErrs := make([]error, 0, maxAttempts) + action := func() error { + err := fmt.Errorf("Error number %d", len(expectedErrs)+1) + expectedErrs = append(expectedErrs, err) + return err + } + delays := make([]time.Duration, 0, maxAttempts) + timeSleep := func(d time.Duration) { + delays = append(delays, d) + + // Simulate context deadline after 1 second. + var ttl time.Duration + for _, delay := range delays { + ttl += delay + } + if ttl >= time.Second { + cancel() + } + } + + err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, Sleep: timeSleep}) + require.Error(t, err) + // We reach the 1 second total waited during the 4th back-off. + assert.Equal(t, delaySequence[:4], delays) + for _, expectedErr := range expectedErrs { + require.ErrorIs(t, err, expectedErr) + } + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("unlimited retries", func(t *testing.T) { + ctx := context.Background() + strategy := NewConstantStrategy(100 * time.Millisecond) + var attempts int + action := func() error { + attempts++ + // At least 20 errors, then flip a coin... but no more than 100 attempts. + if attempts < 20 || (attempts < 100 && rand.Int()%2 == 0) { + return fmt.Errorf("Error number %d", attempts) + } + return nil + } + var delayCount int + timeSleep := func(time.Duration) { + delayCount++ + } + + err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: -1, Sleep: timeSleep}) + require.NoError(t, err) + // We should have waited as many times as we attempted, except for the initial attempt. + assert.Equal(t, delayCount, attempts-1) + }) + + t.Run("immediate success", func(t *testing.T) { + ctx := context.Background() + strategy := NewExponentialStrategy(100*time.Millisecond, 2, 5*time.Second) + maxAttempts := 10 + shouldRetry := func(error, int, time.Duration) bool { return false } + action := func() error { return nil } + delays := make([]time.Duration, 0, maxAttempts) + timeSleep := func(d time.Duration) { + delays = append(delays, d) + } + + err := Retry(ctx, strategy, action, &RetryOptions{MaxAttempts: maxAttempts, ShouldRetry: shouldRetry, Sleep: timeSleep}) + require.NoError(t, err) + assert.Empty(t, delays) + }) +} diff --git a/_integration-tests/utils/backoff/constant.go b/_integration-tests/utils/backoff/constant.go new file mode 100644 index 00000000..700c4518 --- /dev/null +++ b/_integration-tests/utils/backoff/constant.go @@ -0,0 +1,20 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package backoff + +import "time" + +type constantStrategy time.Duration + +// NewConstantStrategy returns a constant backoff strategy, waiting for the +// specified delay between each attempt. +func NewConstantStrategy(delay time.Duration) Strategy { + return constantStrategy(delay) +} + +func (c constantStrategy) Next() time.Duration { + return time.Duration(c) +} diff --git a/_integration-tests/utils/backoff/exponential.go b/_integration-tests/utils/backoff/exponential.go new file mode 100644 index 00000000..3b6bd7d6 --- /dev/null +++ b/_integration-tests/utils/backoff/exponential.go @@ -0,0 +1,49 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package backoff + +import ( + "fmt" + "time" +) + +type exponentialStrategy struct { + next time.Duration + factor int + max time.Duration +} + +// NewExponentialStrategy returns a new exponential back-off strategy that +// starts with the specified [initial] delay, multiplies it by [factor] after +// each attempt, while capping the delay to [max]. Panics if [initial] is +// greater than [max]; or if factor is <=1. +func NewExponentialStrategy(initial time.Duration, factor int, max time.Duration) Strategy { + if initial > max { + panic(fmt.Errorf("invalid exponential back-off strategy: initial delay %s is greater than max delay %s", initial, max)) + } + if factor <= 1 { + panic(fmt.Errorf("invalid exponential back-off strategy: factor %d must be greater than 0", factor)) + } + return &exponentialStrategy{next: initial, factor: factor, max: max} +} + +func (e *exponentialStrategy) Next() time.Duration { + defer e.inc() + return e.next +} + +func (e *exponentialStrategy) inc() { + if e.next == e.max { + // Capped, we have nothing to do anymore. + return + } + // Multiply the current value by the factor... + e.next *= time.Duration(e.factor) + // If we exceeded the cap, truncate to the cap. + if e.next > e.max { + e.next = e.max + } +} diff --git a/_integration-tests/utils/backoff/exponential_test.go b/_integration-tests/utils/backoff/exponential_test.go new file mode 100644 index 00000000..d3ee828d --- /dev/null +++ b/_integration-tests/utils/backoff/exponential_test.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package backoff + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestExponentialStrategy(t *testing.T) { + t.Run("invalid", func(t *testing.T) { + // Invalid factor (must be >= 2) + require.Panics(t, func() { NewExponentialStrategy(time.Second, -1, time.Minute) }) + require.Panics(t, func() { NewExponentialStrategy(time.Second, 0, time.Minute) }) + require.Panics(t, func() { NewExponentialStrategy(time.Second, 1, time.Minute) }) + // Invalid initial/cap (initial must be <= cap) + require.Panics(t, func() { NewExponentialStrategy(time.Minute, 2, time.Second) }) + }) +}