From 0d42e1e104e5d411a34f63b1dfc1e222ccb891e6 Mon Sep 17 00:00:00 2001 From: Javad Date: Wed, 2 Oct 2024 14:05:14 +0330 Subject: [PATCH] feat: add retry pattern to improve reliability --- client.go | 97 ++++++++++++++++++++++++++++++++++++++++++++----- client_test.go | 86 ++++++++++++++++++++++++++++++++++++++++++- error.go | 5 +++ meilisearch.go | 9 ++++- options.go | 33 +++++++++++++++++ options_test.go | 19 ++++++++++ 6 files changed, 236 insertions(+), 13 deletions(-) diff --git a/client.go b/client.go index abc400c1..73d3f707 100644 --- a/client.go +++ b/client.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "sync" + "time" ) type client struct { @@ -19,6 +20,18 @@ type client struct { bufferPool *sync.Pool encoder encoder contentEncoding ContentEncoding + retryOnStatus map[int]bool + disableRetry bool + maxRetries uint8 + retryBackoff func(attempt uint8) time.Duration +} + +type clientConfig struct { + contentEncoding ContentEncoding + encodingCompressionLevel EncodingCompressionLevel + retryOnStatus map[int]bool + disableRetry bool + maxRetries uint8 } type internalRequest struct { @@ -34,7 +47,7 @@ type internalRequest struct { functionName string } -func newClient(cli *http.Client, host, apiKey string, ce ContentEncoding, cl EncodingCompressionLevel) *client { +func newClient(cli *http.Client, host, apiKey string, cfg clientConfig) *client { c := &client{ client: cli, host: host, @@ -44,11 +57,28 @@ func newClient(cli *http.Client, host, apiKey string, ce ContentEncoding, cl Enc return new(bytes.Buffer) }, }, + disableRetry: cfg.disableRetry, + maxRetries: cfg.maxRetries, + retryOnStatus: cfg.retryOnStatus, } - if !ce.IsZero() { - c.contentEncoding = ce - c.encoder = newEncoding(ce, cl) + if c.retryOnStatus == nil { + c.retryOnStatus = map[int]bool{ + 502: true, + 503: true, + 504: true, + } + } + + if !c.disableRetry && c.retryBackoff == nil { + c.retryBackoff = func(attempt uint8) time.Duration { + return time.Second * time.Duration(attempt) + } + } + + if !cfg.contentEncoding.IsZero() { + c.contentEncoding = cfg.contentEncoding + c.encoder = newEncoding(cfg.contentEncoding, cfg.encodingCompressionLevel) } return c @@ -197,12 +227,9 @@ func (c *client) sendRequest( request.Header.Set("User-Agent", GetQualifiedVersion()) - resp, err := c.client.Do(request) + resp, err := c.do(request, internalError) if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return nil, internalError.WithErrCode(MeilisearchTimeoutError, err) - } - return nil, internalError.WithErrCode(MeilisearchCommunicationError, err) + return nil, err } if body != nil { @@ -213,6 +240,58 @@ func (c *client) sendRequest( return resp, nil } +func (c *client) do(req *http.Request, internalError *Error) (resp *http.Response, err error) { + retriesCount := uint8(0) + + for { + resp, err = c.client.Do(req) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return nil, internalError.WithErrCode(MeilisearchTimeoutError, err) + } + return nil, internalError.WithErrCode(MeilisearchCommunicationError, err) + } + + // Exit if retries are disabled + if c.disableRetry { + break + } + + // Check if response status is retryable and we haven't exceeded max retries + if c.retryOnStatus[resp.StatusCode] && retriesCount < c.maxRetries { + retriesCount++ + + // Close response body to prevent memory leaks + resp.Body.Close() + + // Handle backoff with context cancellation support + backoff := c.retryBackoff(retriesCount) + timer := time.NewTimer(backoff) + + select { + case <-req.Context().Done(): + err := req.Context().Err() + timer.Stop() + return nil, internalError.WithErrCode(MeilisearchTimeoutError, err) + case <-timer.C: + // Retry after backoff + timer.Stop() + } + + continue + } + + break + } + + // Return error if retries exceeded the maximum limit + if !c.disableRetry && retriesCount >= c.maxRetries { + return nil, internalError.WithErrCode(MeilisearchMaxRetriesExceeded, nil) + } + + return resp, nil +} + func (c *client) handleStatusCode(req *internalRequest, statusCode int, body []byte, internalError *Error) error { if req.acceptedStatusCodes != nil { diff --git a/client_test.go b/client_test.go index d984bc9e..f786f8b8 100644 --- a/client_test.go +++ b/client_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" ) // Mock structures for testing @@ -26,6 +27,8 @@ type mockJsonMarshaller struct { } func TestExecuteRequest(t *testing.T) { + retryCount := 0 + // Create a mock server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == http.MethodGet && r.URL.Path == "/test-get" { @@ -99,6 +102,15 @@ func TestExecuteRequest(t *testing.T) { } else if r.URL.Path == "/io-reader" { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"message":"io reader"}`)) + } else if r.URL.Path == "/failed-retry" { + w.WriteHeader(http.StatusBadGateway) + } else if r.URL.Path == "/success-retry" { + if retryCount == 2 { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusBadGateway) + retryCount++ } else { w.WriteHeader(http.StatusNotFound) } @@ -110,6 +122,8 @@ func TestExecuteRequest(t *testing.T) { internalReq *internalRequest expectedResp interface{} contentEncoding ContentEncoding + withTimeout bool + disableRetry bool wantErr bool }{ { @@ -342,13 +356,81 @@ func TestExecuteRequest(t *testing.T) { contentEncoding: GzipEncoding, wantErr: false, }, + { + name: "Test successful retries", + internalReq: &internalRequest{ + endpoint: "/success-retry", + method: http.MethodGet, + withResponse: nil, + withRequest: nil, + acceptedStatusCodes: []int{http.StatusOK}, + }, + expectedResp: nil, + wantErr: false, + }, + { + name: "Test failed retries", + internalReq: &internalRequest{ + endpoint: "/failed-retry", + method: http.MethodGet, + withResponse: nil, + withRequest: nil, + acceptedStatusCodes: []int{http.StatusOK}, + }, + expectedResp: nil, + wantErr: true, + }, + { + name: "Test disable retries", + internalReq: &internalRequest{ + endpoint: "/test-get", + method: http.MethodGet, + withResponse: nil, + withRequest: nil, + acceptedStatusCodes: []int{http.StatusOK}, + }, + expectedResp: nil, + disableRetry: true, + wantErr: false, + }, + { + name: "Test request timeout on retries", + internalReq: &internalRequest{ + endpoint: "/failed-retry", + method: http.MethodGet, + withResponse: nil, + withRequest: nil, + acceptedStatusCodes: []int{http.StatusOK}, + }, + expectedResp: nil, + withTimeout: true, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := newClient(&http.Client{}, ts.URL, "testApiKey", tt.contentEncoding, DefaultCompression) + c := newClient(&http.Client{}, ts.URL, "testApiKey", clientConfig{ + contentEncoding: tt.contentEncoding, + encodingCompressionLevel: DefaultCompression, + maxRetries: 3, + disableRetry: tt.disableRetry, + retryOnStatus: map[int]bool{ + 502: true, + 503: true, + 504: true, + }, + }) + + ctx := context.Background() + + if tt.withTimeout { + timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + ctx = timeoutCtx + defer cancel() + } - err := c.executeRequest(context.Background(), tt.internalReq) + err := c.executeRequest(ctx, tt.internalReq) if tt.wantErr { require.Error(t, err) } else { diff --git a/error.go b/error.go index 81a5c011..20c28a6a 100644 --- a/error.go +++ b/error.go @@ -25,6 +25,8 @@ const ( MeilisearchTimeoutError // MeilisearchCommunicationError impossible execute a request MeilisearchCommunicationError + // MeilisearchMaxRetriesExceeded used max retries and exceeded + MeilisearchMaxRetriesExceeded ) const ( @@ -35,6 +37,7 @@ const ( rawStringMeilisearchApiErrorWithoutMessage = `unaccepted status code found: ${statusCode} expected: ${statusCodeExpected}, MeilisearchApiError Message: ${message}` rawStringMeilisearchTimeoutError = `MeilisearchTimeoutError` rawStringMeilisearchCommunicationError = `MeilisearchCommunicationError unable to execute request` + rawStringMeilisearchMaxRetriesExceeded = "failed to request and max retries exceeded" ) func (e ErrCode) rawMessage() string { @@ -51,6 +54,8 @@ func (e ErrCode) rawMessage() string { return rawStringMeilisearchTimeoutError + " " + rawStringCtx case MeilisearchCommunicationError: return rawStringMeilisearchCommunicationError + " " + rawStringCtx + case MeilisearchMaxRetriesExceeded: + return rawStringMeilisearchMaxRetriesExceeded + " " + rawStringCtx default: return rawStringCtx } diff --git a/meilisearch.go b/meilisearch.go index d80b51fd..2d85895b 100644 --- a/meilisearch.go +++ b/meilisearch.go @@ -182,8 +182,13 @@ func New(host string, options ...Option) ServiceManager { defOpt.client, host, defOpt.apiKey, - defOpt.contentEncoding.encodingType, - defOpt.contentEncoding.level, + clientConfig{ + contentEncoding: defOpt.contentEncoding.encodingType, + encodingCompressionLevel: defOpt.contentEncoding.level, + disableRetry: defOpt.disableRetry, + retryOnStatus: defOpt.retryOnStatus, + maxRetries: defOpt.maxRetries, + }, ), } } diff --git a/options.go b/options.go index ef89e25b..1a24a30b 100644 --- a/options.go +++ b/options.go @@ -15,6 +15,13 @@ var ( contentEncoding: &encodingOpt{ level: DefaultCompression, }, + retryOnStatus: map[int]bool{ + 502: true, + 503: true, + 504: true, + }, + disableRetry: false, + maxRetries: 3, } ) @@ -22,6 +29,9 @@ type meiliOpt struct { client *http.Client apiKey string contentEncoding *encodingOpt + retryOnStatus map[int]bool + disableRetry bool + maxRetries uint8 } type encodingOpt struct { @@ -70,6 +80,29 @@ func WithContentEncoding(encodingType ContentEncoding, level EncodingCompression } } +// WithCustomRetries set retry on specific http error code and max retries (min: 1, max: 255) +func WithCustomRetries(retryOnStatus []int, maxRetries uint8) Option { + return func(opt *meiliOpt) { + opt.retryOnStatus = make(map[int]bool) + for _, status := range retryOnStatus { + opt.retryOnStatus[status] = true + } + + if maxRetries == 0 { + maxRetries = 1 + } + + opt.maxRetries = maxRetries + } +} + +// DisableRetries disable retry logic in client +func DisableRetries() Option { + return func(opt *meiliOpt) { + opt.disableRetry = true + } +} + func baseTransport() *http.Transport { return &http.Transport{ Proxy: http.ProxyFromEnvironment, diff --git a/options_test.go b/options_test.go index 1b687074..b2368d40 100644 --- a/options_test.go +++ b/options_test.go @@ -51,3 +51,22 @@ func TestOptions_WithContentEncoding(t *testing.T) { require.Equal(t, m.client.contentEncoding, GzipEncoding) require.NotNil(t, m.client.encoder) } + +func TestOptions_WithCustomRetries(t *testing.T) { + meili := setup(t, "", WithCustomRetries([]int{http.StatusInternalServerError}, 10)) + require.NotNil(t, meili) + + m, ok := meili.(*meilisearch) + require.True(t, ok) + require.True(t, m.client.retryOnStatus[http.StatusInternalServerError]) + require.Equal(t, m.client.maxRetries, uint8(10)) +} + +func TestOptions_DisableRetries(t *testing.T) { + meili := setup(t, "", DisableRetries()) + require.NotNil(t, meili) + + m, ok := meili.(*meilisearch) + require.True(t, ok) + require.Equal(t, m.client.disableRetry, true) +}