Skip to content

Commit

Permalink
Read from branch with compaction data (#7701)
Browse files Browse the repository at this point in the history
  • Loading branch information
idanovo authored Jun 11, 2024
1 parent 2c727f4 commit 317c985
Show file tree
Hide file tree
Showing 13 changed files with 1,641 additions and 596 deletions.
166 changes: 114 additions & 52 deletions pkg/catalog/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
numRecords int
expectedCalls int
expectedForUncommitted int
compactBranch bool
}{
{
name: "no branches",
Expand Down Expand Up @@ -560,73 +561,91 @@ func TestCatalog_PrepareGCUncommitted(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
const repositoryID = "repo1"
g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls)
blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem)
c := &catalog.Catalog{
Store: g.Sut,
BlockAdapter: blockAdapter,
UGCPrepareMaxFileSize: 500 * 1024,
KVStore: g.KVStore,
}

var (
mark *catalog.GCUncommittedMark
runID string
allRecords []string
)
for {
result, err := c.PrepareGCUncommitted(ctx, repositoryID, mark)
require.NoError(t, err)

// keep or check run id match previous calls
if runID == "" {
runID = result.RunID
} else {
require.Equal(t, runID, result.RunID)
for _, compactBranch := range []bool{false, true} {
t.Run(tt.name, func(t *testing.T) {
const repositoryID = "repo1"
g, expectedRecords := createPrepareUncommittedTestScenario(t, repositoryID, tt.numBranch, tt.numRecords, tt.expectedCalls, compactBranch)
blockAdapter := testutil.NewBlockAdapterByType(t, block.BlockstoreTypeMem)
c := &catalog.Catalog{
Store: g.Sut,
BlockAdapter: blockAdapter,
UGCPrepareMaxFileSize: 500 * 1024,
KVStore: g.KVStore,
}

if tt.numRecords == 0 {
require.Equal(t, "", result.Location)
require.Equal(t, "", result.Filename)
} else {
// read parquet information if data was stored to location
objLocation, err := url.JoinPath(result.Location, result.Filename)
var (
mark *catalog.GCUncommittedMark
runID string
allRecords []string
)
for {
result, err := c.PrepareGCUncommitted(ctx, repositoryID, mark)
require.NoError(t, err)
addresses := readPhysicalAddressesFromParquetObject(t, repositoryID, ctx, c, objLocation)
allRecords = append(allRecords, addresses...)
}

mark = result.Mark
if mark == nil {
break
// keep or check run id match previous calls
if runID == "" {
runID = result.RunID
} else {
require.Equal(t, runID, result.RunID)
}

if tt.numRecords == 0 {
require.Equal(t, "", result.Location)
require.Equal(t, "", result.Filename)
} else {
// read parquet information if data was stored to location
objLocation, err := url.JoinPath(result.Location, result.Filename)
require.NoError(t, err)
addresses := readPhysicalAddressesFromParquetObject(t, repositoryID, ctx, c, objLocation)
allRecords = append(allRecords, addresses...)
}

mark = result.Mark
if mark == nil {
break
}
require.Equal(t, runID, result.Mark.RunID)
}
require.Equal(t, runID, result.Mark.RunID)
}

// match expected records found in parquet data
sort.Strings(allRecords)
if diff := deep.Equal(allRecords, expectedRecords); diff != nil {
t.Errorf("Found diff in expected records: %s", diff)
}
})
// match expected records found in parquet data
sort.Strings(allRecords)
if diff := deep.Equal(allRecords, expectedRecords); diff != nil {
t.Errorf("Found diff in expected records: %s", diff)
}
})
}
}
}

func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int) (*gUtils.GravelerTest, []string) {
func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, numBranches, numRecords, expectedCalls int, compact bool) (*gUtils.GravelerTest, []string) {
t.Helper()

test := gUtils.InitGravelerTest(t)
records := make([][]*graveler.ValueRecord, numBranches)
diffs := make([][]graveler.Diff, numBranches)
var branches []*graveler.BranchRecord
var expectedRecords []string
if numBranches > 0 {
test.RefManager.EXPECT().GetCommit(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(&graveler.Commit{}, nil)
test.CommittedManager.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator([]*graveler.ValueRecord{}), nil)
}
for i := 0; i < numBranches; i++ {
branchID := graveler.BranchID(fmt.Sprintf("branch%04d", i))
token := graveler.StagingToken(fmt.Sprintf("%s_st%04d", branchID, i))
branches = append(branches, &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}})
branch := &graveler.BranchRecord{BranchID: branchID, Branch: &graveler.Branch{StagingToken: token}}
compactBaseMetaRangeID := graveler.MetaRangeID(fmt.Sprintf("base%04d", i))
commitID := graveler.CommitID(fmt.Sprintf("commit%04d", i))
if !compact {
test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token}, nil)
} else {
branch.CompactedBaseMetaRangeID = compactBaseMetaRangeID
branch.Branch.CommitID = commitID
test.RefManager.EXPECT().GetBranch(gomock.Any(), gomock.Any(), branchID).MinTimes(1).Return(&graveler.Branch{StagingToken: token, CommitID: commitID, CompactedBaseMetaRangeID: compactBaseMetaRangeID}, nil)
}
branches = append(branches, branch)

records[i] = make([]*graveler.ValueRecord, 0, numRecords)
diffs[i] = make([]graveler.Diff, 0, numRecords)
for j := 0; j < numRecords; j++ {
var (
addressType catalog.Entry_AddressType
Expand Down Expand Up @@ -661,6 +680,30 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
})
// we always keep the relative path - as the prepared uncommitted will trim the storage namespace
expectedRecords = append(expectedRecords, fmt.Sprintf("%s_record%04d", branchID, j))

if compact {
diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
}) // record in compaction and in staging so no need to add it again to expected records
e.Address = fmt.Sprintf("%s_%s", e.Address, "compacted")
v, err = proto.Marshal(&e)
require.NoError(t, err)
diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
})
// record in compaction but not in staging
expectedRecords = append(expectedRecords, fmt.Sprintf("%s_record%04d_%s", branchID, j, "compacted"))
}
}

// Add tombstone
Expand All @@ -669,6 +712,11 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
Value: nil,
})

diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeRemoved,
Key: []byte(fmt.Sprintf("%s_tombstone", branchID)),
})

// Add external address
e := catalog.Entry{
Address: fmt.Sprintf("external/address/object_%s", branchID),
Expand All @@ -688,6 +736,15 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
Data: v,
},
})

diffs[i] = append(diffs[i], graveler.Diff{
Type: graveler.DiffTypeAdded,
Key: []byte(e.Address),
Value: &graveler.Value{
Identity: []byte("dont care"),
Data: v,
},
})
}

test.GarbageCollectionManager.EXPECT().NewID().Return("TestRunID")
Expand All @@ -699,21 +756,27 @@ func createPrepareUncommittedTestScenario(t *testing.T, repositoryID string, num
DefaultBranchID: "main",
},
}
test.RefManager.EXPECT().GetRepository(gomock.Any(), graveler.RepositoryID(repositoryID)).Times(expectedCalls).Return(repository, nil)
test.RefManager.EXPECT().GetRepository(gomock.Any(), graveler.RepositoryID(repositoryID)).MinTimes(1).Return(repository, nil)

// expect tracked addresses does not list branches, so remove one and keep at least the first
test.RefManager.EXPECT().ListBranches(gomock.Any(), gomock.Any()).Times(expectedCalls).Return(gUtils.NewFakeBranchIterator(branches), nil)
test.RefManager.EXPECT().ListBranches(gomock.Any(), gomock.Any()).MinTimes(1).Return(gUtils.NewFakeBranchIterator(branches), nil)
for i := 0; i < len(branches); i++ {
sort.Slice(records[i], func(ii, jj int) bool {
return bytes.Compare(records[i][ii].Key, records[i][jj].Key) < 0
})
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).AnyTimes().Return(cUtils.NewFakeValueIterator(records[i]))
test.StagingManager.EXPECT().List(gomock.Any(), branches[i].StagingToken, gomock.Any()).MinTimes(1).Return(cUtils.NewFakeValueIterator(records[i]))
if compact {
sort.Slice(diffs[i], func(ii, jj int) bool {
return bytes.Compare(diffs[i][ii].Key, diffs[i][jj].Key) < 0
})
test.CommittedManager.EXPECT().Diff(gomock.Any(), repository.StorageNamespace, gomock.Any(), branches[i].CompactedBaseMetaRangeID).MinTimes(1).Return(gUtils.NewDiffIter(diffs[i]), nil)
}
}

if numRecords > 0 {
test.GarbageCollectionManager.EXPECT().
GetUncommittedLocation(gomock.Any(), gomock.Any()).
Times(expectedCalls).
MinTimes(1).
DoAndReturn(func(runID string, sn graveler.StorageNamespace) (string, error) {
return fmt.Sprintf("%s/retention/gc/uncommitted/%s/uncommitted/", "_lakefs", runID), nil
})
Expand All @@ -736,7 +799,6 @@ func readPhysicalAddressesFromParquetObject(t *testing.T, repositoryID string, c
require.NoError(t, err)
bufferFile := buffer.NewBufferFileFromBytes(data)
defer func() { _ = bufferFile.Close() }()

pr, err := reader.NewParquetReader(bufferFile, new(catalog.UncommittedParquetObject), 4)
require.NoError(t, err)

Expand Down
8 changes: 0 additions & 8 deletions pkg/catalog/fake_graveler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type FakeGraveler struct {
KeyValue map[string]*graveler.Value
Err error
ListIteratorFactory func() graveler.ValueIterator
ListStagingIteratorFactory func(token graveler.StagingToken) graveler.ValueIterator
DiffIteratorFactory func() graveler.DiffIterator
RepositoryIteratorFactory func() graveler.RepositoryIterator
BranchIteratorFactory func() graveler.BranchIterator
Expand Down Expand Up @@ -125,13 +124,6 @@ func (g *FakeGraveler) DeleteBatch(ctx context.Context, repository *graveler.Rep
return nil
}

func (g *FakeGraveler) ListStaging(_ context.Context, b *graveler.Branch, _ int) (graveler.ValueIterator, error) {
if g.Err != nil {
return nil, g.Err
}
return g.ListStagingIteratorFactory(b.StagingToken), nil
}

func (g *FakeGraveler) List(_ context.Context, _ *graveler.RepositoryRecord, _ graveler.Ref, _ int) (graveler.ValueIterator, error) {
if g.Err != nil {
return nil, g.Err
Expand Down
Loading

0 comments on commit 317c985

Please sign in to comment.