Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StorageID to adapter GetStorageNamespaceInfo and ResolveNamespace #8551

Merged
merged 14 commits into from
Jan 29, 2025
104 changes: 69 additions & 35 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,22 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http
ctx := r.Context()
c.LogAction(ctx, "create_presign_multipart_upload", r, repository, branch, "")

// check if api is supported
storageConfig := c.getStorageConfig()
if !swag.BoolValue(storageConfig.PreSignMultipartUpload) {
writeError(w, r, http.StatusNotImplemented, "presign multipart upload API is not supported")
repo, err := c.Catalog.GetRepository(ctx, repository)
if c.handleAPIError(ctx, w, r, err) {
return
}

repo, err := c.Catalog.GetRepository(ctx, repository)
storageConfig, err := c.getStorageConfig(repo.StorageID)
if c.handleAPIError(ctx, w, r, err) {
return
}

// check if api is supported
if !swag.BoolValue(storageConfig.PreSignMultipartUpload) {
writeError(w, r, http.StatusNotImplemented, "presign multipart upload API is not supported")
return
}

// check if the branch exists - it is still possible for a branch to be deleted later, but we don't want to
// upload to start and fail at the end when the branch was not there in the first place
branchExists, err := c.Catalog.BranchExists(ctx, repository, branch)
Expand Down Expand Up @@ -211,7 +215,7 @@ func (c *Controller) CreatePresignMultipartUpload(w http.ResponseWriter, r *http
return
}

qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, address, block.IdentifierTypeRelative)
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, address, block.IdentifierTypeRelative)
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -267,8 +271,17 @@ func (c *Controller) AbortPresignMultipartUpload(w http.ResponseWriter, r *http.
ctx := r.Context()
c.LogAction(ctx, "abort_presign_multipart_upload", r, repository, branch, "")

repo, err := c.Catalog.GetRepository(ctx, repository)
if c.handleAPIError(ctx, w, r, err) {
return
}

storageConfig, err := c.getStorageConfig(repo.StorageID)
if c.handleAPIError(ctx, w, r, err) {
return
}

// check if api is supported
storageConfig := c.getStorageConfig()
if !swag.BoolValue(storageConfig.PreSignMultipartUpload) {
writeError(w, r, http.StatusNotImplemented, "presign multipart upload API is not supported")
return
Expand All @@ -288,11 +301,6 @@ func (c *Controller) AbortPresignMultipartUpload(w http.ResponseWriter, r *http.
return
}

repo, err := c.Catalog.GetRepository(ctx, repository)
if c.handleAPIError(ctx, w, r, err) {
return
}

// verify physical address
physicalAddress, addressType := normalizePhysicalAddress(repo.StorageNamespace, body.PhysicalAddress)
if addressType != catalog.AddressTypeRelative {
Expand Down Expand Up @@ -327,8 +335,17 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht
ctx := r.Context()
c.LogAction(ctx, "complete_presign_multipart_upload", r, repository, branch, "")

repo, err := c.Catalog.GetRepository(ctx, repository)
if c.handleAPIError(ctx, w, r, err) {
return
}

storageConfig, err := c.getStorageConfig(repo.StorageID)
if c.handleAPIError(ctx, w, r, err) {
return
}

// check if api is supported
storageConfig := c.getStorageConfig()
if !swag.BoolValue(storageConfig.PreSignMultipartUpload) {
writeError(w, r, http.StatusNotImplemented, "presign multipart upload API is not supported")
return
Expand All @@ -353,11 +370,6 @@ func (c *Controller) CompletePresignMultipartUpload(w http.ResponseWriter, r *ht
}

// verify physical address
repo, err := c.Catalog.GetRepository(ctx, repository)
if c.handleAPIError(ctx, w, r, err) {
return
}

physicalAddress, addressType := normalizePhysicalAddress(repo.StorageNamespace, body.PhysicalAddress)
if addressType != catalog.AddressTypeRelative {
writeError(w, r, http.StatusBadRequest, "physical address must be relative to the storage namespace")
Expand Down Expand Up @@ -696,7 +708,7 @@ func (c *Controller) GetPhysicalAddress(w http.ResponseWriter, r *http.Request,
return
}

qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, address, block.IdentifierTypeRelative)
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, address, block.IdentifierTypeRelative)
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -750,7 +762,7 @@ func (c *Controller) LinkPhysicalAddress(w http.ResponseWriter, r *http.Request,
return
}
// write metadata
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, params.Path, block.IdentifierTypeRelative)
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, params.Path, block.IdentifierTypeRelative)
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -1851,7 +1863,11 @@ func (c *Controller) GetConfig(w http.ResponseWriter, r *http.Request) {
}
}

storageCfg := c.getStorageConfig()
// TODO (gilo): is StorageID relevant here?
storageCfg, err := c.getStorageConfig("")
if c.handleAPIError(r.Context(), w, r, err) {
return
}
// TODO (niro): Needs to be populated
storageListCfg := apigen.StorageConfigList{}
versionConfig := c.getVersionConfig()
Expand All @@ -1868,16 +1884,25 @@ func (c *Controller) GetStorageConfig(w http.ResponseWriter, r *http.Request) {
return
}

writeResponse(w, r, http.StatusOK, c.getStorageConfig())
// TODO (gilo): is StorageID relevant here?
storageConfig, err := c.getStorageConfig("")
if c.handleAPIError(r.Context(), w, r, err) {
return
}

writeResponse(w, r, http.StatusOK, storageConfig)
}

func (c *Controller) getStorageConfig() apigen.StorageConfig {
info := c.BlockAdapter.GetStorageNamespaceInfo()
func (c *Controller) getStorageConfig(storageID string) (apigen.StorageConfig, error) {
info, err := c.BlockAdapter.GetStorageNamespaceInfo(storageID)
if err != nil {
return apigen.StorageConfig{}, err
}
defaultNamespacePrefix := swag.String(info.DefaultNamespacePrefix)
if c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix != nil {
defaultNamespacePrefix = c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix
}
return apigen.StorageConfig{
storageConfig := apigen.StorageConfig{
BlockstoreType: c.Config.GetBaseConfig().Blockstore.Type,
BlockstoreNamespaceValidityRegex: info.ValidityRegex,
BlockstoreNamespaceExample: info.Example,
Expand All @@ -1888,6 +1913,7 @@ func (c *Controller) getStorageConfig() apigen.StorageConfig {
ImportValidityRegex: info.ImportValidityRegex,
PreSignMultipartUpload: swag.Bool(info.PreSignSupportMultipart),
}
return storageConfig, nil
}

func (c *Controller) HealthCheck(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -1971,7 +1997,7 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
c.LogAction(ctx, "repo_sample_data", r, body.Name, "", "")
}

if err := c.validateStorageNamespace(storageNamespace); err != nil {
if err := c.validateStorageNamespace(storageID, storageNamespace); err != nil {
writeError(w, r, http.StatusBadRequest, err)
return
}
Expand Down Expand Up @@ -2075,8 +2101,12 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
writeResponse(w, r, http.StatusCreated, response)
}

func (c *Controller) validateStorageNamespace(storageNamespace string) error {
validRegex := c.BlockAdapter.GetStorageNamespaceInfo().ValidityRegex
func (c *Controller) validateStorageNamespace(storageID, storageNamespace string) error {
namespaceInfo, err := c.BlockAdapter.GetStorageNamespaceInfo(storageID)
if err != nil {
return fmt.Errorf("failed to get storage namespace info: %w", err)
}
validRegex := namespaceInfo.ValidityRegex
storagePrefixRegex, err := regexp.Compile(validRegex)
if err != nil {
return fmt.Errorf("failed to compile validity regex %s: %w", validRegex, block.ErrInvalidNamespace)
Expand Down Expand Up @@ -3312,7 +3342,7 @@ func (c *Controller) UploadObject(w http.ResponseWriter, r *http.Request, reposi
identifierType = block.IdentifierTypeRelative
}

qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, blob.PhysicalAddress, identifierType)
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, blob.PhysicalAddress, identifierType)
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -3348,13 +3378,17 @@ func (c *Controller) StageObject(w http.ResponseWriter, r *http.Request, body ap
return
}
// write metadata
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, body.PhysicalAddress, block.IdentifierTypeFull)
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, body.PhysicalAddress, block.IdentifierTypeFull)
if c.handleAPIError(ctx, w, r, err) {
return
}

// see what storage type this is and whether it fits our configuration
uriRegex := c.BlockAdapter.GetStorageNamespaceInfo().ValidityRegex
namespaceInfo, err := c.BlockAdapter.GetStorageNamespaceInfo(repo.StorageID)
if c.handleAPIError(ctx, w, r, err) {
return
}
uriRegex := namespaceInfo.ValidityRegex
if match, err := regexp.MatchString(uriRegex, body.PhysicalAddress); err != nil || !match {
writeError(w, r, http.StatusBadRequest, fmt.Sprintf("physical address is not valid for block adapter: %s",
c.BlockAdapter.BlockstoreType(),
Expand Down Expand Up @@ -3453,7 +3487,7 @@ func (c *Controller) CopyObject(w http.ResponseWriter, r *http.Request, body api
return
}

qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, entry.PhysicalAddress, block.IdentifierTypeRelative)
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, entry.PhysicalAddress, block.IdentifierTypeRelative)
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -4177,7 +4211,7 @@ func (c *Controller) CreateSymlinkFile(w http.ResponseWriter, r *http.Request, r
}
// loop all entries enter to map[path] physicalAddress
for _, entry := range entries {
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, entry.PhysicalAddress, entry.AddressType.ToIdentifierType())
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, entry.PhysicalAddress, entry.AddressType.ToIdentifierType())
if err != nil {
writeError(w, r, http.StatusInternalServerError, fmt.Sprintf("error while resolving address: %s", err))
return
Expand Down Expand Up @@ -4610,7 +4644,7 @@ func (c *Controller) ListObjects(w http.ResponseWriter, r *http.Request, reposit

objList := make([]apigen.ObjectStats, 0, len(res))
for _, entry := range res {
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, entry.PhysicalAddress, entry.AddressType.ToIdentifierType())
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, entry.PhysicalAddress, entry.AddressType.ToIdentifierType())
if err != nil {
writeError(w, r, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -4708,7 +4742,7 @@ func (c *Controller) StatObject(w http.ResponseWriter, r *http.Request, reposito
return
}

qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageNamespace, entry.PhysicalAddress, entry.AddressType.ToIdentifierType())
qk, err := c.BlockAdapter.ResolveNamespace(repo.StorageID, repo.StorageNamespace, entry.PhysicalAddress, entry.AddressType.ToIdentifierType())
if c.handleAPIError(ctx, w, r, err) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ type Adapter interface {

BlockstoreType() string
BlockstoreMetadata(ctx context.Context) (*BlockstoreMetadata, error)
GetStorageNamespaceInfo() StorageNamespaceInfo
ResolveNamespace(storageNamespace, key string, identifierType IdentifierType) (QualifiedKey, error)
GetStorageNamespaceInfo(storageID string) (StorageNamespaceInfo, error)
ResolveNamespace(storageID, storageNamespace, key string, identifierType IdentifierType) (QualifiedKey, error)

// GetRegion storageID is not actively used, and it's here mainly for completeness
GetRegion(ctx context.Context, storageID, storageNamespace string) (string, error)
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
return completeMultipart(ctx, multipartList.Part, *containerURL, qualifiedKey.BlobURL)
}

func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeAzure)

info.ImportValidityRegex = fmt.Sprintf(`^https?://[a-z0-9_-]+\.%s`, a.clientCache.params.Domain)
Expand All @@ -619,10 +619,10 @@ func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
if !(a.disablePreSignedUI || a.disablePreSigned) {
info.PreSignSupportUI = true
}
return info
return info, nil
}

func (a *Adapter) ResolveNamespace(storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
return block.DefaultResolveNamespace(storageNamespace, key, identifierType)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/block/azure/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestAdapterNamespace(t *testing.T) {
}
require.NoError(t, err, "create new adapter")

namespaceInfo := adapter.GetStorageNamespaceInfo()
namespaceInfo, _ := adapter.GetStorageNamespaceInfo("")
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/block/blocktest/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str
},
}
for _, tt := range cases {
qk, err := adapter.ResolveNamespace(storageNamespace, filepath.Join(testPrefix, tt.prefix), block.IdentifierTypeRelative)
qk, err := adapter.ResolveNamespace("", storageNamespace, filepath.Join(testPrefix, tt.prefix), block.IdentifierTypeRelative)
require.NoError(t, err)
uri, err := url.Parse(qk.Format())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/blocktest/basic_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace str
t.Errorf("Remove() error = %v, wantErr %v", err, tt.wantErr)
}

qk, err := adapter.ResolveNamespace(storageNamespace, tt.name, block.IdentifierTypeRelative)
qk, err := adapter.ResolveNamespace("", storageNamespace, tt.name, block.IdentifierTypeRelative)
require.NoError(t, err)

tree := dumpPathTree(t, ctx, adapter, qk)
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,19 +658,19 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada
return nil, block.ErrOperationNotSupported
}

func (a *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeGS)
if a.disablePreSigned {
info.PreSignSupport = false
}
if !(a.disablePreSignedUI || a.disablePreSigned) {
info.PreSignSupportUI = true
}
return info
return info, nil
}

func (a *Adapter) extractParamsFromObj(obj block.ObjectPointer) (string, string, error) {
qk, err := a.ResolveNamespace(obj.StorageNamespace, obj.Identifier, obj.IdentifierType)
qk, err := a.ResolveNamespace(obj.StorageID, obj.StorageNamespace, obj.Identifier, obj.IdentifierType)
if err != nil {
return "", "", err
}
Expand All @@ -682,7 +682,7 @@ func (a *Adapter) extractParamsFromObj(obj block.ObjectPointer) (string, string,
return bucket, key, nil
}

func (a *Adapter) ResolveNamespace(storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
qualifiedKey, err := block.DefaultResolveNamespace(storageNamespace, key, identifierType)
if err != nil {
return qualifiedKey, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/block/gs/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestAdapterNamespace(t *testing.T) {
require.NoError(t, adapter.Close())
}()

expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo().ValidityRegex)
namespaceInfo, _ := adapter.GetStorageNamespaceInfo("")
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
require.NoError(t, err)

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,15 +545,15 @@ func (l *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada
return nil, block.ErrOperationNotSupported
}

func (l *Adapter) GetStorageNamespaceInfo() block.StorageNamespaceInfo {
func (l *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeLocal)
info.PreSignSupport = false
info.DefaultNamespacePrefix = DefaultNamespacePrefix
info.ImportSupport = l.importEnabled
return info
return info, nil
}

func (l *Adapter) ResolveNamespace(storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
func (l *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
qk, err := block.DefaultResolveNamespace(storageNamespace, key, identifierType)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/block/local/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func TestAdapterNamespace(t *testing.T) {
localPath := path.Join(tmpDir, "lakefs")
adapter, err := local.NewAdapter(localPath, local.WithRemoveEmptyDir(false))
require.NoError(t, err, "create new adapter")
expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo().ValidityRegex)
namespaceInfo, err := adapter.GetStorageNamespaceInfo("")
require.NoError(t, err)
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
require.NoError(t, err)

tests := []struct {
Expand Down
Loading
Loading