diff --git a/tx_check.go b/tx_check.go index a7fa99c7e..1a6d8e2dc 100644 --- a/tx_check.go +++ b/tx_check.go @@ -102,16 +102,16 @@ func (tx *Tx) recursivelyCheckBucketInPage(pageId common.Pgid, reachable map[com case p.IsLeafPage(): for i := range p.LeafPageElements() { elem := p.LeafPageElement(uint16(i)) - if elem.Flags()&common.BucketLeafFlag != 0 { - + if elem.IsBucketEntry() { inBkt := common.NewInBucket(pageId, 0) tmpBucket := Bucket{ InBucket: &inBkt, rootNode: &node{isLeaf: p.IsLeafPage()}, FillPercent: DefaultFillPercent, + tx: tx, } if child := tmpBucket.Bucket(elem.Key()); child != nil { - tx.recursivelyCheckBucket(&tmpBucket, reachable, freed, kvStringer, ch) + tx.recursivelyCheckBucket(child, reachable, freed, kvStringer, ch) } } } diff --git a/tx_check_test.go b/tx_check_test.go index 194cec32b..af3610897 100644 --- a/tx_check_test.go +++ b/tx_check_test.go @@ -14,19 +14,21 @@ import ( ) func TestTx_Check_CorruptPage(t *testing.T) { + bucketName := []byte("data") + t.Log("Creating db file.") db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096}) // Each page can hold roughly 20 key/values pair, so 100 such // key/value pairs will consume about 5 leaf pages. - err := db.Fill([]byte("data"), 1, 100, + err := db.Fill(bucketName, 1, 100, func(tx int, k int) []byte { return []byte(fmt.Sprintf("%04d", k)) }, func(tx int, k int) []byte { return make([]byte, 100) }, ) require.NoError(t, err) - t.Log("Corrupting random leaf page.") - victimPageId, validPageIds := corruptRandomLeafPage(t, db.DB) + t.Log("Corrupting a random leaf page.") + victimPageId, validPageIds := corruptRandomLeafPageInBucket(t, db.DB, bucketName) t.Log("Running consistency check.") vErr := db.View(func(tx *bbolt.Tx) error { @@ -58,41 +60,69 @@ func TestTx_Check_CorruptPage(t *testing.T) { db.MustClose() } -// corruptRandomLeafPage corrupts one random leaf page. -func corruptRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) { - victimPageId, validPageIds = pickupRandomLeafPage(t, db) - victimPage, victimBuf, err := guts_cli.ReadPage(db.Path(), uint64(victimPageId)) - require.NoError(t, err) - require.True(t, victimPage.IsLeafPage()) - require.True(t, victimPage.Count() > 1) +func TestTx_Check_WithNestBucket(t *testing.T) { + parentBucketName := []byte("parentBucket") - // intentionally make the second key < the first key. - element := victimPage.LeafPageElement(1) - key := element.Key() - key[0] = 0 + t.Log("Creating db file.") + db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096}) - // Write the corrupt page to db file. - err = guts_cli.WritePage(db.Path(), victimBuf) - require.NoError(t, err) - return victimPageId, validPageIds -} + err := db.Update(func(tx *bbolt.Tx) error { + pb, bErr := tx.CreateBucket(parentBucketName) + if bErr != nil { + return bErr + } -// pickupRandomLeafPage picks up a random leaf page. -func pickupRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) { - // Read DB's RootPage, which should be a leaf page. - rootPageId, _, err := guts_cli.GetRootPage(db.Path()) - require.NoError(t, err) - rootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(rootPageId)) + t.Log("put some key/values under the parent bucket directly") + for i := 0; i < 10; i++ { + k, v := fmt.Sprintf("%04d", i), fmt.Sprintf("value_%4d", i) + if pErr := pb.Put([]byte(k), []byte(v)); pErr != nil { + return pErr + } + } + + t.Log("create a nested bucket and put some key/values under the nested bucket") + cb, bErr := pb.CreateBucket([]byte("nestedBucket")) + if bErr != nil { + return bErr + } + + for i := 0; i < 2000; i++ { + k, v := fmt.Sprintf("%04d", i), fmt.Sprintf("value_%4d", i) + if pErr := cb.Put([]byte(k), []byte(v)); pErr != nil { + return pErr + } + } + + return nil + }) require.NoError(t, err) - require.True(t, rootPage.IsLeafPage()) - // The leaf page contains only one item, namely the bucket - require.Equal(t, uint16(1), rootPage.Count()) - lpe := rootPage.LeafPageElement(uint16(0)) - require.True(t, lpe.IsBucketEntry()) + // Get the bucket's root page. + bucketRootPageId := mustGetBucketRootPage(t, db.DB, parentBucketName) + + t.Logf("Running consistency check starting from pageId: %d", bucketRootPageId) + vErr := db.View(func(tx *bbolt.Tx) error { + var cErrs []error + + errChan := tx.Check(bbolt.WithPageId(uint(bucketRootPageId))) + for cErr := range errChan { + cErrs = append(cErrs, cErr) + } + require.Equal(t, 0, len(cErrs)) - // The bucket should be pointing to a branch page - bucketRootPageId := lpe.Bucket().RootPage() + return nil + }) + require.NoError(t, vErr) + t.Log("All check passed") + + // Manually close the db, otherwise the PostTestCleanup will + // check the db again and accordingly fail the test. + db.MustClose() +} + +// corruptRandomLeafPage corrupts one random leaf page. +func corruptRandomLeafPageInBucket(t testing.TB, db *bbolt.DB, bucketName []byte) (victimPageId common.Pgid, validPageIds []common.Pgid) { + bucketRootPageId := mustGetBucketRootPage(t, db, bucketName) bucketRootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(bucketRootPageId)) require.NoError(t, err) require.True(t, bucketRootPage.IsBranchPage()) @@ -105,5 +135,32 @@ func pickupRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, randomIdx := rand.Intn(len(bucketPageIds)) victimPageId = bucketPageIds[randomIdx] validPageIds = append(bucketPageIds[:randomIdx], bucketPageIds[randomIdx+1:]...) + + victimPage, victimBuf, err := guts_cli.ReadPage(db.Path(), uint64(victimPageId)) + require.NoError(t, err) + require.True(t, victimPage.IsLeafPage()) + require.True(t, victimPage.Count() > 1) + + // intentionally make the second key < the first key. + element := victimPage.LeafPageElement(1) + key := element.Key() + key[0] = 0 + + // Write the corrupt page to db file. + err = guts_cli.WritePage(db.Path(), victimBuf) + require.NoError(t, err) return victimPageId, validPageIds } + +// mustGetBucketRootPage returns the root page for the provided bucket. +func mustGetBucketRootPage(t testing.TB, db *bbolt.DB, bucketName []byte) common.Pgid { + var rootPageId common.Pgid + _ = db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucketName) + require.NotNil(t, b) + rootPageId = b.RootPage() + return nil + }) + + return rootPageId +}