Skip to content

Commit

Permalink
Add storageID to tier_fs
Browse files Browse the repository at this point in the history
  • Loading branch information
nadavsteindler committed Jan 26, 2025
1 parent 8f255b2 commit 46c153c
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 175 deletions.
3 changes: 2 additions & 1 deletion pkg/graveler/committed/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type iterator struct {
rng *Range // Decoded value at which rangeIt point
it graveler.ValueIterator // nil at start of range
err error
storageID StorageID
namespace Namespace
beforeRange bool
}
Expand All @@ -32,7 +33,7 @@ func NewIterator(ctx context.Context, manager RangeManager, namespace Namespace,
// loadIt loads rvi.it to start iterating over a new range. It returns false and sets rvi.err
// if it fails to open the new range.
func (rvi *iterator) loadIt() bool {
it, err := rvi.manager.NewRangeIterator(rvi.ctx, rvi.namespace, rvi.rng.ID)
it, err := rvi.manager.NewRangeIterator(rvi.ctx, rvi.storageID, rvi.namespace, rvi.rng.ID)
if err != nil {
rvi.err = fmt.Errorf("open range %s: %w", rvi.rng.ID, err)
return false
Expand Down
43 changes: 22 additions & 21 deletions pkg/graveler/committed/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ func NewCommittedManager(m MetaRangeManager, r RangeManager, p Params) graveler.
}
}

func (c *committedManager) Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) {
return c.metaRangeManager.Exists(ctx, ns, id)
func (c *committedManager) Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) {
return c.metaRangeManager.Exists(ctx, storageID, ns, id)
}

func (c *committedManager) Get(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID, key graveler.Key) (*graveler.Value, error) {
it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, rangeID)
func (c *committedManager) Get(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID, key graveler.Key) (*graveler.Value, error) {
it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, rangeID)
if err != nil {
return nil, err
}
Expand All @@ -53,8 +53,8 @@ func (c *committedManager) Get(ctx context.Context, ns graveler.StorageNamespace
return rec.Value, nil
}

func (c *committedManager) List(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) {
it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, rangeID)
func (c *committedManager) List(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, rangeID graveler.MetaRangeID) (graveler.ValueIterator, error) {
it, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, rangeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -170,20 +170,20 @@ func (c *committedManager) WriteMetaRangeByIterator(ctx context.Context, ns grav
return id, nil
}

func (c *committedManager) Diff(ctx context.Context, ns graveler.StorageNamespace, left, right graveler.MetaRangeID) (graveler.DiffIterator, error) {
leftIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, left)
func (c *committedManager) Diff(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, left, right graveler.MetaRangeID) (graveler.DiffIterator, error) {
leftIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, left)
if err != nil {
return nil, err
}
rightIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, right)
rightIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, right)
if err != nil {
return nil, err
}
return NewDiffValueIterator(ctx, leftIt, rightIt), nil
}

func (c *committedManager) Import(ctx context.Context, ns graveler.StorageNamespace, destination, source graveler.MetaRangeID, prefixes []graveler.Prefix, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, error) {
destIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, destination)
func (c *committedManager) Import(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, destination, source graveler.MetaRangeID, prefixes []graveler.Prefix, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, error) {
destIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, destination)
if err != nil {
return "", fmt.Errorf("get destination iterator: %w", err)
}
Expand Down Expand Up @@ -228,6 +228,7 @@ type mergeContext struct {
srcIt Iterator
baseIt Iterator
strategy graveler.MergeStrategy
storageID graveler.StorageID
ns graveler.StorageNamespace
destinationID graveler.MetaRangeID
sourceID graveler.MetaRangeID
Expand All @@ -238,7 +239,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel
var err error = nil
baseIt := mctx.baseIt
if baseIt == nil {
baseIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.ns, mctx.baseID)
baseIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.storageID, mctx.ns, mctx.baseID)
if err != nil {
return "", fmt.Errorf("get base iterator: %w", err)
}
Expand All @@ -247,7 +248,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel

destIt := mctx.destIt
if destIt == nil {
destIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.ns, mctx.destinationID)
destIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.storageID, mctx.ns, mctx.destinationID)
if err != nil {
return "", fmt.Errorf("get destination iterator: %w", err)
}
Expand All @@ -256,7 +257,7 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel

srcIt := mctx.srcIt
if srcIt == nil {
srcIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.ns, mctx.sourceID)
srcIt, err = c.metaRangeManager.NewMetaRangeIterator(ctx, mctx.storageID, mctx.ns, mctx.sourceID)
if err != nil {
return "", fmt.Errorf("get source iterator: %w", err)
}
Expand Down Expand Up @@ -285,15 +286,15 @@ func (c *committedManager) merge(ctx context.Context, mctx mergeContext) (gravel
return *newID, err
}

func (c *committedManager) Commit(ctx context.Context, ns graveler.StorageNamespace, baseMetaRangeID graveler.MetaRangeID, changes graveler.ValueIterator, allowEmpty bool, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, graveler.DiffSummary, error) {
func (c *committedManager) Commit(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, baseMetaRangeID graveler.MetaRangeID, changes graveler.ValueIterator, allowEmpty bool, _ ...graveler.SetOptionsFunc) (graveler.MetaRangeID, graveler.DiffSummary, error) {
mwWriter := c.metaRangeManager.NewWriter(ctx, ns, nil)
defer func() {
err := mwWriter.Abort()
if err != nil {
logging.FromContext(ctx).WithError(err).Error("Abort failed after Commit")
}
}()
metaRangeIterator, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, baseMetaRangeID)
metaRangeIterator, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, baseMetaRangeID)
summary := graveler.DiffSummary{
Count: map[graveler.DiffType]int{},
}
Expand All @@ -315,12 +316,12 @@ func (c *committedManager) Commit(ctx context.Context, ns graveler.StorageNamesp
return *newID, summary, err
}

func (c *committedManager) Compare(ctx context.Context, ns graveler.StorageNamespace, destination, source, base graveler.MetaRangeID) (graveler.DiffIterator, error) {
diffIt, err := c.Diff(ctx, ns, destination, source)
func (c *committedManager) Compare(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, destination, source, base graveler.MetaRangeID) (graveler.DiffIterator, error) {
diffIt, err := c.Diff(ctx, storageID, ns, destination, source)
if err != nil {
return nil, fmt.Errorf("diff: %w", err)
}
baseIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, ns, base)
baseIt, err := c.metaRangeManager.NewMetaRangeIterator(ctx, storageID, ns, base)
if err != nil {
diffIt.Close()
return nil, fmt.Errorf("get base iterator: %w", err)
Expand All @@ -338,11 +339,11 @@ func (c *committedManager) GetRange(ctx context.Context, ns graveler.StorageName
return graveler.RangeAddress(uri), err
}

func (c *committedManager) GetRangeIDByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (graveler.RangeID, error) {
func (c *committedManager) GetRangeIDByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (graveler.RangeID, error) {
if id == "" {
return "", graveler.ErrNotFound
}
r, err := c.metaRangeManager.GetRangeByKey(ctx, ns, id, key)
r, err := c.metaRangeManager.GetRangeByKey(ctx, storageID, ns, id, key)
if err != nil {
return "", fmt.Errorf("get range for key: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/graveler/committed/meta_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ func (r RangeDiff) Copy() *RangeDiff {

// MetaRangeManager is an abstraction for a repository of MetaRanges that exposes operations on them
type MetaRangeManager interface {
Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error)
Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error)

// GetValue returns the matching in-range graveler.ValueRecord for key in the
// MetaRange with id.
GetValue(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error)
GetValue(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error)

// NewWriter returns a writer that is used for creating new MetaRanges
NewWriter(ctx context.Context, ns graveler.StorageNamespace, metadata graveler.Metadata) MetaRangeWriter

// NewMetaRangeIterator returns an Iterator over the MetaRange with id.
NewMetaRangeIterator(ctx context.Context, ns graveler.StorageNamespace, metaRangeID graveler.MetaRangeID) (Iterator, error)
NewMetaRangeIterator(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, metaRangeID graveler.MetaRangeID) (Iterator, error)

// GetMetaRangeURI returns a URI with an object representing metarange ID. It may
// return a URI that does not resolve (rather than an error) if ID does not exist.
Expand All @@ -94,7 +94,7 @@ type MetaRangeManager interface {
GetRangeURI(ctx context.Context, ns graveler.StorageNamespace, rangeID graveler.RangeID) (string, error)

// GetRangeByKey returns the Range that contains key in the MetaRange with id.
GetRangeByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error)
GetRangeByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error)
}

// MetaRangeWriter is an abstraction for creating new MetaRanges
Expand Down
18 changes: 9 additions & 9 deletions pkg/graveler/committed/meta_range_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func NewMetaRangeManager(params Params, metaManager, rangeManager RangeManager)
}, nil
}

func (m *metaRangeManager) Exists(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) {
return m.metaManager.Exists(ctx, Namespace(ns), ID(id))
func (m *metaRangeManager) Exists(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (bool, error) {
return m.metaManager.Exists(ctx, StorageID(storageID), Namespace(ns), ID(id))
}

// GetValue finds the matching graveler.ValueRecord in the MetaRange with the rangeID
func (m *metaRangeManager) GetValue(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) {
func (m *metaRangeManager) GetValue(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*graveler.ValueRecord, error) {
// Fetch range containing key.
rng, err := m.GetRangeByKey(ctx, ns, id, key)
rng, err := m.GetRangeByKey(ctx, storageID, ns, id, key)
if err != nil {
return nil, err
}

r, err := m.rangeManager.GetValue(ctx, Namespace(ns), rng.ID, Key(key))
r, err := m.rangeManager.GetValue(ctx, StorageID(storageID), Namespace(ns), rng.ID, Key(key))
if err != nil {
return nil, fmt.Errorf("get value in range %s of %s for %s: %w", rng.ID, id, key, err)
}
Expand All @@ -69,8 +69,8 @@ func (m *metaRangeManager) GetValue(ctx context.Context, ns graveler.StorageName
}, nil
}

func (m *metaRangeManager) GetRangeByKey(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error) {
v, err := m.metaManager.GetValueGE(ctx, Namespace(ns), ID(id), Key(key))
func (m *metaRangeManager) GetRangeByKey(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID, key graveler.Key) (*Range, error) {
v, err := m.metaManager.GetValueGE(ctx, StorageID(storageID), Namespace(ns), ID(id), Key(key))
if errors.Is(err, ErrNotFound) {
return nil, err
}
Expand Down Expand Up @@ -100,11 +100,11 @@ func (m *metaRangeManager) NewWriter(ctx context.Context, ns graveler.StorageNam
return NewGeneralMetaRangeWriter(ctx, m.rangeManager, m.metaManager, &m.params, Namespace(ns), metadata)
}

func (m *metaRangeManager) NewMetaRangeIterator(ctx context.Context, ns graveler.StorageNamespace, id graveler.MetaRangeID) (Iterator, error) {
func (m *metaRangeManager) NewMetaRangeIterator(ctx context.Context, storageID graveler.StorageID, ns graveler.StorageNamespace, id graveler.MetaRangeID) (Iterator, error) {
if id == "" {
return NewEmptyIterator(), nil
}
rangesIt, err := m.metaManager.NewRangeIterator(ctx, Namespace(ns), ID(id))
rangesIt, err := m.metaManager.NewRangeIterator(ctx, StorageID(storageID), Namespace(ns), ID(id))
if err != nil {
return nil, fmt.Errorf("manage metarange %s: %w", id, err)
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/graveler/committed/mock/meta_range.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 46c153c

Please sign in to comment.