diff --git a/pkg/catalog/catalog_test.go b/pkg/catalog/catalog_test.go index af91b660286..fa095176ddc 100644 --- a/pkg/catalog/catalog_test.go +++ b/pkg/catalog/catalog_test.go @@ -533,6 +533,7 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) { numRecords int expectedCalls int expectedForUncommitted int + compactBranch bool }{ { name: "no branches", @@ -560,73 +561,91 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - const repositoryID = "repo1" - g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls) - blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem) - c := &catalog.Catalog{ - Store: g.Sut, - BlockAdapter: blockAdapter, - UGCPrepareMaxFileSize: 500 * 1024, - KVStore: g.KVStore, - } - - var ( - mark *catalog.GCUncommittedMark - runID string - allRecords []string - ) - for { - result, err := c.PrepareGCUncommitted(ctx, repositoryID, mark) - require.NoError(t, err) - - // keep or check run id match previous calls - if runID == "" { - runID = result.RunID - } else { - require.Equal(t, runID, result.RunID) + for _, compactBranch := range []bool{false, true} { + t.Run(tt.name, func(t *testing.T) { + const repositoryID = "repo1" + g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls, compactBranch) + blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem) + c := &catalog.Catalog{ + Store: g.Sut, + BlockAdapter: blockAdapter, + UGCPrepareMaxFileSize: 500 * 1024, + KVStore: g.KVStore, } - if tt.numRecords == 0 { - require.Equal(t, "", result.Location) - require.Equal(t, "", result.Filename) - } else { - // read parquet information if data was stored to location - objLocation, err := url.JoinPath(result.Location, result.Filename) + var ( + mark *catalog.GCUncommittedMark + runID string + allRecords []string + ) + for { + result, err := c.PrepareGCUncommitted(ctx, repositoryID, mark) require.NoError(t, err) - addresses := readPhysicalAddressesFromParquetObject(t, repositoryID, ctx, c, objLocation) - allRecords = append(allRecords, addresses...) - } - mark = result.Mark - if mark == nil { - break + // keep or check run id match previous calls + if runID == "" { + runID = result.RunID + } else { + require.Equal(t, runID, result.RunID) + } + + if tt.numRecords == 0 { + require.Equal(t, "", result.Location) + require.Equal(t, "", result.Filename) + } else { + // read parquet information if data was stored to location + objLocation, err := url.JoinPath(result.Location, result.Filename) + require.NoError(t, err) + addresses := readPhysicalAddressesFromParquetObject(t, repositoryID, ctx, c, objLocation) + allRecords = append(allRecords, addresses...) + } + + mark = result.Mark + if mark == nil { + break + } + require.Equal(t, runID, result.Mark.RunID) } - require.Equal(t, runID, result.Mark.RunID) - } - // match expected records found in parquet data - sort.Strings(allRecords) - if diff := deep.Equal(allRecords, expectedRecords); diff != nil { - t.Errorf("Found diff in expected records: %s", diff) - } - }) + // match expected records found in parquet data + sort.Strings(allRecords) + if diff := deep.Equal(allRecords, expectedRecords); diff != nil { + t.Errorf("Found diff in expected records: %s", diff) + } + }) + } } } -func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int) (*gUtils.GravelerTest, []string) { +func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int, compact bool) (*gUtils.GravelerTest, []string) { t.Helper() test := gUtils.InitGravelerTest(t) records := make([][]*graveler.ValueRecord, numBranches) + diffs := make([][]graveler.Diff, numBranches) var branches []*graveler.BranchRecord var expectedRecords []string + if numBranches > 0 { + test.RefManager.EXPECT().GetCommit(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(&graveler.Commit{}, nil) + test.CommittedManager.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator([]*graveler.ValueRecord{}), nil) + } for i := 0; i < numBranches; i++ { branchID := graveler.BranchID(fmt.Sprintf("branch%04d", i)) token := graveler.StagingToken(fmt.Sprintf("%s_st%04d", branchID, i)) - branches = append(branches, &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}}) + branch := &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}} + compactBaseMetaRangeID := graveler.MetaRangeID(fmt.Sprintf("base%04d", i)) + commitID := graveler.CommitID(fmt.Sprintf("commit%04d", i)) + if !compact { + test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token}, nil) + } else { + branch.CompactedBaseMetaRangeID = compactBaseMetaRangeID + branch.Branch.CommitID = commitID + test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token, CommitID: commitID, CompactedBaseMetaRangeID: compactBaseMetaRangeID}, nil) + } + branches = append(branches, branch) records[i] = make([]*graveler.ValueRecord, 0, numRecords) + diffs[i] = make([]graveler.Diff, 0, numRecords) for j := 0; j < numRecords; j++ { var ( addressType catalog.Entry_AddressType @@ -661,6 +680,30 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num }) // we always keep the relative path - as the prepared uncommitted will trim the storage namespace expectedRecords = append(expectedRecords, fmt.Sprintf("%s_record%04d", branchID, j)) + + if compact { + diffs[i] = append(diffs[i], graveler.Diff{ + Type: graveler.DiffTypeAdded, + Key: []byte(e.Address), + Value: &graveler.Value{ + Identity: []byte("dont care"), + Data: v, + }, + }) // record in compaction and in staging so no need to add it again to expected records + e.Address = fmt.Sprintf("%s_%s", e.Address, "compacted") + v, err = proto.Marshal(&e) + require.NoError(t, err) + diffs[i] = append(diffs[i], graveler.Diff{ + Type: graveler.DiffTypeAdded, + Key: []byte(e.Address), + Value: &graveler.Value{ + Identity: []byte("dont care"), + Data: v, + }, + }) + // record in compaction but not in staging + expectedRecords = append(expectedRecords, fmt.Sprintf("%s_record%04d_%s", branchID, j, "compacted")) + } } // Add tombstone @@ -669,6 +712,11 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num Value: nil, }) + diffs[i] = append(diffs[i], graveler.Diff{ + Type: graveler.DiffTypeRemoved, + Key: []byte(fmt.Sprintf("%s_tombstone", branchID)), + }) + // Add external address e := catalog.Entry{ Address: fmt.Sprintf("external/address/object_%s", branchID), @@ -688,6 +736,15 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num Data: v, }, }) + + diffs[i] = append(diffs[i], graveler.Diff{ + Type: graveler.DiffTypeAdded, + Key: []byte(e.Address), + Value: &graveler.Value{ + Identity: []byte("dont care"), + Data: v, + }, + }) } test.GarbageCollectionManager.EXPECT().NewID().Return("TestRunID") @@ -699,21 +756,27 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num DefaultBranchID: "main", }, } - test.RefManager.EXPECT().GetRepository(gomock.Any(), graveler.RepositoryID(repositoryID)).Times(expectedCalls).Return(repository, nil) + test.RefManager.EXPECT().GetRepository(gomock.Any(), graveler.RepositoryID(repositoryID)).MinTimes(1).Return(repository, nil) // expect tracked addresses does not list branches, so remove one and keep at least the first - test.RefManager.EXPECT().ListBranches(gomock.Any(), gomock.Any()).Times(expectedCalls).Return(gUtils.NewFakeBranchIterator(branches), nil) + test.RefManager.EXPECT().ListBranches(gomock.Any(), gomock.Any()).MinTimes(1).Return(gUtils.NewFakeBranchIterator(branches), nil) for i := 0; i < len(branches); i++ { sort.Slice(records[i], func(ii, jj int) bool { return bytes.Compare(records[i][ii].Key, records[i][jj].Key) < 0 }) - test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).AnyTimes().Return(cUtils.NewFakeValueIterator(records[i])) + test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator(records[i])) + if compact { + sort.Slice(diffs[i], func(ii, jj int) bool { + return bytes.Compare(diffs[i][ii].Key, diffs[i][jj].Key) < 0 + }) + test.CommittedManager.EXPECT().Diff(gomock.Any(), repository.StorageNamespace, gomock.Any(), branches[i].CompactedBaseMetaRangeID).MinTimes(1).Return(gUtils.NewDiffIter(diffs[i]), nil) + } } if numRecords > 0 { test.GarbageCollectionManager.EXPECT(). GetUncommittedLocation(gomock.Any(), gomock.Any()). - Times(expectedCalls). + MinTimes(1). DoAndReturn(func(runID string, sn graveler.StorageNamespace) (string, error) { return fmt.Sprintf("%s/retention/gc/uncommitted/%s/uncommitted/", "_lakefs", runID), nil }) @@ -736,7 +799,6 @@ func readPhysicalAddressesFromParquetObject(t *testing.T, repositoryID string, c require.NoError(t, err) bufferFile := buffer.NewBufferFileFromBytes(data) defer func() { _ = bufferFile.Close() }() - pr, err := reader.NewParquetReader(bufferFile, new(catalog.UncommittedParquetObject), 4) require.NoError(t, err) diff --git a/pkg/catalog/fake_graveler_test.go b/pkg/catalog/fake_graveler_test.go index 2c52a3d6e8c..aa7e2ab2e23 100644 --- a/pkg/catalog/fake_graveler_test.go +++ b/pkg/catalog/fake_graveler_test.go @@ -14,7 +14,6 @@ type FakeGraveler struct { KeyValue map[string]*graveler.Value Err error ListIteratorFactory func() graveler.ValueIterator - ListStagingIteratorFactory func(token graveler.StagingToken) graveler.ValueIterator DiffIteratorFactory func() graveler.DiffIterator RepositoryIteratorFactory func() graveler.RepositoryIterator BranchIteratorFactory func() graveler.BranchIterator @@ -125,13 +124,6 @@ func (g *FakeGraveler) DeleteBatch(ctx context.Context, repository *graveler.Rep return nil } -func (g *FakeGraveler) ListStaging(_ context.Context, b *graveler.Branch, _ int) (graveler.ValueIterator, error) { - if g.Err != nil { - return nil, g.Err - } - return g.ListStagingIteratorFactory(b.StagingToken), nil -} - func (g *FakeGraveler) List(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.Ref, _ int) (graveler.ValueIterator, error) { if g.Err != nil { return nil, g.Err diff --git a/pkg/catalog/gc_write_uncommitted.go b/pkg/catalog/gc_write_uncommitted.go index 67aa3117678..099f41451b1 100644 --- a/pkg/catalog/gc_write_uncommitted.go +++ b/pkg/catalog/gc_write_uncommitted.go @@ -17,74 +17,115 @@ func gcWriteUncommitted(ctx context.Context, store Store, repository *graveler.R } pw.CompressionType = parquet.CompressionCodec_GZIP - // write uncommitted data from branches - it, err := NewUncommittedIterator(ctx, store, repository) + branchIterator, err := store.ListBranches(ctx, repository) if err != nil { return nil, false, err } - defer it.Close() + defer branchIterator.Close() - if mark != nil { - it.SeekGE(mark.BranchID, mark.Path) - } + normalizedStorageNamespace := normalizeStorageNamespace(string(repository.StorageNamespace)) - normalizedStorageNamespace := string(repository.StorageNamespace) - if !strings.HasSuffix(normalizedStorageNamespace, DefaultPathDelimiter) { - normalizedStorageNamespace += DefaultPathDelimiter + if mark != nil { + branchIterator.SeekGE(mark.BranchID) } count := 0 + var nextMark *GCUncommittedMark + hasData := false startTime := time.Now() + for branchIterator.Next() { + nextMark, count, err = processBranch(ctx, store, repository, branchIterator.Value().BranchID, runID, pw, normalizedStorageNamespace, maxFileSize, prepareDuration, w, count, mark, startTime) + if err != nil { + return nil, false, err + } + if nextMark != nil { + break + } + } + if branchIterator.Err() != nil { + return nil, false, branchIterator.Err() + } + + if err := pw.WriteStop(); err != nil { + return nil, false, err + } + + if count > 0 { + hasData = true + } + return nextMark, hasData, err +} + +func normalizeStorageNamespace(namespace string) string { + if !strings.HasSuffix(namespace, DefaultPathDelimiter) { + namespace += DefaultPathDelimiter + } + return namespace +} + +func processBranch(ctx context.Context, store Store, repository *graveler.RepositoryRecord, branchID graveler.BranchID, runID string, parquetWriter *writer.ParquetWriter, normalizedStorageNamespace string, maxFileSize int64, prepareDuration time.Duration, writer *UncommittedWriter, count int, mark *GCUncommittedMark, startTime time.Time) (*GCUncommittedMark, int, error) { + diffIterator, err := store.DiffUncommitted(ctx, repository, branchID) + if err != nil { + return nil, 0, err + } + defer diffIterator.Close() + var nextMark *GCUncommittedMark - for it.Next() { - entry := it.Value() - // Skip if entry is tombstone - if entry.Entry == nil { + + if mark != nil && mark.BranchID == branchID && mark.Path != "" { + diffIterator.SeekGE(graveler.Key(mark.Path)) + } + + for diffIterator.Next() { + diff := diffIterator.Value() + + // Skip tombstones + if diff.Type == graveler.DiffTypeRemoved { continue } - // Skip non-relative that address outside the storage namespace + + entry, err := ValueToEntry(diff.Value) + if err != nil { + return nil, 0, err + } + + // Skip non-relative addresses outside the storage namespace entryAddress := entry.Address - if entry.Entry.AddressType != Entry_RELATIVE { + if entry.AddressType != Entry_RELATIVE { if !strings.HasPrefix(entry.Address, normalizedStorageNamespace) { continue } entryAddress = entryAddress[len(normalizedStorageNamespace):] } - count += 1 + count++ if count%gcPeriodicCheckSize == 0 { - if err := pw.Flush(true); err != nil { - return nil, false, err + if err := parquetWriter.Flush(true); err != nil { + return nil, 0, err } } + // check if we need to stop - based on max file size or prepare duration. // prepare duration is optional, if 0 it will be ignored. // prepare duration is used to stop the process in cases we scan a lot of data, and we want to stop // so the api call will not time out. - if w.Size() > maxFileSize || (prepareDuration > 0 && time.Since(startTime) > prepareDuration) { + if writer.Size() > maxFileSize || (prepareDuration > 0 && time.Since(startTime) > prepareDuration) { nextMark = &GCUncommittedMark{ RunID: runID, - BranchID: entry.branchID, - Path: entry.Path, + BranchID: branchID, + Path: Path(diff.Key.String()), } break } - if err = pw.Write(UncommittedParquetObject{ + + err = parquetWriter.Write(UncommittedParquetObject{ PhysicalAddress: entryAddress, CreationDate: entry.LastModified.AsTime().Unix(), - }); err != nil { - return nil, false, err + }) + if err != nil { + return nil, 0, err } } - if err := it.Err(); err != nil { - return nil, false, err - } - // stop writer before we return - if err := pw.WriteStop(); err != nil { - return nil, false, err - } - // Finished reading all staging area - return marker to switch processing tracked physical addresses - hasData := count > 0 - return nextMark, hasData, nil + return nextMark, count, diffIterator.Err() } diff --git a/pkg/catalog/uncommitted_iterator.go b/pkg/catalog/uncommitted_iterator.go deleted file mode 100644 index 265f3b9e408..00000000000 --- a/pkg/catalog/uncommitted_iterator.go +++ /dev/null @@ -1,134 +0,0 @@ -package catalog - -import ( - "context" - - "github.com/treeverse/lakefs/pkg/graveler" -) - -type UncommittedIterator struct { - store Store - ctx context.Context - err error - branchItr graveler.BranchIterator - entryItr *valueEntryIterator - branch *graveler.BranchRecord - entry *UncommittedRecord -} - -type UncommittedRecord struct { - branchID graveler.BranchID - *EntryRecord -} - -func NewUncommittedIterator(ctx context.Context, store Store, repository *graveler.RepositoryRecord) (*UncommittedIterator, error) { - bItr, err := store.ListBranches(ctx, repository) - if err != nil { - return nil, err - } - return &UncommittedIterator{ - store: store, - ctx: ctx, - branchItr: bItr, - }, nil -} - -// nextStaging reads the next branch staging area into entryItr -func (u *UncommittedIterator) nextStaging() bool { - if u.entryItr != nil { - u.entryItr.Close() - } - u.branch = u.branchItr.Value() - vItr, err := u.store.ListStaging(u.ctx, u.branch.Branch, 0) - if err != nil { - u.err = err - return false - } - u.entryItr = NewValueToEntryIterator(vItr) - return true -} - -// next Sets iterators to provide the next entry. Handles dependency between branch and value iterators. -// Sets value and returns true if next entry available - false otherwise -func (u *UncommittedIterator) next() bool { - u.entry = nil // will stay nil as long as no new value found - for u.entry == nil { - if !u.branchItr.Next() || !u.nextStaging() { - return false - } - if u.entryItr.Next() { - u.entry = &UncommittedRecord{ - branchID: u.branchItr.Value().BranchID, - } - u.entry.EntryRecord = u.entryItr.Value() - return true - } - u.err = u.entryItr.Err() - u.entryItr.Close() - if u.err != nil { - return false - } - } - return false // not reachable -} - -// Next returns the next entry - if entryItr is still valid - gets the next value from it otherwise call u.next -func (u *UncommittedIterator) Next() bool { - if u.Err() != nil { - return false - } - - if u.entryItr == nil { - return u.next() - } - if u.entryItr.Next() { - u.entry = &UncommittedRecord{ - branchID: u.branchItr.Value().BranchID, - } - u.entry.EntryRecord = u.entryItr.Value() - return true - } - u.err = u.entryItr.Err() - u.entryItr.Close() - if u.err != nil { - return false - } - return u.next() -} - -func (u *UncommittedIterator) SeekGE(branchID graveler.BranchID, id Path) { - if u.Err() != nil { - return - } - u.entry = nil - if u.branch == nil || branchID != u.branch.BranchID { - u.branchItr.SeekGE(branchID) - if u.branchItr.Next() && u.nextStaging() { - u.entryItr.SeekGE(id) - } - } -} - -func (u *UncommittedIterator) Value() *UncommittedRecord { - if u.Err() != nil { - return nil - } - return u.entry -} - -func (u *UncommittedIterator) Err() error { - if u.entryItr != nil && u.entryItr.Err() != nil { - return u.entryItr.Err() - } - if u.branchItr.Err() != nil { - return u.branchItr.Err() - } - return u.err -} - -func (u *UncommittedIterator) Close() { - u.branchItr.Close() - if u.entryItr != nil { - u.entryItr.Close() - } -} diff --git a/pkg/catalog/uncommitted_iterator_test.go b/pkg/catalog/uncommitted_iterator_test.go deleted file mode 100644 index 7a1b2e06160..00000000000 --- a/pkg/catalog/uncommitted_iterator_test.go +++ /dev/null @@ -1,268 +0,0 @@ -package catalog_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/treeverse/lakefs/pkg/catalog" - "github.com/treeverse/lakefs/pkg/graveler" - "github.com/treeverse/lakefs/pkg/graveler/testutil" - "google.golang.org/protobuf/types/known/timestamppb" -) - -var ( - now = time.Now() - uncommittedBranchRecords = []*graveler.ValueRecord{ - {Key: graveler.Key("file1"), Value: catalog.MustEntryToValue(&catalog.Entry{Address: "file1", LastModified: timestamppb.New(now), Size: 1, ETag: "01"})}, - {Key: graveler.Key("file2"), Value: catalog.MustEntryToValue(&catalog.Entry{Address: "file2", LastModified: timestamppb.New(now), Size: 2, ETag: "02"})}, - {Key: graveler.Key("file3"), Value: catalog.MustEntryToValue(&catalog.Entry{Address: "file3", LastModified: timestamppb.New(now), Size: 3, ETag: "03"})}, - {Key: graveler.Key("h/file1"), Value: catalog.MustEntryToValue(&catalog.Entry{Address: "h/file1", LastModified: timestamppb.New(now), Size: 1, ETag: "01"})}, - {Key: graveler.Key("h/file2"), Value: catalog.MustEntryToValue(&catalog.Entry{Address: "h/file2", LastModified: timestamppb.New(now), Size: 2, ETag: "02"})}, - } -) - -func TestUncommittedIterator(t *testing.T) { - ctx := context.Background() - gravelerMock := &catalog.FakeGraveler{ - ListIteratorFactory: catalog.NewFakeValueIteratorFactory(uncommittedBranchRecords), - } - const repoID = "uncommitted-iterator" - repository := &graveler.RepositoryRecord{ - RepositoryID: repoID, - Repository: &graveler.Repository{ - StorageNamespace: "mem://" + repoID, - CreationDate: time.Now(), - DefaultBranchID: "main", - }, - } - tests := []struct { - name string - branches []*graveler.BranchRecord - records map[graveler.StagingToken][]*graveler.ValueRecord - }{ - { - name: "no branches", - branches: []*graveler.BranchRecord{}, - }, - { - name: "no values", - branches: []*graveler.BranchRecord{ - { - BranchID: "b1", - Branch: &graveler.Branch{ - StagingToken: "bst1", - }, - }, - { - BranchID: "b2", - Branch: &graveler.Branch{ - StagingToken: "bst2", - }, - }, - }, - }, - { - name: "first branch no staging", - branches: []*graveler.BranchRecord{ - { - BranchID: "b1", - Branch: &graveler.Branch{ - StagingToken: "bst1", - }, - }, - { - BranchID: "b2", - Branch: &graveler.Branch{ - StagingToken: "bst2", - }, - }, - { - BranchID: "b3", - Branch: &graveler.Branch{}, - }, - { - BranchID: "b4", - Branch: &graveler.Branch{ - StagingToken: "bst4", - }, - }, - }, - records: map[graveler.StagingToken][]*graveler.ValueRecord{ - "bst2": uncommittedBranchRecords, - "bst4": uncommittedBranchRecords, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gravelerMock.BranchIteratorFactory = testutil.NewFakeBranchIteratorFactory(tt.branches) - gravelerMock.ListStagingIteratorFactory = catalog.NewFakeStagingIteratorFactory(tt.records) - itr, err := catalog.NewUncommittedIterator(ctx, gravelerMock, repository) - require.NoError(t, err) - count := 0 - for itr.Next() { - require.Equal(t, uncommittedBranchRecords[count%len(uncommittedBranchRecords)].Key.String(), itr.Value().Path.String()) - count += 1 - } - expectedCount := len(tt.records) * len(uncommittedBranchRecords) - require.Equal(t, expectedCount, count) - require.NoError(t, itr.Err()) - require.False(t, itr.Next()) - require.NoError(t, itr.Err()) - itr.Close() - require.NoError(t, itr.Err()) - }) - } -} - -func TestUncommittedIterator_SeekGE(t *testing.T) { - ctx := context.Background() - gravelerMock := &catalog.FakeGraveler{ - ListIteratorFactory: catalog.NewFakeValueIteratorFactory(uncommittedBranchRecords), - } - const repoID = "uncommitted-iter-seek-ge" - repository := &graveler.RepositoryRecord{ - RepositoryID: repoID, - Repository: &graveler.Repository{ - StorageNamespace: "mem://" + repoID, - CreationDate: time.Now(), - DefaultBranchID: "main", - }, - } - - tests := []struct { - name string - branches []*graveler.BranchRecord - records map[graveler.StagingToken][]*graveler.ValueRecord - expectedCount int - }{ - { - name: "no branches", - branches: []*graveler.BranchRecord{}, - expectedCount: 0, - }, - { - name: "no values", - branches: []*graveler.BranchRecord{ - { - BranchID: "b1", - Branch: &graveler.Branch{ - StagingToken: "bst1", - }, - }, - { - BranchID: "b2", - Branch: &graveler.Branch{ - StagingToken: "bst2", - }, - }, - }, - expectedCount: 0, - }, - { - name: "basic seek", - branches: []*graveler.BranchRecord{ - { - BranchID: "b1", - Branch: &graveler.Branch{ - StagingToken: "bst1", - }, - }, - { - BranchID: "b2", - Branch: &graveler.Branch{ - StagingToken: "bst2", - }, - }, - { - BranchID: "b3", - Branch: &graveler.Branch{ - StagingToken: "bst3", - }, - }, - { - BranchID: "b4", - Branch: &graveler.Branch{ - StagingToken: "bst4", - }, - }, - }, - records: map[graveler.StagingToken][]*graveler.ValueRecord{ - "bst1": uncommittedBranchRecords, - "bst3": uncommittedBranchRecords, - "bst4": uncommittedBranchRecords, - }, - expectedCount: 2 * len(uncommittedBranchRecords), - }, - { - name: "seek next key after branch", - branches: []*graveler.BranchRecord{ - { - BranchID: "b1", - Branch: &graveler.Branch{ - StagingToken: "bst1", - }, - }, - { - BranchID: "b2", - Branch: &graveler.Branch{ - StagingToken: "bst2", - }, - }, - { - BranchID: "b3", - Branch: &graveler.Branch{ - StagingToken: "bst3", - }, - }, - { - BranchID: "b4", - Branch: &graveler.Branch{ - StagingToken: "bst4", - }, - }, - }, - records: map[graveler.StagingToken][]*graveler.ValueRecord{ - "bst1": uncommittedBranchRecords, - "bst4": uncommittedBranchRecords, - }, - expectedCount: len(uncommittedBranchRecords), - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - gravelerMock.BranchIteratorFactory = testutil.NewFakeBranchIteratorFactory(tt.branches) - gravelerMock.ListStagingIteratorFactory = catalog.NewFakeStagingIteratorFactory(tt.records) - skipNum := rand.Intn(len(uncommittedBranchRecords)) - itr, err := catalog.NewUncommittedIterator(ctx, gravelerMock, repository) - require.NoError(t, err) - itr.SeekGE("b3", catalog.Path(uncommittedBranchRecords[skipNum].Key)) - require.NoError(t, err) - count := 0 - offset := 0 - if len(tt.records["bst3"]) > 0 { - offset = skipNum - } - for itr.Next() { - require.Equal(t, uncommittedBranchRecords[(count+offset)%len(uncommittedBranchRecords)].Key.String(), itr.Value().Path.String()) - count += 1 - } - expectedCount := tt.expectedCount - expectedCount -= offset - - require.Equal(t, expectedCount, count) - require.NoError(t, itr.Err()) - require.False(t, itr.Next()) - - itr.SeekGE("b3", catalog.Path(uncommittedBranchRecords[skipNum].Key)) - require.NoError(t, itr.Err()) - itr.Close() - require.NoError(t, itr.Err()) - }) - } -} diff --git a/pkg/graveler/combined_iterator_test.go b/pkg/graveler/combined_iterator_test.go index de6d9ff306c..1ccae68f487 100644 --- a/pkg/graveler/combined_iterator_test.go +++ b/pkg/graveler/combined_iterator_test.go @@ -39,14 +39,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -59,14 +59,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -80,14 +80,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -99,14 +99,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -121,14 +121,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -140,14 +140,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -161,7 +161,7 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -170,7 +170,7 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, @@ -182,14 +182,14 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("iterA/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, { Key: []byte("iterA/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -207,7 +207,7 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("path/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, @@ -216,7 +216,7 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("path/one"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id1"), Data: nil, }, }, @@ -228,7 +228,7 @@ func TestCombinedIterator_NextValue(t *testing.T) { { Key: []byte("path/two"), Value: &graveler.Value{ - Identity: []byte("id"), + Identity: []byte("id2"), Data: nil, }, }, diff --git a/pkg/graveler/graveler.go b/pkg/graveler/graveler.go index c07cf819566..3e1bf83ca21 100644 --- a/pkg/graveler/graveler.go +++ b/pkg/graveler/graveler.go @@ -416,6 +416,8 @@ type Branch struct { StagingToken StagingToken // SealedTokens - Staging tokens are appended to the front, this allows building the diff iterator easily SealedTokens []StagingToken + // CompactedBaseMetaRangeID - the MetaRangeID of the last compaction's + CompactedBaseMetaRangeID MetaRangeID } // BranchRecord holds BranchID with the associated Branch data @@ -501,9 +503,6 @@ type KeyValueStore interface { // List lists values on repository / ref List(ctx context.Context, repository *RepositoryRecord, ref Ref, batchSize int) (ValueIterator, error) - - // ListStaging returns ValueIterator for branch staging area. Exposed to be used by X in PrepareGCUncommitted - ListStaging(ctx context.Context, branch *Branch, batchSize int) (ValueIterator, error) } type VersionController interface { @@ -1284,7 +1283,7 @@ func (g *Graveler) monitorRetries(ctx context.Context, retries int, repositoryID // information on the algorithm used. func (g *Graveler) prepareForCommitIDUpdate(ctx context.Context, repository *RepositoryRecord, branchID BranchID, operation string) error { return g.retryBranchUpdate(ctx, repository, branchID, func(currBranch *Branch) (*Branch, error) { - empty, err := g.isStagingEmpty(ctx, repository, currBranch) + empty, err := g.isUncommittedEmpty(ctx, repository, currBranch) if err != nil { return nil, err } @@ -1601,18 +1600,24 @@ func (g *Graveler) Get(ctx context.Context, repository *RepositoryRecord, ref Re return nil, err } + var updatedValue *Value if reference.StagingToken != "" { // try to get from staging, if not found proceed to committed - value, err := g.getFromStagingArea(ctx, reference.Branch, key) + updatedValue, err = g.getFromStagingArea(ctx, reference.Branch, key) if err != nil && !errors.Is(err, ErrNotFound) { return nil, err } - if err == nil { - if value == nil { - // tombstone - the entry was deleted on the branch => doesn't exist - return nil, ErrNotFound - } - return value, nil + // tombstone - the entry was deleted on the branch => doesn't exist + if err == nil && updatedValue == nil { + return nil, ErrNotFound + } + } + + if updatedValue == nil && reference.CompactedBaseMetaRangeID != "" { + updatedValue, err = g.CommittedManager.Get(ctx, repository.StorageNamespace, reference.CompactedBaseMetaRangeID, key) + // no need to check for ErrNotFound, since if the key is not found in the compacted base, it will not be found in the committed, and we already checked the staging area + if err != nil { + return nil, err } } @@ -1620,12 +1625,30 @@ func (g *Graveler) Get(ctx context.Context, repository *RepositoryRecord, ref Re for _, opt := range opts { opt(&options) } + + commitID := reference.CommitID if options.StageOnly { - return nil, ErrNotFound - } + if updatedValue == nil { + return nil, ErrNotFound + } + commit, err := g.RefManager.GetCommit(ctx, repository, commitID) + if err != nil { + return nil, err + } + committedVal, err := g.CommittedManager.Get(ctx, repository.StorageNamespace, commit.MetaRangeID, key) + if err != nil && !errors.Is(err, ErrNotFound) { + return nil, err + } + // the key we found is committed, return not found in staging + if committedVal != nil && bytes.Equal(committedVal.Identity, updatedValue.Identity) { + return nil, ErrNotFound + } + } + if updatedValue != nil { + return updatedValue, nil + } // If key is not found in staging area (or reference is not a branch), return the key from committed - commitID := reference.CommitID commit, err := g.RefManager.GetCommit(ctx, repository, commitID) if err != nil { return nil, err @@ -1756,7 +1779,7 @@ func (g *Graveler) Delete(ctx context.Context, repository *RepositoryRecord, bra log := g.log(ctx).WithFields(logging.Fields{"key": key, "operation": "delete"}) err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{}, func(branch *Branch) error { - return g.deleteUnsafe(ctx, repository, key, nil, BranchRecord{branchID, branch}) + return g.deleteUnsafe(ctx, repository, key, BranchRecord{branchID, branch}) }, "delete") return err } @@ -1784,9 +1807,8 @@ func (g *Graveler) DeleteBatch(ctx context.Context, repository *RepositoryRecord var m *multierror.Error log := g.log(ctx).WithField("operation", "delete_keys") err = g.safeBranchWrite(ctx, log, repository, branchID, safeBranchWriteOptions{}, func(branch *Branch) error { - var cachedMetaRangeID MetaRangeID // used to cache the committed branch metarange ID for _, key := range keys { - err := g.deleteUnsafe(ctx, repository, key, &cachedMetaRangeID, BranchRecord{branchID, branch}) + err := g.deleteUnsafe(ctx, repository, key, BranchRecord{branchID, branch}) if err != nil { m = multierror.Append(m, &DeleteError{Key: key, Err: err}) } @@ -1796,7 +1818,7 @@ func (g *Graveler) DeleteBatch(ctx context.Context, repository *RepositoryRecord return err } -func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecord, key Key, cachedMetaRangeID *MetaRangeID, branchRecord BranchRecord) error { +func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecord, key Key, branchRecord BranchRecord) error { // First attempt to update on staging token err := g.deleteAndNotify(ctx, repository.RepositoryID, branchRecord, key, true) if !errors.Is(err, kv.ErrPredicateFailed) { @@ -1805,8 +1827,8 @@ func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecor // check key in committed - do we need tombstone? var metaRangeID MetaRangeID - if cachedMetaRangeID != nil && *cachedMetaRangeID != "" { - metaRangeID = *cachedMetaRangeID + if branchRecord.Branch.CompactedBaseMetaRangeID != "" { + metaRangeID = branchRecord.Branch.CompactedBaseMetaRangeID } else { commit, err := g.RefManager.GetCommit(ctx, repository, branchRecord.Branch.CommitID) if err != nil { @@ -1843,14 +1865,10 @@ func (g *Graveler) deleteUnsafe(ctx context.Context, repository *RepositoryRecor return nil } -// ListStaging Exposing listStagingArea to catalog for PrepareGCUncommitted -func (g *Graveler) ListStaging(ctx context.Context, branch *Branch, batchSize int) (ValueIterator, error) { - return g.listStagingArea(ctx, branch, batchSize) -} - -// listStagingArea Returns an iterator which is an aggregation of all changes on all the branch's staging area (staging + sealed) +// listStagingAreaWithoutCompaction Returns an iterator which is an aggregation of all changes on all the branch's staging area (staging + sealed) // for each key in the staging area it will return the latest update for that key (the value that appears in the newest token) -func (g *Graveler) listStagingArea(ctx context.Context, b *Branch, batchSize int) (ValueIterator, error) { +// listStagingAreaWithoutCompaction will not return changes that were already compacted and saved in the CompactedBaseMetaRangeID +func (g *Graveler) listStagingAreaWithoutCompaction(ctx context.Context, b *Branch, batchSize int) (ValueIterator, error) { if b.StagingToken == "" { return nil, ErrNotFound } @@ -1887,7 +1905,9 @@ func (g *Graveler) List(ctx context.Context, repository *RepositoryRecord, ref R return nil, err } var metaRangeID MetaRangeID - if reference.CommitID != "" { + if reference.CompactedBaseMetaRangeID != "" { + metaRangeID = reference.CompactedBaseMetaRangeID + } else if reference.CommitID != "" { commit, err := g.RefManager.GetCommit(ctx, repository, reference.CommitID) if err != nil { return nil, err @@ -1900,7 +1920,7 @@ func (g *Graveler) List(ctx context.Context, repository *RepositoryRecord, ref R return nil, err } if reference.StagingToken != "" { - stagingList, err := g.listStagingArea(ctx, reference.BranchRecord.Branch, batchSize) + stagingList, err := g.listStagingAreaWithoutCompaction(ctx, reference.BranchRecord.Branch, batchSize) if err != nil { listing.Close() return nil, err @@ -1934,7 +1954,7 @@ func (g *Graveler) Commit(ctx context.Context, repository *RepositoryRecord, bra err = g.RefManager.BranchUpdate(ctx, repository, branchID, func(branch *Branch) (*Branch, error) { if params.SourceMetaRange != nil { - empty, err := g.isStagingEmpty(ctx, repository, branch) + empty, err := g.isUncommittedEmpty(ctx, repository, branch) if err != nil { return nil, fmt.Errorf("checking empty branch: %w", err) } @@ -2175,15 +2195,14 @@ func (g *Graveler) addCommitNoLock(ctx context.Context, repository *RepositoryRe return commitID, nil } -func (g *Graveler) isStagingEmpty(ctx context.Context, repository *RepositoryRecord, branch *Branch) (bool, error) { - itr, err := g.listStagingArea(ctx, branch, 1) +func (g *Graveler) isUncommittedEmpty(ctx context.Context, repository *RepositoryRecord, branch *Branch) (bool, error) { + diffIt, err := g.diffUncommitted(ctx, repository, branch) if err != nil { return false, err } - defer itr.Close() + defer diffIt.Close() - // Iterating over staging area (staging + sealed) of the branch and check for entries - return g.checkEmpty(ctx, repository, branch, itr) + return !diffIt.Next(), nil } // checkEmpty - staging iterator is not considered empty IFF it contains any non-tombstone entry @@ -2335,7 +2354,7 @@ func (g *Graveler) deleteAndNotify(ctx context.Context, repositoryID RepositoryI // resetKey resets given key on branch // Since we cannot (will not) modify sealed tokens data, we overwrite changes done on entry on a new staging token, effectively reverting it // to the current state in the branch committed data. If entry is not committed return an error -func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, branchID BranchID, branch *Branch, key Key, stagedValue *Value, st StagingToken) error { +func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, branchID BranchID, branch *Branch, key Key, uncommittedValue *Value, st StagingToken) error { isCommitted := true committed, err := g.Get(ctx, repository, branch.CommitID.Ref(), key) if err != nil { @@ -2346,13 +2365,13 @@ func (g *Graveler) resetKey(ctx context.Context, repository *RepositoryRecord, b } if isCommitted { // entry committed and changed in staging area => override with entry from commit - if stagedValue != nil && bytes.Equal(committed.Identity, stagedValue.Identity) { + if uncommittedValue != nil && bytes.Equal(committed.Identity, uncommittedValue.Identity) { return nil // No change } return g.StagingManager.Set(ctx, st, key, committed, false) // entry not committed and changed in staging area => override with tombstone // If not committed and staging == tombstone => ignore - } else if !isCommitted && stagedValue != nil { + } else if !isCommitted && uncommittedValue != nil { return g.deleteAndNotify(ctx, repository.RepositoryID, BranchRecord{branchID, branch}, key, false) } @@ -2378,15 +2397,24 @@ func (g *Graveler) ResetKey(ctx context.Context, repository *RepositoryRecord, b return fmt.Errorf("getting branch: %w", err) } - staged, err := g.getFromStagingArea(ctx, branch, key) + uncommittedValue, err := g.getFromStagingArea(ctx, branch, key) if err != nil { - if errors.Is(err, ErrNotFound) { // If key is not in staging => nothing to do + if !errors.Is(err, ErrNotFound) { + return err + } + if branch.CompactedBaseMetaRangeID != "" { + uncommittedValue, err = g.CommittedManager.Get(ctx, repository.StorageNamespace, branch.CompactedBaseMetaRangeID, key) + if err != nil && !errors.Is(err, ErrNotFound) { + return err + } + } + // If key is not in staging nor compacted => nothing to do + if uncommittedValue == nil { return nil } - return err } - err = g.resetKey(ctx, repository, branchID, branch, key, staged, branch.StagingToken) + err = g.resetKey(ctx, repository, branchID, branch, key, uncommittedValue, branch.StagingToken) if err != nil { if !errors.Is(err, ErrNotFound) { // Not found in staging => ignore return err @@ -2422,8 +2450,8 @@ func (g *Graveler) ResetPrefix(ctx context.Context, repository *RepositoryRecord newSealedTokens = []StagingToken{branch.StagingToken} newSealedTokens = append(newSealedTokens, branch.SealedTokens...) - // Reset keys by prefix on the new staging token - itr, err := g.listStagingArea(ctx, branch, 0) + // Reset keys by prefix on the uncommitted entries + itr, err := g.DiffUncommitted(ctx, repository, branchID) if err != nil { return nil, err } @@ -2948,28 +2976,36 @@ func (g *Graveler) DiffUncommitted(ctx context.Context, repository *RepositoryRe if err != nil { return nil, err } - var metaRangeID MetaRangeID - if branch.CommitID != "" { - commit, err := g.RefManager.GetCommit(ctx, repository, branch.CommitID) - if err != nil { - return nil, err - } - metaRangeID = commit.MetaRangeID + return g.diffUncommitted(ctx, repository, branch) +} + +func (g *Graveler) diffUncommitted(ctx context.Context, repository *RepositoryRecord, branch *Branch) (DiffIterator, error) { + commit, err := g.RefManager.GetCommit(ctx, repository, branch.CommitID) + if err != nil { + return nil, err } + metaRangeID := commit.MetaRangeID - valueIterator, err := g.listStagingArea(ctx, branch, 0) + valueIterator, err := g.listStagingAreaWithoutCompaction(ctx, branch, 0) if err != nil { return nil, err } - var committedValueIterator ValueIterator - if metaRangeID != "" { - committedValueIterator, err = g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) - if err != nil { - valueIterator.Close() - return nil, err - } + committedValueIterator, err := g.CommittedManager.List(ctx, repository.StorageNamespace, metaRangeID) + if err != nil { + valueIterator.Close() + return nil, err + } + if branch.CompactedBaseMetaRangeID == "" { + return NewUncommittedDiffIterator(ctx, committedValueIterator, valueIterator), nil + } + // return the diff of staging + sealed from committed on top of the diff of compacted from committed + diffCommitAndCompacted, err := g.CommittedManager.Diff(ctx, repository.StorageNamespace, metaRangeID, branch.CompactedBaseMetaRangeID) + if err != nil { + valueIterator.Close() + committedValueIterator.Close() + return nil, err } - return NewUncommittedDiffIterator(ctx, committedValueIterator, valueIterator), nil + return NewJoinedDiffIterator(NewUncommittedDiffIterator(ctx, committedValueIterator, valueIterator), diffCommitAndCompacted), nil } // dereferenceCommit will dereference and load the commit record based on 'ref'. @@ -3023,12 +3059,22 @@ func (g *Graveler) Diff(ctx context.Context, repository *RepositoryRecord, left, leftValueIterator.Close() return nil, err } - stagingIterator, err := g.listStagingArea(ctx, rightBranch, 0) + stagingIterator, err := g.listStagingAreaWithoutCompaction(ctx, rightBranch, 0) + if err != nil { + leftValueIterator.Close() + return nil, err + } + if rightBranch.CompactedBaseMetaRangeID == "" { + return NewCombinedDiffIterator(diff, leftValueIterator, stagingIterator), nil + } + diff.Close() + compactedDiffIterator, err := g.CommittedManager.Diff(ctx, repository.StorageNamespace, leftCommit.MetaRangeID, rightBranch.CompactedBaseMetaRangeID) if err != nil { leftValueIterator.Close() + stagingIterator.Close() return nil, err } - return NewCombinedDiffIterator(diff, leftValueIterator, stagingIterator), nil + return NewCombinedDiffIterator(compactedDiffIterator, leftValueIterator, stagingIterator), nil } func (g *Graveler) FindMergeBase(ctx context.Context, repository *RepositoryRecord, from Ref, to Ref) (*CommitRecord, *CommitRecord, *Commit, error) { diff --git a/pkg/graveler/graveler_test.go b/pkg/graveler/graveler_test.go index 622b74bd472..887074bc48c 100644 --- a/pkg/graveler/graveler_test.go +++ b/pkg/graveler/graveler_test.go @@ -167,6 +167,14 @@ func TestGraveler_List(t *testing.T) { ), expected: []*graveler.ValueRecord{{Key: graveler.Key("bar"), Value: &graveler.Value{}}, {Key: graveler.Key("foo"), Value: &graveler.Value{}}}, }, + { + name: "one compacted one staged no paths", + r: newGraveler(t, &testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{}}}), MetaRangeID: "mr1"}, + &testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("bar"), Value: &graveler.Value{}}})}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mr1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expected: []*graveler.ValueRecord{{Key: graveler.Key("bar"), Value: &graveler.Value{}}, {Key: graveler.Key("foo"), Value: &graveler.Value{}}}, + }, { name: "same path different file", r: newGraveler(t, &testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("original")}}})}, @@ -175,6 +183,14 @@ func TestGraveler_List(t *testing.T) { ), expected: []*graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("other")}}}, }, + { + name: "same path different file compacted", + r: newGraveler(t, &testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("original")}}}), MetaRangeID: "mr1"}, + &testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("other")}}})}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mr1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expected: []*graveler.ValueRecord{{Key: graveler.Key("foo"), Value: &graveler.Value{Identity: []byte("other")}}}, + }, { name: "one committed one staged no paths - with prefix", r: newGraveler(t, &testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("prefix/foo"), Value: &graveler.Value{}}})}, @@ -183,6 +199,14 @@ func TestGraveler_List(t *testing.T) { ), expected: []*graveler.ValueRecord{{Key: graveler.Key("prefix/bar"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/foo"), Value: &graveler.Value{}}}, }, + { + name: "one compacted one staged no paths - with prefix", + r: newGraveler(t, &testutil.CommittedFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("prefix/foo"), Value: &graveler.Value{}}}), MetaRangeID: "mr1"}, + &testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{{Key: graveler.Key("prefix/bar"), Value: &graveler.Value{}}})}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mr1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expected: []*graveler.ValueRecord{{Key: graveler.Key("prefix/bar"), Value: &graveler.Value{}}, {Key: graveler.Key("prefix/foo"), Value: &graveler.Value{}}}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -267,6 +291,34 @@ func TestGraveler_Get(t *testing.T) { ), expectedErr: errTest, }, + { + name: "branch - only compacted", + r: newGraveler(t, &testutil.CommittedFake{ValuesByKey: map[string]*graveler.Value{"key": {Identity: []byte("compacted")}}, MetaRangeID: "mir1"}, &testutil.StagingFake{Err: graveler.ErrNotFound}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token1", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mir1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expectedValueResult: graveler.Value{Identity: []byte("compacted")}, + }, + { + name: "branch - staged and compacted", + r: newGraveler(t, &testutil.CommittedFake{ValuesByKey: map[string]*graveler.Value{"key": {Identity: []byte("compacted")}}, MetaRangeID: "mir1"}, &testutil.StagingFake{Value: &graveler.Value{Identity: []byte("staged")}}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token1", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mir1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expectedValueResult: graveler.Value{Identity: []byte("staged")}, + }, + { + name: "branch - deleted from staged, exists in compaction", + r: newGraveler(t, &testutil.CommittedFake{ValuesByKey: map[string]*graveler.Value{"key": {Identity: []byte("compacted")}}, MetaRangeID: "mir1"}, &testutil.StagingFake{Value: nil}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token1", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mir1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expectedErr: graveler.ErrNotFound, + }, + { + name: "branch - exists in staging and not in compaction", + r: newGraveler(t, &testutil.CommittedFake{MetaRangeID: "mir1"}, &testutil.StagingFake{Value: &graveler.Value{Identity: []byte("staged")}}, + &testutil.RefsFake{RefType: graveler.ReferenceTypeBranch, StagingToken: "token1", Commits: map[graveler.CommitID]*graveler.Commit{"": {}}, BaseMetaRangeID: "mir1"}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expectedValueResult: graveler.Value{Identity: []byte("staged")}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -809,6 +861,127 @@ func TestGraveler_Diff(t *testing.T) { }, }), }, + { + name: "with changes compacted", + r: newGraveler(t, &testutil.CommittedFake{ + Values: map[string]graveler.ValueIterator{ + "mri1": testutil.NewValueIteratorFake([]graveler.ValueRecord{ + {Key: graveler.Key("foo/delete"), Value: &graveler.Value{ + Identity: []byte("deleted"), + Data: []byte("deleted"), + }}, + {Key: graveler.Key("foo/modified_committed"), Value: &graveler.Value{ + Identity: []byte("DECAF"), + Data: []byte("BAD"), + }}, + {Key: graveler.Key("foo/modify"), Value: &graveler.Value{ + Identity: []byte("DECAF"), + Data: []byte("BAD"), + }}, + }), + "mri2": testutil.NewValueIteratorFake([]graveler.ValueRecord{ + {Key: graveler.Key("foo/delete"), Value: &graveler.Value{ + Identity: []byte("deleted"), + Data: []byte("deleted"), + }}, + {Key: graveler.Key("foo/modified_committed"), Value: &graveler.Value{ + Identity: []byte("committed"), + Data: []byte("committed"), + }}, + {Key: graveler.Key("foo/modify"), Value: &graveler.Value{ + Identity: []byte("DECAF"), + Data: []byte("BAD"), + }}, + }), + }, + DiffIterator: testutil.NewDiffIter([]graveler.Diff{ + { + Key: graveler.Key("foo/modified_committed"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{ + Identity: []byte("committed"), + Data: []byte("committed"), + }, + LeftIdentity: []byte("DECAF"), + }, + }), + }, + &testutil.StagingFake{Values: map[string]map[string]*graveler.Value{ + "token": { + "foo/add": &graveler.Value{}, + }, + "token1": { + "foo/delete": nil, + }, + "token2": { + "foo/modify": &graveler.Value{ + Identity: []byte("test"), + Data: []byte("test"), + }, + }, + }}, + &testutil.RefsFake{ + Branch: &graveler.Branch{CommitID: "c1", StagingToken: "token", SealedTokens: []graveler.StagingToken{"token1", "token2"}, CompactedBaseMetaRangeID: "mri2"}, + Commits: map[graveler.CommitID]*graveler.Commit{"c1": {MetaRangeID: "mri1"}}, + Refs: map[graveler.Ref]*graveler.ResolvedRef{ + "b1": { + Type: graveler.ReferenceTypeBranch, + ResolvedBranchModifier: graveler.ResolvedBranchModifierStaging, + BranchRecord: graveler.BranchRecord{ + BranchID: "b1", + Branch: &graveler.Branch{ + CommitID: "c1", + StagingToken: "token", + SealedTokens: []graveler.StagingToken{"token1", "token2"}, + }, + }, + }, + "ref1": { + Type: graveler.ReferenceTypeCommit, + BranchRecord: graveler.BranchRecord{ + Branch: &graveler.Branch{ + CommitID: "c1", + }, + }, + }, + }, + }, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expectedDiff: testutil.NewDiffIter([]graveler.Diff{ + { + Key: graveler.Key("foo/add"), + Type: graveler.DiffTypeAdded, + Value: &graveler.Value{}, + }, + { + Key: graveler.Key("foo/delete"), + Type: graveler.DiffTypeRemoved, + Value: &graveler.Value{ + Identity: []byte("deleted"), + Data: []byte("deleted"), + }, + LeftIdentity: []byte("deleted"), + }, + { + Key: graveler.Key("foo/modified_committed"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{ + Identity: []byte("committed"), + Data: []byte("committed"), + }, + LeftIdentity: []byte("DECAF"), + }, + { + Key: graveler.Key("foo/modify"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{ + Identity: []byte("test"), + Data: []byte("test"), + }, + LeftIdentity: []byte("DECAF"), + }, + }), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -921,6 +1094,118 @@ func TestGraveler_DiffUncommitted(t *testing.T) { Type: graveler.DiffTypeRemoved, }}), }, + { + name: "diff with compacted", + r: newGraveler(t, + &testutil.CommittedFake{ + Values: map[string]graveler.ValueIterator{ + "mri1": testutil.NewValueIteratorFake([]graveler.ValueRecord{ + {Key: graveler.Key("foo/a"), Value: &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BAD"), + }}, + {Key: graveler.Key("foo/b"), Value: &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BAD"), + }}, + {Key: graveler.Key("foo/c"), Value: &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BAD"), + }}, + {Key: graveler.Key("foo/d"), Value: &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BAD"), + }}, + {Key: graveler.Key("foo/e"), Value: &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BAD"), + }}, + {Key: graveler.Key("foo/g"), Value: &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BAD"), + }}, + }), + }, + DiffIterator: testutil.NewDiffIter([]graveler.Diff{ + { + Key: graveler.Key("foo/a"), + Type: graveler.DiffTypeRemoved, + Value: &graveler.Value{}, + }, + { + Key: graveler.Key("foo/b"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{Identity: []byte("compacted"), Data: []byte("compacted")}, + }, + { + Key: graveler.Key("foo/c"), + Type: graveler.DiffTypeRemoved, + }, + { + Key: graveler.Key("foo/d"), + Type: graveler.DiffTypeRemoved, + }, + { + Key: graveler.Key("foo/e"), + Type: graveler.DiffTypeAdded, + Value: &graveler.Value{Identity: []byte("BAD"), Data: []byte("BAD")}, + }, + { + Key: graveler.Key("foo/f"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{Identity: []byte("BAD"), Data: []byte("BAD")}, + }, + { + Key: graveler.Key("foo/g"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{Identity: []byte("BAD"), Data: []byte("BAD")}, + }, + }), + }, + &testutil.StagingFake{ValueIterator: testutil.NewValueIteratorFake([]graveler.ValueRecord{ + {Key: graveler.Key("foo/d"), Value: &graveler.Value{Identity: []byte("staged"), Data: []byte("staged")}}, + {Key: graveler.Key("foo/e"), Value: &graveler.Value{Identity: []byte("staged"), Data: []byte("staged")}}, + {Key: graveler.Key("foo/f"), Value: &graveler.Value{Identity: []byte("staged"), Data: []byte("staged")}}, + {Key: graveler.Key("foo/g"), Value: nil}, + })}, + &testutil.RefsFake{Branch: &graveler.Branch{CommitID: "c1", StagingToken: "token", CompactedBaseMetaRangeID: "mri2"}, Commits: map[graveler.CommitID]*graveler.Commit{"c1": {MetaRangeID: "mri1"}}}, nil, testutil.NewProtectedBranchesManagerFake(), + ), + expectedDiff: testutil.NewDiffIter([]graveler.Diff{ + { + Key: graveler.Key("foo/a"), + Type: graveler.DiffTypeRemoved, + Value: &graveler.Value{}, + }, + { + Key: graveler.Key("foo/b"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{Identity: []byte("compacted"), Data: []byte("compacted")}, + }, + { + Key: graveler.Key("foo/c"), + Type: graveler.DiffTypeRemoved, + }, + { + Key: graveler.Key("foo/d"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{Identity: []byte("staged"), Data: []byte("staged")}, + }, + { + Key: graveler.Key("foo/e"), + Type: graveler.DiffTypeChanged, + Value: &graveler.Value{Identity: []byte("staged"), Data: []byte("staged")}, + }, + { + Key: graveler.Key("foo/f"), + Type: graveler.DiffTypeAdded, + Value: &graveler.Value{Identity: []byte("staged"), Data: []byte("staged")}, + }, + { + Key: graveler.Key("foo/g"), + Type: graveler.DiffTypeRemoved, + }, + }), + }, } for _, tt := range tests { @@ -1167,7 +1452,7 @@ func TestGravelerCommit(t *testing.T) { expectedErr: nil, }, { - name: "commit with source metarange an non-empty staging", + name: "commit with source metarange and non-empty staging", fields: fields{ CommittedManager: &testutil.CommittedFake{MetaRangeID: expectedRangeID}, StagingManager: &testutil.StagingFake{ValueIterator: testutils.NewFakeValueIterator([]*graveler.ValueRecord{{ @@ -1190,6 +1475,28 @@ func TestGravelerCommit(t *testing.T) { values: values, expectedErr: graveler.ErrCommitMetaRangeDirtyBranch, }, + { + name: "commit with source metarange and non-empty compaction", + fields: fields{ + CommittedManager: &testutil.CommittedFake{MetaRangeID: expectedRangeID, DiffIterator: testutil.NewDiffIter([]graveler.Diff{{Key: key1, Type: graveler.DiffTypeRemoved}})}, + StagingManager: &testutil.StagingFake{ValueIterator: testutils.NewFakeValueIterator([]*graveler.ValueRecord{})}, + RefManager: &testutil.RefsFake{ + CommitID: expectedCommitID, + Branch: &graveler.Branch{CommitID: expectedCommitID, StagingToken: "token1", CompactedBaseMetaRangeID: mr2ID}, + Commits: map[graveler.CommitID]*graveler.Commit{expectedCommitID: {MetaRangeID: expectedRangeID}}, + }, + }, + args: args{ + ctx: nil, + branchID: "branch", + committer: "committer", + message: "a message", + metadata: graveler.Metadata{}, + sourceMetarange: &expectedRangeID, + }, + values: values, + expectedErr: graveler.ErrCommitMetaRangeDirtyBranch, + }, { name: "fail on apply", fields: fields{ @@ -1567,6 +1874,29 @@ func TestGravelerDelete(t *testing.T) { }, expectedErr: nil, }, + { + name: "exists only in compacted", + fields: fields{ + CommittedManager: &testutil.CommittedFake{ + ValuesByKey: map[string]*graveler.Value{"key": {}}, + }, + StagingManager: &testutil.StagingFake{ + Err: graveler.ErrNotFound, + }, + RefManager: &testutil.RefsFake{ + Branch: &graveler.Branch{CommitID: "c1", CompactedBaseMetaRangeID: "mr2"}, + Commits: map[graveler.CommitID]*graveler.Commit{"c1": {MetaRangeID: "mr1"}}, + }, + }, + args: args{ + key: []byte("key"), + }, + expectedSetValue: &graveler.ValueRecord{ + Key: []byte("key"), + Value: nil, + }, + expectedErr: nil, + }, { name: "exists in committed and in staging", fields: fields{ @@ -1604,6 +1934,43 @@ func TestGravelerDelete(t *testing.T) { }, expectedErr: nil, }, + { + name: "exists in compacted and in staging", + fields: fields{ + CommittedManager: &testutil.CommittedFake{ + ValuesByKey: map[string]*graveler.Value{"key1": {}}, + }, + StagingManager: &testutil.StagingFake{ + Values: map[string]map[string]*graveler.Value{ + "token": { + "key2": &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BEEF"), + }, + }, + "token2": { + "key1": &graveler.Value{ + Identity: []byte("test"), + Data: []byte("test"), + }, + }, + }, + Value: &graveler.Value{}, + }, + RefManager: &testutil.RefsFake{ + Branch: &graveler.Branch{CommitID: "c1", StagingToken: "token", SealedTokens: []graveler.StagingToken{"token", "token2"}, CompactedBaseMetaRangeID: "mr2"}, + Commits: map[graveler.CommitID]*graveler.Commit{"c1": {MetaRangeID: "mr1"}}, + }, + }, + args: args{ + key: []byte("key1"), + }, + expectedSetValue: &graveler.ValueRecord{ + Key: []byte("key1"), + Value: nil, + }, + expectedErr: nil, + }, { name: "exists in committed tombstone in staging", fields: fields{ @@ -1632,6 +1999,34 @@ func TestGravelerDelete(t *testing.T) { args: args{key: []byte("key1")}, expectedErr: nil, }, + { + name: "exists in compacted tombstone in staging", + fields: fields{ + CommittedManager: &testutil.CommittedFake{ + ValuesByKey: map[string]*graveler.Value{"key1": {}}, + }, + StagingManager: &testutil.StagingFake{ + Values: map[string]map[string]*graveler.Value{ + "token": { + "key1": nil, + }, + "token2": { + "key1": &graveler.Value{ + Identity: []byte("BAD"), + Data: []byte("BEEF"), + }, + }, + }, + Value: nil, + }, + RefManager: &testutil.RefsFake{ + Branch: &graveler.Branch{CommitID: "c1", StagingToken: "token", SealedTokens: []graveler.StagingToken{"token", "token2"}, CompactedBaseMetaRangeID: "mr2"}, + Commits: map[graveler.CommitID]*graveler.Commit{"c1": {MetaRangeID: "mr1"}}, + }, + }, + args: args{key: []byte("key1")}, + expectedErr: nil, + }, { name: "exists only in staging - commits", fields: fields{ @@ -1656,6 +2051,30 @@ func TestGravelerDelete(t *testing.T) { expectedRemovedKey: []byte("key1"), expectedErr: nil, }, + { + name: "exists in staging and not in compaction", + fields: fields{ + CommittedManager: &testutil.CommittedFake{ + Err: graveler.ErrNotFound, + }, + StagingManager: &testutil.StagingFake{ + Values: map[string]map[string]*graveler.Value{"token": {"key1": &graveler.Value{ + Identity: []byte("test"), + Data: []byte("test"), + }}}, + Value: nil, + }, + RefManager: &testutil.RefsFake{ + Branch: &graveler.Branch{CommitID: "c1", StagingToken: "token", CompactedBaseMetaRangeID: "mr2"}, + Commits: map[graveler.CommitID]*graveler.Commit{"c1": {MetaRangeID: "mr1"}}, + }, + }, + args: args{ + key: []byte("key1"), + }, + expectedRemovedKey: []byte("key1"), + expectedErr: nil, + }, { name: "not in committed not in staging", fields: fields{ @@ -1673,6 +2092,23 @@ func TestGravelerDelete(t *testing.T) { args: args{}, expectedErr: nil, }, + { + name: "not in compacted not in staging", + fields: fields{ + CommittedManager: &testutil.CommittedFake{ + Err: graveler.ErrNotFound, + }, + StagingManager: &testutil.StagingFake{ + Err: graveler.ErrNotFound, + }, + RefManager: &testutil.RefsFake{ + Branch: &graveler.Branch{CompactedBaseMetaRangeID: "mr2"}, + Commits: map[graveler.CommitID]*graveler.Commit{"": {MetaRangeID: "mr1"}}, + }, + }, + args: args{}, + expectedErr: nil, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -2469,6 +2905,51 @@ func TestGraveler_Revert(t *testing.T) { expectedErr: graveler.ErrDirtyBranch, expectedVal: "", }, + { + name: "fail on compacted data", + deps: deps{ + CommittedManager: &testutil.CommittedFake{DiffIterator: testutil.NewDiffIter([]graveler.Diff{{Key: key1, Type: graveler.DiffTypeRemoved}})}, + RefManager: &testutil.RefsFake{ + Branch: &graveler.Branch{CommitID: "c1", StagingToken: "token", CompactedBaseMetaRangeID: "mri2"}, + Commits: map[graveler.CommitID]*graveler.Commit{ + "c1": {MetaRangeID: "mri1"}, + }, + Refs: map[graveler.Ref]*graveler.ResolvedRef{ + "dirty-b1": { + Type: graveler.ReferenceTypeBranch, + ResolvedBranchModifier: 0, + BranchRecord: graveler.BranchRecord{ + BranchID: "dirty-b1", + Branch: &graveler.Branch{ + CommitID: "c1", + StagingToken: "token", + CompactedBaseMetaRangeID: "mri2", + }, + }, + }, + "c1": { + Type: graveler.ReferenceTypeCommit, + ResolvedBranchModifier: 0, + BranchRecord: graveler.BranchRecord{ + Branch: &graveler.Branch{ + CommitID: "c1", + }, + }, + }, + }, + }, + StagingManager: &testutil.StagingFake{Value: value1, ValueIterator: testutil.NewValueIteratorFake( + []graveler.ValueRecord{})}, + }, + revertArgs: args{ + branchID: "b1", + ref: graveler.Ref("c1"), + parentNumber: 0, + allowEmpty: false, + }, + expectedErr: graveler.ErrDirtyBranch, + expectedVal: "", + }, { name: "valid revert", deps: deps{ diff --git a/pkg/graveler/graveler_v2_test.go b/pkg/graveler/graveler_v2_test.go index 1a036bf62b5..10c2d6b125c 100644 --- a/pkg/graveler/graveler_v2_test.go +++ b/pkg/graveler/graveler_v2_test.go @@ -19,6 +19,7 @@ var ( repoID = graveler.RepositoryID("repo1") branch1ID = graveler.BranchID("branch1") branch2ID = graveler.BranchID("branch2") + branch3ID = graveler.BranchID("branch3") commit1ID = graveler.CommitID("commit1") commit2ID = graveler.CommitID("commit2") commit3ID = graveler.CommitID("commit3") @@ -53,12 +54,19 @@ var ( StagingToken: stagingToken1, SealedTokens: []graveler.StagingToken{stagingToken2, stagingToken3}, } + branch3 = graveler.Branch{ + CommitID: commit1ID, + StagingToken: stagingToken1, + SealedTokens: []graveler.StagingToken{stagingToken2, stagingToken3}, + CompactedBaseMetaRangeID: mr2ID, + } commit1 = graveler.Commit{MetaRangeID: mr1ID, Parents: []graveler.CommitID{commit4ID}} commit2 = graveler.Commit{MetaRangeID: mr2ID, Parents: []graveler.CommitID{commit4ID}} commit3 = graveler.Commit{MetaRangeID: mr3ID} commit4 = graveler.Commit{MetaRangeID: mr4ID} rawRefBranch = graveler.RawRef{BaseRef: string(branch1ID)} + rawRefBranch3 = graveler.RawRef{BaseRef: string(branch3ID)} rawRefCommit1 = graveler.RawRef{BaseRef: string(commit1ID)} rawRefCommit2 = graveler.RawRef{BaseRef: string(commit2ID)} rawRefCommit4 = graveler.RawRef{BaseRef: string(commit4ID)} @@ -111,6 +119,129 @@ func TestGravelerGet(t *testing.T) { require.Equal(t, value1, val) }) + t.Run("get from branch - staging only flag", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + setupGetFromBranch(test) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken2, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken3, key1).Times(1).Return(nil, graveler.ErrNotFound) + + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch1ID), key1, graveler.WithStageOnly(true)) + + require.Error(t, graveler.ErrNotFound, err) + require.Nil(t, val) + }) + + t.Run("get from branch - staging only flag different in commit", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + setupGetFromBranch(test) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(value1, nil) + + test.RefManager.EXPECT().GetCommit(ctx, repository, commit1ID).Times(1).Return(&commit1, nil) + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, commit1.MetaRangeID, key1).Times(1).Return(value2, nil) + + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch1ID), key1, graveler.WithStageOnly(true)) + + require.NoError(t, err) + require.NotNil(t, val) + require.Equal(t, value1, val) + }) + + t.Run("get from branch - staging only flag same in staging and in commit", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + setupGetFromBranch(test) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(value1, nil) + + test.RefManager.EXPECT().GetCommit(ctx, repository, commit1ID).Times(1).Return(&commit1, nil) + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, commit1.MetaRangeID, key1).Times(1).Return(value1, nil) + + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch1ID), key1, graveler.WithStageOnly(true)) + + require.Error(t, graveler.ErrNotFound, err) + require.Nil(t, val) + }) + + t.Run("get from branch - compacted", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + test.RefManager.EXPECT().ParseRef(graveler.Ref(branch3ID)).Times(1).Return(rawRefBranch3, nil) + test.RefManager.EXPECT().ResolveRawRef(ctx, repository, rawRefBranch3).Times(1).Return(&graveler.ResolvedRef{Type: graveler.ReferenceTypeBranch, BranchRecord: graveler.BranchRecord{BranchID: branch3ID, Branch: &branch3}}, nil) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken2, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken3, key1).Times(1).Return(nil, graveler.ErrNotFound) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, mr2ID, key1).Times(1).Return(value1, nil) + + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch3ID), key1) + + require.NoError(t, err) + require.NotNil(t, val) + require.Equal(t, value1, val) + }) + + t.Run("get from branch - compacted with staging only flag when get object committed", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + test.RefManager.EXPECT().ParseRef(graveler.Ref(branch3ID)).Times(1).Return(rawRefBranch3, nil) + test.RefManager.EXPECT().ResolveRawRef(ctx, repository, rawRefBranch3).Times(1).Return(&graveler.ResolvedRef{Type: graveler.ReferenceTypeBranch, BranchRecord: graveler.BranchRecord{BranchID: branch3ID, Branch: &branch3}}, nil) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken2, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken3, key1).Times(1).Return(nil, graveler.ErrNotFound) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, mr2ID, key1).Times(1).Return(value1, nil) + + test.RefManager.EXPECT().GetCommit(ctx, repository, commit1ID).Times(1).Return(&commit1, nil) + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, commit1.MetaRangeID, key1).Times(1).Return(value1, nil) + + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch3ID), key1, graveler.WithStageOnly(true)) + + require.Error(t, graveler.ErrNotFound, err) + require.Nil(t, val) + }) + + t.Run("get from branch - compacted with staging only flag when get object different in commit", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + test.RefManager.EXPECT().ParseRef(graveler.Ref(branch3ID)).Times(1).Return(rawRefBranch3, nil) + test.RefManager.EXPECT().ResolveRawRef(ctx, repository, rawRefBranch3).Times(1).Return(&graveler.ResolvedRef{Type: graveler.ReferenceTypeBranch, BranchRecord: graveler.BranchRecord{BranchID: branch3ID, Branch: &branch3}}, nil) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken2, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken3, key1).Times(1).Return(nil, graveler.ErrNotFound) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, mr2ID, key1).Times(1).Return(value1, nil) + test.RefManager.EXPECT().GetCommit(ctx, repository, commit1ID).Times(1).Return(&commit1, nil) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, commit1.MetaRangeID, key1).Times(1).Return(value2, nil) + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch3ID), key1, graveler.WithStageOnly(true)) + + require.NoError(t, err) + require.NotNil(t, val) + require.Equal(t, value1, val) + }) + + t.Run("get from branch - compacted with staging only flag when object is not committed", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + test.RefManager.EXPECT().ParseRef(graveler.Ref(branch3ID)).Times(1).Return(rawRefBranch3, nil) + test.RefManager.EXPECT().ResolveRawRef(ctx, repository, rawRefBranch3).Times(1).Return(&graveler.ResolvedRef{Type: graveler.ReferenceTypeBranch, BranchRecord: graveler.BranchRecord{BranchID: branch3ID, Branch: &branch3}}, nil) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken2, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken3, key1).Times(1).Return(nil, graveler.ErrNotFound) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, mr2ID, key1).Times(1).Return(value1, nil) + test.RefManager.EXPECT().GetCommit(ctx, repository, commit1ID).Times(1).Return(&commit1, nil) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, commit1.MetaRangeID, key1).Times(1).Return(nil, graveler.ErrNotFound) + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch3ID), key1, graveler.WithStageOnly(true)) + + require.NoError(t, err) + require.NotNil(t, val) + require.Equal(t, value1, val) + }) + t.Run("get from branch - staging tombstone", func(t *testing.T) { test := testutil.InitGravelerTest(t) setupGetFromBranch(test) @@ -124,6 +255,23 @@ func TestGravelerGet(t *testing.T) { require.Nil(t, val) }) + t.Run("get from branch - not found in compacted", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + test.RefManager.EXPECT().ParseRef(graveler.Ref(branch3ID)).Times(1).Return(rawRefBranch3, nil) + test.RefManager.EXPECT().ResolveRawRef(ctx, repository, rawRefBranch3).Times(1).Return(&graveler.ResolvedRef{Type: graveler.ReferenceTypeBranch, BranchRecord: graveler.BranchRecord{BranchID: branch3ID, Branch: &branch3}}, nil) + + test.StagingManager.EXPECT().Get(ctx, stagingToken1, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken2, key1).Times(1).Return(nil, graveler.ErrNotFound) + test.StagingManager.EXPECT().Get(ctx, stagingToken3, key1).Times(1).Return(nil, graveler.ErrNotFound) + + test.CommittedManager.EXPECT().Get(ctx, repository.StorageNamespace, mr2ID, key1).Times(1).Return(nil, graveler.ErrNotFound) + + val, err := test.Sut.Get(ctx, repository, graveler.Ref(branch3ID), key1) + + require.Error(t, graveler.ErrNotFound, err) + require.Nil(t, val) + }) + t.Run("get from branch - not found", func(t *testing.T) { test := testutil.InitGravelerTest(t) setupGetFromBranch(test) @@ -264,6 +412,31 @@ func TestGravelerMerge(t *testing.T) { require.Equal(t, graveler.CommitID(""), val) }) + t.Run("merge dirty compacted", func(t *testing.T) { + test := testutil.InitGravelerTest(t) + + test.RefManager.EXPECT().BranchUpdate(ctx, repository, branch3ID, gomock.Any()). + DoAndReturn(func(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.BranchID, f graveler.BranchUpdateFunc) error { + branchTest := branch3 + updatedBranch, err := f(&branchTest) + require.Error(t, err) + require.Nil(t, updatedBranch) + return err + }).Times(1) + test.RefManager.EXPECT().GetCommit(ctx, repository, commit1ID).Times(1).Return(&commit1, nil) + test.CommittedManager.EXPECT().List(ctx, repository.StorageNamespace, mr1ID).Times(1).Return(testutils.NewFakeValueIterator(nil), nil) + test.CommittedManager.EXPECT().Diff(ctx, repository.StorageNamespace, mr1ID, mr2ID).Times(1).Return(testutil.NewDiffIter([]graveler.Diff{{Key: key1, Type: graveler.DiffTypeRemoved}}), nil) + + test.StagingManager.EXPECT().List(ctx, stagingToken1, gomock.Any()).Times(1).Return(testutils.NewFakeValueIterator(nil)) + test.StagingManager.EXPECT().List(ctx, stagingToken2, gomock.Any()).Times(1).Return(testutils.NewFakeValueIterator(nil)) + test.StagingManager.EXPECT().List(ctx, stagingToken3, gomock.Any()).Times(1).Return(testutils.NewFakeValueIterator(nil)) + + val, err := test.Sut.Merge(ctx, repository, branch3ID, graveler.Ref(branch2ID), graveler.CommitParams{Metadata: graveler.Metadata{}}, "") + + require.Equal(t, graveler.ErrDirtyBranch, err) + require.Equal(t, graveler.CommitID(""), val) + }) + t.Run("merge successful with branchUpdate retry", func(t *testing.T) { test := testutil.InitGravelerTest(t) diff --git a/pkg/graveler/joined_diff_iterator.go b/pkg/graveler/joined_diff_iterator.go new file mode 100644 index 00000000000..40823252f6a --- /dev/null +++ b/pkg/graveler/joined_diff_iterator.go @@ -0,0 +1,136 @@ +package graveler + +import ( + "bytes" + "errors" + + "github.com/treeverse/lakefs/pkg/logging" +) + +// JoinedDiffIterator calculate the union diff between 2 iterators. +// The output iterator yields a single result for each key. +// If a key exist in the 2 iterators, iteratorA value prevails. +type JoinedDiffIterator struct { + iterA DiffIterator + iterAHasMore bool + iterB DiffIterator + iterBHasMore bool + // the current iterator that has the value to return (iterA or iterB) + currentIter DiffIterator + started bool + log logging.Logger +} + +func NewJoinedDiffIterator(iterA DiffIterator, iterB DiffIterator) *JoinedDiffIterator { + return &JoinedDiffIterator{ + iterA: iterA, + iterAHasMore: true, + iterB: iterB, + iterBHasMore: true, + currentIter: nil, + } +} + +// progressIterByKey advances the iterators to the next key when both iterators still has more keys +func (c *JoinedDiffIterator) progressIterByKey(keyA Key, keyB Key) { + compareResult := bytes.Compare(keyA, keyB) + switch { + case compareResult == 0: + // key exists in both iterators + c.iterAHasMore = c.iterA.Next() + c.iterBHasMore = c.iterB.Next() + case compareResult < 0: + // value of iterA > value of iterB + c.iterAHasMore = c.iterA.Next() + default: + // value of iterA < value of iterB + c.iterBHasMore = c.iterB.Next() + } +} + +func (c *JoinedDiffIterator) Next() bool { + switch { + case !c.started: + // first + c.iterAHasMore = c.iterA.Next() + c.iterBHasMore = c.iterB.Next() + c.started = true + case !c.iterAHasMore && !c.iterBHasMore: + // last + return false + case !c.iterAHasMore: + // iterA is done + c.currentIter = c.iterB + c.iterBHasMore = c.iterB.Next() + case !c.iterBHasMore: + // iterB is done + c.currentIter = c.iterA + c.iterAHasMore = c.iterA.Next() + default: + // both iterators has more keys- progress by key + c.progressIterByKey(c.iterA.Value().Key, c.iterB.Value().Key) + } + if c.iterA.Err() != nil { + c.currentIter = c.iterA + c.iterAHasMore = false + c.iterBHasMore = false + return false + } + if c.iterB.Err() != nil { + c.currentIter = c.iterB + c.iterAHasMore = false + c.iterBHasMore = false + return false + } + + // set c.currentIter to be the next (smaller) value + + // if one of the iterators is done, set the other one as the current iterator and return + if !c.iterAHasMore { + c.currentIter = c.iterB + return c.iterBHasMore + } else if !c.iterBHasMore { + c.currentIter = c.iterA + return true + } + + // if both iterators has more keys, set the iterator with the smaller key as the current iterator + if bytes.Compare(c.iterA.Value().Key, c.iterB.Value().Key) <= 0 { + c.currentIter = c.iterA + } else { + c.currentIter = c.iterB + } + return true +} + +func (c *JoinedDiffIterator) Value() *Diff { + if c.currentIter == nil { + c.log.Errorf("current iterator is nil") + return nil + } + return c.currentIter.Value() +} + +func (c *JoinedDiffIterator) Err() error { + if c.iterA.Err() != nil && c.iterB.Err() != nil { + return errors.Join(c.iterA.Err(), c.iterB.Err()) + } + if c.iterA.Err() != nil { + return c.iterA.Err() + } + if c.iterB.Err() != nil { + return c.iterB.Err() + } + return nil +} + +func (c *JoinedDiffIterator) Close() { + c.iterA.Close() + c.iterB.Close() +} + +func (c *JoinedDiffIterator) SeekGE(id Key) { + c.currentIter = nil + c.iterA.SeekGE(id) + c.iterB.SeekGE(id) +} diff --git a/pkg/graveler/joined_diff_iterator_test.go b/pkg/graveler/joined_diff_iterator_test.go new file mode 100644 index 00000000000..e7080756d8f --- /dev/null +++ b/pkg/graveler/joined_diff_iterator_test.go @@ -0,0 +1,523 @@ +package graveler_test + +import ( + "errors" + "testing" + + "github.com/go-test/deep" + "github.com/stretchr/testify/require" + "github.com/treeverse/lakefs/pkg/graveler" + "github.com/treeverse/lakefs/pkg/graveler/testutil" +) + +func TestJoinedDiffIterator_NextValue(t *testing.T) { + type fields struct { + iterA graveler.DiffIterator + iterB graveler.DiffIterator + } + tests := []struct { + name string + fields fields + wantValue []*graveler.Diff + }{ + { + name: "empty iterators", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{}), + iterB: testutil.NewDiffIter([]graveler.Diff{}), + }, + wantValue: nil, + }, + { + name: "only first iterator", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }), + iterB: testutil.NewDiffIter([]graveler.Diff{}), + }, + wantValue: []*graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }, + }, + { + name: "only second iterator", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{}), + iterB: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }), + }, + wantValue: []*graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }, + }, + { + name: "one from each", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + }), + iterB: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + }), + }, + wantValue: []*graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }, + }, + { + name: "one from each different order", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + }), + iterB: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + }), + }, + wantValue: []*graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }, + }, + { + name: "taking from first iterator before second", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + }), + iterB: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("wrong"), + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + }, + }, + }), + }, + wantValue: []*graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }, + }, + { + name: "same values", + fields: fields{ + iterA: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }), + iterB: testutil.NewDiffIter([]graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }), + }, + wantValue: []*graveler.Diff{ + { + Type: graveler.DiffTypeAdded, + Key: []byte("iterA/a"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeChanged, + Key: []byte("iterA/b"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeRemoved, + Key: []byte("iterA/c"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + { + Type: graveler.DiffTypeConflict, + Key: []byte("iterA/d"), + Value: &graveler.Value{ + Identity: []byte("id"), + Data: nil, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var it graveler.DiffIterator + it = graveler.NewJoinedDiffIterator(tt.fields.iterA, tt.fields.iterB) + defer it.Close() + + var got []*graveler.Diff + for it.Next() { + got = append(got, it.Value()) + } + require.NoError(t, it.Err()) + // verify that what we produced is what we got from the iterator + for i := range got { + println("got", got[i].Key.String()) + } + for i := range tt.wantValue { + println("want", tt.wantValue[i].Key.String()) + } + if diff := deep.Equal(got, tt.wantValue); diff != nil { + t.Fatal("JoinedDiffIterator iterator found diff:", diff) + } + }) + } +} + +func TestJoinedDiffIterator_Error(t *testing.T) { + tests := []struct { + name string + iterAErr error + iterBErr error + expectedErr error + }{ + { + name: "iterA error", + iterAErr: errors.New("iterA error"), + iterBErr: nil, + expectedErr: errors.New("iterA error"), + }, + { + name: "iterB error", + iterAErr: nil, + iterBErr: errors.New("iterB error"), + expectedErr: errors.New("iterB error"), + }, + { + name: "iterA and iterB error", + iterAErr: errors.New("iterA error"), + iterBErr: errors.New("iterB error"), + expectedErr: errors.Join(errors.New("iterA error"), errors.New("iterB error")), + }, + { + name: "no error", + + iterAErr: nil, + iterBErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + itA := testutil.NewDiffIter([]graveler.Diff{}) + itB := testutil.NewDiffIter([]graveler.Diff{}) + if tt.iterAErr != nil { + itA.SetErr(tt.iterAErr) + } + if tt.iterBErr != nil { + itB.SetErr(tt.iterBErr) + } + combinedIter := graveler.NewJoinedDiffIterator(itA, itB) + + if tt.expectedErr != nil { + require.Equal(t, tt.expectedErr, combinedIter.Err()) + } else { + require.NoError(t, combinedIter.Err()) + } + }) + } +} diff --git a/pkg/graveler/mock/graveler.go b/pkg/graveler/mock/graveler.go index d895c95239a..00f5373e689 100644 --- a/pkg/graveler/mock/graveler.go +++ b/pkg/graveler/mock/graveler.go @@ -139,21 +139,6 @@ func (mr *MockKeyValueStoreMockRecorder) List(ctx, repository, ref, batchSize in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "List", reflect.TypeOf((*MockKeyValueStore)(nil).List), ctx, repository, ref, batchSize) } -// ListStaging mocks base method. -func (m *MockKeyValueStore) ListStaging(ctx context.Context, branch *graveler.Branch, batchSize int) (graveler.ValueIterator, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListStaging", ctx, branch, batchSize) - ret0, _ := ret[0].(graveler.ValueIterator) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListStaging indicates an expected call of ListStaging. -func (mr *MockKeyValueStoreMockRecorder) ListStaging(ctx, branch, batchSize interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListStaging", reflect.TypeOf((*MockKeyValueStore)(nil).ListStaging), ctx, branch, batchSize) -} - // Set mocks base method. func (m *MockKeyValueStore) Set(ctx context.Context, repository *graveler.RepositoryRecord, branchID graveler.BranchID, key graveler.Key, value graveler.Value, opts ...graveler.SetOptionsFunc) error { m.ctrl.T.Helper() diff --git a/pkg/graveler/testutil/fakes.go b/pkg/graveler/testutil/fakes.go index 7b51c2a581b..8de2212ab3a 100644 --- a/pkg/graveler/testutil/fakes.go +++ b/pkg/graveler/testutil/fakes.go @@ -252,6 +252,7 @@ type RefsFake struct { Commits map[graveler.CommitID]*graveler.Commit StagingToken graveler.StagingToken SealedTokens []graveler.StagingToken + BaseMetaRangeID graveler.MetaRangeID } func (m *RefsFake) CreateBranch(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.BranchID, branch graveler.Branch) error { @@ -302,10 +303,12 @@ func (m *RefsFake) ResolveRawRef(_ context.Context, _ *graveler.RepositoryRecord var branch graveler.BranchID var stagingToken graveler.StagingToken var sealedTokens []graveler.StagingToken + var baseMetaRangeID graveler.MetaRangeID if m.RefType == graveler.ReferenceTypeBranch { branch = DefaultBranchID stagingToken = m.StagingToken sealedTokens = m.SealedTokens + baseMetaRangeID = m.BaseMetaRangeID } return &graveler.ResolvedRef{ @@ -313,9 +316,10 @@ func (m *RefsFake) ResolveRawRef(_ context.Context, _ *graveler.RepositoryRecord BranchRecord: graveler.BranchRecord{ BranchID: branch, Branch: &graveler.Branch{ - CommitID: m.CommitID, - StagingToken: stagingToken, - SealedTokens: sealedTokens, + CommitID: m.CommitID, + StagingToken: stagingToken, + SealedTokens: sealedTokens, + CompactedBaseMetaRangeID: baseMetaRangeID, }, }, }, nil @@ -466,6 +470,10 @@ func (r *diffIter) Err() error { return r.err } +func (r *diffIter) SetErr(err error) { + r.err = err +} + func (r *diffIter) Close() {} type valueIteratorFake struct {