From 2b1ee6c191e44a21c29768b9574f35f448738288 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sun, 4 Feb 2024 09:53:00 +0000 Subject: [PATCH] Continue to enhance check functionality and add one more case to cover the nested bucket case Signed-off-by: Benjamin Wang --- tx_check.go | 6 +-- tx_check_test.go | 123 ++++++++++++++++++++++++++++++++++------------- 2 files changed, 93 insertions(+), 36 deletions(-) diff --git a/tx_check.go b/tx_check.go index a7fa99c..1a6d8e2 100644 --- a/tx_check.go +++ b/tx_check.go @@ -102,16 +102,16 @@ func (tx *Tx) recursivelyCheckBucketInPage(pageId common.Pgid, reachable map[com case p.IsLeafPage(): for i := range p.LeafPageElements() { elem := p.LeafPageElement(uint16(i)) - if elem.Flags()&common.BucketLeafFlag != 0 { - + if elem.IsBucketEntry() { inBkt := common.NewInBucket(pageId, 0) tmpBucket := Bucket{ InBucket: &inBkt, rootNode: &node{isLeaf: p.IsLeafPage()}, FillPercent: DefaultFillPercent, + tx: tx, } if child := tmpBucket.Bucket(elem.Key()); child != nil { - tx.recursivelyCheckBucket(&tmpBucket, reachable, freed, kvStringer, ch) + tx.recursivelyCheckBucket(child, reachable, freed, kvStringer, ch) } } } diff --git a/tx_check_test.go b/tx_check_test.go index 194cec3..af36108 100644 --- a/tx_check_test.go +++ b/tx_check_test.go @@ -14,19 +14,21 @@ import ( ) func TestTx_Check_CorruptPage(t *testing.T) { + bucketName := []byte("data") + t.Log("Creating db file.") db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096}) // Each page can hold roughly 20 key/values pair, so 100 such // key/value pairs will consume about 5 leaf pages. - err := db.Fill([]byte("data"), 1, 100, + err := db.Fill(bucketName, 1, 100, func(tx int, k int) []byte { return []byte(fmt.Sprintf("%04d", k)) }, func(tx int, k int) []byte { return make([]byte, 100) }, ) require.NoError(t, err) - t.Log("Corrupting random leaf page.") - victimPageId, validPageIds := corruptRandomLeafPage(t, db.DB) + t.Log("Corrupting a random leaf page.") + victimPageId, validPageIds := corruptRandomLeafPageInBucket(t, db.DB, bucketName) t.Log("Running consistency check.") vErr := db.View(func(tx *bbolt.Tx) error { @@ -58,9 +60,82 @@ func TestTx_Check_CorruptPage(t *testing.T) { db.MustClose() } +func TestTx_Check_WithNestBucket(t *testing.T) { + parentBucketName := []byte("parentBucket") + + t.Log("Creating db file.") + db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096}) + + err := db.Update(func(tx *bbolt.Tx) error { + pb, bErr := tx.CreateBucket(parentBucketName) + if bErr != nil { + return bErr + } + + t.Log("put some key/values under the parent bucket directly") + for i := 0; i < 10; i++ { + k, v := fmt.Sprintf("%04d", i), fmt.Sprintf("value_%4d", i) + if pErr := pb.Put([]byte(k), []byte(v)); pErr != nil { + return pErr + } + } + + t.Log("create a nested bucket and put some key/values under the nested bucket") + cb, bErr := pb.CreateBucket([]byte("nestedBucket")) + if bErr != nil { + return bErr + } + + for i := 0; i < 2000; i++ { + k, v := fmt.Sprintf("%04d", i), fmt.Sprintf("value_%4d", i) + if pErr := cb.Put([]byte(k), []byte(v)); pErr != nil { + return pErr + } + } + + return nil + }) + require.NoError(t, err) + + // Get the bucket's root page. + bucketRootPageId := mustGetBucketRootPage(t, db.DB, parentBucketName) + + t.Logf("Running consistency check starting from pageId: %d", bucketRootPageId) + vErr := db.View(func(tx *bbolt.Tx) error { + var cErrs []error + + errChan := tx.Check(bbolt.WithPageId(uint(bucketRootPageId))) + for cErr := range errChan { + cErrs = append(cErrs, cErr) + } + require.Equal(t, 0, len(cErrs)) + + return nil + }) + require.NoError(t, vErr) + t.Log("All check passed") + + // Manually close the db, otherwise the PostTestCleanup will + // check the db again and accordingly fail the test. + db.MustClose() +} + // corruptRandomLeafPage corrupts one random leaf page. -func corruptRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) { - victimPageId, validPageIds = pickupRandomLeafPage(t, db) +func corruptRandomLeafPageInBucket(t testing.TB, db *bbolt.DB, bucketName []byte) (victimPageId common.Pgid, validPageIds []common.Pgid) { + bucketRootPageId := mustGetBucketRootPage(t, db, bucketName) + bucketRootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(bucketRootPageId)) + require.NoError(t, err) + require.True(t, bucketRootPage.IsBranchPage()) + + // Retrieve all the leaf pages included in the branch page, and pick up random one from them. + var bucketPageIds []common.Pgid + for _, bpe := range bucketRootPage.BranchPageElements() { + bucketPageIds = append(bucketPageIds, bpe.Pgid()) + } + randomIdx := rand.Intn(len(bucketPageIds)) + victimPageId = bucketPageIds[randomIdx] + validPageIds = append(bucketPageIds[:randomIdx], bucketPageIds[randomIdx+1:]...) + victimPage, victimBuf, err := guts_cli.ReadPage(db.Path(), uint64(victimPageId)) require.NoError(t, err) require.True(t, victimPage.IsLeafPage()) @@ -77,33 +152,15 @@ func corruptRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid return victimPageId, validPageIds } -// pickupRandomLeafPage picks up a random leaf page. -func pickupRandomLeafPage(t testing.TB, db *bbolt.DB) (victimPageId common.Pgid, validPageIds []common.Pgid) { - // Read DB's RootPage, which should be a leaf page. - rootPageId, _, err := guts_cli.GetRootPage(db.Path()) - require.NoError(t, err) - rootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(rootPageId)) - require.NoError(t, err) - require.True(t, rootPage.IsLeafPage()) +// mustGetBucketRootPage returns the root page for the provided bucket. +func mustGetBucketRootPage(t testing.TB, db *bbolt.DB, bucketName []byte) common.Pgid { + var rootPageId common.Pgid + _ = db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucketName) + require.NotNil(t, b) + rootPageId = b.RootPage() + return nil + }) - // The leaf page contains only one item, namely the bucket - require.Equal(t, uint16(1), rootPage.Count()) - lpe := rootPage.LeafPageElement(uint16(0)) - require.True(t, lpe.IsBucketEntry()) - - // The bucket should be pointing to a branch page - bucketRootPageId := lpe.Bucket().RootPage() - bucketRootPage, _, err := guts_cli.ReadPage(db.Path(), uint64(bucketRootPageId)) - require.NoError(t, err) - require.True(t, bucketRootPage.IsBranchPage()) - - // Retrieve all the leaf pages included in the branch page, and pick up random one from them. - var bucketPageIds []common.Pgid - for _, bpe := range bucketRootPage.BranchPageElements() { - bucketPageIds = append(bucketPageIds, bpe.Pgid()) - } - randomIdx := rand.Intn(len(bucketPageIds)) - victimPageId = bucketPageIds[randomIdx] - validPageIds = append(bucketPageIds[:randomIdx], bucketPageIds[randomIdx+1:]...) - return victimPageId, validPageIds + return rootPageId }