Skip to content

Commit

Permalink
Merge pull request #687 from ahrtr/fix_check_20240204
Browse files Browse the repository at this point in the history
Continue to enhance check functionality and add one more case to cover the nested bucket case
  • Loading branch information
ahrtr authored Feb 5, 2024
2 parents da5975b + 2b1ee6c commit 2a0e6e8
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 35 deletions.
6 changes: 3 additions & 3 deletions tx_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ func (tx *Tx) recursivelyCheckBucketInPage(pageId common.Pgid, reachable map[com
case p.IsLeafPage():
for i := range p.LeafPageElements() {
elem := p.LeafPageElement(uint16(i))
if elem.Flags()&common.BucketLeafFlag != 0 {

if elem.IsBucketEntry() {
inBkt := common.NewInBucket(pageId, 0)
tmpBucket := Bucket{
InBucket: &inBkt,
rootNode: &node{isLeaf: p.IsLeafPage()},
FillPercent: DefaultFillPercent,
tx: tx,
}
if child := tmpBucket.Bucket(elem.Key()); child != nil {
tx.recursivelyCheckBucket(&tmpBucket, reachable, freed, kvStringer, ch)
tx.recursivelyCheckBucket(child, reachable, freed, kvStringer, ch)
}
}
}
Expand Down
121 changes: 89 additions & 32 deletions tx_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ import (
)

func TestTx_Check_CorruptPage(t *testing.T) {
bucketName := []byte("data")

t.Log("Creating db file.")
db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096})

// Each page can hold roughly 20 key/values pair, so 100 such
// key/value pairs will consume about 5 leaf pages.
err := db.Fill([]byte("data"), 1, 100,
err := db.Fill(bucketName, 1, 100,
func(tx int, k int) []byte { return []byte(fmt.Sprintf("%04d", k)) },
func(tx int, k int) []byte { return make([]byte, 100) },
)
require.NoError(t, err)

t.Log("Corrupting random leaf page.")
victimPageId, validPageIds := corruptRandomLeafPage(t, db.DB)
t.Log("Corrupting a random leaf page.")
victimPageId, validPageIds := corruptRandomLeafPageInBucket(t, db.DB, bucketName)

t.Log("Running consistency check.")
vErr := db.View(func(tx *bbolt.Tx) error {
Expand Down Expand Up @@ -58,41 +60,69 @@ func TestTx_Check_CorruptPage(t *testing.T) {
db.MustClose()
}

// corruptRandomLeafPage corrupts one random leaf page.
func corruptRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) {
victimPageId, validPageIds = pickupRandomLeafPage(t, db)
victimPage, victimBuf, err := guts_cli.ReadPage(db.Path(), uint64(victimPageId))
require.NoError(t, err)
require.True(t, victimPage.IsLeafPage())
require.True(t, victimPage.Count() > 1)
func TestTx_Check_WithNestBucket(t *testing.T) {
parentBucketName := []byte("parentBucket")

// intentionally make the second key < the first key.
element := victimPage.LeafPageElement(1)
key := element.Key()
key[0] = 0
t.Log("Creating db file.")
db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096})

// Write the corrupt page to db file.
err = guts_cli.WritePage(db.Path(), victimBuf)
require.NoError(t, err)
return victimPageId, validPageIds
}
err := db.Update(func(tx *bbolt.Tx) error {
pb, bErr := tx.CreateBucket(parentBucketName)
if bErr != nil {
return bErr
}

// pickupRandomLeafPage picks up a random leaf page.
func pickupRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) {
// Read DB's RootPage, which should be a leaf page.
rootPageId, _, err := guts_cli.GetRootPage(db.Path())
require.NoError(t, err)
rootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(rootPageId))
t.Log("put some key/values under the parent bucket directly")
for i := 0; i < 10; i++ {
k, v := fmt.Sprintf("%04d", i), fmt.Sprintf("value_%4d", i)
if pErr := pb.Put([]byte(k), []byte(v)); pErr != nil {
return pErr
}
}

t.Log("create a nested bucket and put some key/values under the nested bucket")
cb, bErr := pb.CreateBucket([]byte("nestedBucket"))
if bErr != nil {
return bErr
}

for i := 0; i < 2000; i++ {
k, v := fmt.Sprintf("%04d", i), fmt.Sprintf("value_%4d", i)
if pErr := cb.Put([]byte(k), []byte(v)); pErr != nil {
return pErr
}
}

return nil
})
require.NoError(t, err)
require.True(t, rootPage.IsLeafPage())

// The leaf page contains only one item, namely the bucket
require.Equal(t, uint16(1), rootPage.Count())
lpe := rootPage.LeafPageElement(uint16(0))
require.True(t, lpe.IsBucketEntry())
// Get the bucket's root page.
bucketRootPageId := mustGetBucketRootPage(t, db.DB, parentBucketName)

t.Logf("Running consistency check starting from pageId: %d", bucketRootPageId)
vErr := db.View(func(tx *bbolt.Tx) error {
var cErrs []error

errChan := tx.Check(bbolt.WithPageId(uint(bucketRootPageId)))
for cErr := range errChan {
cErrs = append(cErrs, cErr)
}
require.Equal(t, 0, len(cErrs))

// The bucket should be pointing to a branch page
bucketRootPageId := lpe.Bucket().RootPage()
return nil
})
require.NoError(t, vErr)
t.Log("All check passed")

// Manually close the db, otherwise the PostTestCleanup will
// check the db again and accordingly fail the test.
db.MustClose()
}

// corruptRandomLeafPage corrupts one random leaf page.
func corruptRandomLeafPageInBucket(t testing.TB, db *bbolt.DB, bucketName []byte) (victimPageId common.Pgid, validPageIds []common.Pgid) {
bucketRootPageId := mustGetBucketRootPage(t, db, bucketName)
bucketRootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(bucketRootPageId))
require.NoError(t, err)
require.True(t, bucketRootPage.IsBranchPage())
Expand All @@ -105,5 +135,32 @@ func pickupRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid,
randomIdx := rand.Intn(len(bucketPageIds))
victimPageId = bucketPageIds[randomIdx]
validPageIds = append(bucketPageIds[:randomIdx], bucketPageIds[randomIdx+1:]...)

victimPage, victimBuf, err := guts_cli.ReadPage(db.Path(), uint64(victimPageId))
require.NoError(t, err)
require.True(t, victimPage.IsLeafPage())
require.True(t, victimPage.Count() > 1)

// intentionally make the second key < the first key.
element := victimPage.LeafPageElement(1)
key := element.Key()
key[0] = 0

// Write the corrupt page to db file.
err = guts_cli.WritePage(db.Path(), victimBuf)
require.NoError(t, err)
return victimPageId, validPageIds
}

// mustGetBucketRootPage returns the root page for the provided bucket.
func mustGetBucketRootPage(t testing.TB, db *bbolt.DB, bucketName []byte) common.Pgid {
var rootPageId common.Pgid
_ = db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(bucketName)
require.NotNil(t, b)
rootPageId = b.RootPage()
return nil
})

return rootPageId
}

0 comments on commit 2a0e6e8

Please sign in to comment.