From 69ced6ed8b35771ce8792eb8f40f1cc04afe1150 Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Tue, 28 Jan 2025 12:44:31 -0500 Subject: [PATCH] StorageConfigList response changes --- cmd/lakefs/cmd/root.go | 2 +- modules/block/factory/build.go | 2 +- pkg/api/controller.go | 95 +++++++----- pkg/block/adapter.go | 2 +- pkg/block/azure/adapter.go | 4 +- pkg/block/azure/adapter_test.go | 2 +- pkg/block/factory/build.go | 3 +- pkg/block/gs/adapter.go | 4 +- pkg/block/gs/adapter_test.go | 3 +- pkg/block/local/adapter.go | 4 +- pkg/block/local/adapter_test.go | 4 +- pkg/block/mem/adapter.go | 6 +- pkg/block/metrics.go | 2 +- pkg/block/params/block.go | 9 -- pkg/block/s3/adapter.go | 4 +- pkg/block/s3/adapter_test.go | 3 +- pkg/block/transient/adapter.go | 4 +- pkg/catalog/catalog.go | 3 +- pkg/config/config.go | 257 ++++++++++++++++++-------------- pkg/ingest/store/factory.go | 5 +- pkg/stats/metadata.go | 2 +- pkg/testutil/adapter.go | 4 +- 22 files changed, 237 insertions(+), 187 deletions(-) diff --git a/cmd/lakefs/cmd/root.go b/cmd/lakefs/cmd/root.go index bf87214868c..9d1f427e74f 100644 --- a/cmd/lakefs/cmd/root.go +++ b/cmd/lakefs/cmd/root.go @@ -96,7 +96,7 @@ func newConfig() (config.Config, error) { if name == config.QuickstartConfiguration { validateQuickstartEnv(cfg.GetBaseConfig()) } - return cfg.GetBaseConfig(), nil + return cfg, nil } func loadConfig() config.Config { diff --git a/modules/block/factory/build.go b/modules/block/factory/build.go index 2d18d253c16..d1327c11251 100644 --- a/modules/block/factory/build.go +++ b/modules/block/factory/build.go @@ -10,7 +10,7 @@ import ( ) func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c config.Config) (block.Adapter, error) { - adapter, err := factory.BuildBlockAdapter(ctx, statsCollector, c.GetBaseConfig()) + adapter, err := factory.BuildBlockAdapter(ctx, statsCollector, c.StorageConfig().GetStorageByID("")) if err != nil { return nil, err } diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 76c77b7bb86..2e0f9adce40 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -777,7 +777,12 @@ func (c *Controller) LinkPhysicalAddress(w http.ResponseWriter, r *http.Request, ifAbsent = true } - blockStoreType := c.BlockAdapter.BlockstoreType() + storage := c.Config.StorageConfig().GetStorageByID(repo.StorageID) + if storage == nil { + c.handleAPIError(ctx, w, r, fmt.Errorf("no storage namespace info found for id: %s: %w", repo.StorageID, block.ErrInvalidAddress)) + return + } + blockStoreType := storage.BlockstoreType() expectedType := qk.GetStorageType().BlockstoreType() if expectedType != blockStoreType { c.Logger.WithContext(ctx).WithFields(logging.Fields{ @@ -1863,15 +1868,10 @@ func (c *Controller) GetConfig(w http.ResponseWriter, r *http.Request) { } } - // TODO (gilo): is StorageID relevant here? - storageCfg, err := c.getStorageConfig("") - if c.handleAPIError(r.Context(), w, r, err) { - return - } - // TODO (niro): Needs to be populated - storageListCfg := apigen.StorageConfigList{} + storageCfg, _ := c.getStorageConfig(config.SingleBlockstoreID) + storageListCfg := c.getStorageConfigList() versionConfig := c.getVersionConfig() - writeResponse(w, r, http.StatusOK, apigen.Config{StorageConfig: &storageCfg, VersionConfig: &versionConfig, StorageConfigList: &storageListCfg}) + writeResponse(w, r, http.StatusOK, apigen.Config{StorageConfig: storageCfg, VersionConfig: &versionConfig, StorageConfigList: &storageListCfg}) } func (c *Controller) GetStorageConfig(w http.ResponseWriter, r *http.Request) { @@ -1884,27 +1884,31 @@ func (c *Controller) GetStorageConfig(w http.ResponseWriter, r *http.Request) { return } - // TODO (gilo): is StorageID relevant here? - storageConfig, err := c.getStorageConfig("") - if c.handleAPIError(r.Context(), w, r, err) { - return - } - - writeResponse(w, r, http.StatusOK, storageConfig) + storageCfg, _ := c.getStorageConfig(config.SingleBlockstoreID) + writeResponse(w, r, http.StatusOK, storageCfg) } -func (c *Controller) getStorageConfig(storageID string) (apigen.StorageConfig, error) { - info, err := c.BlockAdapter.GetStorageNamespaceInfo(storageID) - if err != nil { - return apigen.StorageConfig{}, err +func (c *Controller) getStorageConfig(storageID string) (*apigen.StorageConfig, error) { + storage := c.Config.StorageConfig().GetStorageByID(storageID).(*config.Blockstore) + if storage == nil { + return nil, config.ErrNoStorageConfig + } + info := c.BlockAdapter.GetStorageNamespaceInfo(storageID) + if info == nil { + c.Logger.Error("no storage namespace info found for id: %s", storageID) + return nil, config.ErrNoStorageConfig } defaultNamespacePrefix := swag.String(info.DefaultNamespacePrefix) - if c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix != nil { - defaultNamespacePrefix = c.Config.GetBaseConfig().Blockstore.DefaultNamespacePrefix + if storage.DefaultNamespacePrefix != nil { + defaultNamespacePrefix = storage.DefaultNamespacePrefix } - storageConfig := apigen.StorageConfig{ - BlockstoreType: c.Config.GetBaseConfig().Blockstore.Type, - BlockstoreNamespaceValidityRegex: info.ValidityRegex, + return &apigen.StorageConfig{ + BlockstoreDescription: swag.String(storage.BlockstoreDescription()), + BlockstoreExtras: &apigen.StorageConfig_BlockstoreExtras{ + AdditionalProperties: storage.GetBlockstoreExtras(), + }, + BlockstoreType: storage.Type, + BlockstoreNamespaceValidityRegex: info.DefaultNamespacePrefix, BlockstoreNamespaceExample: info.Example, DefaultNamespacePrefix: defaultNamespacePrefix, PreSignSupport: info.PreSignSupport, @@ -1912,8 +1916,21 @@ func (c *Controller) getStorageConfig(storageID string) (apigen.StorageConfig, e ImportSupport: info.ImportSupport, ImportValidityRegex: info.ImportValidityRegex, PreSignMultipartUpload: swag.Bool(info.PreSignSupportMultipart), + }, nil +} + +func (c *Controller) getStorageConfigList() apigen.StorageConfigList { + configList := apigen.StorageConfigList{} + for _, id := range c.Config.StorageConfig().GetStorageIDs() { + info, err := c.getStorageConfig(id) + if info == nil { + c.Logger.WithError(err).Error("no storage config found for id: %s", id) + continue + } + + configList = append(configList, *info) } - return storageConfig, nil + return configList } func (c *Controller) HealthCheck(w http.ResponseWriter, r *http.Request) { @@ -2026,7 +2043,7 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo retErr = err reason = "bad_url" case errors.Is(err, block.ErrInvalidAddress): - retErr = fmt.Errorf("%w, must match: %s", err, c.BlockAdapter.BlockstoreType()) + retErr = fmt.Errorf("%w, must match: %s", err, c.Config.StorageConfig().GetStorageByID(storageID).BlockstoreType()) reason = "invalid_namespace" case errors.Is(err, ErrStorageNamespaceInUse): retErr = err @@ -2102,11 +2119,11 @@ func (c *Controller) CreateRepository(w http.ResponseWriter, r *http.Request, bo } func (c *Controller) validateStorageNamespace(storageID, storageNamespace string) error { - namespaceInfo, err := c.BlockAdapter.GetStorageNamespaceInfo(storageID) - if err != nil { - return fmt.Errorf("failed to get storage namespace info: %w", err) + info := c.BlockAdapter.GetStorageNamespaceInfo(storageID) + if info == nil { + return fmt.Errorf("no storage namespace info found for id %s: %w", storageID, config.ErrNoStorageConfig) } - validRegex := namespaceInfo.ValidityRegex + validRegex := info.ValidityRegex storagePrefixRegex, err := regexp.Compile(validRegex) if err != nil { return fmt.Errorf("failed to compile validity regex %s: %w", validRegex, block.ErrInvalidNamespace) @@ -2754,7 +2771,8 @@ func (c *Controller) handleAPIErrorCallback(ctx context.Context, w http.Response case errors.Is(err, graveler.ErrNotFound), errors.Is(err, actions.ErrNotFound), errors.Is(err, auth.ErrNotFound), - errors.Is(err, kv.ErrNotFound): + errors.Is(err, kv.ErrNotFound), + errors.Is(err, config.ErrNoStorageConfig): log.Debug("Not found") cb(w, r, http.StatusNotFound, err) @@ -3384,14 +3402,17 @@ func (c *Controller) StageObject(w http.ResponseWriter, r *http.Request, body ap } // see what storage type this is and whether it fits our configuration - namespaceInfo, err := c.BlockAdapter.GetStorageNamespaceInfo(repo.StorageID) - if c.handleAPIError(ctx, w, r, err) { + info := c.BlockAdapter.GetStorageNamespaceInfo(repo.StorageID) + if info == nil { + writeError(w, r, http.StatusNotFound, fmt.Sprintf("no storage namespace info for storage id: %s", + repo.StorageID, + )) return } - uriRegex := namespaceInfo.ValidityRegex + uriRegex := info.ValidityRegex if match, err := regexp.MatchString(uriRegex, body.PhysicalAddress); err != nil || !match { writeError(w, r, http.StatusBadRequest, fmt.Sprintf("physical address is not valid for block adapter: %s", - c.BlockAdapter.BlockstoreType(), + c.Config.StorageConfig().GetStorageByID(repo.StorageID).BlockstoreType(), )) return } @@ -5179,7 +5200,7 @@ func (c *Controller) SetupCommPrefs(w http.ResponseWriter, r *http.Request, body InstallationID: installationID, FeatureUpdates: commPrefs.FeatureUpdates, SecurityUpdates: commPrefs.SecurityUpdates, - BlockstoreType: c.Config.GetBaseConfig().BlockstoreType(), + BlockstoreType: c.BlockAdapter.BlockstoreType(), } // collect comm prefs go c.Collector.CollectCommPrefs(commPrefsED) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index fa70226f8a0..2a4383d623a 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -196,7 +196,7 @@ type Adapter interface { BlockstoreType() string BlockstoreMetadata(ctx context.Context) (*BlockstoreMetadata, error) - GetStorageNamespaceInfo(storageID string) (StorageNamespaceInfo, error) + GetStorageNamespaceInfo(storageID string) *StorageNamespaceInfo ResolveNamespace(storageID, storageNamespace, key string, identifierType IdentifierType) (QualifiedKey, error) // GetRegion storageID is not actively used, and it's here mainly for completeness diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index 7e43989d5f6..760be24e896 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -606,7 +606,7 @@ func (a *Adapter) CompleteMultiPartUpload(ctx context.Context, obj block.ObjectP return completeMultipart(ctx, multipartList.Part, *containerURL, qualifiedKey.BlobURL) } -func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeAzure) info.ImportValidityRegex = fmt.Sprintf(`^https?://[a-z0-9_-]+\.%s`, a.clientCache.params.Domain) @@ -619,7 +619,7 @@ func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, if !(a.disablePreSignedUI || a.disablePreSigned) { info.PreSignSupportUI = true } - return info, nil + return &info } func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { diff --git a/pkg/block/azure/adapter_test.go b/pkg/block/azure/adapter_test.go index a4d6a3babd0..32a0efa4801 100644 --- a/pkg/block/azure/adapter_test.go +++ b/pkg/block/azure/adapter_test.go @@ -135,7 +135,7 @@ func TestAdapterNamespace(t *testing.T) { } require.NoError(t, err, "create new adapter") - namespaceInfo, _ := adapter.GetStorageNamespaceInfo("") + namespaceInfo := adapter.GetStorageNamespaceInfo("") expr, err := regexp.Compile(namespaceInfo.ValidityRegex) require.NoError(t, err) diff --git a/pkg/block/factory/build.go b/pkg/block/factory/build.go index 641e718a6b1..de4d134f7f0 100644 --- a/pkg/block/factory/build.go +++ b/pkg/block/factory/build.go @@ -15,6 +15,7 @@ import ( "github.com/treeverse/lakefs/pkg/block/params" s3a "github.com/treeverse/lakefs/pkg/block/s3" "github.com/treeverse/lakefs/pkg/block/transient" + "github.com/treeverse/lakefs/pkg/config" "github.com/treeverse/lakefs/pkg/logging" "github.com/treeverse/lakefs/pkg/stats" "golang.org/x/oauth2/google" @@ -26,7 +27,7 @@ const ( googleAuthCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" ) -func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c params.AdapterConfig) (block.Adapter, error) { +func BuildBlockAdapter(ctx context.Context, statsCollector stats.Collector, c config.AdapterConfig) (block.Adapter, error) { blockstore := strings.ToLower(c.BlockstoreType()) logging.FromContext(ctx). WithField("type", blockstore). diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index cfc59f9ea61..8f7d5ceb991 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -658,7 +658,7 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada return nil, block.ErrOperationNotSupported } -func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeGS) if a.disablePreSigned { info.PreSignSupport = false @@ -666,7 +666,7 @@ func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, if !(a.disablePreSignedUI || a.disablePreSigned) { info.PreSignSupportUI = true } - return info, nil + return &info } func (a *Adapter) extractParamsFromObj(obj block.ObjectPointer) (string, string, error) { diff --git a/pkg/block/gs/adapter_test.go b/pkg/block/gs/adapter_test.go index c6bed26be20..252cef86613 100644 --- a/pkg/block/gs/adapter_test.go +++ b/pkg/block/gs/adapter_test.go @@ -36,8 +36,7 @@ func TestAdapterNamespace(t *testing.T) { require.NoError(t, adapter.Close()) }() - namespaceInfo, _ := adapter.GetStorageNamespaceInfo("") - expr, err := regexp.Compile(namespaceInfo.ValidityRegex) + expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo("").ValidityRegex) require.NoError(t, err) tests := []struct { diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index d4aaeeda80a..5896fd2c548 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -545,12 +545,12 @@ func (l *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada return nil, block.ErrOperationNotSupported } -func (l *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (l *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeLocal) info.PreSignSupport = false info.DefaultNamespacePrefix = DefaultNamespacePrefix info.ImportSupport = l.importEnabled - return info, nil + return &info } func (l *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { diff --git a/pkg/block/local/adapter_test.go b/pkg/block/local/adapter_test.go index ac78c91f2c7..a45ee299d9e 100644 --- a/pkg/block/local/adapter_test.go +++ b/pkg/block/local/adapter_test.go @@ -31,9 +31,7 @@ func TestAdapterNamespace(t *testing.T) { localPath := path.Join(tmpDir, "lakefs") adapter, err := local.NewAdapter(localPath, local.WithRemoveEmptyDir(false)) require.NoError(t, err, "create new adapter") - namespaceInfo, err := adapter.GetStorageNamespaceInfo("") - require.NoError(t, err) - expr, err := regexp.Compile(namespaceInfo.ValidityRegex) + expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo("").ValidityRegex) require.NoError(t, err) tests := []struct { diff --git a/pkg/block/mem/adapter.go b/pkg/block/mem/adapter.go index 022627ca425..8d316a09222 100644 --- a/pkg/block/mem/adapter.go +++ b/pkg/block/mem/adapter.go @@ -343,14 +343,14 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada return nil, block.ErrOperationNotSupported } -func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeMem) info.PreSignSupport = false info.ImportSupport = false - return info, nil + return &info } -func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (a *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { return block.DefaultResolveNamespace(storageNamespace, key, identifierType) } diff --git a/pkg/block/metrics.go b/pkg/block/metrics.go index 75eae719f3b..a97b9da7814 100644 --- a/pkg/block/metrics.go +++ b/pkg/block/metrics.go @@ -115,7 +115,7 @@ func (m *MetricsAdapter) BlockstoreMetadata(ctx context.Context) (*BlockstoreMet return m.adapter.BlockstoreMetadata(ctx) } -func (m *MetricsAdapter) GetStorageNamespaceInfo(storageID string) (StorageNamespaceInfo, error) { +func (m *MetricsAdapter) GetStorageNamespaceInfo(storageID string) *StorageNamespaceInfo { return m.adapter.GetStorageNamespaceInfo(storageID) } diff --git a/pkg/block/params/block.go b/pkg/block/params/block.go index 335a259c808..ef7aec14758 100644 --- a/pkg/block/params/block.go +++ b/pkg/block/params/block.go @@ -4,15 +4,6 @@ import ( "time" ) -// AdapterConfig configures a block adapter. -type AdapterConfig interface { - BlockstoreType() string - BlockstoreLocalParams() (Local, error) - BlockstoreS3Params() (S3, error) - BlockstoreGSParams() (GS, error) - BlockstoreAzureParams() (Azure, error) -} - type Mem struct{} type Local struct { diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index c28c1322e46..944c063d16f 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -863,7 +863,7 @@ func (a *Adapter) BlockstoreMetadata(ctx context.Context) (*block.BlockstoreMeta return &block.BlockstoreMetadata{Region: ®ion}, nil } -func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeS3) if a.disablePreSigned { info.PreSignSupport = false @@ -874,7 +874,7 @@ func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, if !a.disablePreSignedMultipart && info.PreSignSupport { info.PreSignSupportMultipart = true } - return info, nil + return &info } func resolveNamespace(obj block.ObjectPointer) (block.CommonQualifiedKey, error) { diff --git a/pkg/block/s3/adapter_test.go b/pkg/block/s3/adapter_test.go index 905f6e6b038..f0e4aa406d7 100644 --- a/pkg/block/s3/adapter_test.go +++ b/pkg/block/s3/adapter_test.go @@ -66,8 +66,7 @@ func TestS3AdapterPresignedOverride(t *testing.T) { // TestAdapterNamespace tests the namespace validity regex with various paths func TestAdapterNamespace(t *testing.T) { adapter := getS3BlockAdapter(t, nil) - namespaceInfo, _ := adapter.GetStorageNamespaceInfo("") - expr, err := regexp.Compile(namespaceInfo.ValidityRegex) + expr, err := regexp.Compile(adapter.GetStorageNamespaceInfo("").ValidityRegex) require.NoError(t, err) tests := []struct { diff --git a/pkg/block/transient/adapter.go b/pkg/block/transient/adapter.go index 1b349dcf2d6..d316b91a826 100644 --- a/pkg/block/transient/adapter.go +++ b/pkg/block/transient/adapter.go @@ -151,12 +151,12 @@ func (a *Adapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMetada return nil, block.ErrOperationNotSupported } -func (a *Adapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo(block.BlockstoreTypeTransient) info.PreSignSupport = false info.PreSignSupportUI = false info.ImportSupport = false - return info, nil + return &info } func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 47faf43a33e..afb1f43424d 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -312,9 +312,10 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { cancelFn() return nil, fmt.Errorf("build block adapter: %w", err) } + // TODO (niro): This part will break using multiple blockstores. We should change the catalog logic to get the walker from the adapter if cfg.WalkerFactory == nil { // TODO(niro): Walkfer factory should be removed from catalog. This is a WA which relies on Blockstore configuration - cfg.WalkerFactory = store.NewFactory(cfg.Config.GetBaseConfig()) + cfg.WalkerFactory = store.NewFactory(&cfg.Config.GetBaseConfig().Blockstore) } tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config.GetBaseConfig(), adapter) diff --git a/pkg/config/config.go b/pkg/config/config.go index a2fd73a95a3..2d48dc5f2fb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -23,6 +23,7 @@ var ( ErrMissingRequiredKeys = fmt.Errorf("%w: missing required keys", ErrBadConfiguration) ErrBadGCPCSEKValue = fmt.Errorf("value of customer-supplied server side encryption is not a valid %d bytes AES key", gcpAESKeyLength) ErrGCPEncryptKeyConflict = errors.New("setting both kms and customer supplied encryption will result failure when reading/writing object") + ErrNoStorageConfig = errors.New("no storage config") ) // UseLocalConfiguration set to true will add defaults that enable a lakeFS run @@ -30,6 +31,8 @@ var ( const ( UseLocalConfiguration = "local-settings" QuickstartConfiguration = "quickstart" + + SingleBlockstoreID = "" ) type OIDC struct { @@ -149,6 +152,17 @@ type ApproximatelyCorrectOwnership struct { Acquire time.Duration `mapstructure:"acquire"` } +// AdapterConfig configures a blockstore adapter. +type AdapterConfig interface { + BlockstoreType() string + BlockstoreDescription() string + BlockstoreLocalParams() (blockparams.Local, error) + BlockstoreS3Params() (blockparams.S3, error) + BlockstoreGSParams() (blockparams.GS, error) + BlockstoreAzureParams() (blockparams.Azure, error) + GetBlockstoreExtras() map[string]string +} + type Blockstore struct { Signing struct { SecretKey SecureString `mapstructure:"secret_key"` @@ -211,12 +225,143 @@ type Blockstore struct { } `mapstructure:"gs"` } +func (b *Blockstore) GetStorageByID(id string) AdapterConfig { + if id != "" { + return nil + } + + return b +} + +func (b *Blockstore) BlockstoreType() string { + return b.Type +} + +func (b *Blockstore) BlockstoreS3Params() (blockparams.S3, error) { + var webIdentity *blockparams.S3WebIdentity + if b.S3.WebIdentity != nil { + webIdentity = &blockparams.S3WebIdentity{ + SessionDuration: b.S3.WebIdentity.SessionDuration, + SessionExpiryWindow: b.S3.WebIdentity.SessionExpiryWindow, + } + } + + var creds blockparams.S3Credentials + if b.S3.Credentials != nil { + creds.AccessKeyID = b.S3.Credentials.AccessKeyID.SecureValue() + creds.SecretAccessKey = b.S3.Credentials.SecretAccessKey.SecureValue() + creds.SessionToken = b.S3.Credentials.SessionToken.SecureValue() + } + + return blockparams.S3{ + Region: b.S3.Region, + Profile: b.S3.Profile, + CredentialsFile: b.S3.CredentialsFile, + Credentials: creds, + MaxRetries: b.S3.MaxRetries, + Endpoint: b.S3.Endpoint, + ForcePathStyle: b.S3.ForcePathStyle, + DiscoverBucketRegion: b.S3.DiscoverBucketRegion, + SkipVerifyCertificateTestOnly: b.S3.SkipVerifyCertificateTestOnly, + ServerSideEncryption: b.S3.ServerSideEncryption, + ServerSideEncryptionKmsKeyID: b.S3.ServerSideEncryptionKmsKeyID, + PreSignedExpiry: b.S3.PreSignedExpiry, + PreSignedEndpoint: b.S3.PreSignedEndpoint, + DisablePreSigned: b.S3.DisablePreSigned, + DisablePreSignedUI: b.S3.DisablePreSignedUI, + DisablePreSignedMultipart: b.S3.DisablePreSignedMultipart, + ClientLogRetries: b.S3.ClientLogRetries, + ClientLogRequest: b.S3.ClientLogRequest, + WebIdentity: webIdentity, + }, nil +} + +func (b *Blockstore) BlockstoreLocalParams() (blockparams.Local, error) { + localPath := b.Local.Path + path, err := homedir.Expand(localPath) + if err != nil { + return blockparams.Local{}, fmt.Errorf("parse blockstore location URI %s: %w", localPath, err) + } + + params := blockparams.Local(*b.Local) + params.Path = path + return params, nil +} + +func (b *Blockstore) BlockstoreGSParams() (blockparams.GS, error) { + var customerSuppliedKey []byte = nil + if b.GS.ServerSideEncryptionCustomerSupplied != "" { + v, err := hex.DecodeString(b.GS.ServerSideEncryptionCustomerSupplied) + if err != nil { + return blockparams.GS{}, err + } + if len(v) != gcpAESKeyLength { + return blockparams.GS{}, ErrBadGCPCSEKValue + } + customerSuppliedKey = v + if b.GS.ServerSideEncryptionKmsKeyID != "" { + return blockparams.GS{}, ErrGCPEncryptKeyConflict + } + } + + credPath, err := homedir.Expand(b.GS.CredentialsFile) + if err != nil { + return blockparams.GS{}, fmt.Errorf("parse GS credentials path '%s': %w", b.GS.CredentialsFile, err) + } + return blockparams.GS{ + CredentialsFile: credPath, + CredentialsJSON: b.GS.CredentialsJSON, + PreSignedExpiry: b.GS.PreSignedExpiry, + DisablePreSigned: b.GS.DisablePreSigned, + DisablePreSignedUI: b.GS.DisablePreSignedUI, + ServerSideEncryptionCustomerSupplied: customerSuppliedKey, + ServerSideEncryptionKmsKeyID: b.GS.ServerSideEncryptionKmsKeyID, + }, nil +} + +func (b *Blockstore) BlockstoreAzureParams() (blockparams.Azure, error) { + if b.Azure.AuthMethod != "" { + logging.ContextUnavailable().Warn("blockstore.azure.auth_method is deprecated. Value is no longer used.") + } + if b.Azure.ChinaCloudDeprecated { + logging.ContextUnavailable().Warn("blockstore.azure.china_cloud is deprecated. Value is no longer used. Please pass Domain = 'blob.core.chinacloudapi.cn'") + b.Azure.Domain = "blob.core.chinacloudapi.cn" + } + return blockparams.Azure{ + StorageAccount: b.Azure.StorageAccount, + StorageAccessKey: b.Azure.StorageAccessKey, + TryTimeout: b.Azure.TryTimeout, + PreSignedExpiry: b.Azure.PreSignedExpiry, + TestEndpointURL: b.Azure.TestEndpointURL, + Domain: b.Azure.Domain, + DisablePreSigned: b.Azure.DisablePreSigned, + DisablePreSignedUI: b.Azure.DisablePreSignedUI, + }, nil +} + +func (b *Blockstore) BlockstoreDescription() string { + return "" +} + +func (b *Blockstore) GetStorageIDs() []string { + return nil +} + +func (b *Blockstore) GetBlockstoreExtras() map[string]string { + return nil +} + type Config interface { GetBaseConfig() *BaseConfig - StorageConfig() interface{} + StorageConfig() StorageConfig Validate() error } +type StorageConfig interface { + GetStorageByID(storageID string) AdapterConfig + GetStorageIDs() []string +} + // BaseConfig - Output struct of configuration, used to validate. If you read a key using a viper accessor // rather than accessing a field of this struct, that key will *not* be validated. So don't // do that. @@ -480,116 +625,10 @@ func (c *BaseConfig) Validate() error { return ValidateBlockstore(&c.Blockstore) } -func (c *BaseConfig) BlockstoreType() string { - return c.Blockstore.Type -} - -func (c *BaseConfig) BlockstoreS3Params() (blockparams.S3, error) { - var webIdentity *blockparams.S3WebIdentity - if c.Blockstore.S3.WebIdentity != nil { - webIdentity = &blockparams.S3WebIdentity{ - SessionDuration: c.Blockstore.S3.WebIdentity.SessionDuration, - SessionExpiryWindow: c.Blockstore.S3.WebIdentity.SessionExpiryWindow, - } - } - - var creds blockparams.S3Credentials - if c.Blockstore.S3.Credentials != nil { - creds.AccessKeyID = c.Blockstore.S3.Credentials.AccessKeyID.SecureValue() - creds.SecretAccessKey = c.Blockstore.S3.Credentials.SecretAccessKey.SecureValue() - creds.SessionToken = c.Blockstore.S3.Credentials.SessionToken.SecureValue() - } - - return blockparams.S3{ - Region: c.Blockstore.S3.Region, - Profile: c.Blockstore.S3.Profile, - CredentialsFile: c.Blockstore.S3.CredentialsFile, - Credentials: creds, - MaxRetries: c.Blockstore.S3.MaxRetries, - Endpoint: c.Blockstore.S3.Endpoint, - ForcePathStyle: c.Blockstore.S3.ForcePathStyle, - DiscoverBucketRegion: c.Blockstore.S3.DiscoverBucketRegion, - SkipVerifyCertificateTestOnly: c.Blockstore.S3.SkipVerifyCertificateTestOnly, - ServerSideEncryption: c.Blockstore.S3.ServerSideEncryption, - ServerSideEncryptionKmsKeyID: c.Blockstore.S3.ServerSideEncryptionKmsKeyID, - PreSignedExpiry: c.Blockstore.S3.PreSignedExpiry, - PreSignedEndpoint: c.Blockstore.S3.PreSignedEndpoint, - DisablePreSigned: c.Blockstore.S3.DisablePreSigned, - DisablePreSignedUI: c.Blockstore.S3.DisablePreSignedUI, - DisablePreSignedMultipart: c.Blockstore.S3.DisablePreSignedMultipart, - ClientLogRetries: c.Blockstore.S3.ClientLogRetries, - ClientLogRequest: c.Blockstore.S3.ClientLogRequest, - WebIdentity: webIdentity, - }, nil -} - -func (c *BaseConfig) BlockstoreLocalParams() (blockparams.Local, error) { - localPath := c.Blockstore.Local.Path - path, err := homedir.Expand(localPath) - if err != nil { - return blockparams.Local{}, fmt.Errorf("parse blockstore location URI %s: %w", localPath, err) - } - - params := blockparams.Local(*c.Blockstore.Local) - params.Path = path - return params, nil -} - const ( gcpAESKeyLength = 32 ) -func (c *BaseConfig) BlockstoreGSParams() (blockparams.GS, error) { - var customerSuppliedKey []byte = nil - if c.Blockstore.GS.ServerSideEncryptionCustomerSupplied != "" { - v, err := hex.DecodeString(c.Blockstore.GS.ServerSideEncryptionCustomerSupplied) - if err != nil { - return blockparams.GS{}, err - } - if len(v) != gcpAESKeyLength { - return blockparams.GS{}, ErrBadGCPCSEKValue - } - customerSuppliedKey = v - if c.Blockstore.GS.ServerSideEncryptionKmsKeyID != "" { - return blockparams.GS{}, ErrGCPEncryptKeyConflict - } - } - - credPath, err := homedir.Expand(c.Blockstore.GS.CredentialsFile) - if err != nil { - return blockparams.GS{}, fmt.Errorf("parse GS credentials path '%s': %w", c.Blockstore.GS.CredentialsFile, err) - } - return blockparams.GS{ - CredentialsFile: credPath, - CredentialsJSON: c.Blockstore.GS.CredentialsJSON, - PreSignedExpiry: c.Blockstore.GS.PreSignedExpiry, - DisablePreSigned: c.Blockstore.GS.DisablePreSigned, - DisablePreSignedUI: c.Blockstore.GS.DisablePreSignedUI, - ServerSideEncryptionCustomerSupplied: customerSuppliedKey, - ServerSideEncryptionKmsKeyID: c.Blockstore.GS.ServerSideEncryptionKmsKeyID, - }, nil -} - -func (c *BaseConfig) BlockstoreAzureParams() (blockparams.Azure, error) { - if c.Blockstore.Azure.AuthMethod != "" { - logging.ContextUnavailable().Warn("blockstore.azure.auth_method is deprecated. Value is no longer used.") - } - if c.Blockstore.Azure.ChinaCloudDeprecated { - logging.ContextUnavailable().Warn("blockstore.azure.china_cloud is deprecated. Value is no longer used. Please pass Domain = 'blob.core.chinacloudapi.cn'") - c.Blockstore.Azure.Domain = "blob.core.chinacloudapi.cn" - } - return blockparams.Azure{ - StorageAccount: c.Blockstore.Azure.StorageAccount, - StorageAccessKey: c.Blockstore.Azure.StorageAccessKey, - TryTimeout: c.Blockstore.Azure.TryTimeout, - PreSignedExpiry: c.Blockstore.Azure.PreSignedExpiry, - TestEndpointURL: c.Blockstore.Azure.TestEndpointURL, - Domain: c.Blockstore.Azure.Domain, - DisablePreSigned: c.Blockstore.Azure.DisablePreSigned, - DisablePreSignedUI: c.Blockstore.Azure.DisablePreSignedUI, - }, nil -} - const ( AuthRBACNone = "none" AuthRBACSimplified = "simplified" @@ -644,6 +683,6 @@ func (c *BaseConfig) GetBaseConfig() *BaseConfig { return c } -func (c *BaseConfig) StorageConfig() interface{} { - return c.Blockstore +func (c *BaseConfig) StorageConfig() StorageConfig { + return &c.Blockstore } diff --git a/pkg/ingest/store/factory.go b/pkg/ingest/store/factory.go index b65c0a981be..a236714c9ea 100644 --- a/pkg/ingest/store/factory.go +++ b/pkg/ingest/store/factory.go @@ -17,6 +17,7 @@ import ( "github.com/treeverse/lakefs/pkg/block/local" "github.com/treeverse/lakefs/pkg/block/params" "github.com/treeverse/lakefs/pkg/block/s3" + "github.com/treeverse/lakefs/pkg/config" ) var ErrNotSupported = errors.New("no storage adapter found") @@ -52,10 +53,10 @@ func (ww *WalkerWrapper) GetSkippedEntries() []block.ObjectStoreEntry { } type WalkerFactory struct { - params params.AdapterConfig + params config.AdapterConfig } -func NewFactory(params params.AdapterConfig) *WalkerFactory { +func NewFactory(params config.AdapterConfig) *WalkerFactory { return &WalkerFactory{params: params} } diff --git a/pkg/stats/metadata.go b/pkg/stats/metadata.go index 81ff245aecc..0cda877cd17 100644 --- a/pkg/stats/metadata.go +++ b/pkg/stats/metadata.go @@ -60,7 +60,7 @@ func BuildMetadataProvider(logger logging.Logger, c *config.BaseConfig) cloud.Me case block.BlockstoreTypeGS: return gcp.NewMetadataProvider(logger) case block.BlockstoreTypeS3: - s3Params, err := c.BlockstoreS3Params() + s3Params, err := c.Blockstore.BlockstoreS3Params() if err != nil { logger.WithError(err).Warn("Failed to create S3 client for MetadataProvider") return &noopMetadataProvider{} diff --git a/pkg/testutil/adapter.go b/pkg/testutil/adapter.go index ebd7b19eb69..620b9b71a39 100644 --- a/pkg/testutil/adapter.go +++ b/pkg/testutil/adapter.go @@ -141,11 +141,11 @@ func (a *MockAdapter) BlockstoreMetadata(_ context.Context) (*block.BlockstoreMe } } -func (a *MockAdapter) GetStorageNamespaceInfo(_ string) (block.StorageNamespaceInfo, error) { +func (a *MockAdapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { info := block.DefaultStorageNamespaceInfo("s3") info.PreSignSupport = false info.ImportSupport = false - return info, nil + return &info } func (a *MockAdapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) {