From 46c153cf3b2d037ae09863e8898da91877a4ba25 Mon Sep 17 00:00:00 2001 From: nadavsteindler Date: Sun, 26 Jan 2025 14:11:46 +0200 Subject: [PATCH] Add storageID to tier_fs --- pkg/graveler/committed/iterator.go | 3 +- pkg/graveler/committed/manager.go | 43 ++++++------- pkg/graveler/committed/meta_range.go | 8 +-- pkg/graveler/committed/meta_range_manager.go | 18 +++--- pkg/graveler/committed/mock/meta_range.go | 32 +++++----- pkg/graveler/committed/mock/range_manager.go | 32 +++++----- pkg/graveler/committed/range_manager.go | 11 ++-- pkg/graveler/graveler.go | 58 +++++++++--------- pkg/graveler/mock/graveler.go | 64 ++++++++++---------- pkg/graveler/sstable/range_manager.go | 26 ++++---- pkg/graveler/sstable/writer.go | 4 +- pkg/pyramid/mock/pyramid.go | 24 ++++---- pkg/pyramid/pyramid.go | 6 +- pkg/pyramid/tier_fs.go | 32 ++++++---- 14 files changed, 186 insertions(+), 175 deletions(-) diff --git a/pkg/graveler/committed/iterator.go b/pkg/graveler/committed/iterator.go index 87faf2d9b00..9b65b3fc878 100644 --- a/pkg/graveler/committed/iterator.go +++ b/pkg/graveler/committed/iterator.go @@ -16,6 +16,7 @@ type iterator struct { rng *Range // Decoded value at which rangeIt point it graveler.ValueIterator // nil at start of range err error + storageID StorageID namespace Namespace beforeRange bool } @@ -32,7 +33,7 @@ func NewIterator(ctx context.Context, manager RangeManager, namespace Namespace, // loadIt loads rvi.it to start iterating over a new range. It returns false and sets rvi.err // if it fails to open the new range. func (rvi *iterator) loadIt() bool { - it, err := rvi.manager.NewRangeIterator(rvi.ctx, rvi.namespace, rvi.rng.ID) + it, err := rvi.manager.NewRangeIterator(rvi.ctx, rvi.storageID, rvi.namespace, rvi.rng.ID) if err != nil { rvi.err = fmt.Errorf("open range %s: %w", rvi.rng.ID, err) return false diff --git a/pkg/graveler/committed/manager.go b/pkg/graveler/committed/manager.go index e5dc7858ddc..a00a4bfbfcf 100644 --- a/pkg/graveler/committed/manager.go +++ b/pkg/graveler/committed/manager.go @@ -25,12 +25,12 @@ func NewCommittedManager(m MetaRangeManager, r RangeManager, p Params) graveler. } } -func (c *committedManager) Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { - return c.metaRangeManager.Exists(ctx, ns, id) +func (c *committedManager) Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { + return c.metaRangeManager.Exists(ctx, storageID, ns, id) } -func (c *committedManager) Get(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID, key graveler.Key) (*graveler.Value, error) { - it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, rangeID) +func (c *committedManager) Get(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID, key graveler.Key) (*graveler.Value, error) { + it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, rangeID) if err != nil { return nil, err } @@ -53,8 +53,8 @@ func (c *committedManager) Get(ctx context.Context, ns graveler.StorageNamespace return rec.Value, nil } -func (c *committedManager) List(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) { - it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, rangeID) +func (c *committedManager) List(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) { + it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, rangeID) if err != nil { return nil, err } @@ -170,20 +170,20 @@ func (c *committedManager) WriteMetaRangeByIterator(ctx context.Context, ns grav return id, nil } -func (c *committedManager) Diff(ctx context.Context, ns graveler.StorageNamespace, left, right graveler.MetaRangeID) (graveler.DiffIterator, error) { - leftIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, left) +func (c *committedManager) Diff(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, left, right graveler.MetaRangeID) (graveler.DiffIterator, error) { + leftIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, left) if err != nil { return nil, err } - rightIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, right) + rightIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, right) if err != nil { return nil, err } return NewDiffValueIterator(ctx, leftIt, rightIt), nil } -func (c *committedManager) Import(ctx context.Context, ns graveler.StorageNamespace, destination, source graveler.MetaRangeID, prefixes []graveler.Prefix, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, error) { - destIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, destination) +func (c *committedManager) Import(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, destination, source graveler.MetaRangeID, prefixes []graveler.Prefix, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, error) { + destIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, destination) if err != nil { return "", fmt.Errorf("get destination iterator: %w", err) } @@ -228,6 +228,7 @@ type mergeContext struct { srcIt Iterator baseIt Iterator strategy graveler.MergeStrategy + storageID graveler.StorageID ns graveler.StorageNamespace destinationID graveler.MetaRangeID sourceID graveler.MetaRangeID @@ -238,7 +239,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel var err error = nil baseIt := mctx.baseIt if baseIt == nil { - baseIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.ns, mctx.baseID) + baseIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.storageID, mctx.ns, mctx.baseID) if err != nil { return "", fmt.Errorf("get base iterator: %w", err) } @@ -247,7 +248,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel destIt := mctx.destIt if destIt == nil { - destIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.ns, mctx.destinationID) + destIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.storageID, mctx.ns, mctx.destinationID) if err != nil { return "", fmt.Errorf("get destination iterator: %w", err) } @@ -256,7 +257,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel srcIt := mctx.srcIt if srcIt == nil { - srcIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.ns, mctx.sourceID) + srcIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.storageID, mctx.ns, mctx.sourceID) if err != nil { return "", fmt.Errorf("get source iterator: %w", err) } @@ -285,7 +286,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel return *newID, err } -func (c *committedManager) Commit(ctx context.Context, ns graveler.StorageNamespace, baseMetaRangeID graveler.MetaRangeID, changes graveler.ValueIterator, allowEmpty bool, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, graveler.DiffSummary, error) { +func (c *committedManager) Commit(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, baseMetaRangeID graveler.MetaRangeID, changes graveler.ValueIterator, allowEmpty bool, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, graveler.DiffSummary, error) { mwWriter := c.metaRangeManager.NewWriter(ctx, ns, nil) defer func() { err := mwWriter.Abort() @@ -293,7 +294,7 @@ func (c *committedManager) Commit(ctx context.Context, ns graveler.StorageNamesp logging.FromContext(ctx).WithError(err).Error("Abort failed after Commit") } }() - metaRangeIterator, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, baseMetaRangeID) + metaRangeIterator, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, baseMetaRangeID) summary := graveler.DiffSummary{ Count: map[graveler.DiffType]int{}, } @@ -315,12 +316,12 @@ func (c *committedManager) Commit(ctx context.Context, ns graveler.StorageNamesp return *newID, summary, err } -func (c *committedManager) Compare(ctx context.Context, ns graveler.StorageNamespace, destination, source, base graveler.MetaRangeID) (graveler.DiffIterator, error) { - diffIt, err := c.Diff(ctx, ns, destination, source) +func (c *committedManager) Compare(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, destination, source, base graveler.MetaRangeID) (graveler.DiffIterator, error) { + diffIt, err := c.Diff(ctx, storageID, ns, destination, source) if err != nil { return nil, fmt.Errorf("diff: %w", err) } - baseIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, base) + baseIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, base) if err != nil { diffIt.Close() return nil, fmt.Errorf("get base iterator: %w", err) @@ -338,11 +339,11 @@ func (c *committedManager) GetRange(ctx context.Context, ns graveler.StorageName return graveler.RangeAddress(uri), err } -func (c *committedManager) GetRangeIDByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (graveler.RangeID, error) { +func (c *committedManager) GetRangeIDByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (graveler.RangeID, error) { if id == "" { return "", graveler.ErrNotFound } - r, err := c.metaRangeManager.GetRangeByKey(ctx, ns, id, key) + r, err := c.metaRangeManager.GetRangeByKey(ctx, storageID, ns, id, key) if err != nil { return "", fmt.Errorf("get range for key: %w", err) } diff --git a/pkg/graveler/committed/meta_range.go b/pkg/graveler/committed/meta_range.go index 2ab315c19e8..aa1f88908de 100644 --- a/pkg/graveler/committed/meta_range.go +++ b/pkg/graveler/committed/meta_range.go @@ -73,17 +73,17 @@ func (r RangeDiff) Copy() *RangeDiff { // MetaRangeManager is an abstraction for a repository of MetaRanges that exposes operations on them type MetaRangeManager interface { - Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) + Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) // GetValue returns the matching in-range graveler.ValueRecord for key in the // MetaRange with id. - GetValue(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) + GetValue(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) // NewWriter returns a writer that is used for creating new MetaRanges NewWriter(ctx context.Context, ns graveler.StorageNamespace, metadata graveler.Metadata) MetaRangeWriter // NewMetaRangeIterator returns an Iterator over the MetaRange with id. - NewMetaRangeIterator(ctx context.Context, ns graveler.StorageNamespace, metaRangeID graveler.MetaRangeID) (Iterator, error) + NewMetaRangeIterator(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, metaRangeID graveler.MetaRangeID) (Iterator, error) // GetMetaRangeURI returns a URI with an object representing metarange ID. It may // return a URI that does not resolve (rather than an error) if ID does not exist. @@ -94,7 +94,7 @@ type MetaRangeManager interface { GetRangeURI(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.RangeID) (string, error) // GetRangeByKey returns the Range that contains key in the MetaRange with id. - GetRangeByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error) + GetRangeByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error) } // MetaRangeWriter is an abstraction for creating new MetaRanges diff --git a/pkg/graveler/committed/meta_range_manager.go b/pkg/graveler/committed/meta_range_manager.go index 6da96d85bf8..0153587c762 100644 --- a/pkg/graveler/committed/meta_range_manager.go +++ b/pkg/graveler/committed/meta_range_manager.go @@ -43,19 +43,19 @@ func NewMetaRangeManager(params Params, metaManager, rangeManager RangeManager) }, nil } -func (m *metaRangeManager) Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { - return m.metaManager.Exists(ctx, Namespace(ns), ID(id)) +func (m *metaRangeManager) Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { + return m.metaManager.Exists(ctx, StorageID(storageID), Namespace(ns), ID(id)) } // GetValue finds the matching graveler.ValueRecord in the MetaRange with the rangeID -func (m *metaRangeManager) GetValue(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) { +func (m *metaRangeManager) GetValue(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) { // Fetch range containing key. - rng, err := m.GetRangeByKey(ctx, ns, id, key) + rng, err := m.GetRangeByKey(ctx, storageID, ns, id, key) if err != nil { return nil, err } - r, err := m.rangeManager.GetValue(ctx, Namespace(ns), rng.ID, Key(key)) + r, err := m.rangeManager.GetValue(ctx, StorageID(storageID), Namespace(ns), rng.ID, Key(key)) if err != nil { return nil, fmt.Errorf("get value in range %s of %s for %s: %w", rng.ID, id, key, err) } @@ -69,8 +69,8 @@ func (m *metaRangeManager) GetValue(ctx context.Context, ns graveler.StorageName }, nil } -func (m *metaRangeManager) GetRangeByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error) { - v, err := m.metaManager.GetValueGE(ctx, Namespace(ns), ID(id), Key(key)) +func (m *metaRangeManager) GetRangeByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error) { + v, err := m.metaManager.GetValueGE(ctx, StorageID(storageID), Namespace(ns), ID(id), Key(key)) if errors.Is(err, ErrNotFound) { return nil, err } @@ -100,11 +100,11 @@ func (m *metaRangeManager) NewWriter(ctx context.Context, ns graveler.StorageNam return NewGeneralMetaRangeWriter(ctx, m.rangeManager, m.metaManager, &m.params, Namespace(ns), metadata) } -func (m *metaRangeManager) NewMetaRangeIterator(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (Iterator, error) { +func (m *metaRangeManager) NewMetaRangeIterator(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (Iterator, error) { if id == "" { return NewEmptyIterator(), nil } - rangesIt, err := m.metaManager.NewRangeIterator(ctx, Namespace(ns), ID(id)) + rangesIt, err := m.metaManager.NewRangeIterator(ctx, StorageID(storageID), Namespace(ns), ID(id)) if err != nil { return nil, fmt.Errorf("manage metarange %s: %w", id, err) } diff --git a/pkg/graveler/committed/mock/meta_range.go b/pkg/graveler/committed/mock/meta_range.go index f9cdca4ab30..7092b76bcb2 100644 --- a/pkg/graveler/committed/mock/meta_range.go +++ b/pkg/graveler/committed/mock/meta_range.go @@ -245,18 +245,18 @@ func (m *MockMetaRangeManager) EXPECT() *MockMetaRangeManagerMockRecorder { } // Exists mocks base method. -func (m *MockMetaRangeManager) Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { +func (m *MockMetaRangeManager) Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Exists", ctx, ns, id) + ret := m.ctrl.Call(m, "Exists", ctx, storageID, ns, id) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // Exists indicates an expected call of Exists. -func (mr *MockMetaRangeManagerMockRecorder) Exists(ctx, ns, id interface{}) *gomock.Call { +func (mr *MockMetaRangeManagerMockRecorder) Exists(ctx, storageID, ns, id interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockMetaRangeManager)(nil).Exists), ctx, ns, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockMetaRangeManager)(nil).Exists), ctx, storageID, ns, id) } // GetMetaRangeURI mocks base method. @@ -275,18 +275,18 @@ func (mr *MockMetaRangeManagerMockRecorder) GetMetaRangeURI(ctx, ns, metaRangeID } // GetRangeByKey mocks base method. -func (m *MockMetaRangeManager) GetRangeByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*committed.Range, error) { +func (m *MockMetaRangeManager) GetRangeByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*committed.Range, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetRangeByKey", ctx, ns, id, key) + ret := m.ctrl.Call(m, "GetRangeByKey", ctx, storageID, ns, id, key) ret0, _ := ret[0].(*committed.Range) ret1, _ := ret[1].(error) return ret0, ret1 } // GetRangeByKey indicates an expected call of GetRangeByKey. -func (mr *MockMetaRangeManagerMockRecorder) GetRangeByKey(ctx, ns, id, key interface{}) *gomock.Call { +func (mr *MockMetaRangeManagerMockRecorder) GetRangeByKey(ctx, storageID, ns, id, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRangeByKey", reflect.TypeOf((*MockMetaRangeManager)(nil).GetRangeByKey), ctx, ns, id, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRangeByKey", reflect.TypeOf((*MockMetaRangeManager)(nil).GetRangeByKey), ctx, storageID, ns, id, key) } // GetRangeURI mocks base method. @@ -305,33 +305,33 @@ func (mr *MockMetaRangeManagerMockRecorder) GetRangeURI(ctx, ns, rangeID interfa } // GetValue mocks base method. -func (m *MockMetaRangeManager) GetValue(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) { +func (m *MockMetaRangeManager) GetValue(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetValue", ctx, ns, id, key) + ret := m.ctrl.Call(m, "GetValue", ctx, storageID, ns, id, key) ret0, _ := ret[0].(*graveler.ValueRecord) ret1, _ := ret[1].(error) return ret0, ret1 } // GetValue indicates an expected call of GetValue. -func (mr *MockMetaRangeManagerMockRecorder) GetValue(ctx, ns, id, key interface{}) *gomock.Call { +func (mr *MockMetaRangeManagerMockRecorder) GetValue(ctx, storageID, ns, id, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValue", reflect.TypeOf((*MockMetaRangeManager)(nil).GetValue), ctx, ns, id, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValue", reflect.TypeOf((*MockMetaRangeManager)(nil).GetValue), ctx, storageID, ns, id, key) } // NewMetaRangeIterator mocks base method. -func (m *MockMetaRangeManager) NewMetaRangeIterator(ctx context.Context, ns graveler.StorageNamespace, metaRangeID graveler.MetaRangeID) (committed.Iterator, error) { +func (m *MockMetaRangeManager) NewMetaRangeIterator(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, metaRangeID graveler.MetaRangeID) (committed.Iterator, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewMetaRangeIterator", ctx, ns, metaRangeID) + ret := m.ctrl.Call(m, "NewMetaRangeIterator", ctx, storageID, ns, metaRangeID) ret0, _ := ret[0].(committed.Iterator) ret1, _ := ret[1].(error) return ret0, ret1 } // NewMetaRangeIterator indicates an expected call of NewMetaRangeIterator. -func (mr *MockMetaRangeManagerMockRecorder) NewMetaRangeIterator(ctx, ns, metaRangeID interface{}) *gomock.Call { +func (mr *MockMetaRangeManagerMockRecorder) NewMetaRangeIterator(ctx, storageID, ns, metaRangeID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewMetaRangeIterator", reflect.TypeOf((*MockMetaRangeManager)(nil).NewMetaRangeIterator), ctx, ns, metaRangeID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewMetaRangeIterator", reflect.TypeOf((*MockMetaRangeManager)(nil).NewMetaRangeIterator), ctx, storageID, ns, metaRangeID) } // NewWriter mocks base method. diff --git a/pkg/graveler/committed/mock/range_manager.go b/pkg/graveler/committed/mock/range_manager.go index 325a561ae6a..edb94a8d955 100644 --- a/pkg/graveler/committed/mock/range_manager.go +++ b/pkg/graveler/committed/mock/range_manager.go @@ -126,18 +126,18 @@ func (m *MockRangeManager) EXPECT() *MockRangeManagerMockRecorder { } // Exists mocks base method. -func (m *MockRangeManager) Exists(ctx context.Context, ns committed.Namespace, id committed.ID) (bool, error) { +func (m *MockRangeManager) Exists(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Exists", ctx, ns, id) + ret := m.ctrl.Call(m, "Exists", ctx, storageID, ns, id) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // Exists indicates an expected call of Exists. -func (mr *MockRangeManagerMockRecorder) Exists(ctx, ns, id interface{}) *gomock.Call { +func (mr *MockRangeManagerMockRecorder) Exists(ctx, storageID, ns, id interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockRangeManager)(nil).Exists), ctx, ns, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockRangeManager)(nil).Exists), ctx, storageID, ns, id) } // GetURI mocks base method. @@ -156,33 +156,33 @@ func (mr *MockRangeManagerMockRecorder) GetURI(ctx, ns, id interface{}) *gomock. } // GetValue mocks base method. -func (m *MockRangeManager) GetValue(ctx context.Context, ns committed.Namespace, id committed.ID, key committed.Key) (*committed.Record, error) { +func (m *MockRangeManager) GetValue(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID, key committed.Key) (*committed.Record, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetValue", ctx, ns, id, key) + ret := m.ctrl.Call(m, "GetValue", ctx, storageID, ns, id, key) ret0, _ := ret[0].(*committed.Record) ret1, _ := ret[1].(error) return ret0, ret1 } // GetValue indicates an expected call of GetValue. -func (mr *MockRangeManagerMockRecorder) GetValue(ctx, ns, id, key interface{}) *gomock.Call { +func (mr *MockRangeManagerMockRecorder) GetValue(ctx, storageID, ns, id, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValue", reflect.TypeOf((*MockRangeManager)(nil).GetValue), ctx, ns, id, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValue", reflect.TypeOf((*MockRangeManager)(nil).GetValue), ctx, storageID, ns, id, key) } // GetValueGE mocks base method. -func (m *MockRangeManager) GetValueGE(ctx context.Context, ns committed.Namespace, id committed.ID, key committed.Key) (*committed.Record, error) { +func (m *MockRangeManager) GetValueGE(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID, key committed.Key) (*committed.Record, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetValueGE", ctx, ns, id, key) + ret := m.ctrl.Call(m, "GetValueGE", ctx, storageID, ns, id, key) ret0, _ := ret[0].(*committed.Record) ret1, _ := ret[1].(error) return ret0, ret1 } // GetValueGE indicates an expected call of GetValueGE. -func (mr *MockRangeManagerMockRecorder) GetValueGE(ctx, ns, id, key interface{}) *gomock.Call { +func (mr *MockRangeManagerMockRecorder) GetValueGE(ctx, storageID, ns, id, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValueGE", reflect.TypeOf((*MockRangeManager)(nil).GetValueGE), ctx, ns, id, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetValueGE", reflect.TypeOf((*MockRangeManager)(nil).GetValueGE), ctx, storageID, ns, id, key) } // GetWriter mocks base method. @@ -201,18 +201,18 @@ func (mr *MockRangeManagerMockRecorder) GetWriter(ctx, ns, metadata interface{}) } // NewRangeIterator mocks base method. -func (m *MockRangeManager) NewRangeIterator(ctx context.Context, ns committed.Namespace, pid committed.ID) (committed.ValueIterator, error) { +func (m *MockRangeManager) NewRangeIterator(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, pid committed.ID) (committed.ValueIterator, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewRangeIterator", ctx, ns, pid) + ret := m.ctrl.Call(m, "NewRangeIterator", ctx, storageID, ns, pid) ret0, _ := ret[0].(committed.ValueIterator) ret1, _ := ret[1].(error) return ret0, ret1 } // NewRangeIterator indicates an expected call of NewRangeIterator. -func (mr *MockRangeManagerMockRecorder) NewRangeIterator(ctx, ns, pid interface{}) *gomock.Call { +func (mr *MockRangeManagerMockRecorder) NewRangeIterator(ctx, storageID, ns, pid interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewRangeIterator", reflect.TypeOf((*MockRangeManager)(nil).NewRangeIterator), ctx, ns, pid) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewRangeIterator", reflect.TypeOf((*MockRangeManager)(nil).NewRangeIterator), ctx, storageID, ns, pid) } // MockRangeWriter is a mock of RangeWriter interface. diff --git a/pkg/graveler/committed/range_manager.go b/pkg/graveler/committed/range_manager.go index 2f46f976ad9..901b2e6e0f3 100644 --- a/pkg/graveler/committed/range_manager.go +++ b/pkg/graveler/committed/range_manager.go @@ -14,6 +14,9 @@ type ID string // Namespace is namespace for ID ranges type Namespace string +// StorageID is id for object storage +type StorageID string + // Key and Value types for to be stored in any Range of the MetaRange type Key []byte @@ -43,18 +46,18 @@ var ErrNotFound = graveler.ErrNotFound type RangeManager interface { // Exists returns true if id references a Range. - Exists(ctx context.Context, ns Namespace, id ID) (bool, error) + Exists(ctx context.Context, storageID StorageID, ns Namespace, id ID) (bool, error) // GetValue returns the value matching key in the Range referenced by id. If id not // found, it return (nil, ErrNotFound). - GetValue(ctx context.Context, ns Namespace, id ID, key Key) (*Record, error) + GetValue(ctx context.Context, storageID StorageID, ns Namespace, id ID, key Key) (*Record, error) // GetValueGE returns the first value keyed at or after key in the Range referenced by // id. If all values are keyed before key, it returns (nil, ErrNotFound). - GetValueGE(ctx context.Context, ns Namespace, id ID, key Key) (*Record, error) + GetValueGE(ctx context.Context, storageID StorageID, ns Namespace, id ID, key Key) (*Record, error) // NewRangeIterator returns an iterator over values in the Range with ID. - NewRangeIterator(ctx context.Context, ns Namespace, pid ID) (ValueIterator, error) + NewRangeIterator(ctx context.Context, storageID StorageID, ns Namespace, pid ID) (ValueIterator, error) // GetWriter returns a new Range writer instance GetWriter(ctx context.Context, ns Namespace, metadata graveler.Metadata) (RangeWriter, error) diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index c7e3389e59f..c4e186e77bc 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -1008,10 +1008,10 @@ type RefManager interface { // it is responsible for de-duping them, persisting them and providing basic diff, merge and list capabilities type CommittedManager interface { // Get returns the provided key, if exists, from the provided MetaRangeID - Get(ctx context.Context, ns StorageNamespace, rangeID MetaRangeID, key Key) (*Value, error) + Get(ctx context.Context, storageID StorageID, ns StorageNamespace, rangeID MetaRangeID, key Key) (*Value, error) // Exists returns true if a MetaRange matching ID exists in namespace ns. - Exists(ctx context.Context, ns StorageNamespace, id MetaRangeID) (bool, error) + Exists(ctx context.Context, storageID StorageID, ns StorageNamespace, id MetaRangeID) (bool, error) // WriteMetaRangeByIterator flushes the iterator to a new MetaRange and returns the created ID. WriteMetaRangeByIterator(ctx context.Context, ns StorageNamespace, it ValueIterator, metadata Metadata) (*MetaRangeID, error) @@ -1024,15 +1024,15 @@ type CommittedManager interface { WriteMetaRange(ctx context.Context, ns StorageNamespace, ranges []*RangeInfo) (*MetaRangeInfo, error) // List takes a given tree and returns an ValueIterator - List(ctx context.Context, ns StorageNamespace, rangeID MetaRangeID) (ValueIterator, error) + List(ctx context.Context, storageID StorageID, ns StorageNamespace, rangeID MetaRangeID) (ValueIterator, error) // Diff receives two metaRanges and returns a DiffIterator describing all differences between them. // This is similar to a two-dot diff in git (left..right) - Diff(ctx context.Context, ns StorageNamespace, left, right MetaRangeID) (DiffIterator, error) + Diff(ctx context.Context, storageID StorageID, ns StorageNamespace, left, right MetaRangeID) (DiffIterator, error) // Compare returns the difference between 'source' and 'destination', relative to a merge base 'base'. // This is similar to a three-dot diff in git. - Compare(ctx context.Context, ns StorageNamespace, destination, source, base MetaRangeID) (DiffIterator, error) + Compare(ctx context.Context, storageID StorageID, ns StorageNamespace, destination, source, base MetaRangeID) (DiffIterator, error) // Merge applies changes from 'source' to 'destination', relative to a merge base 'base' and // returns the ID of the new metarange. This is similar to a git merge operation. @@ -1041,12 +1041,12 @@ type CommittedManager interface { // Import sync changes from 'source' to 'destination'. All the given prefixes are completely overridden on the resulting metarange. Returns the ID of the new // metarange. - Import(ctx context.Context, ns StorageNamespace, destination, source MetaRangeID, prefixes []Prefix, opts ...SetOptionsFunc) (MetaRangeID, error) + Import(ctx context.Context, storageID StorageID, ns StorageNamespace, destination, source MetaRangeID, prefixes []Prefix, opts ...SetOptionsFunc) (MetaRangeID, error) // Commit is the act of taking an existing metaRange (snapshot) and applying a set of changes to it. // A change is either an entity to write/overwrite, or a tombstone to mark a deletion // it returns a new MetaRangeID that is expected to be immediately addressable - Commit(ctx context.Context, ns StorageNamespace, baseMetaRangeID MetaRangeID, changes ValueIterator, allowEmpty bool, opts ...SetOptionsFunc) (MetaRangeID, DiffSummary, error) + Commit(ctx context.Context, storageID StorageID, ns StorageNamespace, baseMetaRangeID MetaRangeID, changes ValueIterator, allowEmpty bool, opts ...SetOptionsFunc) (MetaRangeID, DiffSummary, error) // GetMetaRange returns information where metarangeID is stored. GetMetaRange(ctx context.Context, ns StorageNamespace, metaRangeID MetaRangeID) (MetaRangeAddress, error) @@ -1054,7 +1054,7 @@ type CommittedManager interface { GetRange(ctx context.Context, ns StorageNamespace, rangeID RangeID) (RangeAddress, error) // GetRangeIDByKey returns the RangeID that contains the given key. - GetRangeIDByKey(ctx context.Context, ns StorageNamespace, id MetaRangeID, key Key) (RangeID, error) + GetRangeIDByKey(ctx context.Context, storageID StorageID, ns StorageNamespace, id MetaRangeID, key Key) (RangeID, error) } // StagingManager manages entries in a staging area, denoted by a staging token @@ -1748,7 +1748,7 @@ func (g *Graveler) Get(ctx context.Context, repository *RepositoryRecord, ref Re } if updatedValue == nil && reference.CompactedBaseMetaRangeID != "" { - updatedValue, err = g.CommittedManager.Get(ctx, repository.StorageNamespace, reference.CompactedBaseMetaRangeID, key) + updatedValue, err = g.CommittedManager.Get(ctx, repository.StorageID, repository.StorageNamespace, reference.CompactedBaseMetaRangeID, key) // no need to check for ErrNotFound, since if the key is not found in the compacted base, it will not be found in the committed, and we already checked the staging area if err != nil { return nil, err @@ -1770,7 +1770,7 @@ func (g *Graveler) Get(ctx context.Context, repository *RepositoryRecord, ref Re if err != nil { return nil, err } - committedVal, err := g.CommittedManager.Get(ctx, repository.StorageNamespace, commit.MetaRangeID, key) + committedVal, err := g.CommittedManager.Get(ctx, repository.StorageID, repository.StorageNamespace, commit.MetaRangeID, key) if err != nil && !errors.Is(err, ErrNotFound) { return nil, err } @@ -1787,7 +1787,7 @@ func (g *Graveler) Get(ctx context.Context, repository *RepositoryRecord, ref Re if err != nil { return nil, err } - return g.CommittedManager.Get(ctx, repository.StorageNamespace, commit.MetaRangeID, key) + return g.CommittedManager.Get(ctx, repository.StorageID, repository.StorageNamespace, commit.MetaRangeID, key) } func (g *Graveler) GetByCommitID(ctx context.Context, repository *RepositoryRecord, commitID CommitID, key Key) (*Value, error) { @@ -1796,7 +1796,7 @@ func (g *Graveler) GetByCommitID(ctx context.Context, repository *RepositoryReco if err != nil { return nil, err } - return g.CommittedManager.Get(ctx, repository.StorageNamespace, commit.MetaRangeID, key) + return g.CommittedManager.Get(ctx, repository.StorageID, repository.StorageNamespace, commit.MetaRangeID, key) } func (g *Graveler) GetRangeIDByKey(ctx context.Context, repository *RepositoryRecord, commitID CommitID, key Key) (RangeID, error) { @@ -1804,7 +1804,7 @@ func (g *Graveler) GetRangeIDByKey(ctx context.Context, repository *RepositoryRe if err != nil { return "", err } - return g.CommittedManager.GetRangeIDByKey(ctx, repository.StorageNamespace, commit.MetaRangeID, key) + return g.CommittedManager.GetRangeIDByKey(ctx, repository.StorageID, repository.StorageNamespace, commit.MetaRangeID, key) } func (g *Graveler) Set(ctx context.Context, repository *RepositoryRecord, branchID BranchID, key Key, value Value, opts ...SetOptionsFunc) error { @@ -2011,7 +2011,7 @@ func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecor metaRangeID = commit.MetaRangeID } - _, err = g.CommittedManager.Get(ctx, repository.StorageNamespace, metaRangeID, key) + _, err = g.CommittedManager.Get(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID, key) if err == nil { // found in committed, set tombstone return g.deleteAndNotify(ctx, repository.RepositoryID, branchRecord, key, false) @@ -2089,7 +2089,7 @@ func (g *Graveler) List(ctx context.Context, repository *RepositoryRecord, ref R metaRangeID = commit.MetaRangeID } - listing, err := g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) + listing, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID) if err != nil { return nil, err } @@ -2205,7 +2205,7 @@ func (g *Graveler) Commit(ctx context.Context, repository *RepositoryRecord, bra } defer changes.Close() // returns err if the commit is empty (no changes) - commit.MetaRangeID, _, err = g.CommittedManager.Commit(ctx, storageNamespace, branchMetaRangeID, changes, params.AllowEmpty) + commit.MetaRangeID, _, err = g.CommittedManager.Commit(ctx, repository.StorageID, storageNamespace, branchMetaRangeID, changes, params.AllowEmpty) if err != nil { return nil, fmt.Errorf("commit: %w", err) } @@ -2353,7 +2353,7 @@ func (g *Graveler) AddCommit(ctx context.Context, repository *RepositoryRecord, // addCommitNoLock lower API used to add commit into a repository. It will verify that the commit meta-range is accessible but will not lock any metadata update. func (g *Graveler) addCommitNoLock(ctx context.Context, repository *RepositoryRecord, commit Commit) (CommitID, error) { // verify access to meta range - ok, err := g.CommittedManager.Exists(ctx, repository.StorageNamespace, commit.MetaRangeID) + ok, err := g.CommittedManager.Exists(ctx, repository.StorageID, repository.StorageNamespace, commit.MetaRangeID) if err != nil { return "", fmt.Errorf("checking for meta range %s: %w", commit.MetaRangeID, err) } @@ -2386,7 +2386,7 @@ func (g *Graveler) checkEmpty(ctx context.Context, repository *RepositoryRecord, if err != nil { return false, err } - committedList, err := g.CommittedManager.List(ctx, repository.StorageNamespace, commit.MetaRangeID) + committedList, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, commit.MetaRangeID) if err != nil { return false, err } @@ -2577,7 +2577,7 @@ func (g *Graveler) ResetKey(ctx context.Context, repository *RepositoryRecord, b return err } if branch.CompactedBaseMetaRangeID != "" { - uncommittedValue, err = g.CommittedManager.Get(ctx, repository.StorageNamespace, branch.CompactedBaseMetaRangeID, key) + uncommittedValue, err = g.CommittedManager.Get(ctx, repository.StorageID, repository.StorageNamespace, branch.CompactedBaseMetaRangeID, key) if err != nil && !errors.Is(err, ErrNotFound) { return err } @@ -3084,7 +3084,7 @@ func (g *Graveler) Import(ctx context.Context, repository *RepositoryRecord, des "destination_meta_range": toCommit.MetaRangeID, }).Trace("Import") - metaRangeID, err := g.CommittedManager.Import(ctx, storageNamespace, toCommit.MetaRangeID, source, prefixes) + metaRangeID, err := g.CommittedManager.Import(ctx, repository.StorageID, storageNamespace, toCommit.MetaRangeID, source, prefixes) if err != nil { if !errors.Is(err, ErrUserVisible) { err = fmt.Errorf("merge in CommitManager: %w", err) @@ -3188,7 +3188,7 @@ func (g *Graveler) diffUncommitted(ctx context.Context, repository *RepositoryRe if err != nil { return nil, err } - committedValueIterator, err := g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) + committedValueIterator, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID) if err != nil { valueIterator.Close() return nil, err @@ -3197,7 +3197,7 @@ func (g *Graveler) diffUncommitted(ctx context.Context, repository *RepositoryRe return NewUncommittedDiffIterator(ctx, committedValueIterator, valueIterator), nil } // return the diff of staging + sealed from committed on top of the diff of compacted from committed - diffCommitAndCompacted, err := g.CommittedManager.Diff(ctx, repository.StorageNamespace, metaRangeID, branch.CompactedBaseMetaRangeID) + diffCommitAndCompacted, err := g.CommittedManager.Diff(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID, branch.CompactedBaseMetaRangeID) if err != nil { valueIterator.Close() committedValueIterator.Close() @@ -3240,14 +3240,14 @@ func (g *Graveler) Diff(ctx context.Context, repository *RepositoryRecord, left, if err != nil { return nil, err } - diff, err := g.CommittedManager.Diff(ctx, repository.StorageNamespace, leftCommit.MetaRangeID, rightCommit.MetaRangeID) + diff, err := g.CommittedManager.Diff(ctx, repository.StorageID, repository.StorageNamespace, leftCommit.MetaRangeID, rightCommit.MetaRangeID) if err != nil { return nil, err } if rightRawRef.ResolvedBranchModifier != ResolvedBranchModifierStaging { return diff, nil } - leftValueIterator, err := g.CommittedManager.List(ctx, repository.StorageNamespace, leftCommit.MetaRangeID) + leftValueIterator, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, leftCommit.MetaRangeID) if err != nil { return nil, err } @@ -3266,7 +3266,7 @@ func (g *Graveler) Diff(ctx context.Context, repository *RepositoryRecord, left, return NewCombinedDiffIterator(diff, leftValueIterator, stagingIterator), nil } diff.Close() - compactedDiffIterator, err := g.CommittedManager.Diff(ctx, repository.StorageNamespace, leftCommit.MetaRangeID, rightBranch.CompactedBaseMetaRangeID) + compactedDiffIterator, err := g.CommittedManager.Diff(ctx, repository.StorageID, repository.StorageNamespace, leftCommit.MetaRangeID, rightBranch.CompactedBaseMetaRangeID) if err != nil { leftValueIterator.Close() stagingIterator.Close() @@ -3299,7 +3299,7 @@ func (g *Graveler) Compare(ctx context.Context, repository *RepositoryRecord, le if err != nil { return nil, err } - return g.CommittedManager.Compare(ctx, repository.StorageNamespace, toCommit.MetaRangeID, fromCommit.MetaRangeID, baseCommit.MetaRangeID) + return g.CommittedManager.Compare(ctx, repository.StorageID, repository.StorageNamespace, toCommit.MetaRangeID, fromCommit.MetaRangeID, baseCommit.MetaRangeID) } func (g *Graveler) SetHooksHandler(handler HooksHandler) { @@ -3316,7 +3316,7 @@ func (g *Graveler) LoadCommits(ctx context.Context, repository *RepositoryRecord return ErrReadOnlyRepository } - iter, err := g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) + iter, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID) if err != nil { return err } @@ -3364,7 +3364,7 @@ func (g *Graveler) LoadBranches(ctx context.Context, repository *RepositoryRecor if repository.ReadOnly && !options.Force { return ErrReadOnlyRepository } - iter, err := g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) + iter, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID) if err != nil { return err } @@ -3397,7 +3397,7 @@ func (g *Graveler) LoadTags(ctx context.Context, repository *RepositoryRecord, m if repository.ReadOnly && !options.Force { return ErrReadOnlyRepository } - iter, err := g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) + iter, err := g.CommittedManager.List(ctx, repository.StorageID, repository.StorageNamespace, metaRangeID) if err != nil { return err } diff --git a/pkg/graveler/mock/graveler.go b/pkg/graveler/mock/graveler.go index 61c7523eac4..56497f4ee74 100644 --- a/pkg/graveler/mock/graveler.go +++ b/pkg/graveler/mock/graveler.go @@ -2602,9 +2602,9 @@ func (m *MockCommittedManager) EXPECT() *MockCommittedManagerMockRecorder { } // Commit mocks base method. -func (m *MockCommittedManager) Commit(ctx context.Context, ns graveler.StorageNamespace, baseMetaRangeID graveler.MetaRangeID, changes graveler.ValueIterator, allowEmpty bool, opts ...graveler.SetOptionsFunc) (graveler.MetaRangeID, graveler.DiffSummary, error) { +func (m *MockCommittedManager) Commit(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, baseMetaRangeID graveler.MetaRangeID, changes graveler.ValueIterator, allowEmpty bool, opts ...graveler.SetOptionsFunc) (graveler.MetaRangeID, graveler.DiffSummary, error) { m.ctrl.T.Helper() - varargs := []interface{}{ctx, ns, baseMetaRangeID, changes, allowEmpty} + varargs := []interface{}{ctx, storageID, ns, baseMetaRangeID, changes, allowEmpty} for _, a := range opts { varargs = append(varargs, a) } @@ -2616,70 +2616,70 @@ func (m *MockCommittedManager) Commit(ctx context.Context, ns graveler.StorageNa } // Commit indicates an expected call of Commit. -func (mr *MockCommittedManagerMockRecorder) Commit(ctx, ns, baseMetaRangeID, changes, allowEmpty interface{}, opts ...interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) Commit(ctx, storageID, ns, baseMetaRangeID, changes, allowEmpty interface{}, opts ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, ns, baseMetaRangeID, changes, allowEmpty}, opts...) + varargs := append([]interface{}{ctx, storageID, ns, baseMetaRangeID, changes, allowEmpty}, opts...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Commit", reflect.TypeOf((*MockCommittedManager)(nil).Commit), varargs...) } // Compare mocks base method. -func (m *MockCommittedManager) Compare(ctx context.Context, ns graveler.StorageNamespace, destination, source, base graveler.MetaRangeID) (graveler.DiffIterator, error) { +func (m *MockCommittedManager) Compare(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, destination, source, base graveler.MetaRangeID) (graveler.DiffIterator, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Compare", ctx, ns, destination, source, base) + ret := m.ctrl.Call(m, "Compare", ctx, storageID, ns, destination, source, base) ret0, _ := ret[0].(graveler.DiffIterator) ret1, _ := ret[1].(error) return ret0, ret1 } // Compare indicates an expected call of Compare. -func (mr *MockCommittedManagerMockRecorder) Compare(ctx, ns, destination, source, base interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) Compare(ctx, storageID, ns, destination, source, base interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Compare", reflect.TypeOf((*MockCommittedManager)(nil).Compare), ctx, ns, destination, source, base) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Compare", reflect.TypeOf((*MockCommittedManager)(nil).Compare), ctx, storageID, ns, destination, source, base) } // Diff mocks base method. -func (m *MockCommittedManager) Diff(ctx context.Context, ns graveler.StorageNamespace, left, right graveler.MetaRangeID) (graveler.DiffIterator, error) { +func (m *MockCommittedManager) Diff(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, left, right graveler.MetaRangeID) (graveler.DiffIterator, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Diff", ctx, ns, left, right) + ret := m.ctrl.Call(m, "Diff", ctx, storageID, ns, left, right) ret0, _ := ret[0].(graveler.DiffIterator) ret1, _ := ret[1].(error) return ret0, ret1 } // Diff indicates an expected call of Diff. -func (mr *MockCommittedManagerMockRecorder) Diff(ctx, ns, left, right interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) Diff(ctx, storageID, ns, left, right interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Diff", reflect.TypeOf((*MockCommittedManager)(nil).Diff), ctx, ns, left, right) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Diff", reflect.TypeOf((*MockCommittedManager)(nil).Diff), ctx, storageID, ns, left, right) } // Exists mocks base method. -func (m *MockCommittedManager) Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { +func (m *MockCommittedManager) Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Exists", ctx, ns, id) + ret := m.ctrl.Call(m, "Exists", ctx, storageID, ns, id) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // Exists indicates an expected call of Exists. -func (mr *MockCommittedManagerMockRecorder) Exists(ctx, ns, id interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) Exists(ctx, storageID, ns, id interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockCommittedManager)(nil).Exists), ctx, ns, id) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockCommittedManager)(nil).Exists), ctx, storageID, ns, id) } // Get mocks base method. -func (m *MockCommittedManager) Get(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID, key graveler.Key) (*graveler.Value, error) { +func (m *MockCommittedManager) Get(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID, key graveler.Key) (*graveler.Value, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Get", ctx, ns, rangeID, key) + ret := m.ctrl.Call(m, "Get", ctx, storageID, ns, rangeID, key) ret0, _ := ret[0].(*graveler.Value) ret1, _ := ret[1].(error) return ret0, ret1 } // Get indicates an expected call of Get. -func (mr *MockCommittedManagerMockRecorder) Get(ctx, ns, rangeID, key interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) Get(ctx, storageID, ns, rangeID, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockCommittedManager)(nil).Get), ctx, ns, rangeID, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockCommittedManager)(nil).Get), ctx, storageID, ns, rangeID, key) } // GetMetaRange mocks base method. @@ -2713,24 +2713,24 @@ func (mr *MockCommittedManagerMockRecorder) GetRange(ctx, ns, rangeID interface{ } // GetRangeIDByKey mocks base method. -func (m *MockCommittedManager) GetRangeIDByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (graveler.RangeID, error) { +func (m *MockCommittedManager) GetRangeIDByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (graveler.RangeID, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetRangeIDByKey", ctx, ns, id, key) + ret := m.ctrl.Call(m, "GetRangeIDByKey", ctx, storageID, ns, id, key) ret0, _ := ret[0].(graveler.RangeID) ret1, _ := ret[1].(error) return ret0, ret1 } // GetRangeIDByKey indicates an expected call of GetRangeIDByKey. -func (mr *MockCommittedManagerMockRecorder) GetRangeIDByKey(ctx, ns, id, key interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) GetRangeIDByKey(ctx, storageID, ns, id, key interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRangeIDByKey", reflect.TypeOf((*MockCommittedManager)(nil).GetRangeIDByKey), ctx, ns, id, key) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRangeIDByKey", reflect.TypeOf((*MockCommittedManager)(nil).GetRangeIDByKey), ctx, storageID, ns, id, key) } // Import mocks base method. -func (m *MockCommittedManager) Import(ctx context.Context, ns graveler.StorageNamespace, destination, source graveler.MetaRangeID, prefixes []graveler.Prefix, opts ...graveler.SetOptionsFunc) (graveler.MetaRangeID, error) { +func (m *MockCommittedManager) Import(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, destination, source graveler.MetaRangeID, prefixes []graveler.Prefix, opts ...graveler.SetOptionsFunc) (graveler.MetaRangeID, error) { m.ctrl.T.Helper() - varargs := []interface{}{ctx, ns, destination, source, prefixes} + varargs := []interface{}{ctx, storageID, ns, destination, source, prefixes} for _, a := range opts { varargs = append(varargs, a) } @@ -2741,25 +2741,25 @@ func (m *MockCommittedManager) Import(ctx context.Context, ns graveler.StorageNa } // Import indicates an expected call of Import. -func (mr *MockCommittedManagerMockRecorder) Import(ctx, ns, destination, source, prefixes interface{}, opts ...interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) Import(ctx, storageID, ns, destination, source, prefixes interface{}, opts ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, ns, destination, source, prefixes}, opts...) + varargs := append([]interface{}{ctx, storageID, ns, destination, source, prefixes}, opts...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Import", reflect.TypeOf((*MockCommittedManager)(nil).Import), varargs...) } // List mocks base method. -func (m *MockCommittedManager) List(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) { +func (m *MockCommittedManager) List(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "List", ctx, ns, rangeID) + ret := m.ctrl.Call(m, "List", ctx, storageID, ns, rangeID) ret0, _ := ret[0].(graveler.ValueIterator) ret1, _ := ret[1].(error) return ret0, ret1 } // List indicates an expected call of List. -func (mr *MockCommittedManagerMockRecorder) List(ctx, ns, rangeID interface{}) *gomock.Call { +func (mr *MockCommittedManagerMockRecorder) List(ctx, storageID, ns, rangeID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockCommittedManager)(nil).List), ctx, ns, rangeID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockCommittedManager)(nil).List), ctx, storageID, ns, rangeID) } // Merge mocks base method. diff --git a/pkg/graveler/sstable/range_manager.go b/pkg/graveler/sstable/range_manager.go index fda6484d6a8..fdb050d68f4 100644 --- a/pkg/graveler/sstable/range_manager.go +++ b/pkg/graveler/sstable/range_manager.go @@ -14,7 +14,7 @@ import ( "github.com/treeverse/lakefs/pkg/pyramid" ) -type NewSSTableReaderFn func(ctx context.Context, ns committed.Namespace, id committed.ID) (*sstable.Reader, error) +type NewSSTableReaderFn func(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID) (*sstable.Reader, error) type Unrefer interface { Unref() @@ -32,14 +32,14 @@ func NewPebbleSSTableRangeManager(cache *pebble.Cache, fs pyramid.FS, hash crypt cache.Ref() } opts := sstable.ReaderOptions{Cache: cache} - newReader := func(ctx context.Context, ns committed.Namespace, id committed.ID) (*sstable.Reader, error) { - return newReader(ctx, fs, ns, id, opts) + newReader := func(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID) (*sstable.Reader, error) { + return newReader(ctx, fs, storageID, ns, id, opts) } return NewPebbleSSTableRangeManagerWithNewReader(newReader, opts.Cache, fs, hash) } -func newReader(ctx context.Context, fs pyramid.FS, ns committed.Namespace, id committed.ID, opts sstable.ReaderOptions) (*sstable.Reader, error) { - file, err := fs.Open(ctx, string(ns), string(id)) +func newReader(ctx context.Context, fs pyramid.FS, storageID committed.StorageID, ns committed.Namespace, id committed.ID, opts sstable.ReaderOptions) (*sstable.Reader, error) { + file, err := fs.Open(ctx, string(storageID), string(ns), string(id)) if err != nil { return nil, fmt.Errorf("open sstable file %s %s: %w", ns, id, err) } @@ -66,12 +66,12 @@ var ( _ committed.RangeManager = &RangeManager{} ) -func (m *RangeManager) Exists(ctx context.Context, ns committed.Namespace, id committed.ID) (bool, error) { - return m.fs.Exists(ctx, string(ns), string(id)) +func (m *RangeManager) Exists(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID) (bool, error) { + return m.fs.Exists(ctx, string(storageID), string(ns), string(id)) } -func (m *RangeManager) GetValueGE(ctx context.Context, ns committed.Namespace, id committed.ID, lookup committed.Key) (*committed.Record, error) { - reader, err := m.newReader(ctx, ns, id) +func (m *RangeManager) GetValueGE(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID, lookup committed.Key) (*committed.Record, error) { + reader, err := m.newReader(ctx, storageID, ns, id) if err != nil { return nil, err } @@ -105,8 +105,8 @@ func (m *RangeManager) GetValueGE(ctx context.Context, ns committed.Namespace, i // GetValue returns the Record matching the key in the SSTable referenced by the id. // If key is not found, (nil, ErrKeyNotFound) is returned. -func (m *RangeManager) GetValue(ctx context.Context, ns committed.Namespace, id committed.ID, lookup committed.Key) (*committed.Record, error) { - reader, err := m.newReader(ctx, ns, id) +func (m *RangeManager) GetValue(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, id committed.ID, lookup committed.Key) (*committed.Record, error) { + reader, err := m.newReader(ctx, storageID, ns, id) if err != nil { return nil, err } @@ -146,8 +146,8 @@ func (m *RangeManager) GetValue(ctx context.Context, ns committed.Namespace, id } // NewRangeIterator takes a given SSTable and returns an EntryIterator seeked to >= "from" path -func (m *RangeManager) NewRangeIterator(ctx context.Context, ns committed.Namespace, tid committed.ID) (committed.ValueIterator, error) { - reader, err := m.newReader(ctx, ns, tid) +func (m *RangeManager) NewRangeIterator(ctx context.Context, storageID committed.StorageID, ns committed.Namespace, tid committed.ID) (committed.ValueIterator, error) { + reader, err := m.newReader(ctx, storageID, ns, tid) if err != nil { return nil, err } diff --git a/pkg/graveler/sstable/writer.go b/pkg/graveler/sstable/writer.go index dfdf61d6a1a..f4faeb9c67f 100644 --- a/pkg/graveler/sstable/writer.go +++ b/pkg/graveler/sstable/writer.go @@ -35,8 +35,8 @@ type DiskWriter struct { closed bool } -func NewDiskWriter(ctx context.Context, tierFS pyramid.FS, ns committed.Namespace, hash hash.Hash, metadata graveler.Metadata) (*DiskWriter, error) { - fh, err := tierFS.Create(ctx, string(ns)) +func NewDiskWriter(ctx context.Context, tierFS pyramid.FS, storageID committed.StorageID, ns committed.Namespace, hash hash.Hash, metadata graveler.Metadata) (*DiskWriter, error) { + fh, err := tierFS.Create(ctx, string(storageID), string(ns)) if err != nil { return nil, fmt.Errorf("opening file: %w", err) } diff --git a/pkg/pyramid/mock/pyramid.go b/pkg/pyramid/mock/pyramid.go index 398f74f1cc6..9b64c4c6aa5 100644 --- a/pkg/pyramid/mock/pyramid.go +++ b/pkg/pyramid/mock/pyramid.go @@ -37,33 +37,33 @@ func (m *MockFS) EXPECT() *MockFSMockRecorder { } // Create mocks base method. -func (m *MockFS) Create(ctx context.Context, namespace string) (pyramid.StoredFile, error) { +func (m *MockFS) Create(ctx context.Context, storageID, namespace string) (pyramid.StoredFile, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Create", ctx, namespace) + ret := m.ctrl.Call(m, "Create", ctx, storageID, namespace) ret0, _ := ret[0].(pyramid.StoredFile) ret1, _ := ret[1].(error) return ret0, ret1 } // Create indicates an expected call of Create. -func (mr *MockFSMockRecorder) Create(ctx, namespace interface{}) *gomock.Call { +func (mr *MockFSMockRecorder) Create(ctx, storageID, namespace interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockFS)(nil).Create), ctx, namespace) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Create", reflect.TypeOf((*MockFS)(nil).Create), ctx, storageID, namespace) } // Exists mocks base method. -func (m *MockFS) Exists(ctx context.Context, namespace, filename string) (bool, error) { +func (m *MockFS) Exists(ctx context.Context, storageID, namespace, filename string) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Exists", ctx, namespace, filename) + ret := m.ctrl.Call(m, "Exists", ctx, storageID, namespace, filename) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // Exists indicates an expected call of Exists. -func (mr *MockFSMockRecorder) Exists(ctx, namespace, filename interface{}) *gomock.Call { +func (mr *MockFSMockRecorder) Exists(ctx, storageID, namespace, filename interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockFS)(nil).Exists), ctx, namespace, filename) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockFS)(nil).Exists), ctx, storageID, namespace, filename) } // GetRemoteURI mocks base method. @@ -82,18 +82,18 @@ func (mr *MockFSMockRecorder) GetRemoteURI(ctx, namespace, filename interface{}) } // Open mocks base method. -func (m *MockFS) Open(ctx context.Context, namespace, filename string) (pyramid.File, error) { +func (m *MockFS) Open(ctx context.Context, storageID, namespace, filename string) (pyramid.File, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Open", ctx, namespace, filename) + ret := m.ctrl.Call(m, "Open", ctx, storageID, namespace, filename) ret0, _ := ret[0].(pyramid.File) ret1, _ := ret[1].(error) return ret0, ret1 } // Open indicates an expected call of Open. -func (mr *MockFSMockRecorder) Open(ctx, namespace, filename interface{}) *gomock.Call { +func (mr *MockFSMockRecorder) Open(ctx, storageID, namespace, filename interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockFS)(nil).Open), ctx, namespace, filename) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockFS)(nil).Open), ctx, storageID, namespace, filename) } // MockFile is a mock of File interface. diff --git a/pkg/pyramid/pyramid.go b/pkg/pyramid/pyramid.go index 260a4cab5b5..a003f0a1f28 100644 --- a/pkg/pyramid/pyramid.go +++ b/pkg/pyramid/pyramid.go @@ -14,14 +14,14 @@ import ( type FS interface { // Create creates a new file in the FS. // It will only be persistent after the returned file is stored. - Create(ctx context.Context, namespace string) (StoredFile, error) + Create(ctx context.Context, storageID string, namespace string) (StoredFile, error) // Open finds the referenced file and returns its read-only File. // If file isn't in the local disk, it is fetched from the block storage. - Open(ctx context.Context, namespace, filename string) (File, error) + Open(ctx context.Context, storageID string, namespace, filename string) (File, error) // Exists returns true if filename currently exists on block storage. - Exists(ctx context.Context, namespace, filename string) (bool, error) + Exists(ctx context.Context, storageID string, namespace, filename string) (bool, error) // GetRemoteURI returns the URI for filename on block storage. That URI might not // resolve if filename does not exist. diff --git a/pkg/pyramid/tier_fs.go b/pkg/pyramid/tier_fs.go index b2c2bb7e96c..a67e2227876 100644 --- a/pkg/pyramid/tier_fs.go +++ b/pkg/pyramid/tier_fs.go @@ -140,9 +140,10 @@ func (tfs *TierFS) removeFromLocalInternal(rPath params.RelativePath) { }() } -func (tfs *TierFS) store(ctx context.Context, namespace, originalPath, nsPath, filename string) error { +func (tfs *TierFS) store(ctx context.Context, storageID string, namespace, originalPath, nsPath, filename string) error { if tfs.logger.IsTracing() { tfs.log(ctx).WithFields(logging.Fields{ + "storageID": storageID, "namespace": namespace, "original_path": originalPath, "ns_path": nsPath, @@ -160,7 +161,7 @@ func (tfs *TierFS) store(ctx context.Context, namespace, originalPath, nsPath, f return fmt.Errorf("file stat %s: %w", originalPath, err) } - if _, err = tfs.adapter.Put(ctx, tfs.objPointer(namespace, filename), stat.Size(), f, block.PutOpts{}); err != nil { + if _, err = tfs.adapter.Put(ctx, tfs.objPointer(storageID, namespace, filename), stat.Size(), f, block.PutOpts{}); err != nil { return fmt.Errorf("adapter put %s %s: %w", namespace, filename, err) } @@ -168,7 +169,7 @@ func (tfs *TierFS) store(ctx context.Context, namespace, originalPath, nsPath, f return fmt.Errorf("closing file %s: %w", filename, err) } - fileRef := tfs.newLocalFileRef(namespace, nsPath, filename) + fileRef := tfs.newLocalFileRef(storageID, namespace, nsPath, filename) if tfs.eviction.Store(fileRef.fsRelativePath, stat.Size()) { // file was stored by the policy return tfs.syncDir.renameFile(originalPath, fileRef.fullPath) @@ -184,7 +185,7 @@ func (tfs *TierFS) GetRemoteURI(_ context.Context, _, filename string) (string, // Create creates a new file in TierFS. File isn't stored in TierFS until a successful close // operation. Open(namespace, filename) calls will return an error before the close was // called. Create only performs local operations so it ignores the context. -func (tfs *TierFS) Create(_ context.Context, namespace string) (StoredFile, error) { +func (tfs *TierFS) Create(_ context.Context, storageID string, namespace string) (StoredFile, error) { nsPath, err := parseNamespacePath(namespace) if err != nil { return nil, err @@ -203,7 +204,7 @@ func (tfs *TierFS) Create(_ context.Context, namespace string) (StoredFile, erro File: fh, logger: tfs.logger, store: func(ctx context.Context, filename string) error { - return tfs.store(ctx, namespace, tempPath, nsPath, filename) + return tfs.store(ctx, storageID, namespace, tempPath, nsPath, filename) }, abort: func(context.Context) error { return os.Remove(tempPath) @@ -213,7 +214,7 @@ func (tfs *TierFS) Create(_ context.Context, namespace string) (StoredFile, erro // Open returns a file descriptor to the local file. // If the file is missing from the local disk, it will try to fetch it from the block storage. -func (tfs *TierFS) Open(ctx context.Context, namespace, filename string) (File, error) { +func (tfs *TierFS) Open(ctx context.Context, storageID, namespace, filename string) (File, error) { nsPath, err := parseNamespacePath(namespace) if err != nil { return nil, err @@ -223,7 +224,7 @@ func (tfs *TierFS) Open(ctx context.Context, namespace, filename string) (File, } // check if file is there - without taking the lock - fileRef := tfs.newLocalFileRef(namespace, nsPath, filename) + fileRef := tfs.newLocalFileRef(storageID, namespace, nsPath, filename) fh, err := os.Open(fileRef.fullPath) if err == nil { if tfs.logger.IsTracing() { @@ -249,9 +250,9 @@ func (tfs *TierFS) Open(ctx context.Context, namespace, filename string) (File, return tfs.openFile(ctx, fileRef, fh) } -func (tfs *TierFS) Exists(ctx context.Context, namespace, filename string) (bool, error) { +func (tfs *TierFS) Exists(ctx context.Context, storageID, namespace, filename string) (bool, error) { cacheAccess.WithLabelValues(tfs.fsName, "Exists").Inc() - return tfs.adapter.Exists(ctx, tfs.objPointer(namespace, filename)) + return tfs.adapter.Exists(ctx, tfs.objPointer(storageID, namespace, filename)) } // openFile converts an os.File to pyramid.ROFile and updates the eviction control. @@ -283,6 +284,7 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os. log := tfs.log(ctx) if tfs.logger.IsTracing() { log.WithFields(logging.Fields{ + "storageID": fileRef.storageID, "namespace": fileRef.namespace, "file": fileRef.filename, "fullpath": fileRef.fullPath, @@ -297,6 +299,7 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os. if err == nil { if log.IsTracing() { log.WithFields(logging.Fields{ + "storageID": fileRef.storageID, "namespace": fileRef.namespace, "file": fileRef.filename, "fullpath": fileRef.fullPath, @@ -312,12 +315,13 @@ func (tfs *TierFS) openWithLock(ctx context.Context, fileRef localFileRef) (*os. if log.IsTracing() { log.WithFields(logging.Fields{ + "storageID": fileRef.storageID, "namespace": fileRef.namespace, "file": fileRef.filename, "fullpath": fileRef.fullPath, }).Trace("get file from block storage") } - reader, err := tfs.adapter.Get(ctx, tfs.objPointer(fileRef.namespace, fileRef.filename)) + reader, err := tfs.adapter.Get(ctx, tfs.objPointer(fileRef.storageID, fileRef.namespace, fileRef.filename)) if err != nil { return nil, fmt.Errorf("read from block storage: %w", err) } @@ -379,6 +383,7 @@ func validateFilename(filename string) error { // localFileRef consists of all possible local file references type localFileRef struct { + storageID string namespace string filename string fullPath string @@ -408,9 +413,10 @@ func (tfs *TierFS) storeLocalFile(rPath params.RelativePath, size int64) { } } -func (tfs *TierFS) newLocalFileRef(namespace, nsPath, filename string) localFileRef { +func (tfs *TierFS) newLocalFileRef(storageID, namespace, nsPath, filename string) localFileRef { rPath := path.Join(nsPath, filename) return localFileRef{ + storageID: storageID, namespace: namespace, filename: filename, fsRelativePath: params.RelativePath(rPath), @@ -418,9 +424,9 @@ func (tfs *TierFS) newLocalFileRef(namespace, nsPath, filename string) localFile } } -func (tfs *TierFS) objPointer(namespace, filename string) block.ObjectPointer { - // TODO (gilo): ObjectPointer init - add StorageID here +func (tfs *TierFS) objPointer(storageID, namespace, filename string) block.ObjectPointer { return block.ObjectPointer{ + StorageID: storageID, StorageNamespace: namespace, IdentifierType: block.IdentifierTypeRelative, Identifier: tfs.blockStoragePath(filepath.ToSlash(filename)),