Skip to content

Commit

Permalink
feat: add retry pattern to improve reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad committed Oct 2, 2024
1 parent 995bc1c commit 0d42e1e
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 13 deletions.
97 changes: 88 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"sync"
"time"
)

type client struct {
Expand All @@ -19,6 +20,18 @@ type client struct {
bufferPool *sync.Pool
encoder encoder
contentEncoding ContentEncoding
retryOnStatus map[int]bool
disableRetry bool
maxRetries uint8
retryBackoff func(attempt uint8) time.Duration
}

type clientConfig struct {
contentEncoding ContentEncoding
encodingCompressionLevel EncodingCompressionLevel
retryOnStatus map[int]bool
disableRetry bool
maxRetries uint8
}

type internalRequest struct {
Expand All @@ -34,7 +47,7 @@ type internalRequest struct {
functionName string
}

func newClient(cli *http.Client, host, apiKey string, ce ContentEncoding, cl EncodingCompressionLevel) *client {
func newClient(cli *http.Client, host, apiKey string, cfg clientConfig) *client {
c := &client{
client: cli,
host: host,
Expand All @@ -44,11 +57,28 @@ func newClient(cli *http.Client, host, apiKey string, ce ContentEncoding, cl Enc
return new(bytes.Buffer)
},
},
disableRetry: cfg.disableRetry,
maxRetries: cfg.maxRetries,
retryOnStatus: cfg.retryOnStatus,
}

if !ce.IsZero() {
c.contentEncoding = ce
c.encoder = newEncoding(ce, cl)
if c.retryOnStatus == nil {
c.retryOnStatus = map[int]bool{
502: true,
503: true,
504: true,
}
}

if !c.disableRetry && c.retryBackoff == nil {
c.retryBackoff = func(attempt uint8) time.Duration {
return time.Second * time.Duration(attempt)
}
}

if !cfg.contentEncoding.IsZero() {
c.contentEncoding = cfg.contentEncoding
c.encoder = newEncoding(cfg.contentEncoding, cfg.encodingCompressionLevel)
}

return c
Expand Down Expand Up @@ -197,12 +227,9 @@ func (c *client) sendRequest(

request.Header.Set("User-Agent", GetQualifiedVersion())

resp, err := c.client.Do(request)
resp, err := c.do(request, internalError)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, internalError.WithErrCode(MeilisearchTimeoutError, err)
}
return nil, internalError.WithErrCode(MeilisearchCommunicationError, err)
return nil, err
}

if body != nil {
Expand All @@ -213,6 +240,58 @@ func (c *client) sendRequest(
return resp, nil
}

func (c *client) do(req *http.Request, internalError *Error) (resp *http.Response, err error) {
retriesCount := uint8(0)

for {
resp, err = c.client.Do(req)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, internalError.WithErrCode(MeilisearchTimeoutError, err)
}
return nil, internalError.WithErrCode(MeilisearchCommunicationError, err)
}

// Exit if retries are disabled
if c.disableRetry {
break
}

// Check if response status is retryable and we haven't exceeded max retries
if c.retryOnStatus[resp.StatusCode] && retriesCount < c.maxRetries {
retriesCount++

// Close response body to prevent memory leaks
resp.Body.Close()

// Handle backoff with context cancellation support
backoff := c.retryBackoff(retriesCount)
timer := time.NewTimer(backoff)

select {
case <-req.Context().Done():
err := req.Context().Err()
timer.Stop()
return nil, internalError.WithErrCode(MeilisearchTimeoutError, err)
case <-timer.C:
// Retry after backoff
timer.Stop()
}

continue
}

break
}

// Return error if retries exceeded the maximum limit
if !c.disableRetry && retriesCount >= c.maxRetries {
return nil, internalError.WithErrCode(MeilisearchMaxRetriesExceeded, nil)
}

return resp, nil
}

func (c *client) handleStatusCode(req *internalRequest, statusCode int, body []byte, internalError *Error) error {
if req.acceptedStatusCodes != nil {

Expand Down
86 changes: 84 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)

// Mock structures for testing
Expand All @@ -26,6 +27,8 @@ type mockJsonMarshaller struct {
}

func TestExecuteRequest(t *testing.T) {
retryCount := 0

// Create a mock server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && r.URL.Path == "/test-get" {
Expand Down Expand Up @@ -99,6 +102,15 @@ func TestExecuteRequest(t *testing.T) {
} else if r.URL.Path == "/io-reader" {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"message":"io reader"}`))
} else if r.URL.Path == "/failed-retry" {
w.WriteHeader(http.StatusBadGateway)
} else if r.URL.Path == "/success-retry" {
if retryCount == 2 {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusBadGateway)
retryCount++
} else {
w.WriteHeader(http.StatusNotFound)
}
Expand All @@ -110,6 +122,8 @@ func TestExecuteRequest(t *testing.T) {
internalReq *internalRequest
expectedResp interface{}
contentEncoding ContentEncoding
withTimeout bool
disableRetry bool
wantErr bool
}{
{
Expand Down Expand Up @@ -342,13 +356,81 @@ func TestExecuteRequest(t *testing.T) {
contentEncoding: GzipEncoding,
wantErr: false,
},
{
name: "Test successful retries",
internalReq: &internalRequest{
endpoint: "/success-retry",
method: http.MethodGet,
withResponse: nil,
withRequest: nil,
acceptedStatusCodes: []int{http.StatusOK},
},
expectedResp: nil,
wantErr: false,
},
{
name: "Test failed retries",
internalReq: &internalRequest{
endpoint: "/failed-retry",
method: http.MethodGet,
withResponse: nil,
withRequest: nil,
acceptedStatusCodes: []int{http.StatusOK},
},
expectedResp: nil,
wantErr: true,
},
{
name: "Test disable retries",
internalReq: &internalRequest{
endpoint: "/test-get",
method: http.MethodGet,
withResponse: nil,
withRequest: nil,
acceptedStatusCodes: []int{http.StatusOK},
},
expectedResp: nil,
disableRetry: true,
wantErr: false,
},
{
name: "Test request timeout on retries",
internalReq: &internalRequest{
endpoint: "/failed-retry",
method: http.MethodGet,
withResponse: nil,
withRequest: nil,
acceptedStatusCodes: []int{http.StatusOK},
},
expectedResp: nil,
withTimeout: true,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := newClient(&http.Client{}, ts.URL, "testApiKey", tt.contentEncoding, DefaultCompression)
c := newClient(&http.Client{}, ts.URL, "testApiKey", clientConfig{
contentEncoding: tt.contentEncoding,
encodingCompressionLevel: DefaultCompression,
maxRetries: 3,
disableRetry: tt.disableRetry,
retryOnStatus: map[int]bool{
502: true,
503: true,
504: true,
},
})

ctx := context.Background()

if tt.withTimeout {
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Second)
ctx = timeoutCtx
defer cancel()
}

err := c.executeRequest(context.Background(), tt.internalReq)
err := c.executeRequest(ctx, tt.internalReq)
if tt.wantErr {
require.Error(t, err)
} else {
Expand Down
5 changes: 5 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
MeilisearchTimeoutError
// MeilisearchCommunicationError impossible execute a request
MeilisearchCommunicationError
// MeilisearchMaxRetriesExceeded used max retries and exceeded
MeilisearchMaxRetriesExceeded
)

const (
Expand All @@ -35,6 +37,7 @@ const (
rawStringMeilisearchApiErrorWithoutMessage = `unaccepted status code found: ${statusCode} expected: ${statusCodeExpected}, MeilisearchApiError Message: ${message}`
rawStringMeilisearchTimeoutError = `MeilisearchTimeoutError`
rawStringMeilisearchCommunicationError = `MeilisearchCommunicationError unable to execute request`
rawStringMeilisearchMaxRetriesExceeded = "failed to request and max retries exceeded"
)

func (e ErrCode) rawMessage() string {
Expand All @@ -51,6 +54,8 @@ func (e ErrCode) rawMessage() string {
return rawStringMeilisearchTimeoutError + " " + rawStringCtx
case MeilisearchCommunicationError:
return rawStringMeilisearchCommunicationError + " " + rawStringCtx
case MeilisearchMaxRetriesExceeded:
return rawStringMeilisearchMaxRetriesExceeded + " " + rawStringCtx
default:
return rawStringCtx
}
Expand Down
9 changes: 7 additions & 2 deletions meilisearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,13 @@ func New(host string, options ...Option) ServiceManager {
defOpt.client,
host,
defOpt.apiKey,
defOpt.contentEncoding.encodingType,
defOpt.contentEncoding.level,
clientConfig{
contentEncoding: defOpt.contentEncoding.encodingType,
encodingCompressionLevel: defOpt.contentEncoding.level,
disableRetry: defOpt.disableRetry,
retryOnStatus: defOpt.retryOnStatus,
maxRetries: defOpt.maxRetries,
},
),
}
}
Expand Down
33 changes: 33 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@ var (
contentEncoding: &encodingOpt{
level: DefaultCompression,
},
retryOnStatus: map[int]bool{
502: true,
503: true,
504: true,
},
disableRetry: false,
maxRetries: 3,
}
)

type meiliOpt struct {
client *http.Client
apiKey string
contentEncoding *encodingOpt
retryOnStatus map[int]bool
disableRetry bool
maxRetries uint8
}

type encodingOpt struct {
Expand Down Expand Up @@ -70,6 +80,29 @@ func WithContentEncoding(encodingType ContentEncoding, level EncodingCompression
}
}

// WithCustomRetries set retry on specific http error code and max retries (min: 1, max: 255)
func WithCustomRetries(retryOnStatus []int, maxRetries uint8) Option {
return func(opt *meiliOpt) {
opt.retryOnStatus = make(map[int]bool)
for _, status := range retryOnStatus {
opt.retryOnStatus[status] = true
}

if maxRetries == 0 {
maxRetries = 1
}

opt.maxRetries = maxRetries
}
}

// DisableRetries disable retry logic in client
func DisableRetries() Option {
return func(opt *meiliOpt) {
opt.disableRetry = true
}
}

func baseTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
Expand Down
19 changes: 19 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,22 @@ func TestOptions_WithContentEncoding(t *testing.T) {
require.Equal(t, m.client.contentEncoding, GzipEncoding)
require.NotNil(t, m.client.encoder)
}

func TestOptions_WithCustomRetries(t *testing.T) {
meili := setup(t, "", WithCustomRetries([]int{http.StatusInternalServerError}, 10))
require.NotNil(t, meili)

m, ok := meili.(*meilisearch)
require.True(t, ok)
require.True(t, m.client.retryOnStatus[http.StatusInternalServerError])
require.Equal(t, m.client.maxRetries, uint8(10))
}

func TestOptions_DisableRetries(t *testing.T) {
meili := setup(t, "", DisableRetries())
require.NotNil(t, meili)

m, ok := meili.(*meilisearch)
require.True(t, ok)
require.Equal(t, m.client.disableRetry, true)
}

0 comments on commit 0d42e1e

Please sign in to comment.