diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index 009204130e6..47faf43a33e 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -23,9 +23,9 @@ import ( "github.com/hashicorp/go-multierror" lru "github.com/hnlq715/golang-lru" "github.com/rs/xid" + blockfactory "github.com/treeverse/lakefs/modules/block/factory" "github.com/treeverse/lakefs/pkg/batch" "github.com/treeverse/lakefs/pkg/block" - blockfactory "github.com/treeverse/lakefs/pkg/block/factory" "github.com/treeverse/lakefs/pkg/config" "github.com/treeverse/lakefs/pkg/graveler" "github.com/treeverse/lakefs/pkg/graveler/branch" @@ -214,7 +214,7 @@ type WriteRangeRequest struct { } type Config struct { - Config *config.BaseConfig + Config config.Config KVStore kv.Store WalkerFactory WalkerFactory SettingsManagerOption settings.ManagerOption @@ -313,10 +313,11 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { return nil, fmt.Errorf("build block adapter: %w", err) } if cfg.WalkerFactory == nil { - cfg.WalkerFactory = store.NewFactory(cfg.Config) + // TODO(niro): Walkfer factory should be removed from catalog. This is a WA which relies on Blockstore configuration + cfg.WalkerFactory = store.NewFactory(cfg.Config.GetBaseConfig()) } - tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config, adapter) + tierFSParams, err := pyramidparams.NewCommittedTierFSParams(cfg.Config.GetBaseConfig(), adapter) if err != nil { cancelFn() return nil, fmt.Errorf("configure tiered FS for committed: %w", err) @@ -347,11 +348,12 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { sstableManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, rangeFS, hashAlg) sstableMetaManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, metaRangeFS, hashAlg) + baseCfg := cfg.Config.GetBaseConfig() committedParams := committed.Params{ - MinRangeSizeBytes: cfg.Config.Committed.Permanent.MinRangeSizeBytes, - MaxRangeSizeBytes: cfg.Config.Committed.Permanent.MaxRangeSizeBytes, - RangeSizeEntriesRaggedness: cfg.Config.Committed.Permanent.RangeRaggednessEntries, - MaxUploaders: cfg.Config.Committed.LocalCache.MaxUploadersPerWriter, + MinRangeSizeBytes: baseCfg.Committed.Permanent.MinRangeSizeBytes, + MaxRangeSizeBytes: baseCfg.Committed.Permanent.MaxRangeSizeBytes, + RangeSizeEntriesRaggedness: baseCfg.Committed.Permanent.RangeRaggednessEntries, + MaxUploaders: baseCfg.Committed.LocalCache.MaxUploadersPerWriter, } sstableMetaRangeManager, err := committed.NewMetaRangeManager( committedParams, @@ -369,7 +371,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { go executor.Run(ctx) // Setup rate limiter used for background operations - limiter := newLimiter(cfg.Config.Graveler.Background.RateLimit) + limiter := newLimiter(baseCfg.Graveler.Background.RateLimit) storeLimiter := kv.NewStoreLimiter(cfg.KVStore, limiter) addressProvider := ident.NewHexAddressProvider() @@ -379,21 +381,21 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { KVStore: cfg.KVStore, KVStoreLimited: storeLimiter, AddressProvider: addressProvider, - RepositoryCacheConfig: ref.CacheConfig(cfg.Config.Graveler.RepositoryCache), - CommitCacheConfig: ref.CacheConfig(cfg.Config.Graveler.CommitCache), - MaxBatchDelay: cfg.Config.Graveler.MaxBatchDelay, - BranchApproximateOwnershipParams: makeBranchApproximateOwnershipParams(cfg.Config.Graveler.BranchOwnership), + RepositoryCacheConfig: ref.CacheConfig(baseCfg.Graveler.RepositoryCache), + CommitCacheConfig: ref.CacheConfig(baseCfg.Graveler.CommitCache), + MaxBatchDelay: baseCfg.Graveler.MaxBatchDelay, + BranchApproximateOwnershipParams: makeBranchApproximateOwnershipParams(baseCfg.Graveler.BranchOwnership), }) - gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, cfg.Config.Committed.BlockStoragePrefix) + gcManager := retention.NewGarbageCollectionManager(tierFSParams.Adapter, refManager, baseCfg.Committed.BlockStoragePrefix) settingManager := settings.NewManager(refManager, cfg.KVStore) if cfg.SettingsManagerOption != nil { cfg.SettingsManagerOption(settingManager) } protectedBranchesManager := branch.NewProtectionManager(settingManager) - stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, cfg.Config.Graveler.BatchDBIOTransactionMarkers, executor) + stagingManager := staging.NewManager(ctx, cfg.KVStore, storeLimiter, baseCfg.Graveler.BatchDBIOTransactionMarkers, executor) var deleteSensor *graveler.DeleteSensor - if cfg.Config.Graveler.CompactionSensorThreshold > 0 { + if baseCfg.Graveler.CompactionSensorThreshold > 0 { cb := func(repositoryID graveler.RepositoryID, branchID graveler.BranchID, stagingTokenID graveler.StagingToken, inGrace bool) { logging.FromContext(ctx).WithFields(logging.Fields{ "repositoryID": repositoryID, @@ -402,7 +404,7 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { "inGrace": inGrace, }).Info("Delete sensor callback") } - deleteSensor = graveler.NewDeleteSensor(cfg.Config.Graveler.CompactionSensorThreshold, cb) + deleteSensor = graveler.NewDeleteSensor(baseCfg.Graveler.CompactionSensorThreshold, cb) } gStore := graveler.NewGraveler(committedManager, stagingManager, refManager, gcManager, protectedBranchesManager, deleteSensor) @@ -412,8 +414,8 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { return &Catalog{ BlockAdapter: tierFSParams.Adapter, Store: gStore, - UGCPrepareMaxFileSize: cfg.Config.UGC.PrepareMaxFileSize, - UGCPrepareInterval: cfg.Config.UGC.PrepareInterval, + UGCPrepareMaxFileSize: baseCfg.UGC.PrepareMaxFileSize, + UGCPrepareInterval: baseCfg.UGC.PrepareInterval, PathProvider: cfg.PathProvider, BackgroundLimiter: limiter, walkerFactory: cfg.WalkerFactory, @@ -423,7 +425,8 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { KVStoreLimited: storeLimiter, addressProvider: addressProvider, deleteSensor: deleteSensor, - signingKey: cfg.Config.Blockstore.Signing.SecretKey, + // TODO(niro): This should be removed - we need to return the signing key dynamically from the blockAdapter + signingKey: baseCfg.Blockstore.Signing.SecretKey, }, nil }