diff --git a/db.go b/db.go index 2c5c694..39fce45 100644 --- a/db.go +++ b/db.go @@ -1206,7 +1206,7 @@ func (db *DB) freepages() []common.Pgid { panic(fmt.Sprintf("freepages: failed to get all reachable pages (%v)", e)) } }() - tx.checkBucket(&tx.root, reachable, nofreed, HexKVStringer(), ech) + tx.recursivelyCheckBucket(&tx.root, reachable, nofreed, HexKVStringer(), ech) close(ech) // TODO: If check bucket reported any corruptions (ech) we shouldn't proceed to freeing the pages. diff --git a/tx_check.go b/tx_check.go index cc08013..ed8840e 100644 --- a/tx_check.go +++ b/tx_check.go @@ -57,7 +57,7 @@ func (tx *Tx) check(kvStringer KVStringer, ch chan error) { } // Recursively check buckets. - tx.checkBucket(&tx.root, reachable, freed, kvStringer, ch) + tx.recursivelyCheckBucket(&tx.root, reachable, freed, kvStringer, ch) // Ensure all pages below high water mark are either reachable or freed. for i := common.Pgid(0); i < tx.meta.Pgid(); i++ { @@ -71,7 +71,7 @@ func (tx *Tx) check(kvStringer KVStringer, ch chan error) { close(ch) } -func (tx *Tx) checkBucket(b *Bucket, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool, +func (tx *Tx) recursivelyCheckBucket(b *Bucket, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool, kvStringer KVStringer, ch chan error) { // Ignore inline buckets. if b.RootPage() == 0 { @@ -80,52 +80,56 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[common.Pgid]*common.Page, fre // Check every page used by this bucket. b.tx.forEachPage(b.RootPage(), func(p *common.Page, _ int, stack []common.Pgid) { - if p.Id() > tx.meta.Pgid() { - ch <- fmt.Errorf("page %d: out of bounds: %d (stack: %v)", int(p.Id()), int(b.tx.meta.Pgid()), stack) - } - - // Ensure each page is only referenced once. - for i := common.Pgid(0); i <= common.Pgid(p.Overflow()); i++ { - var id = p.Id() + i - if _, ok := reachable[id]; ok { - ch <- fmt.Errorf("page %d: multiple references (stack: %v)", int(id), stack) - } - reachable[id] = p - } - - // We should only encounter un-freed leaf and branch pages. - if freed[p.Id()] { - ch <- fmt.Errorf("page %d: reachable freed", int(p.Id())) - } else if !p.IsBranchPage() && !p.IsLeafPage() { - ch <- fmt.Errorf("page %d: invalid type: %s (stack: %v)", int(p.Id()), p.Typ(), stack) - } + verifyPageReachable(p, tx.meta.Pgid(), stack, reachable, freed, ch) }) - tx.recursivelyCheckPages(b.RootPage(), kvStringer.KeyToString, ch) + tx.recursivelyCheckPageKeyOrder(b.RootPage(), kvStringer.KeyToString, ch) // Check each bucket within this bucket. _ = b.ForEachBucket(func(k []byte) error { if child := b.Bucket(k); child != nil { - tx.checkBucket(child, reachable, freed, kvStringer, ch) + tx.recursivelyCheckBucket(child, reachable, freed, kvStringer, ch) } return nil }) } -// recursivelyCheckPages confirms database consistency with respect to b-tree +func verifyPageReachable(p *common.Page, hwm common.Pgid, stack []common.Pgid, reachable map[common.Pgid]*common.Page, freed map[common.Pgid]bool, ch chan error) { + if p.Id() > hwm { + ch <- fmt.Errorf("page %d: out of bounds: %d (stack: %v)", int(p.Id()), int(hwm), stack) + } + + // Ensure each page is only referenced once. + for i := common.Pgid(0); i <= common.Pgid(p.Overflow()); i++ { + var id = p.Id() + i + if _, ok := reachable[id]; ok { + ch <- fmt.Errorf("page %d: multiple references (stack: %v)", int(id), stack) + } + reachable[id] = p + } + + // We should only encounter un-freed leaf and branch pages. + if freed[p.Id()] { + ch <- fmt.Errorf("page %d: reachable freed", int(p.Id())) + } else if !p.IsBranchPage() && !p.IsLeafPage() { + ch <- fmt.Errorf("page %d: invalid type: %s (stack: %v)", int(p.Id()), p.Typ(), stack) + } +} + +// recursivelyCheckPageKeyOrder verifies database consistency with respect to b-tree // key order constraints: // - keys on pages must be sorted // - keys on children pages are between 2 consecutive keys on the parent's branch page). -func (tx *Tx) recursivelyCheckPages(pgId common.Pgid, keyToString func([]byte) string, ch chan error) { - tx.recursivelyCheckPagesInternal(pgId, nil, nil, nil, keyToString, ch) +func (tx *Tx) recursivelyCheckPageKeyOrder(pgId common.Pgid, keyToString func([]byte) string, ch chan error) { + tx.recursivelyCheckPageKeyOrderInternal(pgId, nil, nil, nil, keyToString, ch) } -// recursivelyCheckPagesInternal verifies that all keys in the subtree rooted at `pgid` are: +// recursivelyCheckPageKeyOrderInternal verifies that all keys in the subtree rooted at `pgid` are: // - >=`minKeyClosed` (can be nil) // - <`maxKeyOpen` (can be nil) // - Are in right ordering relationship to their parents. // `pagesStack` is expected to contain IDs of pages from the tree root to `pgid` for the clean debugging message. -func (tx *Tx) recursivelyCheckPagesInternal( +func (tx *Tx) recursivelyCheckPageKeyOrderInternal( pgId common.Pgid, minKeyClosed, maxKeyOpen []byte, pagesStack []common.Pgid, keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) { @@ -143,7 +147,7 @@ func (tx *Tx) recursivelyCheckPagesInternal( if i < len(p.BranchPageElements())-1 { maxKey = p.BranchPageElement(uint16(i + 1)).Key() } - maxKeyInSubtree = tx.recursivelyCheckPagesInternal(elem.Pgid(), elem.Key(), maxKey, pagesStack, keyToString, ch) + maxKeyInSubtree = tx.recursivelyCheckPageKeyOrderInternal(elem.Pgid(), elem.Key(), maxKey, pagesStack, keyToString, ch) runningMin = maxKeyInSubtree } return maxKeyInSubtree