Skip to content

Commit

Permalink
feat(client): retry internal retryable errors (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
go-to-k authored Jul 20, 2024
1 parent 0bbb9bc commit 4436bc2
Show file tree
Hide file tree
Showing 2 changed files with 372 additions and 30 deletions.
94 changes: 64 additions & 30 deletions pkg/client/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package client
import (
"context"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/go-to-k/cls3/internal/io"
)

var SleepTimeSecForS3 = 10
Expand Down Expand Up @@ -79,42 +79,76 @@ func (s *S3) DeleteObjects(
objects []types.ObjectIdentifier,
region string,
) ([]types.Error, error) {
// Assuming that the number of objects received as an argument does not
// exceed 1000, so no loop processing and validation whether exceeds
// 1000 or not are good.
if len(objects) == 0 {
return []types.Error{}, nil
}
errors := []types.Error{}
retryCounts := 0

input := &s3.DeleteObjectsInput{
Bucket: bucketName,
Delete: &types.Delete{
Objects: objects,
Quiet: aws.Bool(true),
},
}
for {
// Assuming that the number of objects received as an argument does not
// exceed 1000, so no slice splitting and validation whether exceeds
// 1000 or not are good.
if len(objects) == 0 {
break
}

retryable := func(err error) bool {
isErrorRetryable := strings.Contains(err.Error(), "api error SlowDown")
if isErrorRetryable {
io.Logger.Debug().Msgf("Retry: %s", err.Error())
input := &s3.DeleteObjectsInput{
Bucket: bucketName,
Delete: &types.Delete{
Objects: objects,
Quiet: aws.Bool(true),
},
}
return isErrorRetryable
}
optFn := func(o *s3.Options) {
o.Retryer = NewRetryer(retryable, SleepTimeSecForS3)
o.Region = region
}

output, err := s.client.DeleteObjects(ctx, input, optFn)
if err != nil {
return []types.Error{}, &ClientError{
ResourceName: bucketName,
Err: err,
retryable := func(err error) bool {
isErrorRetryable := strings.Contains(err.Error(), "api error SlowDown")
return isErrorRetryable
}
retryer := NewRetryer(retryable, SleepTimeSecForS3)
optFn := func(o *s3.Options) {
o.Retryer = retryer
o.Region = region
}

output, err := s.client.DeleteObjects(ctx, input, optFn)
if err != nil {
return []types.Error{}, &ClientError{
ResourceName: bucketName,
Err: err,
}
}

if len(output.Errors) == 0 {
break
}

retryCounts++

if retryCounts > retryer.MaxAttempts() {
errors = append(errors, output.Errors...)
break
}

objects = []types.ObjectIdentifier{}
for _, err := range output.Errors {
// Error example:
// Code: InternalError
// Message: We encountered an internal error. Please try again.
if strings.Contains(*err.Message, "Please try again") {
objects = append(objects, types.ObjectIdentifier{
Key: err.Key,
VersionId: err.VersionId,
})
} else {
errors = append(errors, err)
}
}
// random sleep
if len(objects) > 0 {
sleepTime, _ := retryer.RetryDelay(0, nil)
time.Sleep(sleepTime)
}
}

return output.Errors, nil
return errors, nil
}

func (s *S3) ListObjectVersions(
Expand Down
Loading

0 comments on commit 4436bc2

Please sign in to comment.