Skip to content

Commit

Permalink
StorageConfigList response changes
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Jan 30, 2025
1 parent 8ec58b3 commit 69ced6e
Show file tree
Hide file tree
Showing 22 changed files with 237 additions and 187 deletions.
2 changes: 1 addition & 1 deletion cmd/lakefs/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newConfig() (config.Config, error) {
if name == config.QuickstartConfiguration {
validateQuickstartEnv(cfg.GetBaseConfig())
}
return cfg.GetBaseConfig(), nil
return cfg, nil
}

func loadConfig() config.Config {
Expand Down
2 changes: 1 addition & 1 deletion modules/block/factory/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c config.Config) (block.Adapter, error) {
adapter, err := factory.BuildBlockAdapter(ctx, statsCollector, c.GetBaseConfig())
adapter, err := factory.BuildBlockAdapter(ctx, statsCollector, c.StorageConfig().GetStorageByID(""))
if err != nil {
return nil, err
}
Expand Down
95 changes: 58 additions & 37 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,12 @@ func (c *Controller) LinkPhysicalAddress(w http.ResponseWriter, r *http.Request,
ifAbsent = true
}

blockStoreType := c.BlockAdapter.BlockstoreType()
storage := c.Config.StorageConfig().GetStorageByID(repo.StorageID)
if storage == nil {
c.handleAPIError(ctx, w, r, fmt.Errorf("no storage namespace info found for id: %s: %w", repo.StorageID, block.ErrInvalidAddress))
return
}
blockStoreType := storage.BlockstoreType()
expectedType := qk.GetStorageType().BlockstoreType()
if expectedType != blockStoreType {
c.Logger.WithContext(ctx).WithFields(logging.Fields{
Expand Down Expand Up @@ -1863,15 +1868,10 @@ func (c *Controller) GetConfig(w http.ResponseWriter, r *http.Request) {
}
}

// TODO (gilo): is StorageID relevant here?
storageCfg, err := c.getStorageConfig("")
if c.handleAPIError(r.Context(), w, r, err) {
return
}
// TODO (niro): Needs to be populated
storageListCfg := apigen.StorageConfigList{}
storageCfg, _ := c.getStorageConfig(config.SingleBlockstoreID)
storageListCfg := c.getStorageConfigList()
versionConfig := c.getVersionConfig()
writeResponse(w, r, http.StatusOK, apigen.Config{StorageConfig: &storageCfg, VersionConfig: &versionConfig, StorageConfigList: &storageListCfg})
writeResponse(w, r, http.StatusOK, apigen.Config{StorageConfig: storageCfg, VersionConfig: &versionConfig, StorageConfigList: &storageListCfg})
}

func (c *Controller) GetStorageConfig(w http.ResponseWriter, r *http.Request) {
Expand All @@ -1884,36 +1884,53 @@ func (c *Controller) GetStorageConfig(w http.ResponseWriter, r *http.Request) {
return
}

// TODO (gilo): is StorageID relevant here?
storageConfig, err := c.getStorageConfig("")
if c.handleAPIError(r.Context(), w, r, err) {
return
}

writeResponse(w, r, http.StatusOK, storageConfig)
storageCfg, _ := c.getStorageConfig(config.SingleBlockstoreID)
writeResponse(w, r, http.StatusOK, storageCfg)
}

func (c *Controller) getStorageConfig(storageID string) (apigen.StorageConfig, error) {
info, err := c.BlockAdapter.GetStorageNamespaceInfo(storageID)
if err != nil {
return apigen.StorageConfig{}, err
func (c *Controller) getStorageConfig(storageID string) (*apigen.StorageConfig, error) {
storage := c.Config.StorageConfig().GetStorageByID(storageID).(*config.Blockstore)
if storage == nil {
return nil, config.ErrNoStorageConfig
}
info := c.BlockAdapter.GetStorageNamespaceInfo(storageID)
if info == nil {
c.Logger.Error("no storage namespace info found for id: %s", storageID)
return nil, config.ErrNoStorageConfig
}
defaultNamespacePrefix := swag.String(info.DefaultNamespacePrefix)
if c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix != nil {
defaultNamespacePrefix = c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix
if storage.DefaultNamespacePrefix != nil {
defaultNamespacePrefix = storage.DefaultNamespacePrefix
}
storageConfig := apigen.StorageConfig{
BlockstoreType: c.Config.GetBaseConfig().Blockstore.Type,
BlockstoreNamespaceValidityRegex: info.ValidityRegex,
return &apigen.StorageConfig{
BlockstoreDescription: swag.String(storage.BlockstoreDescription()),
BlockstoreExtras: &apigen.StorageConfig_BlockstoreExtras{
AdditionalProperties: storage.GetBlockstoreExtras(),
},
BlockstoreType: storage.Type,
BlockstoreNamespaceValidityRegex: info.DefaultNamespacePrefix,
BlockstoreNamespaceExample: info.Example,
DefaultNamespacePrefix: defaultNamespacePrefix,
PreSignSupport: info.PreSignSupport,
PreSignSupportUi: info.PreSignSupportUI,
ImportSupport: info.ImportSupport,
ImportValidityRegex: info.ImportValidityRegex,
PreSignMultipartUpload: swag.Bool(info.PreSignSupportMultipart),
}, nil
}

func (c *Controller) getStorageConfigList() apigen.StorageConfigList {
configList := apigen.StorageConfigList{}
for _, id := range c.Config.StorageConfig().GetStorageIDs() {
info, err := c.getStorageConfig(id)
if info == nil {
c.Logger.WithError(err).Error("no storage config found for id: %s", id)
continue
}

configList = append(configList, *info)
}
return storageConfig, nil
return configList
}

func (c *Controller) HealthCheck(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -2026,7 +2043,7 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
retErr = err
reason = "bad_url"
case errors.Is(err, block.ErrInvalidAddress):
retErr = fmt.Errorf("%w, must match: %s", err, c.BlockAdapter.BlockstoreType())
retErr = fmt.Errorf("%w, must match: %s", err, c.Config.StorageConfig().GetStorageByID(storageID).BlockstoreType())
reason = "invalid_namespace"
case errors.Is(err, ErrStorageNamespaceInUse):
retErr = err
Expand Down Expand Up @@ -2102,11 +2119,11 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
}

func (c *Controller) validateStorageNamespace(storageID, storageNamespace string) error {
namespaceInfo, err := c.BlockAdapter.GetStorageNamespaceInfo(storageID)
if err != nil {
return fmt.Errorf("failed to get storage namespace info: %w", err)
info := c.BlockAdapter.GetStorageNamespaceInfo(storageID)
if info == nil {
return fmt.Errorf("no storage namespace info found for id %s: %w", storageID, config.ErrNoStorageConfig)
}
validRegex := namespaceInfo.ValidityRegex
validRegex := info.ValidityRegex
storagePrefixRegex, err := regexp.Compile(validRegex)
if err != nil {
return fmt.Errorf("failed to compile validity regex %s: %w", validRegex, block.ErrInvalidNamespace)
Expand Down Expand Up @@ -2754,7 +2771,8 @@ func (c *Controller) handleAPIErrorCallback(ctx context.Context, w http.Response
case errors.Is(err, graveler.ErrNotFound),
errors.Is(err, actions.ErrNotFound),
errors.Is(err, auth.ErrNotFound),
errors.Is(err, kv.ErrNotFound):
errors.Is(err, kv.ErrNotFound),
errors.Is(err, config.ErrNoStorageConfig):
log.Debug("Not found")
cb(w, r, http.StatusNotFound, err)

Expand Down Expand Up @@ -3384,14 +3402,17 @@ func (c *Controller) StageObject(w http.ResponseWriter, r *http.Request, body ap
}

// see what storage type this is and whether it fits our configuration
namespaceInfo, err := c.BlockAdapter.GetStorageNamespaceInfo(repo.StorageID)
if c.handleAPIError(ctx, w, r, err) {
info := c.BlockAdapter.GetStorageNamespaceInfo(repo.StorageID)
if info == nil {
writeError(w, r, http.StatusNotFound, fmt.Sprintf("no storage namespace info for storage id: %s",
repo.StorageID,
))
return
}
uriRegex := namespaceInfo.ValidityRegex
uriRegex := info.ValidityRegex
if match, err := regexp.MatchString(uriRegex, body.PhysicalAddress); err != nil || !match {
writeError(w, r, http.StatusBadRequest, fmt.Sprintf("physical address is not valid for block adapter: %s",
c.BlockAdapter.BlockstoreType(),
c.Config.StorageConfig().GetStorageByID(repo.StorageID).BlockstoreType(),
))
return
}
Expand Down Expand Up @@ -5179,7 +5200,7 @@ func (c *Controller) SetupCommPrefs(w http.ResponseWriter, r *http.Request, body
InstallationID: installationID,
FeatureUpdates: commPrefs.FeatureUpdates,
SecurityUpdates: commPrefs.SecurityUpdates,
BlockstoreType: c.Config.GetBaseConfig().BlockstoreType(),
BlockstoreType: c.BlockAdapter.BlockstoreType(),
}
// collect comm prefs
go c.Collector.CollectCommPrefs(commPrefsED)
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ type Adapter interface {

BlockstoreType() string
BlockstoreMetadata(ctx context.Context) (*BlockstoreMetadata, error)
GetStorageNamespaceInfo(storageID string) (StorageNamespaceInfo, error)
GetStorageNamespaceInfo(storageID string) *StorageNamespaceInfo
ResolveNamespace(storageID, storageNamespace, key string, identifierType IdentifierType) (QualifiedKey, error)

// GetRegion storageID is not actively used, and it's here mainly for completeness
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP
return completeMultipart(ctx, multipartList.Part, *containerURL, qualifiedKey.BlobURL)
}

func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeAzure)

info.ImportValidityRegex = fmt.Sprintf(`^https?://[a-z0-9_-]+\.%s`, a.clientCache.params.Domain)
Expand All @@ -619,7 +619,7 @@ func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo,
if !(a.disablePreSignedUI || a.disablePreSigned) {
info.PreSignSupportUI = true
}
return info, nil
return &info
}

func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/block/azure/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestAdapterNamespace(t *testing.T) {
}
require.NoError(t, err, "create new adapter")

namespaceInfo, _ := adapter.GetStorageNamespaceInfo("")
namespaceInfo := adapter.GetStorageNamespaceInfo("")
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
require.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion pkg/block/factory/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/treeverse/lakefs/pkg/block/params"
s3a "github.com/treeverse/lakefs/pkg/block/s3"
"github.com/treeverse/lakefs/pkg/block/transient"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/stats"
"golang.org/x/oauth2/google"
Expand All @@ -26,7 +27,7 @@ const (
googleAuthCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)

func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) {
func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c config.AdapterConfig) (block.Adapter, error) {
blockstore := strings.ToLower(c.BlockstoreType())
logging.FromContext(ctx).
WithField("type", blockstore).
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,15 +658,15 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada
return nil, block.ErrOperationNotSupported
}

func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeGS)
if a.disablePreSigned {
info.PreSignSupport = false
}
if !(a.disablePreSignedUI || a.disablePreSigned) {
info.PreSignSupportUI = true
}
return info, nil
return &info
}

func (a *Adapter) extractParamsFromObj(obj block.ObjectPointer) (string, string, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/block/gs/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func TestAdapterNamespace(t *testing.T) {
require.NoError(t, adapter.Close())
}()

namespaceInfo, _ := adapter.GetStorageNamespaceInfo("")
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo("").ValidityRegex)
require.NoError(t, err)

tests := []struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,12 @@ func (l *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada
return nil, block.ErrOperationNotSupported
}

func (l *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
func (l *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeLocal)
info.PreSignSupport = false
info.DefaultNamespacePrefix = DefaultNamespacePrefix
info.ImportSupport = l.importEnabled
return info, nil
return &info
}

func (l *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/block/local/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ func TestAdapterNamespace(t *testing.T) {
localPath := path.Join(tmpDir, "lakefs")
adapter, err := local.NewAdapter(localPath, local.WithRemoveEmptyDir(false))
require.NoError(t, err, "create new adapter")
namespaceInfo, err := adapter.GetStorageNamespaceInfo("")
require.NoError(t, err)
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo("").ValidityRegex)
require.NoError(t, err)

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,14 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada
return nil, block.ErrOperationNotSupported
}

func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeMem)
info.PreSignSupport = false
info.ImportSupport = false
return info, nil
return &info
}

func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
func (a *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
return block.DefaultResolveNamespace(storageNamespace, key, identifierType)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/block/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (m *MetricsAdapter) BlockstoreMetadata(ctx context.Context) (*BlockstoreMet
return m.adapter.BlockstoreMetadata(ctx)
}

func (m *MetricsAdapter) GetStorageNamespaceInfo(storageID string) (StorageNamespaceInfo, error) {
func (m *MetricsAdapter) GetStorageNamespaceInfo(storageID string) *StorageNamespaceInfo {
return m.adapter.GetStorageNamespaceInfo(storageID)
}

Expand Down
9 changes: 0 additions & 9 deletions pkg/block/params/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@ import (
"time"
)

// AdapterConfig configures a block adapter.
type AdapterConfig interface {
BlockstoreType() string
BlockstoreLocalParams() (Local, error)
BlockstoreS3Params() (S3, error)
BlockstoreGSParams() (GS, error)
BlockstoreAzureParams() (Azure, error)
}

type Mem struct{}

type Local struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMeta
return &block.BlockstoreMetadata{Region: &region}, nil
}

func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeS3)
if a.disablePreSigned {
info.PreSignSupport = false
Expand All @@ -874,7 +874,7 @@ func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo,
if !a.disablePreSignedMultipart && info.PreSignSupport {
info.PreSignSupportMultipart = true
}
return info, nil
return &info
}

func resolveNamespace(obj block.ObjectPointer) (block.CommonQualifiedKey, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/block/s3/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func TestS3AdapterPresignedOverride(t *testing.T) {
// TestAdapterNamespace tests the namespace validity regex with various paths
func TestAdapterNamespace(t *testing.T) {
adapter := getS3BlockAdapter(t, nil)
namespaceInfo, _ := adapter.GetStorageNamespaceInfo("")
expr, err := regexp.Compile(namespaceInfo.ValidityRegex)
expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo("").ValidityRegex)
require.NoError(t, err)

tests := []struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/transient/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada
return nil, block.ErrOperationNotSupported
}

func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) {
func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo {
info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeTransient)
info.PreSignSupport = false
info.PreSignSupportUI = false
info.ImportSupport = false
return info, nil
return &info
}

func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,10 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
cancelFn()
return nil, fmt.Errorf("build block adapter: %w", err)
}
// TODO (niro): This part will break using multiple blockstores. We should change the catalog logic to get the walker from the adapter
if cfg.WalkerFactory == nil {
// TODO(niro): Walkfer factory should be removed from catalog. This is a WA which relies on Blockstore configuration
cfg.WalkerFactory = store.NewFactory(cfg.Config.GetBaseConfig())
cfg.WalkerFactory = store.NewFactory(&cfg.Config.GetBaseConfig().Blockstore)
}

tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config.GetBaseConfig(), adapter)
Expand Down
Loading

0 comments on commit 69ced6e

Please sign in to comment.