diff --git a/pkg/api/controller.go b/pkg/api/controller.go index f927053d565..b8327bb2d9e 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -216,6 +216,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http // create a new multipart upload mpuResp, err := c.BlockAdapter.CreateMultiPartUpload(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: address, @@ -229,6 +230,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http for i := 0; i < swag.IntValue(params.Parts); i++ { // generate a pre-signed PUT url for the given request preSignedURL, err := c.BlockAdapter.GetPresignUploadPartURL(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, Identifier: address, IdentifierType: block.IdentifierTypeRelative, @@ -300,6 +302,7 @@ func (c *Controller) AbortPresignMultipartUpload(w http.ResponseWriter, r *http. } if err := c.BlockAdapter.AbortMultiPartUpload(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: physicalAddress, @@ -372,6 +375,7 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht } mpuResp, err := c.BlockAdapter.CompleteMultiPartUpload(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: physicalAddress, @@ -702,6 +706,7 @@ func (c *Controller) GetPhysicalAddress(w http.ResponseWriter, r *http.Request, if swag.BoolValue(params.Presign) { // generate a pre-signed PUT url for the given request preSignedURL, expiry, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, Identifier: address, IdentifierType: block.IdentifierTypeRelative, @@ -2092,6 +2097,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac // this serves two purposes, first, we maintain safety check for older lakeFS version. // second, in scenarios where lakeFS shouldn't have access to the root namespace (i.e pre-sign URL only). if c.Config.GetBaseConfig().Graveler.EnsureReadableRootNamespace { + // TODO (gilo): ObjectPointer init - add StorageID here rootObj := block.ObjectPointer{ StorageNamespace: storageNamespace, IdentifierType: block.IdentifierTypeRelative, @@ -2109,6 +2115,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac // check if the dummy file exists obj := block.ObjectPointer{ + // TODO (gilo): ObjectPointer init - add StorageID here StorageNamespace: storageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: dummyKey, @@ -2559,6 +2566,7 @@ func (c *Controller) GetRunHookOutput(w http.ResponseWriter, r *http.Request, re logPath := taskResult.LogPath() reader, err := c.BlockAdapter.Get(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: logPath, @@ -3206,13 +3214,19 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi writeError(w, r, http.StatusInternalServerError, err) return } + opts := block.PutOpts{StorageClass: params.StorageClass} var blob *upload.Blob if mediaType != "multipart/form-data" { // handle non-multipart, direct content upload address := c.PathProvider.NewPath() - blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, r.Body, r.ContentLength, - block.PutOpts{StorageClass: params.StorageClass}) + objectPointer := block.ObjectPointer{ + StorageID: repo.StorageID, + StorageNamespace: repo.StorageNamespace, + IdentifierType: block.IdentifierTypeRelative, + Identifier: address, + } + blob, err = upload.WriteBlob(ctx, c.BlockAdapter, objectPointer, r.Body, r.ContentLength, opts) if err != nil { writeError(w, r, http.StatusInternalServerError, err) return @@ -3240,8 +3254,13 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi partName := part.FormName() if partName == "content" { // upload the first "content" and exit the loop - address := c.PathProvider.NewPath() - blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, part, -1, block.PutOpts{StorageClass: params.StorageClass}) + objectPointer := block.ObjectPointer{ + StorageID: repo.StorageID, + StorageNamespace: repo.StorageNamespace, + IdentifierType: block.IdentifierTypeRelative, + Identifier: c.PathProvider.NewPath(), + } + blob, err = upload.WriteBlob(ctx, c.BlockAdapter, objectPointer, part, -1, opts) if err != nil { _ = part.Close() writeError(w, r, http.StatusInternalServerError, err) @@ -3626,6 +3645,7 @@ func (c *Controller) PrepareGarbageCollectionCommits(w http.ResponseWriter, r *h if c.handleAPIError(ctx, w, r, err) { return } + // TODO (gilo): ObjectPointer init - add StorageID here presignedURL, _, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ Identifier: gcRunMetadata.CommitsCSVLocation, IdentifierType: block.IdentifierTypeFull, @@ -3856,6 +3876,7 @@ func (c *Controller) DumpRefs(w http.ResponseWriter, r *http.Request, repository return } _, err = c.BlockAdapter.Put(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: fmt.Sprintf("%s/refs_manifest.json", c.Config.GetBaseConfig().Committed.BlockStoragePrefix), @@ -4196,6 +4217,7 @@ func writeSymlink(ctx context.Context, repo *catalog.Repository, branch, path st address := fmt.Sprintf("%s/%s/%s/%s/symlink.txt", lakeFSPrefix, repo.Name, branch, path) data := strings.Join(addresses, "\n") _, err := adapter.Put(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: address, @@ -4403,6 +4425,7 @@ func (c *Controller) GetMetadataObject(w http.ResponseWriter, r *http.Request, r // if pre-sign, return a redirect pointer := block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: objPath, @@ -4482,6 +4505,7 @@ func (c *Controller) GetObject(w http.ResponseWriter, r *http.Request, repositor // if pre-sign, return a redirect pointer := block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: entry.AddressType.ToIdentifierType(), Identifier: entry.PhysicalAddress, @@ -4628,6 +4652,7 @@ func (c *Controller) ListObjects(w http.ResponseWriter, r *http.Request, reposit if authResponse.Allowed { var expiry time.Time objStat.PhysicalAddress, expiry, err = c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: entry.AddressType.ToIdentifierType(), Identifier: entry.PhysicalAddress, @@ -4710,6 +4735,7 @@ func (c *Controller) StatObject(w http.ResponseWriter, r *http.Request, reposito } else if swag.BoolValue(params.Presign) { // need to pre-sign the physical address preSignedURL, expiry, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: entry.AddressType.ToIdentifierType(), Identifier: entry.PhysicalAddress, @@ -4771,6 +4797,7 @@ func (c *Controller) GetUnderlyingProperties(w http.ResponseWriter, r *http.Requ // read object properties from underlying storage properties, err := c.BlockAdapter.GetProperties(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: entry.AddressType.ToIdentifierType(), Identifier: entry.PhysicalAddress, diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 849fc314e34..9e319989c40 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -2549,8 +2549,13 @@ func TestController_ObjectsHeadObjectHandler(t *testing.T) { buf := new(bytes.Buffer) buf.WriteString("this is file content made up of bytes") - address := upload.DefaultPathProvider.NewPath() - blob, err := upload.WriteBlob(context.Background(), deps.blocks, onBlock(deps, "ns1"), address, buf, 37, block.PutOpts{StorageClass: &expensiveString}) + objectPointer := block.ObjectPointer{ + StorageID: "", + StorageNamespace: onBlock(deps, "ns1"), + IdentifierType: block.IdentifierTypeRelative, + Identifier: upload.DefaultPathProvider.NewPath(), + } + blob, err := upload.WriteBlob(context.Background(), deps.blocks, objectPointer, buf, 37, block.PutOpts{StorageClass: &expensiveString}) if err != nil { t.Fatal(err) } @@ -2625,8 +2630,13 @@ func TestController_ObjectsGetObjectHandler(t *testing.T) { buf := new(bytes.Buffer) buf.WriteString("this is file content made up of bytes") - address := upload.DefaultPathProvider.NewPath() - blob, err := upload.WriteBlob(context.Background(), deps.blocks, onBlock(deps, "ns1"), address, buf, 37, block.PutOpts{StorageClass: &expensiveString}) + objectPointer := block.ObjectPointer{ + StorageID: "", + StorageNamespace: onBlock(deps, "ns1"), + IdentifierType: block.IdentifierTypeRelative, + Identifier: upload.DefaultPathProvider.NewPath(), + } + blob, err := upload.WriteBlob(context.Background(), deps.blocks, objectPointer, buf, 37, block.PutOpts{StorageClass: &expensiveString}) if err != nil { t.Fatal(err) } diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index defc543bb17..7f6db15b180 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -63,6 +63,7 @@ const DefaultPreSignExpiryDuration = 15 * time.Minute // ObjectPointer is a unique identifier of an object in the object // store: the store is a 1:1 mapping between pointers and objects. type ObjectPointer struct { + StorageID string StorageNamespace string Identifier string diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index 4d14adb446c..898b07c2bb6 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -139,6 +139,7 @@ func ResolveBlobURLInfoFromURL(pathURL *url.URL) (BlobURLInfo, error) { func resolveBlobURLInfo(obj block.ObjectPointer) (BlobURLInfo, error) { key := obj.Identifier + // we're in the context of a specific storage here, so there's no need for StorageID. defaultNamespace := obj.StorageNamespace var qk BlobURLInfo // check if the key is fully qualified diff --git a/pkg/block/blocktest/adapter.go b/pkg/block/blocktest/adapter.go index 9db4c0f18a8..5fa7f8010de 100644 --- a/pkg/block/blocktest/adapter.go +++ b/pkg/block/blocktest/adapter.go @@ -39,6 +39,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s part1 := "this is the first part " part2 := "this is the last part" _, err := adapter.Put(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "test_file", IdentifierType: block.IdentifierTypeRelative, @@ -63,6 +64,7 @@ func testAdapterGetRange(t *testing.T, adapter block.Adapter, storageNamespace s for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { reader, err := adapter.GetRange(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "test_file", IdentifierType: block.IdentifierTypeRelative, @@ -89,6 +91,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str for i := 0; i < filesAndFolders; i++ { for j := 0; j < filesAndFolders; j++ { _, err := adapter.Put(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: fmt.Sprintf("%s/folder_%d/test_file_%d", testPrefix, filesAndFolders-i-1, filesAndFolders-j-1), IdentifierType: block.IdentifierTypeRelative, @@ -98,6 +101,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str } _, err := adapter.Put(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: fmt.Sprintf("%s/folder_0.txt", testPrefix), IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/block/blocktest/basic_suite.go b/pkg/block/blocktest/basic_suite.go index e4541be51b2..f96d5efb98a 100644 --- a/pkg/block/blocktest/basic_suite.go +++ b/pkg/block/blocktest/basic_suite.go @@ -39,6 +39,7 @@ func testAdapterPutGet(t *testing.T, adapter block.Adapter, storageNamespace, ex for _, c := range cases { t.Run(c.name, func(t *testing.T) { obj := block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: c.path, IdentifierType: c.identifierType, @@ -64,11 +65,13 @@ func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace strin ctx := context.Background() contents := "foo bar baz quux" src := block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "src", IdentifierType: block.IdentifierTypeRelative, } dst := block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "export/to/dst", IdentifierType: block.IdentifierTypeRelative, @@ -130,6 +133,7 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str envObjects = append(envObjects, tt.path) for _, p := range envObjects { obj := block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: tt.name + "/" + p, IdentifierType: block.IdentifierTypeRelative, @@ -140,6 +144,7 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str // test Remove obj := block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: tt.name + "/" + tt.path, IdentifierType: block.IdentifierTypeRelative, @@ -165,6 +170,7 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str const contents = "exists" ctx := context.Background() _, err := adapter.Put(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: contents, IdentifierType: block.IdentifierTypeRelative, @@ -172,6 +178,7 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str require.NoError(t, err) _, err = adapter.Put(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "nested/and/" + contents, IdentifierType: block.IdentifierTypeRelative, @@ -192,6 +199,7 @@ func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace str for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { ok, err := adapter.Exists(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: tt.path, IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/block/blocktest/multipart_suite.go b/pkg/block/blocktest/multipart_suite.go index 2959a087fce..a8961bd95fb 100644 --- a/pkg/block/blocktest/multipart_suite.go +++ b/pkg/block/blocktest/multipart_suite.go @@ -41,6 +41,7 @@ func testAdapterMultipartUpload(t *testing.T, adapter block.Adapter, storageName t.Run(c.name, func(t *testing.T) { blockstoreType := adapter.BlockstoreType() obj := block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: c.path, IdentifierType: block.IdentifierTypeRelative, @@ -296,12 +297,14 @@ func requireEqualBigByteSlice(t *testing.T, exp, actual []byte) { func objPointers(storageNamespace string) (block.ObjectPointer, block.ObjectPointer) { var obj = block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "abc", IdentifierType: block.IdentifierTypeRelative, } var objCopy = block.ObjectPointer{ + StorageID: "", StorageNamespace: storageNamespace, Identifier: "abcCopy", IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index 121c63d661d..f89291da51b 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -246,7 +246,12 @@ func (l *Adapter) UploadCopyPart(ctx context.Context, sourceObj, destinationObj } md5Read := block.NewHashingReader(r, block.HashFunctionMD5) fName := uploadID + fmt.Sprintf("-%05d", partNumber) - _, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) + objectPointer := block.ObjectPointer{ + StorageID: destinationObj.StorageID, + StorageNamespace: destinationObj.StorageNamespace, + Identifier: fName, + } + _, err = l.Put(ctx, objectPointer, -1, md5Read, block.PutOpts{}) if err != nil { return nil, fmt.Errorf("copy put: %w", err) } @@ -266,7 +271,12 @@ func (l *Adapter) UploadCopyPartRange(ctx context.Context, sourceObj, destinatio } md5Read := block.NewHashingReader(r, block.HashFunctionMD5) fName := uploadID + fmt.Sprintf("-%05d", partNumber) - _, err = l.Put(ctx, block.ObjectPointer{StorageNamespace: destinationObj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) + objectPointer := block.ObjectPointer{ + StorageID: destinationObj.StorageID, + StorageNamespace: destinationObj.StorageNamespace, + Identifier: fName, + } + _, err = l.Put(ctx, objectPointer, -1, md5Read, block.PutOpts{}) if err != nil { return nil, fmt.Errorf("copy range put: %w", err) } @@ -398,7 +408,12 @@ func (l *Adapter) UploadPart(ctx context.Context, obj block.ObjectPointer, _ int } md5Read := block.NewHashingReader(reader, block.HashFunctionMD5) fName := uploadID + fmt.Sprintf("-%05d", partNumber) - _, err := l.Put(ctx, block.ObjectPointer{StorageNamespace: obj.StorageNamespace, Identifier: fName}, -1, md5Read, block.PutOpts{}) + objectPointer := block.ObjectPointer{ + StorageID: obj.StorageID, + StorageNamespace: obj.StorageNamespace, + Identifier: fName, + } + _, err := l.Put(ctx, objectPointer, -1, md5Read, block.PutOpts{}) etag := hex.EncodeToString(md5Read.Md5.Sum(nil)) return &block.UploadPartResponse{ ETag: etag, @@ -505,6 +520,7 @@ func (l *Adapter) removePartFiles(files []string) error { func (l *Adapter) getPartFiles(uploadID string, obj block.ObjectPointer) ([]string, error) { newObj := block.ObjectPointer{ + StorageID: obj.StorageID, StorageNamespace: obj.StorageNamespace, Identifier: uploadID, } @@ -544,6 +560,7 @@ func (l *Adapter) ResolveNamespace(storageNamespace, key string, identifierType } // Check if path allowed and return error if path is not allowed + // TODO (gilo): ObjectPointer init - add StorageID here _, err = l.extractParamsFromObj(block.ObjectPointer{ StorageNamespace: storageNamespace, Identifier: key, diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index 2591dcdfc9d..d0e96e79cb2 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -411,6 +411,7 @@ func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, log := a.log(ctx).WithFields(logging.Fields{ "operation": "GetPreSignedURL", + "storage_id": obj.StorageID, "namespace": obj.StorageNamespace, "identifier": obj.Identifier, "ttl": time.Until(expiry), @@ -466,6 +467,7 @@ func (a *Adapter) GetPresignUploadPartURL(ctx context.Context, obj block.ObjectP log := a.log(ctx).WithFields(logging.Fields{ "operation": "GetPresignUploadPartURL", + "storage_id": obj.StorageID, "namespace": obj.StorageNamespace, "identifier": obj.Identifier, }) diff --git a/pkg/catalog/actions_output_writer.go b/pkg/catalog/actions_output_writer.go index e9fc56f44d9..7c3b96e0b56 100644 --- a/pkg/catalog/actions_output_writer.go +++ b/pkg/catalog/actions_output_writer.go @@ -18,6 +18,7 @@ func NewActionsOutputWriter(blockAdapter block.Adapter) *ActionsOutputWriter { } func (o *ActionsOutputWriter) OutputWrite(ctx context.Context, storageNamespace, name string, reader io.Reader, size int64) error { + // TODO (gilo): ObjectPointer init - add StorageID here _, err := o.adapter.Put(ctx, block.ObjectPointer{ StorageNamespace: storageNamespace, IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/catalog/actions_source.go b/pkg/catalog/actions_source.go index db0f59852d3..95a9f5e1912 100644 --- a/pkg/catalog/actions_source.go +++ b/pkg/catalog/actions_source.go @@ -93,6 +93,7 @@ func (s *ActionsSource) load(ctx context.Context, record graveler.HookRecord, na // get action address blockAdapter := s.catalog.BlockAdapter reader, err := blockAdapter.Get(ctx, block.ObjectPointer{ + StorageID: repo.StorageID, StorageNamespace: repo.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: ent.PhysicalAddress, diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 8dc65fc29a4..009204130e6 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -2630,7 +2630,7 @@ type UncommittedParquetObject struct { CreationDate int64 `parquet:"name=creation_date, type=INT64, convertedtype=INT_64"` } -func (c *Catalog) uploadFile(ctx context.Context, ns graveler.StorageNamespace, location string, fd *os.File, size int64) (string, error) { +func (c *Catalog) uploadFile(ctx context.Context, repo *graveler.RepositoryRecord, location string, fd *os.File, size int64) (string, error) { _, err := fd.Seek(0, 0) if err != nil { return "", err @@ -2642,7 +2642,8 @@ func (c *Catalog) uploadFile(ctx context.Context, ns graveler.StorageNamespace, return "", err } obj := block.ObjectPointer{ - StorageNamespace: ns.String(), + StorageID: repo.StorageID.String(), + StorageNamespace: repo.StorageNamespace.String(), Identifier: identifier, IdentifierType: block.IdentifierTypeFull, } @@ -2705,7 +2706,7 @@ func (c *Catalog) PrepareGCUncommitted(ctx context.Context, repositoryID string, return nil, err } - name, err = c.uploadFile(ctx, repository.StorageNamespace, uncommittedLocation, fd, uw.Size()) + name, err = c.uploadFile(ctx, repository, uncommittedLocation, fd, uw.Size()) if err != nil { return nil, err } @@ -2757,11 +2758,13 @@ func (c *Catalog) CopyEntry(ctx context.Context, srcRepository, srcRef, srcPath, } srcObject := block.ObjectPointer{ + StorageID: srcRepo.StorageID, StorageNamespace: srcRepo.StorageNamespace, IdentifierType: srcEntry.AddressType.ToIdentifierType(), Identifier: srcEntry.PhysicalAddress, } destObj := block.ObjectPointer{ + StorageID: destRepo.StorageID, StorageNamespace: destRepo.StorageNamespace, IdentifierType: dstEntry.AddressType.ToIdentifierType(), Identifier: dstEntry.PhysicalAddress, diff --git a/pkg/catalog/catalog_test.go b/pkg/catalog/catalog_test.go index 0a1679837aa..8c786bb9396 100644 --- a/pkg/catalog/catalog_test.go +++ b/pkg/catalog/catalog_test.go @@ -908,8 +908,9 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num func readPhysicalAddressesFromParquetObject(t *testing.T, repositoryID string, ctx context.Context, c *catalog.Catalog, obj string) []string { objReader, err := c.BlockAdapter.Get(ctx, block.ObjectPointer{ - Identifier: obj, - IdentifierType: block.IdentifierTypeFull, + Identifier: obj, + IdentifierType: block.IdentifierTypeFull, + // in the context of an adapter here, so no need to specify StorageID. StorageNamespace: "mem://" + repositoryID, }) require.NoError(t, err) diff --git a/pkg/gateway/operations/deleteobject.go b/pkg/gateway/operations/deleteobject.go index c76a1519fc8..2e10099f65c 100644 --- a/pkg/gateway/operations/deleteobject.go +++ b/pkg/gateway/operations/deleteobject.go @@ -42,6 +42,7 @@ func (controller *DeleteObject) HandleAbortMultipartUpload(w http.ResponseWriter req = req.WithContext(logging.AddFields(ctx, logging.Fields{logging.UploadIDFieldKey: uploadID})) err = o.BlockStore.AbortMultiPartUpload(ctx, block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: mpu.PhysicalAddress, diff --git a/pkg/gateway/operations/getobject.go b/pkg/gateway/operations/getobject.go index 22f054bccfc..54abc44978c 100644 --- a/pkg/gateway/operations/getobject.go +++ b/pkg/gateway/operations/getobject.go @@ -110,6 +110,7 @@ func (controller *GetObject) Handle(w http.ResponseWriter, req *http.Request, o contentLength := entry.Size contentRange := "" objectPointer := block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: entry.AddressType.ToIdentifierType(), Identifier: entry.PhysicalAddress, @@ -208,6 +209,7 @@ func handleListParts(w http.ResponseWriter, req *http.Request, o *PathOperation) } partsResp, err := o.BlockStore.ListParts(req.Context(), block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: multiPart.PhysicalAddress, diff --git a/pkg/gateway/operations/postobject.go b/pkg/gateway/operations/postobject.go index d3cd11ab0f3..07038a4d4fc 100644 --- a/pkg/gateway/operations/postobject.go +++ b/pkg/gateway/operations/postobject.go @@ -54,6 +54,7 @@ func (controller *PostObject) HandleCreateMultipartUpload(w http.ResponseWriter, storageClass := StorageClassFromHeader(req.Header) opts := block.CreateMultiPartUploadOpts{StorageClass: storageClass} resp, err := o.BlockStore.CreateMultiPartUpload(req.Context(), block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: address, @@ -130,6 +131,7 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite normalizeMultipartUploadCompletion(&multipartList) resp, err := o.BlockStore.CompleteMultiPartUpload(req.Context(), block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: objName, diff --git a/pkg/gateway/operations/putobject.go b/pkg/gateway/operations/putobject.go index aeacec0442b..4206544b5d7 100644 --- a/pkg/gateway/operations/putobject.go +++ b/pkg/gateway/operations/putobject.go @@ -178,12 +178,14 @@ func handleUploadPart(w http.ResponseWriter, req *http.Request, o *PathOperation } src := block.ObjectPointer{ + StorageID: srcRepo.StorageID, StorageNamespace: srcRepo.StorageNamespace, IdentifierType: ent.AddressType.ToIdentifierType(), Identifier: ent.PhysicalAddress, } dst := block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: multiPart.PhysicalAddress, @@ -225,6 +227,7 @@ func handleUploadPart(w http.ResponseWriter, req *http.Request, o *PathOperation byteSize := req.ContentLength resp, err := o.BlockStore.UploadPart(req.Context(), block.ObjectPointer{ + StorageID: o.Repository.StorageID, StorageNamespace: o.Repository.StorageNamespace, IdentifierType: block.IdentifierTypeRelative, Identifier: multiPart.PhysicalAddress, @@ -315,8 +318,13 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) { return } } - address := o.PathProvider.NewPath() - blob, err := upload.WriteBlob(req.Context(), o.BlockStore, o.Repository.StorageNamespace, address, req.Body, req.ContentLength, opts) + objectPointer := block.ObjectPointer{ + StorageID: o.Repository.StorageID, + StorageNamespace: o.Repository.StorageNamespace, + IdentifierType: block.IdentifierTypeRelative, + Identifier: o.PathProvider.NewPath(), + } + blob, err := upload.WriteBlob(req.Context(), o.BlockStore, objectPointer, req.Body, req.ContentLength, opts) if err != nil { o.Log(req).WithError(err).Error("could not write request body to block adapter") _ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrInternalError)) diff --git a/pkg/gateway/operations/putobject_test.go b/pkg/gateway/operations/putobject_test.go index 373d8d352ee..69feef1a94f 100644 --- a/pkg/gateway/operations/putobject_test.go +++ b/pkg/gateway/operations/putobject_test.go @@ -15,8 +15,8 @@ import ( ) const ( - bucketName = "test" - ObjectBlockSize = 1024 * 3 + storageNamespace = "test" + ObjectBlockSize = 1024 * 3 expensiveString = "EXPENSIVE" cheapString = "CHEAP" @@ -45,16 +45,21 @@ func TestWriteBlob(t *testing.T) { } reader := bytes.NewReader(data) adapter := testutil.NewMockAdapter() + objectPointer := block.ObjectPointer{ + StorageID: "", + StorageNamespace: storageNamespace, + IdentifierType: block.IdentifierTypeRelative, + Identifier: upload.DefaultPathProvider.NewPath(), + } opts := block.PutOpts{StorageClass: tc.storageClass} - address := upload.DefaultPathProvider.NewPath() - blob, err := upload.WriteBlob(context.Background(), adapter, bucketName, address, reader, tc.size, opts) + blob, err := upload.WriteBlob(context.Background(), adapter, objectPointer, reader, tc.size, opts) if err != nil { t.Fatal(err) } - // test bucketName - if adapter.LastBucket != bucketName && tc.size != 0 { - t.Fatalf("write to wrong bucket: expected:%s got:%s", bucketName, adapter.LastBucket) + // test storageNamespace + if adapter.LastStorageNamespace != storageNamespace && tc.size != 0 { + t.Fatalf("write to wrong storageNamespace: expected:%s got:%s", storageNamespace, adapter.LastStorageNamespace) } // test data size expectedSize := int64(len(data)) diff --git a/pkg/graveler/retention/garbage_collection_manager.go b/pkg/graveler/retention/garbage_collection_manager.go index 89ad027b4de..de7580f54d1 100644 --- a/pkg/graveler/retention/garbage_collection_manager.go +++ b/pkg/graveler/retention/garbage_collection_manager.go @@ -82,6 +82,7 @@ func (m *GarbageCollectionManager) SaveGarbageCollectionUncommitted(ctx context. } location += filename _, err = m.blockAdapter.Put(ctx, block.ObjectPointer{ + // TODO (gilo): ObjectPointer init - add StorageID here? Identifier: location, IdentifierType: block.IdentifierTypeFull, }, stat.Size(), fd, block.PutOpts{}) @@ -115,6 +116,7 @@ func NewGarbageCollectionManager(blockAdapter block.Adapter, refManager graveler } func (m *GarbageCollectionManager) GetRules(ctx context.Context, storageNamespace graveler.StorageNamespace) (*graveler.GarbageCollectionRules, error) { + // TODO (gilo): ObjectPointer init - add StorageID here? objectPointer := block.ObjectPointer{ StorageNamespace: string(storageNamespace), Identifier: fmt.Sprintf(configFileSuffixTemplate, m.committedBlockStoragePrefix), @@ -151,6 +153,7 @@ func (m *GarbageCollectionManager) SaveRules(ctx context.Context, storageNamespa if err != nil { return err } + // TODO (gilo): ObjectPointer init - add StorageID here? _, err = m.blockAdapter.Put(ctx, block.ObjectPointer{ StorageNamespace: string(storageNamespace), Identifier: fmt.Sprintf(configFileSuffixTemplate, m.committedBlockStoragePrefix), @@ -206,6 +209,7 @@ func (m *GarbageCollectionManager) SaveGarbageCollectionCommits(ctx context.Cont return "", err } _, err = m.blockAdapter.Put(ctx, block.ObjectPointer{ + // TODO (gilo): ObjectPointer init - add StorageID here? Identifier: csvLocation, IdentifierType: block.IdentifierTypeFull, }, int64(len(commitsStr)), strings.NewReader(commitsStr), block.PutOpts{}) diff --git a/pkg/graveler/retention/garbage_collection_manager_test.go b/pkg/graveler/retention/garbage_collection_manager_test.go index d620a3be2c7..181e348586a 100644 --- a/pkg/graveler/retention/garbage_collection_manager_test.go +++ b/pkg/graveler/retention/garbage_collection_manager_test.go @@ -70,6 +70,7 @@ func TestGarbageCollectionManager_SaveGarbageCollectionUncommitted(t *testing.T) err = gc.SaveGarbageCollectionUncommitted(ctx, &repositoryRec, filename, runID) require.NoError(t, err) reader, err := blockAdapter.Get(ctx, block.ObjectPointer{ + StorageID: "", StorageNamespace: "", Identifier: fmt.Sprintf("%s%s", location, filename), IdentifierType: block.IdentifierTypeFull, diff --git a/pkg/pyramid/tier_fs.go b/pkg/pyramid/tier_fs.go index 3c01d94ba38..b2c2bb7e96c 100644 --- a/pkg/pyramid/tier_fs.go +++ b/pkg/pyramid/tier_fs.go @@ -419,6 +419,7 @@ func (tfs *TierFS) newLocalFileRef(namespace, nsPath, filename string) localFile } func (tfs *TierFS) objPointer(namespace, filename string) block.ObjectPointer { + // TODO (gilo): ObjectPointer init - add StorageID here return block.ObjectPointer{ StorageNamespace: namespace, IdentifierType: block.IdentifierTypeRelative, diff --git a/pkg/samplerepo/samplecontent.go b/pkg/samplerepo/samplecontent.go index b35a36cb21e..56786c24dfb 100644 --- a/pkg/samplerepo/samplecontent.go +++ b/pkg/samplerepo/samplecontent.go @@ -80,8 +80,13 @@ func PopulateSampleRepo(ctx context.Context, repo *catalog.Repository, cat *cata } // write file to storage - address := pathProvider.NewPath() - blob, err := upload.WriteBlob(ctx, blockAdapter, repo.StorageNamespace, address, contentReader, contentSize, block.PutOpts{}) + objectPointer := block.ObjectPointer{ + StorageID: repo.StorageID, + StorageNamespace: repo.StorageNamespace, + IdentifierType: block.IdentifierTypeRelative, + Identifier: pathProvider.NewPath(), + } + blob, err := upload.WriteBlob(ctx, blockAdapter, objectPointer, contentReader, contentSize, block.PutOpts{}) if err != nil { return err } diff --git a/pkg/testutil/adapter.go b/pkg/testutil/adapter.go index 3a6f72e998d..442ab22193b 100644 --- a/pkg/testutil/adapter.go +++ b/pkg/testutil/adapter.go @@ -12,10 +12,10 @@ import ( ) type MockAdapter struct { - TotalSize int64 - Count int - LastBucket string - LastStorageClass *string + TotalSize int64 + Count int + LastStorageNamespace string + LastStorageClass *string blockstoreMetadata *block.BlockstoreMetadata namespaceRegion *string @@ -68,7 +68,7 @@ func (a *MockAdapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, r } a.TotalSize += int64(len(data)) a.Count++ - a.LastBucket = obj.StorageNamespace + a.LastStorageNamespace = obj.StorageNamespace a.LastStorageClass = opts.StorageClass return &block.PutResponse{}, nil } diff --git a/pkg/upload/write_blob.go b/pkg/upload/write_blob.go index db0b594e3b0..02182ea5dab 100644 --- a/pkg/upload/write_blob.go +++ b/pkg/upload/write_blob.go @@ -17,20 +17,16 @@ type Blob struct { CreationDate time.Time } -func WriteBlob(ctx context.Context, adapter block.Adapter, bucketName, address string, body io.Reader, contentLength int64, opts block.PutOpts) (*Blob, error) { +func WriteBlob(ctx context.Context, adapter block.Adapter, objectPointer block.ObjectPointer, body io.Reader, contentLength int64, opts block.PutOpts) (*Blob, error) { // handle the upload itself hashReader := block.NewHashingReader(body, block.HashFunctionMD5, block.HashFunctionSHA256) - res, err := adapter.Put(ctx, block.ObjectPointer{ - StorageNamespace: bucketName, - IdentifierType: block.IdentifierTypeRelative, - Identifier: address, - }, contentLength, hashReader, opts) + res, err := adapter.Put(ctx, objectPointer, contentLength, hashReader, opts) if err != nil { return nil, err } checksum := hex.EncodeToString(hashReader.Md5.Sum(nil)) return &Blob{ - PhysicalAddress: address, + PhysicalAddress: objectPointer.Identifier, RelativePath: true, Checksum: checksum, Size: hashReader.CopiedSize,