-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add StorageID to ObjectPointer #8522
Changes from 33 commits
c1d3587
4e51981
30b3849
b45d428
02e4404
74e2202
222b126
32b273f
65974c0
af9476e
4e8e423
5eebff2
478d498
13a771e
989a388
298bc0d
299c292
b81eff3
acb70e8
3dbd9d1
aa8180a
cecce8d
1ced59b
858812c
df0b2af
920cd0f
7386b7e
6f8d83a
b361b34
428f57a
3221718
a66dba0
789033d
b0a76ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,6 +216,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http | |
|
||
// create a new multipart upload | ||
mpuResp, err := c.BlockAdapter.CreateMultiPartUpload(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: address, | ||
|
@@ -229,6 +230,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http | |
for i := 0; i < swag.IntValue(params.Parts); i++ { | ||
// generate a pre-signed PUT url for the given request | ||
preSignedURL, err := c.BlockAdapter.GetPresignUploadPartURL(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
Identifier: address, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
|
@@ -300,6 +302,7 @@ func (c *Controller) AbortPresignMultipartUpload(w http.ResponseWriter, r *http. | |
} | ||
|
||
if err := c.BlockAdapter.AbortMultiPartUpload(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: physicalAddress, | ||
|
@@ -372,6 +375,7 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht | |
} | ||
|
||
mpuResp, err := c.BlockAdapter.CompleteMultiPartUpload(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: physicalAddress, | ||
|
@@ -702,6 +706,7 @@ func (c *Controller) GetPhysicalAddress(w http.ResponseWriter, r *http.Request, | |
if swag.BoolValue(params.Presign) { | ||
// generate a pre-signed PUT url for the given request | ||
preSignedURL, expiry, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
Identifier: address, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
|
@@ -2091,6 +2096,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac | |
// this serves two purposes, first, we maintain safety check for older lakeFS version. | ||
// second, in scenarios where lakeFS shouldn't have access to the root namespace (i.e pre-sign URL only). | ||
if c.Config.Graveler.EnsureReadableRootNamespace { | ||
// TODO (gilo): ObjectPointer init - add StorageID here | ||
rootObj := block.ObjectPointer{ | ||
StorageNamespace: storageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
|
@@ -2108,6 +2114,7 @@ func (c *Controller) ensureStorageNamespace(ctx context.Context, storageNamespac | |
|
||
// check if the dummy file exists | ||
obj := block.ObjectPointer{ | ||
// TODO (gilo): ObjectPointer init - add StorageID here | ||
StorageNamespace: storageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: dummyKey, | ||
|
@@ -2558,6 +2565,7 @@ func (c *Controller) GetRunHookOutput(w http.ResponseWriter, r *http.Request, re | |
|
||
logPath := taskResult.LogPath() | ||
reader, err := c.BlockAdapter.Get(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: logPath, | ||
|
@@ -3205,13 +3213,19 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi | |
writeError(w, r, http.StatusInternalServerError, err) | ||
return | ||
} | ||
opts := block.PutOpts{StorageClass: params.StorageClass} | ||
|
||
var blob *upload.Blob | ||
if mediaType != "multipart/form-data" { | ||
// handle non-multipart, direct content upload | ||
address := c.PathProvider.NewPath() | ||
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, r.Body, r.ContentLength, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this code style change indicate a change in our best practices? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good question. I don't think we have a code style guide for go, |
||
block.PutOpts{StorageClass: params.StorageClass}) | ||
objectPointer := block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: address, | ||
} | ||
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, objectPointer, r.Body, r.ContentLength, opts) | ||
if err != nil { | ||
writeError(w, r, http.StatusInternalServerError, err) | ||
return | ||
|
@@ -3239,8 +3253,13 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi | |
partName := part.FormName() | ||
if partName == "content" { | ||
// upload the first "content" and exit the loop | ||
address := c.PathProvider.NewPath() | ||
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, repo.StorageNamespace, address, part, -1, block.PutOpts{StorageClass: params.StorageClass}) | ||
objectPointer := block.ObjectPointer{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🎉 |
||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: c.PathProvider.NewPath(), | ||
} | ||
blob, err = upload.WriteBlob(ctx, c.BlockAdapter, objectPointer, part, -1, opts) | ||
if err != nil { | ||
_ = part.Close() | ||
writeError(w, r, http.StatusInternalServerError, err) | ||
|
@@ -3625,6 +3644,7 @@ func (c *Controller) PrepareGarbageCollectionCommits(w http.ResponseWriter, r *h | |
if c.handleAPIError(ctx, w, r, err) { | ||
return | ||
} | ||
// TODO (gilo): ObjectPointer init - add StorageID here | ||
presignedURL, _, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ | ||
Identifier: gcRunMetadata.CommitsCSVLocation, | ||
IdentifierType: block.IdentifierTypeFull, | ||
|
@@ -3855,6 +3875,7 @@ func (c *Controller) DumpRefs(w http.ResponseWriter, r *http.Request, repository | |
return | ||
} | ||
_, err = c.BlockAdapter.Put(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: fmt.Sprintf("%s/refs_manifest.json", c.Config.Committed.BlockStoragePrefix), | ||
|
@@ -4195,6 +4216,7 @@ func writeSymlink(ctx context.Context, repo *catalog.Repository, branch, path st | |
address := fmt.Sprintf("%s/%s/%s/%s/symlink.txt", lakeFSPrefix, repo.Name, branch, path) | ||
data := strings.Join(addresses, "\n") | ||
_, err := adapter.Put(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: address, | ||
|
@@ -4402,6 +4424,7 @@ func (c *Controller) GetMetadataObject(w http.ResponseWriter, r *http.Request, r | |
|
||
// if pre-sign, return a redirect | ||
pointer := block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: block.IdentifierTypeRelative, | ||
Identifier: objPath, | ||
|
@@ -4481,6 +4504,7 @@ func (c *Controller) GetObject(w http.ResponseWriter, r *http.Request, repositor | |
|
||
// if pre-sign, return a redirect | ||
pointer := block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: entry.AddressType.ToIdentifierType(), | ||
Identifier: entry.PhysicalAddress, | ||
|
@@ -4627,6 +4651,7 @@ func (c *Controller) ListObjects(w http.ResponseWriter, r *http.Request, reposit | |
if authResponse.Allowed { | ||
var expiry time.Time | ||
objStat.PhysicalAddress, expiry, err = c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: entry.AddressType.ToIdentifierType(), | ||
Identifier: entry.PhysicalAddress, | ||
|
@@ -4709,6 +4734,7 @@ func (c *Controller) StatObject(w http.ResponseWriter, r *http.Request, reposito | |
} else if swag.BoolValue(params.Presign) { | ||
// need to pre-sign the physical address | ||
preSignedURL, expiry, err := c.BlockAdapter.GetPreSignedURL(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: entry.AddressType.ToIdentifierType(), | ||
Identifier: entry.PhysicalAddress, | ||
|
@@ -4770,6 +4796,7 @@ func (c *Controller) GetUnderlyingProperties(w http.ResponseWriter, r *http.Requ | |
|
||
// read object properties from underlying storage | ||
properties, err := c.BlockAdapter.GetProperties(ctx, block.ObjectPointer{ | ||
StorageID: repo.StorageID, | ||
StorageNamespace: repo.StorageNamespace, | ||
IdentifierType: entry.AddressType.ToIdentifierType(), | ||
Identifier: entry.PhysicalAddress, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,7 @@ const DefaultPreSignExpiryDuration = 15 * time.Minute | |
// ObjectPointer is a unique identifier of an object in the object | ||
// store: the store is a 1:1 mapping between pointers and objects. | ||
type ObjectPointer struct { | ||
StorageID string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Documentation: This is such an important core class class, but there is no explanation what sort of values these fields are ment to take. Is "S3" a valid storage ID? Or maybe a URL is expected like "https://bucketname.s3.amazonaws.com"? I have no idea when reading this code? Same goes for the other two fields There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is out of scope for this PR - we should however do this for the config part and document that well. |
||
StorageNamespace string | ||
Identifier string | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this change delayed? It's going in a different PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce the scope of this one, and making it more reviewable (yet complete enough to be able to merge it in the end).