Skip to content

Commit

Permalink
StorageConfigList response changes
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Jan 28, 2025
1 parent f2b9264 commit bafe34f
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 145 deletions.
2 changes: 1 addition & 1 deletion modules/block/factory/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c config.Config) (block.Adapter, error) {
adapter, err := factory.BuildBlockAdapter(ctx, statsCollector, c.GetBaseConfig())
adapter, err := factory.BuildBlockAdapter(ctx, statsCollector, c.StorageConfig().GetStorageByID(""))
if err != nil {
return nil, err
}
Expand Down
72 changes: 52 additions & 20 deletions pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/treeverse/lakefs/pkg/auth/setup"
"github.com/treeverse/lakefs/pkg/authentication"
"github.com/treeverse/lakefs/pkg/block"
blockfactory "github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/cloud"
"github.com/treeverse/lakefs/pkg/config"
Expand Down Expand Up @@ -765,7 +766,7 @@ func (c *Controller) LinkPhysicalAddress(w http.ResponseWriter, r *http.Request,
ifAbsent = true
}

blockStoreType := c.BlockAdapter.BlockstoreType()
blockStoreType := c.Config.StorageConfig().GetStorageByID(repo.StorageID).BlockstoreType()
expectedType := qk.GetStorageType().BlockstoreType()
if expectedType != blockStoreType {
c.Logger.WithContext(ctx).WithFields(logging.Fields{
Expand Down Expand Up @@ -1852,8 +1853,7 @@ func (c *Controller) GetConfig(w http.ResponseWriter, r *http.Request) {
}

storageCfg := c.getStorageConfig()
// TODO (niro): Needs to be populated
storageListCfg := apigen.StorageConfigList{}
storageListCfg := c.getStorageConfigList()
versionConfig := c.getVersionConfig()
writeResponse(w, r, http.StatusOK, apigen.Config{StorageConfig: &storageCfg, VersionConfig: &versionConfig, StorageConfigList: &storageListCfg})
}
Expand All @@ -1872,22 +1872,54 @@ func (c *Controller) GetStorageConfig(w http.ResponseWriter, r *http.Request) {
}

func (c *Controller) getStorageConfig() apigen.StorageConfig {
info := c.BlockAdapter.GetStorageNamespaceInfo()
defaultNamespacePrefix := swag.String(info.DefaultNamespacePrefix)
if c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix != nil {
defaultNamespacePrefix = c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix
}
storageConfig := block.GetStorageConfig(c.BlockAdapter, c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix)
return apigen.StorageConfig{
BlockstoreType: c.Config.GetBaseConfig().Blockstore.Type,
BlockstoreNamespaceValidityRegex: info.ValidityRegex,
BlockstoreNamespaceExample: info.Example,
DefaultNamespacePrefix: defaultNamespacePrefix,
PreSignSupport: info.PreSignSupport,
PreSignSupportUi: info.PreSignSupportUI,
ImportSupport: info.ImportSupport,
ImportValidityRegex: info.ImportValidityRegex,
PreSignMultipartUpload: swag.Bool(info.PreSignSupportMultipart),
BlockstoreType: storageConfig.Type,
BlockstoreNamespaceValidityRegex: storageConfig.NamespaceValidityRegex,
BlockstoreNamespaceExample: storageConfig.NamespaceExample,
DefaultNamespacePrefix: swag.String(storageConfig.DefaultNamespacePrefix),
PreSignSupport: storageConfig.PreSignSupport,
PreSignSupportUi: storageConfig.PreSignSupportUI,
ImportSupport: storageConfig.ImportSupport,
ImportValidityRegex: storageConfig.ImportValidityRegex,
PreSignMultipartUpload: swag.Bool(storageConfig.PreSignMultipartUpload),
}
}

func (c *Controller) getStorageConfigList() apigen.StorageConfigList {
configList := apigen.StorageConfigList{}
storageConfig := c.Config.StorageConfig()
for _, id := range c.Config.StorageConfig().GetStorageIDs() {
storage := storageConfig.GetStorageByID(id)
if storage == nil {
c.Logger.Error("no storage found for id: %s", id)
continue
}
adapter, err := blockfactory.BuildBlockAdapter(context.Background(), nil, storage)
if err != nil {
c.Logger.WithError(err).WithField("storage_id", id).
Warn("Failed to initialize adapter")
continue
}
info := block.GetStorageConfig(adapter, storage.GetDefaultNamespacePrefix())
configList = append(configList, apigen.StorageConfig{
BlockstoreDescription: swag.String(storage.BlockstoreDescription()),
BlockstoreExtras: &apigen.StorageConfig_BlockstoreExtras{
AdditionalProperties: storage.GetBlockstoreExtras(),
},
BlockstoreId: swag.String(id),
BlockstoreNamespaceValidityRegex: info.NamespaceValidityRegex,
BlockstoreNamespaceExample: info.NamespaceExample,
BlockstoreType: info.Type,
DefaultNamespacePrefix: swag.String(info.DefaultNamespacePrefix),
ImportSupport: info.ImportSupport,
ImportValidityRegex: info.ImportValidityRegex,
PreSignMultipartUpload: swag.Bool(info.PreSignMultipartUpload),
PreSignSupport: info.PreSignSupport,
PreSignSupportUi: info.PreSignSupportUI,
})
}
return configList
}

func (c *Controller) HealthCheck(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -2000,7 +2032,7 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo
retErr = err
reason = "bad_url"
case errors.Is(err, block.ErrInvalidAddress):
retErr = fmt.Errorf("%w, must match: %s", err, c.BlockAdapter.BlockstoreType())
retErr = fmt.Errorf("%w, must match: %s", err, c.Config.StorageConfig().GetStorageByID(storageID).BlockstoreType())
reason = "invalid_namespace"
case errors.Is(err, ErrStorageNamespaceInUse):
retErr = err
Expand Down Expand Up @@ -3357,7 +3389,7 @@ func (c *Controller) StageObject(w http.ResponseWriter, r *http.Request, body ap
uriRegex := c.BlockAdapter.GetStorageNamespaceInfo().ValidityRegex
if match, err := regexp.MatchString(uriRegex, body.PhysicalAddress); err != nil || !match {
writeError(w, r, http.StatusBadRequest, fmt.Sprintf("physical address is not valid for block adapter: %s",
c.BlockAdapter.BlockstoreType(),
c.Config.StorageConfig().GetStorageByID(repo.StorageID).BlockstoreType(),
))
return
}
Expand Down Expand Up @@ -5145,7 +5177,7 @@ func (c *Controller) SetupCommPrefs(w http.ResponseWriter, r *http.Request, body
InstallationID: installationID,
FeatureUpdates: commPrefs.FeatureUpdates,
SecurityUpdates: commPrefs.SecurityUpdates,
BlockstoreType: c.Config.GetBaseConfig().BlockstoreType(),
BlockstoreType: c.BlockAdapter.BlockstoreType(),
}
// collect comm prefs
go c.Collector.CollectCommPrefs(commPrefsED)
Expand Down
31 changes: 31 additions & 0 deletions pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ func (r *PutResponse) GetMtime() time.Time {
return time.Now()
}

type StorageConfig struct {
ID string
Type string
NamespaceValidityRegex string
NamespaceExample string
DefaultNamespacePrefix string
PreSignSupport bool
PreSignSupportUI bool
ImportSupport bool
ImportValidityRegex string
PreSignMultipartUpload bool
}

// Adapter abstract Storage Adapter for persistence of version controlled data. The methods generally map to S3 API methods
// - Generally some type of Object Storage
// - Can also be block storage or even in-memory
Expand Down Expand Up @@ -204,3 +217,21 @@ type Adapter interface {

RuntimeStats() map[string]string
}

func GetStorageConfig(adapter Adapter, defaultNamespacePrefix *string) StorageConfig {
info := adapter.GetStorageNamespaceInfo()
if defaultNamespacePrefix != nil {
info.DefaultNamespacePrefix = *defaultNamespacePrefix
}
return StorageConfig{
Type: adapter.BlockstoreType(),
NamespaceValidityRegex: info.ValidityRegex,
NamespaceExample: info.Example,
DefaultNamespacePrefix: info.DefaultNamespacePrefix,
PreSignSupport: info.PreSignSupport,
PreSignSupportUI: info.PreSignSupportUI,
ImportSupport: info.ImportSupport,
ImportValidityRegex: info.ImportValidityRegex,
PreSignMultipartUpload: info.PreSignSupportMultipart,
}
}
3 changes: 2 additions & 1 deletion pkg/block/factory/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/treeverse/lakefs/pkg/block/params"
s3a "github.com/treeverse/lakefs/pkg/block/s3"
"github.com/treeverse/lakefs/pkg/block/transient"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/stats"
"golang.org/x/oauth2/google"
Expand All @@ -26,7 +27,7 @@ const (
googleAuthCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
)

func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) {
func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c config.AdapterConfig) (block.Adapter, error) {
blockstore := strings.ToLower(c.BlockstoreType())
logging.FromContext(ctx).
WithField("type", blockstore).
Expand Down
9 changes: 0 additions & 9 deletions pkg/block/params/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@ import (
"time"
)

// AdapterConfig configures a block adapter.
type AdapterConfig interface {
BlockstoreType() string
BlockstoreLocalParams() (Local, error)
BlockstoreS3Params() (S3, error)
BlockstoreGSParams() (GS, error)
BlockstoreAzureParams() (Azure, error)
}

type Mem struct{}

type Local struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"github.com/hashicorp/go-multierror"
lru "github.com/hnlq715/golang-lru"
"github.com/rs/xid"
blockfactory "github.com/treeverse/lakefs/modules/block/factory"
"github.com/treeverse/lakefs/pkg/batch"
"github.com/treeverse/lakefs/pkg/block"
blockfactory "github.com/treeverse/lakefs/pkg/block/factory"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/graveler/branch"
Expand Down Expand Up @@ -312,8 +312,9 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) {
cancelFn()
return nil, fmt.Errorf("build block adapter: %w", err)
}
// TODO (niro): This part will break using multiple blockstores. We should change the catalog logic to get the walker from the adapter
if cfg.WalkerFactory == nil {
cfg.WalkerFactory = store.NewFactory(cfg.Config)
cfg.WalkerFactory = store.NewFactory(&cfg.Config.Blockstore)
}

tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config, adapter)
Expand Down
Loading

0 comments on commit bafe34f

Please sign in to comment.