From e7fb7b8f4d65d228f92c07ec79150a94ced954ca Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 23 Oct 2024 11:43:22 +0300 Subject: [PATCH 01/48] Revert "docs: clarify auditing availble on enterprise (#8289)" This reverts commit e12badc61311809390186938dc8d97847dce0a03. --- docs/reference/auditing.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/reference/auditing.md b/docs/reference/auditing.md index bcf7ad1a3f6..cfebd88fbfd 100644 --- a/docs/reference/auditing.md +++ b/docs/reference/auditing.md @@ -17,7 +17,10 @@ lakeFS Enterprise {: .label .label-purple } {: .note} -> Auditing is only available for [lakeFS Cloud]({% link cloud/index.md %}) and [lakeFS Enterprise]({% link enterprise/index.md %}). +> Auditing is only available for [lakeFS Cloud]({% link cloud/index.md %}). + +{: .warning } +> Please note, as of Jan 2024, the queryable interface within the lakeFS Cloud UI has been removed in favor of direct access to lakeFS audit logs. This document now describes how to set up and query this information using [AWS Glue](https://aws.amazon.com/glue/) as a reference. The lakeFS audit log allows you to view all relevant user action information in a clear and organized table, including when the action was performed, by whom, and what it was they did. From d9ea54065c94b3b507e729a653d72a3396d3079a Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 20 Jan 2025 16:37:05 +0200 Subject: [PATCH 02/48] no work no mess --- pkg/block/adapter.go | 11 +++++++- pkg/block/azure/adapter.go | 3 +++ pkg/block/gs/adapter.go | 3 +++ pkg/block/local/adapter.go | 3 +++ pkg/block/mem/adapter.go | 3 +++ pkg/block/metrics.go | 4 +++ pkg/block/s3/adapter.go | 42 +++++++++++++++++++++++++++++ pkg/block/transient/adapter.go | 3 +++ pkg/gateway/operations/getobject.go | 42 +++++++++++++++++++++++++++++ pkg/gateway/operations/putobject.go | 11 ++++---- pkg/gateway/serde/xml.go | 11 ++++++++ pkg/testutil/adapter.go | 4 ++- 12 files changed, 133 insertions(+), 7 deletions(-) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 77f82e6822b..1f640c073f4 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -15,6 +15,10 @@ type MultipartPart struct { LastModified time.Time Size int64 } +type Upload struct { + Key string + UploadID string +} // MultipartUploadCompletion parts described as part of complete multipart upload. Each part holds the part number and ETag received while calling part upload. // NOTE that S3 implementation and our S3 gateway accept and returns ETag value surrounded with double-quotes ("), while @@ -121,6 +125,10 @@ type ListPartsResponse struct { IsTruncated bool } +type ListMultipartUploadsResponse struct { + Uploads []Upload +} + // CreateMultiPartUploadOpts contains optional arguments for // CreateMultiPartUpload. These should be analogous to options on // some underlying storage layer. Missing arguments are mapped to the @@ -187,8 +195,9 @@ type Adapter interface { CreateMultiPartUpload(ctx context.Context, obj ObjectPointer, r *http.Request, opts CreateMultiPartUploadOpts) (*CreateMultiPartUploadResponse, error) UploadPart(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int) (*UploadPartResponse, error) - ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) + ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error) + ListMultipartUploads(ctx context.Context, obj ObjectPointer) (*ListMultipartUploadsResponse, error) UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int, startPosition, endPosition int64) (*UploadPartResponse, error) AbortMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string) error CompleteMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string, multipartList *MultipartUploadCompletion) (*CompleteMultiPartUploadResponse, error) diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index 3c9a332d6e8..7f6f13623e9 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -644,6 +644,9 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + return nil, block.ErrOperationNotSupported +} // ParseURL - parses url and extracts account name and domain. If either are not found returns an error func ParseURL(uri *url.URL) (accountName string, domain string, err error) { diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index 8695532d642..f4830b7c3ed 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -717,3 +717,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + return nil, block.ErrOperationNotSupported +} diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index 0b6fb31dd54..f14bb778b42 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -600,3 +600,6 @@ func (l *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (l *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + return nil, block.ErrOperationNotSupported +} diff --git a/pkg/block/mem/adapter.go b/pkg/block/mem/adapter.go index d394945bbf2..267eade435c 100644 --- a/pkg/block/mem/adapter.go +++ b/pkg/block/mem/adapter.go @@ -369,3 +369,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + return nil, block.ErrOperationNotSupported +} diff --git a/pkg/block/metrics.go b/pkg/block/metrics.go index 36c697f330e..413a4fb5c3b 100644 --- a/pkg/block/metrics.go +++ b/pkg/block/metrics.go @@ -85,6 +85,10 @@ func (m *MetricsAdapter) ListParts(ctx context.Context, obj ObjectPointer, uploa ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) return m.adapter.ListParts(ctx, obj, uploadID, opts) } +func (m *MetricsAdapter) ListMultipartUploads(ctx context.Context, obj ObjectPointer) (*ListMultipartUploadsResponse, error) { + ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) + return m.adapter.ListMultipartUploads(ctx, obj) +} func (m *MetricsAdapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) { ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 2f8ff3060c0..fb4fda43223 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -849,6 +849,48 @@ func (a *Adapter) ListParts(ctx context.Context, obj block.ObjectPointer, upload return &partsResp, nil } +func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + var err error + defer reportMetrics("ListParts", time.Now(), nil, &err) + bucket, _, qualifiedKey, err := a.extractParamsFromObj(obj) + if err != nil { + return nil, err + } + + input := &s3.ListMultipartUploadsInput{ + Bucket: aws.String(bucket), + Prefix: aws.String(""), + } + + lg := a.log(ctx).WithFields(logging.Fields{ + "qualified_ns": qualifiedKey.GetStorageNamespace(), + "qualified_key": qualifiedKey.GetKey(), + "key": obj.Identifier, + }) + client := a.clients.Get(ctx, bucket) + resp, err := client.ListMultipartUploads(ctx, input) + if err != nil { + lg.WithError(err).Error("ListParts failed") + return nil, err + } + + partsResp := block.ListMultipartUploadsResponse{ + Uploads: make([]block.Upload, len(resp.Uploads)), + } + for _, upload := range resp.Uploads { + // multipart.tracker.get(f) + // partsResp.Parts[i] = block.MultipartPart{ + // ETag: strings.Trim(aws.ToString(upload.ETag), `"`), + // PartNumber: int(aws.ToInt32(upload.PartNumber)), + // LastModified: aws.ToTime(upload.LastModified), + // Size: aws.ToInt64(upload.Size), + } + + // lg.WithField("num_parts", len(resp.Parts)).Debug("list multipart upload parts") + + return &partsResp, nil +} + func (a *Adapter) BlockstoreType() string { return block.BlockstoreTypeS3 } diff --git a/pkg/block/transient/adapter.go b/pkg/block/transient/adapter.go index 1d98dd1834b..92034093f5c 100644 --- a/pkg/block/transient/adapter.go +++ b/pkg/block/transient/adapter.go @@ -178,3 +178,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + return nil, block.ErrOperationNotSupported +} diff --git a/pkg/gateway/operations/getobject.go b/pkg/gateway/operations/getobject.go index 22f054bccfc..4025c74528d 100644 --- a/pkg/gateway/operations/getobject.go +++ b/pkg/gateway/operations/getobject.go @@ -65,6 +65,11 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o handleListParts(w, req, o) return } + // check if this is a list multipart uploads call + if query.Has(QueryParamListMultipart) { + handleListMultipartUploads(w, req, o) + return + } beforeMeta := time.Now() entry, err := o.Catalog.GetEntry(ctx, o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{}) @@ -243,3 +248,40 @@ func handleListParts(w http.ResponseWriter, req *http.Request, o *PathOperation) o.EncodeResponse(w, req, resp, http.StatusOK) } + +func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *PathOperation) { + o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, o.Reference) + // query := req.URL.Query() + + resp := &serde.ListMultipartUploadsOutput{ + Bucket: o.Repository.Name, + } + + partsResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ + StorageNamespace: o.Repository.StorageNamespace, + IdentifierType: block.IdentifierTypeRelative, + //Identifier: multiPart.PhysicalAddress, + }) + if err != nil { + o.Log(req).WithError(err).Error("list multipart uploads failed") + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + return + } + uploads := make([]serde.Upload, len(partsResp.Uploads)) + for i, upload := range partsResp.Uploads { + + multiPart, err := o.MultipartTracker.Get(req.Context(), upload.UploadID) + if err != nil { + o.Log(req).WithError(err).Error("could not read multipart record") + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + return + } + uploads[i] = serde.Upload{ + Key: multiPart.Path, + UploadID: upload.UploadID, + } + } + resp.Uploads = uploads + + o.EncodeResponse(w, req, resp, http.StatusOK) +} diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index aeacec0442b..5181fd44a5a 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -20,11 +20,12 @@ import ( ) const ( - IfNoneMatchHeader = "If-None-Match" - CopySourceHeader = "x-amz-copy-source" - CopySourceRangeHeader = "x-amz-copy-source-range" - QueryParamUploadID = "uploadId" - QueryParamPartNumber = "partNumber" + IfNoneMatchHeader = "If-None-Match" + CopySourceHeader = "x-amz-copy-source" + CopySourceRangeHeader = "x-amz-copy-source-range" + QueryParamUploadID = "uploadId" + QueryParamListMultipart = "uploads" + QueryParamPartNumber = "partNumber" ) type PutObject struct{} diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index 116bdab7364..94bbde6aeae 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -149,6 +149,11 @@ type MultipartUploadPart struct { Size int64 `xml:"Size"` } +type Upload struct { + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` +} + type ListPartsOutput struct { XMLName xml.Name `xml:"ListPartsResult"` Bucket string `xml:"Bucket"` @@ -159,6 +164,12 @@ type ListPartsOutput struct { Parts []MultipartUploadPart `xml:"Part"` } +type ListMultipartUploadsOutput struct { + XMLName xml.Name `xml:"ListMultipartUploadsResult"` + Bucket string `xml:"Bucket"` + Uploads []Upload `xml:"Upload"` +} + type VersioningConfiguration struct { Enabled bool `xml:"Enabled,omitempty"` } diff --git a/pkg/testutil/adapter.go b/pkg/testutil/adapter.go index 80923afa4c9..f9e6a66a794 100644 --- a/pkg/testutil/adapter.go +++ b/pkg/testutil/adapter.go @@ -128,7 +128,9 @@ func (a *MockAdapter) UploadCopyPartRange(_ context.Context, _, _ block.ObjectPo func (a *MockAdapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { panic("try to list parts in mock adapter") } - +func (a *MockAdapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { + panic("try to list multipart uploads in mock adapter") +} func (a *MockAdapter) BlockstoreType() string { return "s3" } From 7b1500d6a262311c9f6ea689d193ae5fa78411b2 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 16:19:07 +0200 Subject: [PATCH 03/48] got it --- pkg/block/adapter.go | 4 ++- pkg/block/s3/adapter.go | 17 +++------- pkg/gateway/handler.go | 3 ++ pkg/gateway/operations/base.go | 8 ++++- pkg/gateway/operations/getobject.go | 42 ------------------------ pkg/gateway/operations/listobjects.go | 46 +++++++++++++++++++++++++-- 6 files changed, 62 insertions(+), 58 deletions(-) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 1f640c073f4..0556a07239f 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -6,6 +6,8 @@ import ( "net/http" "net/url" "time" + + "github.com/aws/aws-sdk-go-v2/service/s3/types" ) // MultipartPart single multipart information @@ -126,7 +128,7 @@ type ListPartsResponse struct { } type ListMultipartUploadsResponse struct { - Uploads []Upload + Uploads []types.MultipartUpload } // CreateMultiPartUploadOpts contains optional arguments for diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index fb4fda43223..1b0ae4a6dbc 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -851,7 +851,7 @@ func (a *Adapter) ListParts(ctx context.Context, obj block.ObjectPointer, upload func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { var err error - defer reportMetrics("ListParts", time.Now(), nil, &err) + defer reportMetrics("ListMultipartUploads", time.Now(), nil, &err) bucket, _, qualifiedKey, err := a.extractParamsFromObj(obj) if err != nil { return nil, err @@ -859,7 +859,6 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin input := &s3.ListMultipartUploadsInput{ Bucket: aws.String(bucket), - Prefix: aws.String(""), } lg := a.log(ctx).WithFields(logging.Fields{ @@ -875,19 +874,13 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin } partsResp := block.ListMultipartUploadsResponse{ - Uploads: make([]block.Upload, len(resp.Uploads)), + Uploads: make([]types.MultipartUpload, len(resp.Uploads)), } for _, upload := range resp.Uploads { - // multipart.tracker.get(f) - // partsResp.Parts[i] = block.MultipartPart{ - // ETag: strings.Trim(aws.ToString(upload.ETag), `"`), - // PartNumber: int(aws.ToInt32(upload.PartNumber)), - // LastModified: aws.ToTime(upload.LastModified), - // Size: aws.ToInt64(upload.Size), + partsResp.Uploads = append(partsResp.Uploads, upload) + fmt.Println("key ", *upload.Key) + fmt.Println("key ", *upload.UploadId) } - - // lg.WithField("num_parts", len(resp.Parts)).Debug("list multipart upload parts") - return &partsResp, nil } diff --git a/pkg/gateway/handler.go b/pkg/gateway/handler.go index 242e6eaac4f..5b2efad7968 100644 --- a/pkg/gateway/handler.go +++ b/pkg/gateway/handler.go @@ -2,6 +2,7 @@ package gateway import ( "errors" + "fmt" "net/http" gohttputil "net/http/httputil" "net/url" @@ -198,6 +199,7 @@ func RepoOperationHandler(sc *ServerContext, handler operations.RepoOperationHan } func PathOperationHandler(sc *ServerContext, handler operations.PathOperationHandler) http.Handler { + fmt.Println("got to path operation handler") return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() repo := ctx.Value(ContextKeyRepository).(*catalog.Repository) @@ -319,6 +321,7 @@ func setDefaultContentType(w http.ResponseWriter, req *http.Request) { } func unsupportedOperationHandler() http.Handler { + fmt.Println("got to where u thought") return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { o := &operations.Operation{} _ = o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr()) diff --git a/pkg/gateway/operations/base.go b/pkg/gateway/operations/base.go index 98786e94bd7..3206a05f892 100644 --- a/pkg/gateway/operations/base.go +++ b/pkg/gateway/operations/base.go @@ -91,8 +91,14 @@ func (o *Operation) HandleUnsupported(w http.ResponseWriter, req *http.Request, return false } query := req.URL.Query() + fmt.Println(query) + fmt.Println(keys) + fmt.Println(o.OperationID) + // if query.Has("uploads") { + // return false + // } if slices.ContainsFunc(keys, query.Has) { - _ = o.EncodeError(w, req, nil, gwerrors.ERRLakeFSNotSupported.ToAPIErr()) + _ = o.EncodeError(w, req, nil, gwerrors.ErrAllAccessDisabled.ToAPIErr()) return true } return false diff --git a/pkg/gateway/operations/getobject.go b/pkg/gateway/operations/getobject.go index 4025c74528d..22f054bccfc 100644 --- a/pkg/gateway/operations/getobject.go +++ b/pkg/gateway/operations/getobject.go @@ -65,11 +65,6 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o handleListParts(w, req, o) return } - // check if this is a list multipart uploads call - if query.Has(QueryParamListMultipart) { - handleListMultipartUploads(w, req, o) - return - } beforeMeta := time.Now() entry, err := o.Catalog.GetEntry(ctx, o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{}) @@ -248,40 +243,3 @@ func handleListParts(w http.ResponseWriter, req *http.Request, o *PathOperation) o.EncodeResponse(w, req, resp, http.StatusOK) } - -func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *PathOperation) { - o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, o.Reference) - // query := req.URL.Query() - - resp := &serde.ListMultipartUploadsOutput{ - Bucket: o.Repository.Name, - } - - partsResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ - StorageNamespace: o.Repository.StorageNamespace, - IdentifierType: block.IdentifierTypeRelative, - //Identifier: multiPart.PhysicalAddress, - }) - if err != nil { - o.Log(req).WithError(err).Error("list multipart uploads failed") - _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) - return - } - uploads := make([]serde.Upload, len(partsResp.Uploads)) - for i, upload := range partsResp.Uploads { - - multiPart, err := o.MultipartTracker.Get(req.Context(), upload.UploadID) - if err != nil { - o.Log(req).WithError(err).Error("could not read multipart record") - _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) - return - } - uploads[i] = serde.Upload{ - Key: multiPart.Path, - UploadID: upload.UploadID, - } - } - resp.Uploads = uploads - - o.EncodeResponse(w, req, resp, http.StatusOK) -} diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index e124d75fecd..11d1f4be848 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -2,10 +2,12 @@ package operations import ( "errors" + "fmt" "net/http" "strconv" "strings" + "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/catalog" gatewayerrors "github.com/treeverse/lakefs/pkg/gateway/errors" "github.com/treeverse/lakefs/pkg/gateway/path" @@ -355,7 +357,7 @@ func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, if o.HandleUnsupported(w, req, "inventory", "metrics", "publicAccessBlock", "ownershipControls", "intelligent-tiering", "analytics", "policy", "lifecycle", "encryption", "object-lock", "replication", "notification", "events", "acl", "cors", "website", "accelerate", - "requestPayment", "logging", "tagging", "uploads", "versions", "policyStatus") { + "requestPayment", "logging", "tagging", "versions", "policyStatus") { return } query := req.URL.Query() @@ -370,7 +372,12 @@ func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, o.EncodeResponse(w, req, response, http.StatusOK) return } - + // check if request is list-multipart-uploads + if query.Has("uploads") { + fmt.Println("itamar, you are the king") + handleListMultipartUploads(w, req, o) + return + } // getbucketversioing support if query.Has("versioning") { o.EncodeXMLBytes(w, req, []byte(serde.VersioningResponse), http.StatusOK) @@ -393,3 +400,38 @@ func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, _ = o.EncodeError(w, req, nil, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrBadRequest)) } } + +func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *RepoOperation) { + o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") + resp := &serde.ListMultipartUploadsOutput{ + Bucket: o.Repository.Name, + } + + partsResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ + StorageNamespace: o.Repository.StorageNamespace, + IdentifierType: block.IdentifierTypeRelative, + }) + if err != nil { + o.Log(req).WithError(err).Error("list multipart uploads failed") + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + return + } + var uploads []serde.Upload + for _, upload := range partsResp.Uploads { + if upload.UploadId == nil { + continue + } + multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) + if err != nil { + o.Log(req).WithError(err).Error("could not read multipart record") + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + return + } + uploads = append(uploads, serde.Upload{ + Key: multiPart.Path, + UploadID: *upload.UploadId, + }) + } + resp.Uploads = uploads + o.EncodeResponse(w, req, resp, http.StatusOK) +} From 08ad93c7d28deb16738309877dbfcec1dc00af71 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 16:30:08 +0200 Subject: [PATCH 04/48] f**** auditing --- docs/reference/auditing.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/reference/auditing.md b/docs/reference/auditing.md index cfebd88fbfd..bcf7ad1a3f6 100644 --- a/docs/reference/auditing.md +++ b/docs/reference/auditing.md @@ -17,10 +17,7 @@ lakeFS Enterprise {: .label .label-purple } {: .note} -> Auditing is only available for [lakeFS Cloud]({% link cloud/index.md %}). - -{: .warning } -> Please note, as of Jan 2024, the queryable interface within the lakeFS Cloud UI has been removed in favor of direct access to lakeFS audit logs. This document now describes how to set up and query this information using [AWS Glue](https://aws.amazon.com/glue/) as a reference. +> Auditing is only available for [lakeFS Cloud]({% link cloud/index.md %}) and [lakeFS Enterprise]({% link enterprise/index.md %}). The lakeFS audit log allows you to view all relevant user action information in a clear and organized table, including when the action was performed, by whom, and what it was they did. From 1f7bf5662bff2ec09061660dc6367c903431dacd Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 17:13:53 +0200 Subject: [PATCH 05/48] with tests --- esti/s3_gateway_test.go | 64 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index c1037d16d52..563c6b1c219 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -238,6 +238,70 @@ func TestMultipartUploadIfNoneMatch(t *testing.T) { } } } +func TestListMultipartUploads(t *testing.T) { + ctx, logger, repo := setupTest(t) + defer tearDownTest(repo) + s3Endpoint := viper.GetString("s3_endpoint") + s3Client := createS3Client(s3Endpoint, t) + multipartNumberOfParts := 7 + multipartPartSize := 5 * 1024 * 1024 + + obj1 := "main/object1" + obj2 := "main/object2" + + input1 := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(repo), + Key: aws.String(obj1), + } + input2 := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(repo), + Key: aws.String(obj2), + } + + resp1, err := s3Client.CreateMultipartUpload(ctx, input1) + require.NoError(t, err, "failed to create multipart upload") + + parts := make([][]byte, multipartNumberOfParts) + for i := 0; i < multipartNumberOfParts; i++ { + parts[i] = randstr.Bytes(multipartPartSize + i) + } + + completedParts1 := uploadMultipartParts(t, ctx, s3Client, logger, resp1, parts, 0) + + completeInput1 := &s3.CompleteMultipartUploadInput{ + Bucket: resp1.Bucket, + Key: resp1.Key, + UploadId: resp1.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts1, + }, + } + output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.Contains(t, output.Uploads, obj1) + + resp2, err := s3Client.CreateMultipartUpload(ctx, input2) + require.NoError(t, err, "failed to create multipart upload") + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.Contains(t, output.Uploads, obj1) + require.Contains(t, output.Uploads, obj2) + + _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.NotContains(t, output.Uploads, obj1) + require.Contains(t, output.Uploads, obj2) + + abortInput2 := &s3.AbortMultipartUploadInput{ + Bucket: resp2.Bucket, + Key: resp2.Key, + UploadId: resp2.UploadId, + } + _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) + + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.NotContains(t, output.Uploads, obj1) + require.NotContains(t, output.Uploads, obj2) + +} func setHTTPHeaders(ifNoneMatch string) func(*middleware.Stack) error { return func(stack *middleware.Stack) error { From 2fbe42a827310e3320dd2c122dc6607096b53648 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 18:39:14 +0200 Subject: [PATCH 06/48] slight adjustment --- pkg/block/s3/adapter.go | 2 -- pkg/gateway/operations/listobjects.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 1b0ae4a6dbc..eaa7c0eb74b 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -878,8 +878,6 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin } for _, upload := range resp.Uploads { partsResp.Uploads = append(partsResp.Uploads, upload) - fmt.Println("key ", *upload.Key) - fmt.Println("key ", *upload.UploadId) } return &partsResp, nil } diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 11d1f4be848..f682e923397 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -2,7 +2,6 @@ package operations import ( "errors" - "fmt" "net/http" "strconv" "strings" @@ -374,7 +373,6 @@ func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, } // check if request is list-multipart-uploads if query.Has("uploads") { - fmt.Println("itamar, you are the king") handleListMultipartUploads(w, req, o) return } From 11f8d466de1400a24417d554b0f6b3c2d28298d5 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 19:11:13 +0200 Subject: [PATCH 07/48] cross ya fingaz --- esti/s3_gateway_test.go | 28 ++++++++++++++++++++-------- pkg/block/local/adapter.go | 2 +- pkg/block/s3/adapter.go | 5 ++--- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 563c6b1c219..e09c1c32792 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -277,18 +277,21 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.Contains(t, output.Uploads, obj1) + str := concatKeys(*output) + require.Contains(t, str, obj1) resp2, err := s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.Contains(t, output.Uploads, obj1) - require.Contains(t, output.Uploads, obj2) + str = concatKeys(*output) + require.Contains(t, str, obj1) + require.Contains(t, str, obj2) _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.NotContains(t, output.Uploads, obj1) - require.Contains(t, output.Uploads, obj2) + str = concatKeys(*output) + require.NotContains(t, str, obj1) + require.Contains(t, str, obj2) abortInput2 := &s3.AbortMultipartUploadInput{ Bucket: resp2.Bucket, @@ -298,11 +301,20 @@ func TestListMultipartUploads(t *testing.T) { _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.NotContains(t, output.Uploads, obj1) - require.NotContains(t, output.Uploads, obj2) + str = concatKeys(*output) + require.NotContains(t, str, obj1) + require.NotContains(t, str, obj2) } - +func concatKeys(output s3.ListMultipartUploadsOutput) string { + var allKeys string + for _, upload := range output.Uploads { + if upload.Key != nil { + allKeys += *upload.Key + " " + } + } + return allKeys +} func setHTTPHeaders(ifNoneMatch string) func(*middleware.Stack) error { return func(stack *middleware.Stack) error { return stack.Build.Add(middleware.BuildMiddlewareFunc("AddIfNoneMatchHeader", func( diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index f14bb778b42..6d9f95234e3 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -600,6 +600,6 @@ func (l *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (l *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } -func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (l *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index eaa7c0eb74b..67421a8cc7a 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -866,6 +866,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin "qualified_key": qualifiedKey.GetKey(), "key": obj.Identifier, }) + client := a.clients.Get(ctx, bucket) resp, err := client.ListMultipartUploads(ctx, input) if err != nil { @@ -876,9 +877,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin partsResp := block.ListMultipartUploadsResponse{ Uploads: make([]types.MultipartUpload, len(resp.Uploads)), } - for _, upload := range resp.Uploads { - partsResp.Uploads = append(partsResp.Uploads, upload) - } + partsResp.Uploads = append(partsResp.Uploads, resp.Uploads...) return &partsResp, nil } From 08977ec7142070eae6e24f2bec3109125a26a6dd Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 22:16:57 +0200 Subject: [PATCH 08/48] yalla --- esti/s3_gateway_test.go | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index e09c1c32792..7e6daf14c81 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -277,21 +277,17 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - str := concatKeys(*output) - require.Contains(t, str, obj1) + require.Contains(t, output.Uploads[0].Key, obj1) resp2, err := s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - str = concatKeys(*output) - require.Contains(t, str, obj1) - require.Contains(t, str, obj2) + require.Contains(t, output.Uploads[0].Key, obj1) + require.Contains(t, output.Uploads[1].Key, obj2) _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - str = concatKeys(*output) - require.NotContains(t, str, obj1) - require.Contains(t, str, obj2) + require.Contains(t, output.Uploads[0].Key, obj2) abortInput2 := &s3.AbortMultipartUploadInput{ Bucket: resp2.Bucket, @@ -301,20 +297,9 @@ func TestListMultipartUploads(t *testing.T) { _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - str = concatKeys(*output) - require.NotContains(t, str, obj1) - require.NotContains(t, str, obj2) - -} -func concatKeys(output s3.ListMultipartUploadsOutput) string { - var allKeys string - for _, upload := range output.Uploads { - if upload.Key != nil { - allKeys += *upload.Key + " " - } - } - return allKeys + require.True(t, len(output.Uploads) == 0) } + func setHTTPHeaders(ifNoneMatch string) func(*middleware.Stack) error { return func(stack *middleware.Stack) error { return stack.Build.Add(middleware.BuildMiddlewareFunc("AddIfNoneMatchHeader", func( From 749f305f063a4ad999f7a60f4290189aa46143a0 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 21 Jan 2025 22:28:24 +0200 Subject: [PATCH 09/48] yalla --- esti/s3_gateway_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 7e6daf14c81..7ef0a63aad0 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -277,7 +277,7 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.Contains(t, output.Uploads[0].Key, obj1) + require.Contains(t, *output.Uploads[0].Key, obj1) resp2, err := s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") From 7d121bd37d389b9b31f444ef9174292c274429d1 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 22 Jan 2025 10:50:56 +0200 Subject: [PATCH 10/48] tests dont yet work --- esti/s3_gateway_test.go | 2 +- pkg/gateway/handler.go | 3 --- pkg/gateway/operations/base.go | 6 ------ 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 7ef0a63aad0..7e6daf14c81 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -277,7 +277,7 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.Contains(t, *output.Uploads[0].Key, obj1) + require.Contains(t, output.Uploads[0].Key, obj1) resp2, err := s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") diff --git a/pkg/gateway/handler.go b/pkg/gateway/handler.go index 5b2efad7968..242e6eaac4f 100644 --- a/pkg/gateway/handler.go +++ b/pkg/gateway/handler.go @@ -2,7 +2,6 @@ package gateway import ( "errors" - "fmt" "net/http" gohttputil "net/http/httputil" "net/url" @@ -199,7 +198,6 @@ func RepoOperationHandler(sc *ServerContext, handler operations.RepoOperationHan } func PathOperationHandler(sc *ServerContext, handler operations.PathOperationHandler) http.Handler { - fmt.Println("got to path operation handler") return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() repo := ctx.Value(ContextKeyRepository).(*catalog.Repository) @@ -321,7 +319,6 @@ func setDefaultContentType(w http.ResponseWriter, req *http.Request) { } func unsupportedOperationHandler() http.Handler { - fmt.Println("got to where u thought") return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { o := &operations.Operation{} _ = o.EncodeError(w, req, nil, gatewayerrors.ERRLakeFSNotSupported.ToAPIErr()) diff --git a/pkg/gateway/operations/base.go b/pkg/gateway/operations/base.go index 3206a05f892..d1f459b113d 100644 --- a/pkg/gateway/operations/base.go +++ b/pkg/gateway/operations/base.go @@ -91,12 +91,6 @@ func (o *Operation) HandleUnsupported(w http.ResponseWriter, req *http.Request, return false } query := req.URL.Query() - fmt.Println(query) - fmt.Println(keys) - fmt.Println(o.OperationID) - // if query.Has("uploads") { - // return false - // } if slices.ContainsFunc(keys, query.Has) { _ = o.EncodeError(w, req, nil, gwerrors.ErrAllAccessDisabled.ToAPIErr()) return true From e04cd1b145601a79214a9f67a97657ee643797a4 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 22 Jan 2025 10:52:44 +0200 Subject: [PATCH 11/48] check output --- esti/s3_gateway_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 7e6daf14c81..f9711d964ed 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -18,6 +18,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/smithy-go/middleware" smithyhttp "github.com/aws/smithy-go/transport/http" + "github.com/davecgh/go-spew/spew" "github.com/go-openapi/swag" "github.com/thanhpk/randstr" @@ -277,6 +278,9 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + fmt.Println("output %+v/n: ", output) + spew.Dump(output) + require.Contains(t, output.Uploads[0].Key, obj1) resp2, err := s3Client.CreateMultipartUpload(ctx, input2) From dfc66997e1bab6b900416a19914743c8ee079737 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 22 Jan 2025 11:04:21 +0200 Subject: [PATCH 12/48] check output --- esti/s3_gateway_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index f9711d964ed..0bfc022c120 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -278,7 +278,7 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - fmt.Println("output %+v/n: ", output) + require.NoError(t, err, "failed to create multipart upload") spew.Dump(output) require.Contains(t, output.Uploads[0].Key, obj1) From 8aa34f584865beca7af7627c38c14c81eaa849c1 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 22 Jan 2025 11:27:07 +0200 Subject: [PATCH 13/48] clean --- esti/s3_gateway_test.go | 2 -- pkg/block/adapter.go | 4 ---- pkg/gateway/operations/putobject.go | 11 +++++------ 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 0bfc022c120..9aadc70020e 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -18,7 +18,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/smithy-go/middleware" smithyhttp "github.com/aws/smithy-go/transport/http" - "github.com/davecgh/go-spew/spew" "github.com/go-openapi/swag" "github.com/thanhpk/randstr" @@ -279,7 +278,6 @@ func TestListMultipartUploads(t *testing.T) { } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "failed to create multipart upload") - spew.Dump(output) require.Contains(t, output.Uploads[0].Key, obj1) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 0556a07239f..1ed8fea37f4 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -17,10 +17,6 @@ type MultipartPart struct { LastModified time.Time Size int64 } -type Upload struct { - Key string - UploadID string -} // MultipartUploadCompletion parts described as part of complete multipart upload. Each part holds the part number and ETag received while calling part upload. // NOTE that S3 implementation and our S3 gateway accept and returns ETag value surrounded with double-quotes ("), while diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index 5181fd44a5a..aeacec0442b 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -20,12 +20,11 @@ import ( ) const ( - IfNoneMatchHeader = "If-None-Match" - CopySourceHeader = "x-amz-copy-source" - CopySourceRangeHeader = "x-amz-copy-source-range" - QueryParamUploadID = "uploadId" - QueryParamListMultipart = "uploads" - QueryParamPartNumber = "partNumber" + IfNoneMatchHeader = "If-None-Match" + CopySourceHeader = "x-amz-copy-source" + CopySourceRangeHeader = "x-amz-copy-source-range" + QueryParamUploadID = "uploadId" + QueryParamPartNumber = "partNumber" ) type PutObject struct{} From f8c82153399a69ae6f4b8596f6c4a5a3598cf77c Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 22 Jan 2025 13:38:37 +0200 Subject: [PATCH 14/48] no tests --- esti/s3_gateway_test.go | 63 ----------------------------------------- 1 file changed, 63 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 9aadc70020e..c1037d16d52 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -238,69 +238,6 @@ func TestMultipartUploadIfNoneMatch(t *testing.T) { } } } -func TestListMultipartUploads(t *testing.T) { - ctx, logger, repo := setupTest(t) - defer tearDownTest(repo) - s3Endpoint := viper.GetString("s3_endpoint") - s3Client := createS3Client(s3Endpoint, t) - multipartNumberOfParts := 7 - multipartPartSize := 5 * 1024 * 1024 - - obj1 := "main/object1" - obj2 := "main/object2" - - input1 := &s3.CreateMultipartUploadInput{ - Bucket: aws.String(repo), - Key: aws.String(obj1), - } - input2 := &s3.CreateMultipartUploadInput{ - Bucket: aws.String(repo), - Key: aws.String(obj2), - } - - resp1, err := s3Client.CreateMultipartUpload(ctx, input1) - require.NoError(t, err, "failed to create multipart upload") - - parts := make([][]byte, multipartNumberOfParts) - for i := 0; i < multipartNumberOfParts; i++ { - parts[i] = randstr.Bytes(multipartPartSize + i) - } - - completedParts1 := uploadMultipartParts(t, ctx, s3Client, logger, resp1, parts, 0) - - completeInput1 := &s3.CompleteMultipartUploadInput{ - Bucket: resp1.Bucket, - Key: resp1.Key, - UploadId: resp1.UploadId, - MultipartUpload: &types.CompletedMultipartUpload{ - Parts: completedParts1, - }, - } - output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.NoError(t, err, "failed to create multipart upload") - - require.Contains(t, output.Uploads[0].Key, obj1) - - resp2, err := s3Client.CreateMultipartUpload(ctx, input2) - require.NoError(t, err, "failed to create multipart upload") - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.Contains(t, output.Uploads[0].Key, obj1) - require.Contains(t, output.Uploads[1].Key, obj2) - - _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.Contains(t, output.Uploads[0].Key, obj2) - - abortInput2 := &s3.AbortMultipartUploadInput{ - Bucket: resp2.Bucket, - Key: resp2.Key, - UploadId: resp2.UploadId, - } - _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) - - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.True(t, len(output.Uploads) == 0) -} func setHTTPHeaders(ifNoneMatch string) func(*middleware.Stack) error { return func(stack *middleware.Stack) error { From cff37f2af0c9ce79d6e100ec485e2d7eee07992b Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 08:44:40 +0200 Subject: [PATCH 15/48] tests --- esti/s3_gateway_test.go | 83 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index c1037d16d52..6f157d6508d 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -289,6 +289,89 @@ func TestS3IfNoneMatch(t *testing.T) { } } +func TestListMultipartUploads(t *testing.T) { + ctx, logger, repo := setupTest(t) + defer tearDownTest(repo) + s3Endpoint := viper.GetString("s3_endpoint") + s3Client := createS3Client(s3Endpoint, t) + multipartNumberOfParts := 7 + multipartPartSize := 5 * 1024 * 1024 + + obj1 := "object1" + obj2 := "object2" + path1 := "main/" + obj1 + path2 := "main/" + obj2 + input1 := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(repo), + Key: aws.String(path1), + } + input2 := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(repo), + Key: aws.String(path2), + } + + resp1, err := s3Client.CreateMultipartUpload(ctx, input1) + require.NoError(t, err, "failed to create multipart upload") + + parts := make([][]byte, multipartNumberOfParts) + for i := 0; i < multipartNumberOfParts; i++ { + parts[i] = randstr.Bytes(multipartPartSize + i) + } + + completedParts1 := uploadMultipartParts(t, ctx, s3Client, logger, resp1, parts, 0) + + completeInput1 := &s3.CompleteMultipartUploadInput{ + Bucket: resp1.Bucket, + Key: resp1.Key, + UploadId: resp1.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: completedParts1, + }, + } + output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + str := concatKeys(output) + require.NoError(t, err, "failed to create multipart upload") + + require.Contains(t, str, obj1) + + resp2, err := s3Client.CreateMultipartUpload(ctx, input2) + require.NoError(t, err, "failed to create multipart upload") + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + str = concatKeys(output) + fmt.Println(str) + require.Contains(t, str, obj1) + require.Contains(t, str, obj2) + + _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + str = concatKeys(output) + require.Contains(t, str, obj2) + + abortInput2 := &s3.AbortMultipartUploadInput{ + Bucket: resp2.Bucket, + Key: resp2.Key, + UploadId: resp2.UploadId, + } + _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) + + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + str = concatKeys(output) + require.NotContains(t, str, obj1) + require.NotContains(t, str, obj2) +} + +func concatKeys(output *s3.ListMultipartUploadsOutput) string { + var allKeys string + for _, upload := range output.Uploads { + if upload.Key != nil { + allKeys += *upload.Key + " " + } else { + fmt.Println("upload.Key is nil") // Debug logging + } + } + return allKeys + "\n" +} + func verifyObjectInfo(t *testing.T, got minio.ObjectInfo, expectedSize int) { if got.Err != nil { t.Errorf("%s: %s", got.Key, got.Err) From 219d4f49b61ca15757f8b0da860e365ab6b6101a Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 11:49:19 +0200 Subject: [PATCH 16/48] ooooo baby --- esti/s3_gateway_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 6f157d6508d..cca95149d05 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -329,8 +329,8 @@ func TestListMultipartUploads(t *testing.T) { }, } output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.NoError(t, err, "error listing multiparts") str := concatKeys(output) - require.NoError(t, err, "failed to create multipart upload") require.Contains(t, str, obj1) @@ -344,6 +344,7 @@ func TestListMultipartUploads(t *testing.T) { _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.NoError(t, err, "error listing multiparts") str = concatKeys(output) require.Contains(t, str, obj2) @@ -355,12 +356,16 @@ func TestListMultipartUploads(t *testing.T) { _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + require.NoError(t, err, "error listing multiparts") str = concatKeys(output) require.NotContains(t, str, obj1) require.NotContains(t, str, obj2) } func concatKeys(output *s3.ListMultipartUploadsOutput) string { + if output == nil { + return "" + } var allKeys string for _, upload := range output.Uploads { if upload.Key != nil { From 0a3725122cea8583d47619e6dabcd472010f9763 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 12:04:26 +0200 Subject: [PATCH 17/48] moving further --- esti/s3_gateway_test.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index cca95149d05..1ba4cc945a8 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -334,7 +334,7 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, str, obj1) - resp2, err := s3Client.CreateMultipartUpload(ctx, input2) + _, err = s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) str = concatKeys(output) @@ -343,23 +343,11 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, str, obj2) _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - require.NoError(t, err, "error listing multiparts") - str = concatKeys(output) - require.Contains(t, str, obj2) - - abortInput2 := &s3.AbortMultipartUploadInput{ - Bucket: resp2.Bucket, - Key: resp2.Key, - UploadId: resp2.UploadId, - } - _, err = s3Client.AbortMultipartUpload(ctx, abortInput2) - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") str = concatKeys(output) require.NotContains(t, str, obj1) - require.NotContains(t, str, obj2) + require.Contains(t, str, obj2) } func concatKeys(output *s3.ListMultipartUploadsOutput) string { From 52ef352738501d9e2950430a58cd048836e329c9 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 12:48:33 +0200 Subject: [PATCH 18/48] check errr --- pkg/gateway/operations/listobjects.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index f682e923397..7c7835a7188 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -2,6 +2,7 @@ package operations import ( "errors" + "fmt" "net/http" "strconv" "strings" @@ -405,7 +406,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep Bucket: o.Repository.Name, } - partsResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ + mpuResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, }) @@ -415,12 +416,13 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep return } var uploads []serde.Upload - for _, upload := range partsResp.Uploads { + for _, upload := range mpuResp.Uploads { if upload.UploadId == nil { continue } multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) if err != nil { + fmt.Println("err: ", err) o.Log(req).WithError(err).Error("could not read multipart record") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) return From ea30b22852a2a4536785ee48e8c418493781c550 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 12:49:40 +0200 Subject: [PATCH 19/48] check errr --- pkg/gateway/operations/listobjects.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 7c7835a7188..3fac7b5bce4 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -422,6 +422,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep } multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) if err != nil { + fmt.Println("id: ", *upload.UploadId) fmt.Println("err: ", err) o.Log(req).WithError(err).Error("could not read multipart record") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) From 62f9e4433934359e194f769d91376750cfc43616 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 13:58:10 +0200 Subject: [PATCH 20/48] err not found --- pkg/gateway/operations/listobjects.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 3fac7b5bce4..54320acad19 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -421,7 +421,9 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep continue } multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) - if err != nil { + if errors.Is(err, graveler.ErrNotFound) { + continue + } else if err != nil { fmt.Println("id: ", *upload.UploadId) fmt.Println("err: ", err) o.Log(req).WithError(err).Error("could not read multipart record") From 1bdd16fd971803f03be485c483f0e004c1fddc2e Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 14:22:01 +0200 Subject: [PATCH 21/48] err not found --- pkg/gateway/operations/listobjects.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 54320acad19..08588beea0e 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -426,9 +426,10 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep } else if err != nil { fmt.Println("id: ", *upload.UploadId) fmt.Println("err: ", err) - o.Log(req).WithError(err).Error("could not read multipart record") - _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) - return + continue + // o.Log(req).WithError(err).Error("could not read multipart record") + // _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + // return } uploads = append(uploads, serde.Upload{ Key: multiPart.Path, From 644a3908ba7f79ab2f335c8446151a9d68ce5703 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 15:24:53 +0200 Subject: [PATCH 22/48] yalla --- esti/s3_gateway_test.go | 4 ++++ pkg/gateway/operations/listobjects.go | 11 ++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 1ba4cc945a8..9859f4fee46 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -297,6 +297,10 @@ func TestListMultipartUploads(t *testing.T) { multipartNumberOfParts := 7 multipartPartSize := 5 * 1024 * 1024 + blockStoreType := viper.GetString(ViperBlockstoreType) + if blockStoreType != "s3" { + t.Skip("Skipping test - blockstore type is not s3") + } obj1 := "object1" obj2 := "object2" path1 := "main/" + obj1 diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 08588beea0e..3fac7b5bce4 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -421,15 +421,12 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep continue } multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) - if errors.Is(err, graveler.ErrNotFound) { - continue - } else if err != nil { + if err != nil { fmt.Println("id: ", *upload.UploadId) fmt.Println("err: ", err) - continue - // o.Log(req).WithError(err).Error("could not read multipart record") - // _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) - // return + o.Log(req).WithError(err).Error("could not read multipart record") + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + return } uploads = append(uploads, serde.Upload{ Key: multiPart.Path, From 22c17568fbdce06edba4e8b12acf229fc9438d04 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 17:03:43 +0200 Subject: [PATCH 23/48] check --- pkg/gateway/operations/listobjects.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 3fac7b5bce4..2144eca35ea 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -410,6 +410,9 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, }) + for _, up := range mpuResp.Uploads { + fmt.Println(*up.UploadId) + } if err != nil { o.Log(req).WithError(err).Error("list multipart uploads failed") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) From 3d9225ee675c98ded051aaf961cf17919f71bcc7 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 17:31:40 +0200 Subject: [PATCH 24/48] check --- pkg/gateway/operations/listobjects.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 2144eca35ea..6351602cb64 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -14,6 +14,7 @@ import ( "github.com/treeverse/lakefs/pkg/gateway/serde" "github.com/treeverse/lakefs/pkg/graveler" "github.com/treeverse/lakefs/pkg/httputil" + "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/logging" "github.com/treeverse/lakefs/pkg/permissions" ) @@ -410,9 +411,6 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, }) - for _, up := range mpuResp.Uploads { - fmt.Println(*up.UploadId) - } if err != nil { o.Log(req).WithError(err).Error("list multipart uploads failed") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) @@ -425,6 +423,9 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep } multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) if err != nil { + if errors.Is(err, kv.ErrNotFound) { + continue + } fmt.Println("id: ", *upload.UploadId) fmt.Println("err: ", err) o.Log(req).WithError(err).Error("could not read multipart record") From b1d588b1231a41d466689bfc6551d23ff7e1f6d8 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 17:46:39 +0200 Subject: [PATCH 25/48] schlafen --- esti/s3_gateway_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 9859f4fee46..61a8772cf4c 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -316,7 +316,7 @@ func TestListMultipartUploads(t *testing.T) { resp1, err := s3Client.CreateMultipartUpload(ctx, input1) require.NoError(t, err, "failed to create multipart upload") - + time.Sleep(10 * time.Second) parts := make([][]byte, multipartNumberOfParts) for i := 0; i < multipartNumberOfParts; i++ { parts[i] = randstr.Bytes(multipartPartSize + i) @@ -339,6 +339,7 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, str, obj1) _, err = s3Client.CreateMultipartUpload(ctx, input2) + time.Sleep(10 * time.Second) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) str = concatKeys(output) From f6d95c72fd7cd1d347ce66eed066e84cdadbaf0f Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 23 Jan 2025 17:47:40 +0200 Subject: [PATCH 26/48] adapter --- pkg/block/s3/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 67421a8cc7a..5f02ea5058f 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -875,7 +875,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin } partsResp := block.ListMultipartUploadsResponse{ - Uploads: make([]types.MultipartUpload, len(resp.Uploads)), + Uploads: []types.MultipartUpload{}, } partsResp.Uploads = append(partsResp.Uploads, resp.Uploads...) return &partsResp, nil From 9a4c5e123fcb03e8d03318d053dce331e4b6f221 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Sun, 26 Jan 2025 11:13:33 +0200 Subject: [PATCH 27/48] namespace --- esti/s3_gateway_test.go | 4 ++-- pkg/block/s3/adapter.go | 8 ++++++-- pkg/gateway/operations/listobjects.go | 3 +++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 61a8772cf4c..4db0690e796 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -289,7 +289,7 @@ func TestS3IfNoneMatch(t *testing.T) { } } -func TestListMultipartUploads(t *testing.T) { +func TestListMultipartUploads17(t *testing.T) { ctx, logger, repo := setupTest(t) defer tearDownTest(repo) s3Endpoint := viper.GetString("s3_endpoint") @@ -351,7 +351,7 @@ func TestListMultipartUploads(t *testing.T) { output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") str = concatKeys(output) - require.NotContains(t, str, obj1) + //require.NotContains(t, str, obj1) require.Contains(t, str, obj2) } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 5f02ea5058f..5444afc866b 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -852,13 +852,17 @@ func (a *Adapter) ListParts(ctx context.Context, obj block.ObjectPointer, upload func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { var err error defer reportMetrics("ListMultipartUploads", time.Now(), nil, &err) - bucket, _, qualifiedKey, err := a.extractParamsFromObj(obj) + bucket, key, qualifiedKey, err := a.extractParamsFromObj(obj) if err != nil { return nil, err } - + fmt.Println("key: ", key) + fmt.Println("bucket: ", qualifiedKey.GetStorageNamespace()) + fmt.Println("bucket: ", qualifiedKey.GetKey()) + fmt.Println("bucket: ", bucket) input := &s3.ListMultipartUploadsInput{ Bucket: aws.String(bucket), + Prefix: aws.String(key), } lg := a.log(ctx).WithFields(logging.Fields{ diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 6351602cb64..d58d00843ea 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -411,6 +411,9 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, }) + fmt.Println("name ", o.Repository.Name) + fmt.Println("name space", o.Repository.StorageNamespace) + fmt.Println("storeagenid ", o.Repository.StorageID) if err != nil { o.Log(req).WithError(err).Error("list multipart uploads failed") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) From 033784ada546cecca9b341503db670c40529f8e5 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Sun, 26 Jan 2025 11:51:33 +0200 Subject: [PATCH 28/48] thank yoou --- esti/s3_gateway_test.go | 17 +++++++++-------- pkg/block/s3/adapter.go | 12 ++++-------- pkg/gateway/operations/listobjects.go | 12 ++++-------- 3 files changed, 17 insertions(+), 24 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 4db0690e796..e9f3ca859fa 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -294,17 +294,20 @@ func TestListMultipartUploads17(t *testing.T) { defer tearDownTest(repo) s3Endpoint := viper.GetString("s3_endpoint") s3Client := createS3Client(s3Endpoint, t) - multipartNumberOfParts := 7 + multipartNumberOfParts := 3 multipartPartSize := 5 * 1024 * 1024 blockStoreType := viper.GetString(ViperBlockstoreType) if blockStoreType != "s3" { t.Skip("Skipping test - blockstore type is not s3") } + + // create two objects for two mpus obj1 := "object1" obj2 := "object2" path1 := "main/" + obj1 path2 := "main/" + obj2 + input1 := &s3.CreateMultipartUploadInput{ Bucket: aws.String(repo), Key: aws.String(path1), @@ -313,10 +316,9 @@ func TestListMultipartUploads17(t *testing.T) { Bucket: aws.String(repo), Key: aws.String(path2), } - + // create first mpu resp1, err := s3Client.CreateMultipartUpload(ctx, input1) require.NoError(t, err, "failed to create multipart upload") - time.Sleep(10 * time.Second) parts := make([][]byte, multipartNumberOfParts) for i := 0; i < multipartNumberOfParts; i++ { parts[i] = randstr.Bytes(multipartPartSize + i) @@ -332,14 +334,14 @@ func TestListMultipartUploads17(t *testing.T) { Parts: completedParts1, }, } + // check first mpu appears output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") str := concatKeys(output) - require.Contains(t, str, obj1) + // create second mpu check both appear _, err = s3Client.CreateMultipartUpload(ctx, input2) - time.Sleep(10 * time.Second) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) str = concatKeys(output) @@ -347,11 +349,12 @@ func TestListMultipartUploads17(t *testing.T) { require.Contains(t, str, obj1) require.Contains(t, str, obj2) + // finish first mpu check only second appear _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") str = concatKeys(output) - //require.NotContains(t, str, obj1) + require.NotContains(t, str, obj1) require.Contains(t, str, obj2) } @@ -363,8 +366,6 @@ func concatKeys(output *s3.ListMultipartUploadsOutput) string { for _, upload := range output.Uploads { if upload.Key != nil { allKeys += *upload.Key + " " - } else { - fmt.Println("upload.Key is nil") // Debug logging } } return allKeys + "\n" diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 5444afc866b..9fa231fddcc 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -856,10 +856,6 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin if err != nil { return nil, err } - fmt.Println("key: ", key) - fmt.Println("bucket: ", qualifiedKey.GetStorageNamespace()) - fmt.Println("bucket: ", qualifiedKey.GetKey()) - fmt.Println("bucket: ", bucket) input := &s3.ListMultipartUploadsInput{ Bucket: aws.String(bucket), Prefix: aws.String(key), @@ -874,15 +870,15 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin client := a.clients.Get(ctx, bucket) resp, err := client.ListMultipartUploads(ctx, input) if err != nil { - lg.WithError(err).Error("ListParts failed") + lg.WithError(err).Error("ListParts multipart uploads failed") return nil, err } - partsResp := block.ListMultipartUploadsResponse{ + mpuResp := block.ListMultipartUploadsResponse{ Uploads: []types.MultipartUpload{}, } - partsResp.Uploads = append(partsResp.Uploads, resp.Uploads...) - return &partsResp, nil + mpuResp.Uploads = append(mpuResp.Uploads, resp.Uploads...) + return &mpuResp, nil } func (a *Adapter) BlockstoreType() string { diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index d58d00843ea..3bc85013dbd 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -2,7 +2,6 @@ package operations import ( "errors" - "fmt" "net/http" "strconv" "strings" @@ -411,32 +410,29 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, }) - fmt.Println("name ", o.Repository.Name) - fmt.Println("name space", o.Repository.StorageNamespace) - fmt.Println("storeagenid ", o.Repository.StorageID) + if err != nil { o.Log(req).WithError(err).Error("list multipart uploads failed") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) return } + var uploads []serde.Upload for _, upload := range mpuResp.Uploads { if upload.UploadId == nil { continue } - multiPart, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) + mpu, err := o.MultipartTracker.Get(req.Context(), *upload.UploadId) if err != nil { if errors.Is(err, kv.ErrNotFound) { continue } - fmt.Println("id: ", *upload.UploadId) - fmt.Println("err: ", err) o.Log(req).WithError(err).Error("could not read multipart record") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) return } uploads = append(uploads, serde.Upload{ - Key: multiPart.Path, + Key: mpu.Path, UploadID: *upload.UploadId, }) } From 25428554dfdf3b29e2492bc527f8e9b6c0b832fd Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Sun, 26 Jan 2025 15:38:47 +0200 Subject: [PATCH 29/48] names --- esti/s3_gateway_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index e9f3ca859fa..c119529fb87 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -289,7 +289,7 @@ func TestS3IfNoneMatch(t *testing.T) { } } -func TestListMultipartUploads17(t *testing.T) { +func TestListMultipartUploads(t *testing.T) { ctx, logger, repo := setupTest(t) defer tearDownTest(repo) s3Endpoint := viper.GetString("s3_endpoint") From c74b714803c49004f7ba79f14ccdc05ad148cd98 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 16:05:14 +0200 Subject: [PATCH 30/48] refactor --- pkg/block/adapter.go | 6 +++++- pkg/block/azure/adapter.go | 2 +- pkg/block/gs/adapter.go | 2 +- pkg/block/local/adapter.go | 2 +- pkg/block/mem/adapter.go | 2 +- pkg/block/metrics.go | 4 ++-- pkg/block/s3/adapter.go | 12 ++++++------ pkg/block/transient/adapter.go | 2 +- pkg/gateway/operations/base.go | 2 +- 9 files changed, 19 insertions(+), 15 deletions(-) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 1ed8fea37f4..164cd82a235 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -145,6 +145,10 @@ type ListPartsOpts struct { PartNumberMarker *string } +type ListMultipartUploadsOpts struct { + MaxUploads *int32 +} + // Properties of an object stored on the underlying block store. // Refer to the actual underlying Adapter for which properties are // actually reported. @@ -195,7 +199,7 @@ type Adapter interface { UploadPart(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int) (*UploadPartResponse, error) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) ListParts(ctx context.Context, obj ObjectPointer, uploadID string, opts ListPartsOpts) (*ListPartsResponse, error) - ListMultipartUploads(ctx context.Context, obj ObjectPointer) (*ListMultipartUploadsResponse, error) + ListMultipartUploads(ctx context.Context, obj ObjectPointer, opts ListMultipartUploadsOpts) (*ListMultipartUploadsResponse, error) UploadCopyPartRange(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int, startPosition, endPosition int64) (*UploadPartResponse, error) AbortMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string) error CompleteMultiPartUpload(ctx context.Context, obj ObjectPointer, uploadID string, multipartList *MultipartUploadCompletion) (*CompleteMultiPartUploadResponse, error) diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index 7f6f13623e9..28dd498e59c 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -644,7 +644,7 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } -func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index f4830b7c3ed..bfbcda34c72 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -717,6 +717,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } -func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index 6d9f95234e3..aff12df760d 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -600,6 +600,6 @@ func (l *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (l *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } -func (l *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (l *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/mem/adapter.go b/pkg/block/mem/adapter.go index 267eade435c..b461c4b0f5e 100644 --- a/pkg/block/mem/adapter.go +++ b/pkg/block/mem/adapter.go @@ -369,6 +369,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } -func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/block/metrics.go b/pkg/block/metrics.go index 413a4fb5c3b..9f6f71ff629 100644 --- a/pkg/block/metrics.go +++ b/pkg/block/metrics.go @@ -85,9 +85,9 @@ func (m *MetricsAdapter) ListParts(ctx context.Context, obj ObjectPointer, uploa ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) return m.adapter.ListParts(ctx, obj, uploadID, opts) } -func (m *MetricsAdapter) ListMultipartUploads(ctx context.Context, obj ObjectPointer) (*ListMultipartUploadsResponse, error) { +func (m *MetricsAdapter) ListMultipartUploads(ctx context.Context, obj ObjectPointer, opts ListMultipartUploadsOpts) (*ListMultipartUploadsResponse, error) { ctx = httputil.SetClientTrace(ctx, m.adapter.BlockstoreType()) - return m.adapter.ListMultipartUploads(ctx, obj) + return m.adapter.ListMultipartUploads(ctx, obj, opts) } func (m *MetricsAdapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj ObjectPointer, uploadID string, partNumber int) (*UploadPartResponse, error) { diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 9fa231fddcc..d743968e91c 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -849,7 +849,7 @@ func (a *Adapter) ListParts(ctx context.Context, obj block.ObjectPointer, upload return &partsResp, nil } -func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPointer, opts block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { var err error defer reportMetrics("ListMultipartUploads", time.Now(), nil, &err) bucket, key, qualifiedKey, err := a.extractParamsFromObj(obj) @@ -857,8 +857,9 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin return nil, err } input := &s3.ListMultipartUploadsInput{ - Bucket: aws.String(bucket), - Prefix: aws.String(key), + Bucket: aws.String(bucket), + Prefix: aws.String(key), + MaxUploads: opts.MaxUploads, } lg := a.log(ctx).WithFields(logging.Fields{ @@ -870,14 +871,13 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin client := a.clients.Get(ctx, bucket) resp, err := client.ListMultipartUploads(ctx, input) if err != nil { - lg.WithError(err).Error("ListParts multipart uploads failed") + lg.WithError(err).Error("List multipart uploads failed") return nil, err } mpuResp := block.ListMultipartUploadsResponse{ - Uploads: []types.MultipartUpload{}, + Uploads: resp.Uploads, } - mpuResp.Uploads = append(mpuResp.Uploads, resp.Uploads...) return &mpuResp, nil } diff --git a/pkg/block/transient/adapter.go b/pkg/block/transient/adapter.go index 92034093f5c..5318b49df4a 100644 --- a/pkg/block/transient/adapter.go +++ b/pkg/block/transient/adapter.go @@ -178,6 +178,6 @@ func (a *Adapter) GetPresignUploadPartURL(_ context.Context, _ block.ObjectPoint func (a *Adapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { return nil, block.ErrOperationNotSupported } -func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (a *Adapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { return nil, block.ErrOperationNotSupported } diff --git a/pkg/gateway/operations/base.go b/pkg/gateway/operations/base.go index d1f459b113d..98786e94bd7 100644 --- a/pkg/gateway/operations/base.go +++ b/pkg/gateway/operations/base.go @@ -92,7 +92,7 @@ func (o *Operation) HandleUnsupported(w http.ResponseWriter, req *http.Request, } query := req.URL.Query() if slices.ContainsFunc(keys, query.Has) { - _ = o.EncodeError(w, req, nil, gwerrors.ErrAllAccessDisabled.ToAPIErr()) + _ = o.EncodeError(w, req, nil, gwerrors.ERRLakeFSNotSupported.ToAPIErr()) return true } return false From deb1fd3bfd3a6073e05ba4d617486b735fdd4763 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 16:09:49 +0200 Subject: [PATCH 31/48] refactored now --- esti/s3_gateway_test.go | 26 +++++++++++----------- pkg/gateway/operations/listobjects.go | 31 ++++++++++++++++++++------- pkg/testutil/adapter.go | 2 +- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index c119529fb87..0a864a886a9 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -290,6 +290,10 @@ func TestS3IfNoneMatch(t *testing.T) { } func TestListMultipartUploads(t *testing.T) { + blockStoreType := viper.GetString(ViperBlockstoreType) + if blockStoreType != "s3" { + t.Skip("Skipping test - blockstore type is not s3") + } ctx, logger, repo := setupTest(t) defer tearDownTest(repo) s3Endpoint := viper.GetString("s3_endpoint") @@ -297,11 +301,6 @@ func TestListMultipartUploads(t *testing.T) { multipartNumberOfParts := 3 multipartPartSize := 5 * 1024 * 1024 - blockStoreType := viper.GetString(ViperBlockstoreType) - if blockStoreType != "s3" { - t.Skip("Skipping test - blockstore type is not s3") - } - // create two objects for two mpus obj1 := "object1" obj2 := "object2" @@ -337,15 +336,14 @@ func TestListMultipartUploads(t *testing.T) { // check first mpu appears output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") - str := concatKeys(output) + str := extractUploadKeys(output) require.Contains(t, str, obj1) // create second mpu check both appear _, err = s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - str = concatKeys(output) - fmt.Println(str) + str = extractUploadKeys(output) require.Contains(t, str, obj1) require.Contains(t, str, obj2) @@ -353,22 +351,22 @@ func TestListMultipartUploads(t *testing.T) { _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") - str = concatKeys(output) + str = extractUploadKeys(output) require.NotContains(t, str, obj1) require.Contains(t, str, obj2) } -func concatKeys(output *s3.ListMultipartUploadsOutput) string { +func extractUploadKeys(output *s3.ListMultipartUploadsOutput) []string { if output == nil { - return "" + return nil } - var allKeys string + keys := make([]string, 0, len(output.Uploads)) for _, upload := range output.Uploads { if upload.Key != nil { - allKeys += *upload.Key + " " + keys = append(keys, *upload.Key) } } - return allKeys + "\n" + return keys } func verifyObjectInfo(t *testing.T, got minio.ObjectInfo, expectedSize int) { diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 3bc85013dbd..420d63711d5 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -23,6 +23,7 @@ const ( // defaultBucketLocation used to identify if we need to specify the location constraint defaultBucketLocation = "us-east-1" + QueryParamMaxUploads = "max-uploads" ) type ListObjects struct{} @@ -401,15 +402,25 @@ func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, } func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *RepoOperation) { - o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") - resp := &serde.ListMultipartUploadsOutput{ - Bucket: o.Repository.Name, + query := req.URL.Query() + maxUploadsStr := query.Get(QueryParamMaxUploads) + opts := block.ListMultipartUploadsOpts{} + if maxUploadsStr != "" { + maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) + if err != nil { + o.Log(req).WithField("MaxParts", maxUploadsStr). + WithError(err).Error("malformed query parameter 'MaxParts'") + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + return + } + maxUploads32 := int32(maxUploads) + opts.MaxUploads = &maxUploads32 } - + // partNumberMarker := query.Get(QueryParamPartNumberMarker) mpuResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, - }) + }, opts) if err != nil { o.Log(req).WithError(err).Error("list multipart uploads failed") @@ -417,7 +428,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep return } - var uploads []serde.Upload + uploads := make([]serde.Upload, 0, len(mpuResp.Uploads)) for _, upload := range mpuResp.Uploads { if upload.UploadId == nil { continue @@ -427,7 +438,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if errors.Is(err, kv.ErrNotFound) { continue } - o.Log(req).WithError(err).Error("could not read multipart record") + o.Log(req).WithError(err).Error("could not read multipart record %s", *&upload.UploadId) _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) return } @@ -436,6 +447,10 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep UploadID: *upload.UploadId, }) } - resp.Uploads = uploads + o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") + resp := &serde.ListMultipartUploadsOutput{ + Bucket: o.Repository.Name, + Uploads: uploads, + } o.EncodeResponse(w, req, resp, http.StatusOK) } diff --git a/pkg/testutil/adapter.go b/pkg/testutil/adapter.go index f9e6a66a794..1102ce28a8f 100644 --- a/pkg/testutil/adapter.go +++ b/pkg/testutil/adapter.go @@ -128,7 +128,7 @@ func (a *MockAdapter) UploadCopyPartRange(_ context.Context, _, _ block.ObjectPo func (a *MockAdapter) ListParts(_ context.Context, _ block.ObjectPointer, _ string, _ block.ListPartsOpts) (*block.ListPartsResponse, error) { panic("try to list parts in mock adapter") } -func (a *MockAdapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer) (*block.ListMultipartUploadsResponse, error) { +func (a *MockAdapter) ListMultipartUploads(_ context.Context, _ block.ObjectPointer, _ block.ListMultipartUploadsOpts) (*block.ListMultipartUploadsResponse, error) { panic("try to list multipart uploads in mock adapter") } func (a *MockAdapter) BlockstoreType() string { From 5922ef8bb26e408d1b8cd4c801794f35895c451d Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 16:34:17 +0200 Subject: [PATCH 32/48] no error --- pkg/gateway/operations/listobjects.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 420d63711d5..8c96c5333e3 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -438,7 +438,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if errors.Is(err, kv.ErrNotFound) { continue } - o.Log(req).WithError(err).Error("could not read multipart record %s", *&upload.UploadId) + o.Log(req).WithError(err).Error("could not read multipart record %s", *upload.UploadId) _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) return } From 2f740f4373af6cbc452874973273f415742786e0 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 17:32:01 +0200 Subject: [PATCH 33/48] trimmming --- pkg/gateway/operations/listobjects.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 8c96c5333e3..0286c40433f 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -408,8 +408,8 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) if err != nil { - o.Log(req).WithField("MaxParts", maxUploadsStr). - WithError(err).Error("malformed query parameter 'MaxParts'") + o.Log(req).WithField("maxUploadsStr", maxUploadsStr). + WithError(err).Error("malformed query parameter 'maxUploadsStr'") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) return } From f91771d1bad66114b9e43b7cd24ba440229bb9c0 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 20:05:33 +0200 Subject: [PATCH 34/48] corrections --- esti/s3_gateway_test.go | 17 +++++++++-------- pkg/block/adapter.go | 8 ++++++-- pkg/block/s3/adapter.go | 12 ++++++++---- pkg/gateway/errors/errors.go | 6 ++++++ pkg/gateway/operations/listobjects.go | 25 ++++++++++++++++++------- pkg/gateway/serde/xml.go | 8 +++++--- 6 files changed, 52 insertions(+), 24 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 0a864a886a9..59af6d03447 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -336,24 +336,25 @@ func TestListMultipartUploads(t *testing.T) { // check first mpu appears output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") - str := extractUploadKeys(output) - require.Contains(t, str, obj1) + keys := extractUploadKeys(output) + require.Contains(t, keys, obj1) // create second mpu check both appear _, err = s3Client.CreateMultipartUpload(ctx, input2) require.NoError(t, err, "failed to create multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - str = extractUploadKeys(output) - require.Contains(t, str, obj1) - require.Contains(t, str, obj2) + keys = extractUploadKeys(output) + require.Contains(t, keys, obj1) + require.Contains(t, keys, obj2) // finish first mpu check only second appear _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) + require.NoError(t, err, "failed to complete multipart upload") output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) require.NoError(t, err, "error listing multiparts") - str = extractUploadKeys(output) - require.NotContains(t, str, obj1) - require.Contains(t, str, obj2) + keys = extractUploadKeys(output) + require.NotContains(t, keys, obj1) + require.Contains(t, keys, obj2) } func extractUploadKeys(output *s3.ListMultipartUploadsOutput) []string { diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 164cd82a235..510730a8eb6 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -124,7 +124,9 @@ type ListPartsResponse struct { } type ListMultipartUploadsResponse struct { - Uploads []types.MultipartUpload + Uploads []types.MultipartUpload + NextUploadIdMarker *string + NextKeyMarker *string } // CreateMultiPartUploadOpts contains optional arguments for @@ -146,7 +148,9 @@ type ListPartsOpts struct { } type ListMultipartUploadsOpts struct { - MaxUploads *int32 + MaxUploads *int32 + UploadIdMarker *string + KeyMarker *string } // Properties of an object stored on the underlying block store. diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index d743968e91c..f83278f9360 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -857,9 +857,11 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin return nil, err } input := &s3.ListMultipartUploadsInput{ - Bucket: aws.String(bucket), - Prefix: aws.String(key), - MaxUploads: opts.MaxUploads, + Bucket: aws.String(bucket), + Prefix: aws.String(key), + MaxUploads: opts.MaxUploads, + UploadIdMarker: opts.UploadIdMarker, + KeyMarker: opts.KeyMarker, } lg := a.log(ctx).WithFields(logging.Fields{ @@ -876,7 +878,9 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin } mpuResp := block.ListMultipartUploadsResponse{ - Uploads: resp.Uploads, + Uploads: resp.Uploads, + NextUploadIdMarker: resp.NextUploadIdMarker, + NextKeyMarker: resp.NextKeyMarker, } return &mpuResp, nil } diff --git a/pkg/gateway/errors/errors.go b/pkg/gateway/errors/errors.go index b3c182afe53..c681d48cbe9 100644 --- a/pkg/gateway/errors/errors.go +++ b/pkg/gateway/errors/errors.go @@ -60,6 +60,7 @@ const ( ErrInternalError ErrInvalidAccessKeyID ErrInvalidBucketName + ErrInvalidArgument ErrInvalidDigest ErrInvalidRange ErrInvalidCopyPartRange @@ -204,6 +205,11 @@ func (a APIErrorCode) ToAPIErr() APIError { // Codes - error code to APIError structure, these fields carry respective // descriptions for all the error responses. var Codes = errorCodeMap{ + ErrInvalidArgument: { + Code: "InvalidArgument", + Description: "Argument max-uploads must be an integer between 0 and 2147483647", + HTTPStatusCode: http.StatusBadRequest, + }, ErrInvalidCopyDest: { Code: "InvalidRequest", Description: "This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.", diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 0286c40433f..1b6e6b948f0 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -22,8 +22,10 @@ const ( ListObjectMaxKeys = 1000 // defaultBucketLocation used to identify if we need to specify the location constraint - defaultBucketLocation = "us-east-1" - QueryParamMaxUploads = "max-uploads" + defaultBucketLocation = "us-east-1" + QueryParamMaxUploads = "max-uploads" + QueryParamUploadIdMarker = "upload-id-marker" + QueryParamKeyMarker = "key-marker" ) type ListObjects struct{} @@ -402,21 +404,29 @@ func (controller *ListObjects) Handle(w http.ResponseWriter, req *http.Request, } func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *RepoOperation) { + o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") query := req.URL.Query() maxUploadsStr := query.Get(QueryParamMaxUploads) + uploadIdMarker := query.Get(QueryParamUploadIdMarker) + keyMarker := query.Get(QueryParamKeyMarker) opts := block.ListMultipartUploadsOpts{} if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) if err != nil { o.Log(req).WithField("maxUploadsStr", maxUploadsStr). WithError(err).Error("malformed query parameter 'maxUploadsStr'") - _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidArgument)) return } maxUploads32 := int32(maxUploads) opts.MaxUploads = &maxUploads32 } - // partNumberMarker := query.Get(QueryParamPartNumberMarker) + if uploadIdMarker != "" { + opts.UploadIdMarker = &uploadIdMarker + } + if keyMarker != "" { + opts.KeyMarker = &keyMarker + } mpuResp, err := o.BlockStore.ListMultipartUploads(req.Context(), block.ObjectPointer{ StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, @@ -447,10 +457,11 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep UploadID: *upload.UploadId, }) } - o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") resp := &serde.ListMultipartUploadsOutput{ - Bucket: o.Repository.Name, - Uploads: uploads, + Bucket: o.Repository.Name, + Uploads: uploads, + NextKeyMarker: *mpuResp.NextKeyMarker, + NextUploadIdMarker: *mpuResp.NextUploadIdMarker, } o.EncodeResponse(w, req, resp, http.StatusOK) } diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index 94bbde6aeae..ee38db0c945 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -165,9 +165,11 @@ type ListPartsOutput struct { } type ListMultipartUploadsOutput struct { - XMLName xml.Name `xml:"ListMultipartUploadsResult"` - Bucket string `xml:"Bucket"` - Uploads []Upload `xml:"Upload"` + XMLName xml.Name `xml:"ListMultipartUploadsResult"` + Bucket string `xml:"Bucket"` + Uploads []Upload `xml:"Upload"` + NextKeyMarker string `xml:"NextKeyMarker"` + NextUploadIdMarker string `xml:"NextUploadIdMarker"` } type VersioningConfiguration struct { From b3e588492d432a0b2e457e59a08e5f71a54bb5d0 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 20:23:19 +0200 Subject: [PATCH 35/48] fixin --- pkg/block/adapter.go | 4 ++-- pkg/block/s3/adapter.go | 4 ++-- pkg/gateway/operations/listobjects.go | 8 ++++---- pkg/gateway/serde/xml.go | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 510730a8eb6..e7c438c69d6 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -125,7 +125,7 @@ type ListPartsResponse struct { type ListMultipartUploadsResponse struct { Uploads []types.MultipartUpload - NextUploadIdMarker *string + NextUploadIDMarker *string NextKeyMarker *string } @@ -149,7 +149,7 @@ type ListPartsOpts struct { type ListMultipartUploadsOpts struct { MaxUploads *int32 - UploadIdMarker *string + UploadIDMarker *string KeyMarker *string } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index f83278f9360..97b395b70ff 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -860,7 +860,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin Bucket: aws.String(bucket), Prefix: aws.String(key), MaxUploads: opts.MaxUploads, - UploadIdMarker: opts.UploadIdMarker, + UploadIdMarker: opts.UploadIDMarker, KeyMarker: opts.KeyMarker, } @@ -879,7 +879,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin mpuResp := block.ListMultipartUploadsResponse{ Uploads: resp.Uploads, - NextUploadIdMarker: resp.NextUploadIdMarker, + NextUploadIDMarker: resp.NextUploadIdMarker, NextKeyMarker: resp.NextKeyMarker, } return &mpuResp, nil diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 1b6e6b948f0..c328968ce6a 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -24,7 +24,7 @@ const ( // defaultBucketLocation used to identify if we need to specify the location constraint defaultBucketLocation = "us-east-1" QueryParamMaxUploads = "max-uploads" - QueryParamUploadIdMarker = "upload-id-marker" + QueryParamUploadIDMarker = "upload-id-marker" QueryParamKeyMarker = "key-marker" ) @@ -407,7 +407,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") query := req.URL.Query() maxUploadsStr := query.Get(QueryParamMaxUploads) - uploadIdMarker := query.Get(QueryParamUploadIdMarker) + uploadIdMarker := query.Get(QueryParamUploadIDMarker) keyMarker := query.Get(QueryParamKeyMarker) opts := block.ListMultipartUploadsOpts{} if maxUploadsStr != "" { @@ -422,7 +422,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep opts.MaxUploads = &maxUploads32 } if uploadIdMarker != "" { - opts.UploadIdMarker = &uploadIdMarker + opts.UploadIDMarker = &uploadIdMarker } if keyMarker != "" { opts.KeyMarker = &keyMarker @@ -461,7 +461,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep Bucket: o.Repository.Name, Uploads: uploads, NextKeyMarker: *mpuResp.NextKeyMarker, - NextUploadIdMarker: *mpuResp.NextUploadIdMarker, + NextUploadIDMarker: *mpuResp.NextUploadIDMarker, } o.EncodeResponse(w, req, resp, http.StatusOK) } diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index ee38db0c945..f45fe7da21d 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -169,7 +169,7 @@ type ListMultipartUploadsOutput struct { Bucket string `xml:"Bucket"` Uploads []Upload `xml:"Upload"` NextKeyMarker string `xml:"NextKeyMarker"` - NextUploadIdMarker string `xml:"NextUploadIdMarker"` + NextUploadIDMarker string `xml:"NextUploadIDMarker"` } type VersioningConfiguration struct { From 5a205686272c588e5e0fcb27a823fb3ee3794669 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Mon, 27 Jan 2025 20:32:21 +0200 Subject: [PATCH 36/48] yalla --- pkg/gateway/operations/listobjects.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index c328968ce6a..a1d21e99ec9 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -407,7 +407,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep o.Incr("list_multipart_uploads", o.Principal, o.Repository.Name, "") query := req.URL.Query() maxUploadsStr := query.Get(QueryParamMaxUploads) - uploadIdMarker := query.Get(QueryParamUploadIDMarker) + uploadIDMarker := query.Get(QueryParamUploadIDMarker) keyMarker := query.Get(QueryParamKeyMarker) opts := block.ListMultipartUploadsOpts{} if maxUploadsStr != "" { @@ -421,8 +421,8 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep maxUploads32 := int32(maxUploads) opts.MaxUploads = &maxUploads32 } - if uploadIdMarker != "" { - opts.UploadIDMarker = &uploadIdMarker + if uploadIDMarker != "" { + opts.UploadIDMarker = &uploadIDMarker } if keyMarker != "" { opts.KeyMarker = &keyMarker From 62f8bbb14504b1a68b72f7d4dc74274a63b4c910 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 28 Jan 2025 13:38:21 +0200 Subject: [PATCH 37/48] testing key marker --- esti/s3_gateway_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 59af6d03447..a38f393442c 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -347,6 +347,22 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, keys, obj1) require.Contains(t, keys, obj2) + maxUploads := aws.Int32(1) + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, MaxUploads: maxUploads}) + require.NoError(t, err, "failed to list multipart uploads") + keys = extractUploadKeys(output) + require.Contains(t, keys, obj1) + require.NotContains(t, keys, obj2) + + keyMarker := output.KeyMarker + uploadIDMarker := output.UploadIdMarker + + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, MaxUploads: maxUploads, KeyMarker: keyMarker, UploadIdMarker: uploadIDMarker}) + require.NoError(t, err, "failed to list multipart uploads") + keys = extractUploadKeys(output) + require.NotContains(t, keys, obj1) + require.Contains(t, keys, obj2) + // finish first mpu check only second appear _, err = s3Client.CompleteMultipartUpload(ctx, completeInput1) require.NoError(t, err, "failed to complete multipart upload") From 8f556055981b1759a09b78411967788dd5c8aa05 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 28 Jan 2025 15:01:57 +0200 Subject: [PATCH 38/48] is truncated --- esti/s3_gateway_test.go | 7 ++++--- pkg/block/adapter.go | 1 + pkg/block/s3/adapter.go | 1 + pkg/gateway/operations/listobjects.go | 1 + pkg/gateway/serde/xml.go | 1 + 5 files changed, 8 insertions(+), 3 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index a38f393442c..7059c32a693 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -347,6 +347,7 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, keys, obj1) require.Contains(t, keys, obj2) + // testing maxuploads - only first upload should return maxUploads := aws.Int32(1) output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, MaxUploads: maxUploads}) require.NoError(t, err, "failed to list multipart uploads") @@ -354,9 +355,9 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, keys, obj1) require.NotContains(t, keys, obj2) - keyMarker := output.KeyMarker - uploadIDMarker := output.UploadIdMarker - + // testing key marker and upload id marker for pagination. only records after marker should return + keyMarker := output.NextKeyMarker + uploadIDMarker := output.NextUploadIdMarker output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, MaxUploads: maxUploads, KeyMarker: keyMarker, UploadIdMarker: uploadIDMarker}) require.NoError(t, err, "failed to list multipart uploads") keys = extractUploadKeys(output) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index e7c438c69d6..8815f4fe569 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -127,6 +127,7 @@ type ListMultipartUploadsResponse struct { Uploads []types.MultipartUpload NextUploadIDMarker *string NextKeyMarker *string + IsTruncated bool } // CreateMultiPartUploadOpts contains optional arguments for diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 97b395b70ff..3a4b2e4fbba 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -881,6 +881,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin Uploads: resp.Uploads, NextUploadIDMarker: resp.NextUploadIdMarker, NextKeyMarker: resp.NextKeyMarker, + IsTruncated: *resp.IsTruncated, } return &mpuResp, nil } diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index a1d21e99ec9..2a40b0578a3 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -462,6 +462,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep Uploads: uploads, NextKeyMarker: *mpuResp.NextKeyMarker, NextUploadIDMarker: *mpuResp.NextUploadIDMarker, + IsTruncted: mpuResp.IsTruncated, } o.EncodeResponse(w, req, resp, http.StatusOK) } diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index f45fe7da21d..e7fbae2ea03 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -170,6 +170,7 @@ type ListMultipartUploadsOutput struct { Uploads []Upload `xml:"Upload"` NextKeyMarker string `xml:"NextKeyMarker"` NextUploadIDMarker string `xml:"NextUploadIDMarker"` + IsTruncted bool `xml:"IsTruncted"` } type VersioningConfiguration struct { From 645e5cf5f360740c601ac185597b91dcf0efe593 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 28 Jan 2025 16:17:58 +0200 Subject: [PATCH 39/48] with error not implemented --- esti/s3_gateway_test.go | 11 +++++++++++ pkg/gateway/operations/listobjects.go | 11 +++++++++++ 2 files changed, 22 insertions(+) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 7059c32a693..a2ff9140007 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -372,6 +372,17 @@ func TestListMultipartUploads(t *testing.T) { keys = extractUploadKeys(output) require.NotContains(t, keys, obj1) require.Contains(t, keys, obj2) + + // unsupported headers, expect error + delimiter := aws.String("/") + prefix := aws.String("prefix") + encodingType := types.EncodingTypeUrl + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, Delimiter: delimiter}) + require.ErrorIs(t, err, gtwerrors.ErrNotImplemented) + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, Prefix: prefix}) + require.ErrorIs(t, err, gtwerrors.ErrNotImplemented) + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, EncodingType: encodingType}) + require.ErrorIs(t, err, gtwerrors.ErrNotImplemented) } func extractUploadKeys(output *s3.ListMultipartUploadsOutput) []string { diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 2a40b0578a3..2b014f28d65 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -26,6 +26,10 @@ const ( QueryParamMaxUploads = "max-uploads" QueryParamUploadIDMarker = "upload-id-marker" QueryParamKeyMarker = "key-marker" + // missing implementation - will return error + QueryParampPrefix = "prefix" + QueryParampEncodingType = "encoding-type" + QueryParampDelimiter = "delimiter" ) type ListObjects struct{} @@ -409,6 +413,13 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep maxUploadsStr := query.Get(QueryParamMaxUploads) uploadIDMarker := query.Get(QueryParamUploadIDMarker) keyMarker := query.Get(QueryParamKeyMarker) + prefix := query.Get(QueryParampPrefix) + delimiter := query.Get(QueryParampDelimiter) + encodingType := query.Get(QueryParampEncodingType) + if prefix != "" || delimiter != "" || encodingType != "" { + _ = o.EncodeError(w, req, gatewayerrors.ErrNotImplemented, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNotImplemented)) + return + } opts := block.ListMultipartUploadsOpts{} if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) From 4ecb69174888ee6dd0ddd90a6eaf56982979f103 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 28 Jan 2025 16:36:21 +0200 Subject: [PATCH 40/48] testts will now pass --- esti/s3_gateway_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index a2ff9140007..247adf354fa 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -377,12 +377,18 @@ func TestListMultipartUploads(t *testing.T) { delimiter := aws.String("/") prefix := aws.String("prefix") encodingType := types.EncodingTypeUrl + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, Delimiter: delimiter}) - require.ErrorIs(t, err, gtwerrors.ErrNotImplemented) + require.Error(t, err) + require.Contains(t, err.Error(), "NotImplemented") + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, Prefix: prefix}) - require.ErrorIs(t, err, gtwerrors.ErrNotImplemented) + require.Error(t, err) + require.Contains(t, err.Error(), "NotImplemented") + output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, EncodingType: encodingType}) - require.ErrorIs(t, err, gtwerrors.ErrNotImplemented) + require.Error(t, err) + require.Contains(t, err.Error(), "NotImplemented") } func extractUploadKeys(output *s3.ListMultipartUploadsOutput) []string { From 58961d158548ba47873ae24a73762b8a984f2b44 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Tue, 28 Jan 2025 19:37:11 +0200 Subject: [PATCH 41/48] truncated for real --- pkg/gateway/operations/listobjects.go | 2 +- pkg/gateway/serde/xml.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 2b014f28d65..1d377428ac4 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -473,7 +473,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep Uploads: uploads, NextKeyMarker: *mpuResp.NextKeyMarker, NextUploadIDMarker: *mpuResp.NextUploadIDMarker, - IsTruncted: mpuResp.IsTruncated, + IsTruncated: mpuResp.IsTruncated, } o.EncodeResponse(w, req, resp, http.StatusOK) } diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index e7fbae2ea03..55e0405bfd1 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -170,7 +170,7 @@ type ListMultipartUploadsOutput struct { Uploads []Upload `xml:"Upload"` NextKeyMarker string `xml:"NextKeyMarker"` NextUploadIDMarker string `xml:"NextUploadIDMarker"` - IsTruncted bool `xml:"IsTruncted"` + IsTruncated bool `xml:"IsTruncated"` } type VersioningConfiguration struct { From fa64a69c9d8b1b7cb88ba422746fb46a256fe47e Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 29 Jan 2025 15:48:13 +0200 Subject: [PATCH 42/48] tests --- esti/s3_gateway_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 247adf354fa..f7d00ad6988 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -291,9 +291,6 @@ func TestS3IfNoneMatch(t *testing.T) { func TestListMultipartUploads(t *testing.T) { blockStoreType := viper.GetString(ViperBlockstoreType) - if blockStoreType != "s3" { - t.Skip("Skipping test - blockstore type is not s3") - } ctx, logger, repo := setupTest(t) defer tearDownTest(repo) s3Endpoint := viper.GetString("s3_endpoint") @@ -304,16 +301,17 @@ func TestListMultipartUploads(t *testing.T) { // create two objects for two mpus obj1 := "object1" obj2 := "object2" - path1 := "main/" + obj1 - path2 := "main/" + obj2 + keysPrefix := "main/" + key1 := keysPrefix + obj1 + key2 := keysPrefix + obj2 input1 := &s3.CreateMultipartUploadInput{ Bucket: aws.String(repo), - Key: aws.String(path1), + Key: aws.String(key1), } input2 := &s3.CreateMultipartUploadInput{ Bucket: aws.String(repo), - Key: aws.String(path2), + Key: aws.String(key2), } // create first mpu resp1, err := s3Client.CreateMultipartUpload(ctx, input1) @@ -335,6 +333,10 @@ func TestListMultipartUploads(t *testing.T) { } // check first mpu appears output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) + if blockStoreType != "s3" { + require.Contains(t, err.Error(), "NotImplemented") + t.Skip("Skipping test - blockstore type is not s3") + } require.NoError(t, err, "error listing multiparts") keys := extractUploadKeys(output) require.Contains(t, keys, obj1) From 82f74a30aa30111ae5deb4146fa169499394ecfc Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Wed, 29 Jan 2025 22:29:16 +0200 Subject: [PATCH 43/48] final --- esti/s3_gateway_test.go | 21 ++++++++++++++++----- pkg/block/s3/adapter.go | 2 +- pkg/gateway/errors/errors.go | 5 ----- pkg/gateway/operations/listobjects.go | 9 +++++---- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index f7d00ad6988..04ad8135640 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -335,7 +335,7 @@ func TestListMultipartUploads(t *testing.T) { output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) if blockStoreType != "s3" { require.Contains(t, err.Error(), "NotImplemented") - t.Skip("Skipping test - blockstore type is not s3") + return } require.NoError(t, err, "error listing multiparts") keys := extractUploadKeys(output) @@ -375,20 +375,31 @@ func TestListMultipartUploads(t *testing.T) { require.NotContains(t, keys, obj1) require.Contains(t, keys, obj2) - // unsupported headers, expect error +} +func TestListMultipartUploadsUnsupported(t *testing.T) { + blockStoreType := viper.GetString(ViperBlockstoreType) + if blockStoreType != "s3" { + return + } + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + s3Endpoint := viper.GetString("s3_endpoint") + s3Client := createS3Client(s3Endpoint, t) + Bucket := aws.String(repo) + delimiter := aws.String("/") prefix := aws.String("prefix") encodingType := types.EncodingTypeUrl - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, Delimiter: delimiter}) + _, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Delimiter: delimiter}) require.Error(t, err) require.Contains(t, err.Error(), "NotImplemented") - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, Prefix: prefix}) + _, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Prefix: prefix}) require.Error(t, err) require.Contains(t, err.Error(), "NotImplemented") - output, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket, EncodingType: encodingType}) + _, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, EncodingType: encodingType}) require.Error(t, err) require.Contains(t, err.Error(), "NotImplemented") } diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 3a4b2e4fbba..941720e99ec 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -881,7 +881,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin Uploads: resp.Uploads, NextUploadIDMarker: resp.NextUploadIdMarker, NextKeyMarker: resp.NextKeyMarker, - IsTruncated: *resp.IsTruncated, + IsTruncated: aws.ToBool(resp.IsTruncated), } return &mpuResp, nil } diff --git a/pkg/gateway/errors/errors.go b/pkg/gateway/errors/errors.go index c681d48cbe9..4106896d09d 100644 --- a/pkg/gateway/errors/errors.go +++ b/pkg/gateway/errors/errors.go @@ -205,11 +205,6 @@ func (a APIErrorCode) ToAPIErr() APIError { // Codes - error code to APIError structure, these fields carry respective // descriptions for all the error responses. var Codes = errorCodeMap{ - ErrInvalidArgument: { - Code: "InvalidArgument", - Description: "Argument max-uploads must be an integer between 0 and 2147483647", - HTTPStatusCode: http.StatusBadRequest, - }, ErrInvalidCopyDest: { Code: "InvalidRequest", Description: "This copy request is illegal because it is trying to copy an object to itself without changing the object's metadata, storage class, website redirect location or encryption attributes.", diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 1d377428ac4..0837035c72e 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" + "github.com/aws/aws-sdk-go/aws" "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/catalog" gatewayerrors "github.com/treeverse/lakefs/pkg/gateway/errors" @@ -426,7 +427,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if err != nil { o.Log(req).WithField("maxUploadsStr", maxUploadsStr). WithError(err).Error("malformed query parameter 'maxUploadsStr'") - _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidArgument)) + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidMaxUploads)) return } maxUploads32 := int32(maxUploads) @@ -445,7 +446,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if err != nil { o.Log(req).WithError(err).Error("list multipart uploads failed") - _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInternalError)) + _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrNotImplemented)) return } @@ -471,8 +472,8 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep resp := &serde.ListMultipartUploadsOutput{ Bucket: o.Repository.Name, Uploads: uploads, - NextKeyMarker: *mpuResp.NextKeyMarker, - NextUploadIDMarker: *mpuResp.NextUploadIDMarker, + NextKeyMarker: aws.StringValue(mpuResp.NextKeyMarker), + NextUploadIDMarker: aws.StringValue(mpuResp.NextUploadIDMarker), IsTruncated: mpuResp.IsTruncated, } o.EncodeResponse(w, req, resp, http.StatusOK) From dfbee5cc09d04f9a11dc8dc36e21c7c0fb29fcb2 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 30 Jan 2025 10:32:34 +0200 Subject: [PATCH 44/48] kadima --- esti/s3_gateway_test.go | 25 ++++++++++++++++--------- pkg/gateway/operations/listobjects.go | 5 +++-- pkg/gateway/serde/xml.go | 6 +++--- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index 04ad8135640..cfc8be4840d 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -376,6 +376,7 @@ func TestListMultipartUploads(t *testing.T) { require.Contains(t, keys, obj2) } + func TestListMultipartUploadsUnsupported(t *testing.T) { blockStoreType := viper.GetString(ViperBlockstoreType) if blockStoreType != "s3" { @@ -391,17 +392,23 @@ func TestListMultipartUploadsUnsupported(t *testing.T) { prefix := aws.String("prefix") encodingType := types.EncodingTypeUrl - _, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Delimiter: delimiter}) - require.Error(t, err) - require.Contains(t, err.Error(), "NotImplemented") + t.Run("Delimiter", func(t *testing.T) { + _, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Delimiter: delimiter}) + require.Error(t, err) + require.Contains(t, err.Error(), "NotImplemented") + }) - _, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Prefix: prefix}) - require.Error(t, err) - require.Contains(t, err.Error(), "NotImplemented") + t.Run("Prefix", func(t *testing.T) { + _, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, Prefix: prefix}) + require.Error(t, err) + require.Contains(t, err.Error(), "NotImplemented") + }) - _, err = s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, EncodingType: encodingType}) - require.Error(t, err) - require.Contains(t, err.Error(), "NotImplemented") + t.Run("EncodingType", func(t *testing.T) { + _, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: Bucket, EncodingType: encodingType}) + require.Error(t, err) + require.Contains(t, err.Error(), "NotImplemented") + }) } func extractUploadKeys(output *s3.ListMultipartUploadsOutput) []string { diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 0837035c72e..09b273b68fe 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -21,7 +21,8 @@ import ( const ( ListObjectMaxKeys = 1000 - + maxUploadsRange = 2147483647 + minUploadsRange = 0 // defaultBucketLocation used to identify if we need to specify the location constraint defaultBucketLocation = "us-east-1" QueryParamMaxUploads = "max-uploads" @@ -424,7 +425,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep opts := block.ListMultipartUploadsOpts{} if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) - if err != nil { + if err != nil || maxUploads < minUploadsRange || maxUploads > maxUploadsRange { o.Log(req).WithField("maxUploadsStr", maxUploadsStr). WithError(err).Error("malformed query parameter 'maxUploadsStr'") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidMaxUploads)) diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index 55e0405bfd1..4f3dc03a768 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -168,9 +168,9 @@ type ListMultipartUploadsOutput struct { XMLName xml.Name `xml:"ListMultipartUploadsResult"` Bucket string `xml:"Bucket"` Uploads []Upload `xml:"Upload"` - NextKeyMarker string `xml:"NextKeyMarker"` - NextUploadIDMarker string `xml:"NextUploadIDMarker"` - IsTruncated bool `xml:"IsTruncated"` + NextKeyMarker string `xml:"NextKeyMarker,omitempty"` + NextUploadIDMarker string `xml:"NextUploadIDMarker,omitempty"` + IsTruncated bool `xml:"IsTruncated,omitempty"` } type VersioningConfiguration struct { From d21b831867c18900c1f1f031dc165b08ed5d1a63 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 30 Jan 2025 16:14:46 +0200 Subject: [PATCH 45/48] final adjusments --- esti/s3_gateway_test.go | 7 +++---- pkg/gateway/errors/errors.go | 1 - pkg/gateway/operations/listobjects.go | 5 ++--- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/esti/s3_gateway_test.go b/esti/s3_gateway_test.go index cfc8be4840d..b0271abb855 100644 --- a/esti/s3_gateway_test.go +++ b/esti/s3_gateway_test.go @@ -291,6 +291,9 @@ func TestS3IfNoneMatch(t *testing.T) { func TestListMultipartUploads(t *testing.T) { blockStoreType := viper.GetString(ViperBlockstoreType) + if blockStoreType != "s3" { + return + } ctx, logger, repo := setupTest(t) defer tearDownTest(repo) s3Endpoint := viper.GetString("s3_endpoint") @@ -333,10 +336,6 @@ func TestListMultipartUploads(t *testing.T) { } // check first mpu appears output, err := s3Client.ListMultipartUploads(ctx, &s3.ListMultipartUploadsInput{Bucket: resp1.Bucket}) - if blockStoreType != "s3" { - require.Contains(t, err.Error(), "NotImplemented") - return - } require.NoError(t, err, "error listing multiparts") keys := extractUploadKeys(output) require.Contains(t, keys, obj1) diff --git a/pkg/gateway/errors/errors.go b/pkg/gateway/errors/errors.go index 4106896d09d..b3c182afe53 100644 --- a/pkg/gateway/errors/errors.go +++ b/pkg/gateway/errors/errors.go @@ -60,7 +60,6 @@ const ( ErrInternalError ErrInvalidAccessKeyID ErrInvalidBucketName - ErrInvalidArgument ErrInvalidDigest ErrInvalidRange ErrInvalidCopyPartRange diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 09b273b68fe..9ec13454034 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -23,6 +23,7 @@ const ( ListObjectMaxKeys = 1000 maxUploadsRange = 2147483647 minUploadsRange = 0 + // defaultBucketLocation used to identify if we need to specify the location constraint defaultBucketLocation = "us-east-1" QueryParamMaxUploads = "max-uploads" @@ -425,13 +426,11 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep opts := block.ListMultipartUploadsOpts{} if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) + maxUploads32 := int32(maxUploads) if err != nil || maxUploads < minUploadsRange || maxUploads > maxUploadsRange { - o.Log(req).WithField("maxUploadsStr", maxUploadsStr). - WithError(err).Error("malformed query parameter 'maxUploadsStr'") _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidMaxUploads)) return } - maxUploads32 := int32(maxUploads) opts.MaxUploads = &maxUploads32 } if uploadIDMarker != "" { From 8ea2f8e6f3785e60862bb7b277a1cc29063002c1 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 30 Jan 2025 17:38:37 +0200 Subject: [PATCH 46/48] max uploads added --- pkg/block/adapter.go | 1 + pkg/block/s3/adapter.go | 1 + pkg/gateway/operations/listobjects.go | 7 ++++--- pkg/gateway/serde/xml.go | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 8815f4fe569..ad105c485ba 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -128,6 +128,7 @@ type ListMultipartUploadsResponse struct { NextUploadIDMarker *string NextKeyMarker *string IsTruncated bool + MaxUploads *int32 } // CreateMultiPartUploadOpts contains optional arguments for diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 941720e99ec..cdead0dde13 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -882,6 +882,7 @@ func (a *Adapter) ListMultipartUploads(ctx context.Context, obj block.ObjectPoin NextUploadIDMarker: resp.NextUploadIdMarker, NextKeyMarker: resp.NextKeyMarker, IsTruncated: aws.ToBool(resp.IsTruncated), + MaxUploads: resp.MaxUploads, } return &mpuResp, nil } diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 9ec13454034..9a30b5fe0ad 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -21,8 +21,8 @@ import ( const ( ListObjectMaxKeys = 1000 - maxUploadsRange = 2147483647 - minUploadsRange = 0 + maxUploadsListMPU = 2147483647 + minUploadsListMPU = 0 // defaultBucketLocation used to identify if we need to specify the location constraint defaultBucketLocation = "us-east-1" @@ -427,7 +427,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) maxUploads32 := int32(maxUploads) - if err != nil || maxUploads < minUploadsRange || maxUploads > maxUploadsRange { + if err != nil || maxUploads < minUploadsListMPU || maxUploads > maxUploadsListMPU { _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidMaxUploads)) return } @@ -475,6 +475,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep NextKeyMarker: aws.StringValue(mpuResp.NextKeyMarker), NextUploadIDMarker: aws.StringValue(mpuResp.NextUploadIDMarker), IsTruncated: mpuResp.IsTruncated, + MaxUploads: aws.Int32Value(mpuResp.MaxUploads), } o.EncodeResponse(w, req, resp, http.StatusOK) } diff --git a/pkg/gateway/serde/xml.go b/pkg/gateway/serde/xml.go index 4f3dc03a768..fab5d0a8d29 100644 --- a/pkg/gateway/serde/xml.go +++ b/pkg/gateway/serde/xml.go @@ -171,6 +171,7 @@ type ListMultipartUploadsOutput struct { NextKeyMarker string `xml:"NextKeyMarker,omitempty"` NextUploadIDMarker string `xml:"NextUploadIDMarker,omitempty"` IsTruncated bool `xml:"IsTruncated,omitempty"` + MaxUploads int32 `xml:"MaxUploads,omitempty"` } type VersioningConfiguration struct { From eaf897239a9f20099eb3466440a38b6d1d6360b0 Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 30 Jan 2025 18:17:48 +0200 Subject: [PATCH 47/48] kadima --- pkg/gateway/operations/listobjects.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index 9a30b5fe0ad..beea3f497f1 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -21,8 +21,7 @@ import ( const ( ListObjectMaxKeys = 1000 - maxUploadsListMPU = 2147483647 - minUploadsListMPU = 0 + maxUploadsListMPU = 1000 // defaultBucketLocation used to identify if we need to specify the location constraint defaultBucketLocation = "us-east-1" @@ -427,10 +426,13 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep if maxUploadsStr != "" { maxUploads, err := strconv.ParseInt(maxUploadsStr, 10, 32) maxUploads32 := int32(maxUploads) - if err != nil || maxUploads < minUploadsListMPU || maxUploads > maxUploadsListMPU { + if err != nil || maxUploads < 0 { _ = o.EncodeError(w, req, err, gatewayerrors.Codes.ToAPIErr(gatewayerrors.ErrInvalidMaxUploads)) return } + if maxUploads > maxUploadsListMPU { + maxUploads = maxUploadsListMPU + } opts.MaxUploads = &maxUploads32 } if uploadIDMarker != "" { From ad126b753dbbcbf18a99c407c200e09e2b21b45e Mon Sep 17 00:00:00 2001 From: Itamar Yuran Date: Thu, 30 Jan 2025 18:31:22 +0200 Subject: [PATCH 48/48] nu --- pkg/gateway/operations/listobjects.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/operations/listobjects.go b/pkg/gateway/operations/listobjects.go index beea3f497f1..39b9227d3c1 100644 --- a/pkg/gateway/operations/listobjects.go +++ b/pkg/gateway/operations/listobjects.go @@ -431,7 +431,7 @@ func handleListMultipartUploads(w http.ResponseWriter, req *http.Request, o *Rep return } if maxUploads > maxUploadsListMPU { - maxUploads = maxUploadsListMPU + maxUploads32 = maxUploadsListMPU } opts.MaxUploads = &maxUploads32 }