Skip to content

Commit

Permalink
Sync manager to use HTTP client with retries (#7815)
Browse files Browse the repository at this point in the history
* Sync manager to use HTTP client with retries

* With Presign uploads too
  • Loading branch information
itaiad200 authored May 29, 2024
1 parent c0b0d60 commit 0ee3512
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var fsDownloadCmd = &cobra.Command{
}
}()

s := local.NewSyncManager(ctx, client, syncFlags)
s := local.NewSyncManager(ctx, client, getHTTPClient(), syncFlags)
err := s.Sync(dest, remote, ch)
if err != nil {
DieErr(err)
Expand Down
4 changes: 2 additions & 2 deletions cmd/lakectl/cmd/fs_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var fsUploadCmd = &cobra.Command{
c <- change
}
}()
s := local.NewSyncManager(ctx, client, syncFlags)
s := local.NewSyncManager(ctx, client, getHTTPClient(), syncFlags)
fullPath, err := filepath.Abs(source)
if err != nil {
DieErr(err)
Expand All @@ -87,7 +87,7 @@ func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sou
}()
objectPath := apiutil.Value(destURI.Path)
if syncFlags.Presign {
return helpers.ClientUploadPreSign(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp, syncFlags.PresignMultipart)
return helpers.ClientUploadPreSign(ctx, client, getHTTPClient(), destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp, syncFlags.PresignMultipart)
}
return helpers.ClientUpload(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, co
currentBase := remote.WithRef(idx.AtHead)
diffs := local.Undo(localDiff(cmd.Context(), client, currentBase, idx.LocalPath()))
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(checkoutOperation))
syncMgr := local.NewSyncManager(sigCtx, client, syncFlags)
syncMgr := local.NewSyncManager(sigCtx, client, getHTTPClient(), syncFlags)
// confirm on local changes
if confirmByFlag && len(diffs) > 0 {
fmt.Println("Uncommitted changes exist, the operation will revert all changes on local directory.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var localCloneCmd = &cobra.Command{
DieErr(err)
}
sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags)
s := local.NewSyncManager(sigCtx, client, getHTTPClient(), syncFlags)
err = s.Sync(localPath, stableRemote, ch)
if err != nil {
DieErr(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ var localCommitCmd = &cobra.Command{
}

sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(commitOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags)
s := local.NewSyncManager(sigCtx, client, getHTTPClient(), syncFlags)
err = s.Sync(idx.LocalPath(), remote, c)
if err != nil {
DieErr(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var localPullCmd = &cobra.Command{
return nil
})
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(pullOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags)
s := local.NewSyncManager(sigCtx, client, getHTTPClient(), syncFlags)
err = s.Sync(idx.LocalPath(), newBase, c)
if err != nil {
DieErr(err)
Expand Down
14 changes: 7 additions & 7 deletions cmd/lakectl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,21 +435,21 @@ func sendStats(ctx context.Context, client apigen.ClientWithResponsesInterface,
}
}

func getClient() *apigen.ClientWithResponses {
func getHTTPClient() *http.Client {
// Override MaxIdleConnsPerHost to allow highly concurrent access to our API client.
// This is done to avoid accumulating many sockets in `TIME_WAIT` status that were closed
// only to be immediately reopened.
// see: https://stackoverflow.com/a/39834253
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = DefaultMaxIdleConnsPerHost
var httpClient *http.Client
if !cfg.Server.Retries.Enabled {
httpClient = &http.Client{
Transport: transport,
}
} else {
httpClient = NewRetryClient(cfg.Server.Retries, transport)
return &http.Client{Transport: transport}
}
return NewRetryClient(cfg.Server.Retries, transport)
}

func getClient() *apigen.ClientWithResponses {
httpClient := getHTTPClient()

accessKeyID := cfg.Credentials.AccessKeyID
secretAccessKey := cfg.Credentials.SecretAccessKey
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/helpers/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ type presignUpload struct {
numParts int
}

func NewPreSignUploader(client apigen.ClientWithResponsesInterface, multipartSupport bool) *PreSignUploader {
func NewPreSignUploader(client apigen.ClientWithResponsesInterface, httpClient *http.Client, multipartSupport bool) *PreSignUploader {
return &PreSignUploader{
Concurrency: DefaultUploadConcurrency,
HTTPClient: http.DefaultClient,
HTTPClient: httpClient,
Client: client,
MultipartSupport: multipartSupport,
}
Expand Down Expand Up @@ -403,9 +403,9 @@ func (u *presignUpload) Upload(ctx context.Context) (*apigen.ObjectStats, error)
return u.uploadObject(ctx)
}

func ClientUploadPreSign(ctx context.Context, client apigen.ClientWithResponsesInterface, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker, presignMultipartSupport bool) (*apigen.ObjectStats, error) {
func ClientUploadPreSign(ctx context.Context, client apigen.ClientWithResponsesInterface, httpClient *http.Client, repoID, branchID, objPath string, metadata map[string]string, contentType string, contents io.ReadSeeker, presignMultipartSupport bool) (*apigen.ObjectStats, error) {
// upload loop, retry on conflict
uploader := NewPreSignUploader(client, presignMultipartSupport)
uploader := NewPreSignUploader(client, httpClient, presignMultipartSupport)
for {
stats, err := uploader.Upload(ctx, repoID, branchID, objPath, contents, contentType, metadata)
if err == nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/local/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ type SyncManager struct {
tasks Tasks
}

func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, flags SyncFlags) *SyncManager {
func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, httpClient *http.Client, flags SyncFlags) *SyncManager {
return &SyncManager{
ctx: ctx,
client: client,
httpClient: http.DefaultClient,
httpClient: httpClient,
progressBar: NewProgressPool(),
flags: flags,
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.U
}
if s.flags.Presign {
_, err = helpers.ClientUploadPreSign(
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", reader, s.flags.PresignMultipart)
ctx, s.client, s.httpClient, remote.Repository, remote.Ref, dest, metadata, "", reader, s.flags.PresignMultipart)
return err
}
// not pre-signed
Expand Down

0 comments on commit 0ee3512

Please sign in to comment.