Recursive checker implementation.

Recursive checker confirms database consistency with respect to b-tree
key order constraints:
  - keys on pages must be sorted
  - keys on children pages are between 2 consecutive keys on parent
branch page).

Signed-off-by: Piotr Tabor <ptab@google.com>
pull/225/head
Piotr Tabor 2022-12-20 15:19:54 +01:00
parent 0ccb16dc02
commit 0c8d75db1e
8 changed files with 150 additions and 14 deletions

View File

@ -207,7 +207,7 @@ func (cmd *CheckCommand) Run(args ...string) error {
// Perform consistency check.
return db.View(func(tx *bolt.Tx) error {
var count int
for err := range tx.Check() {
for err := range tx.Check(CmdKeyValueStringer()) {
fmt.Fprintln(cmd.Stdout, err)
count++
}
@ -1689,3 +1689,25 @@ Additional options include:
Defaults to 64KB.
`, "\n")
}
type cmdKeyValueStringer struct{}
func (_ cmdKeyValueStringer) KeyToString(key []byte) string {
if isPrintable(string(key)) {
return string(key)
} else {
return hex.EncodeToString(key)
}
}
func (_ cmdKeyValueStringer) ValueToString(value []byte) string {
if isPrintable(string(value)) {
return string(value)
} else {
return hex.EncodeToString(value)
}
}
func CmdKeyValueStringer() bolt.KeyValueStringer {
return cmdKeyValueStringer{}
}

4
db.go
View File

@ -1148,9 +1148,11 @@ func (db *DB) freepages() []pgid {
panic(fmt.Sprintf("freepages: failed to get all reachable pages (%v)", e))
}
}()
tx.checkBucket(&tx.root, reachable, nofreed, ech)
tx.checkBucket(&tx.root, reachable, nofreed, HexKeyValueStringer(), ech)
close(ech)
// TODO: If check bucket reported any corruptions (ech) we shouldn't proceed to freeing the pages.
var fids []pgid
for i := pgid(2); i < db.meta().pgid; i++ {
if _, ok := reachable[i]; !ok {

View File

@ -396,7 +396,7 @@ func TestOpen_Check(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err = db.View(func(tx *bolt.Tx) error { return <-tx.Check() }); err != nil {
if err = db.View(func(tx *bolt.Tx) error { return <-tx.Check(bolt.HexKeyValueStringer()) }); err != nil {
t.Fatal(err)
}
if err = db.Close(); err != nil {
@ -407,7 +407,7 @@ func TestOpen_Check(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := db.View(func(tx *bolt.Tx) error { return <-tx.Check() }); err != nil {
if err := db.View(func(tx *bolt.Tx) error { return <-tx.Check(bolt.HexKeyValueStringer()) }); err != nil {
t.Fatal(err)
}
if err := db.Close(); err != nil {

View File

@ -119,7 +119,7 @@ func (db *DB) MustCheck() {
err := db.Update(func(tx *bolt.Tx) error {
// Collect all the errors.
var errors []error
for err := range tx.Check() {
for err := range tx.Check(bolt.HexKeyValueStringer()) {
errors = append(errors, err)
if len(errors) > 10 {
break

View File

@ -585,6 +585,10 @@ func (n *node) dump() {
}
*/
func compareKeys(left, right []byte) int {
return bytes.Compare(left, right)
}
type nodes []*node
func (s nodes) Len() int { return len(s) }

2
tx.go
View File

@ -190,7 +190,7 @@ func (tx *Tx) Commit() error {
// If strict mode is enabled then perform a consistency check.
if tx.db.StrictMode {
ch := tx.Check()
ch := tx.Check(HexKeyValueStringer())
var errs []string
for {
err, ok := <-ch

View File

@ -1,6 +1,9 @@
package bbolt
import "fmt"
import (
"encoding/hex"
"fmt"
)
// Check performs several consistency checks on the database for this transaction.
// An error is returned if any inconsistency is found.
@ -10,13 +13,13 @@ import "fmt"
// because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at
// the same time.
func (tx *Tx) Check() <-chan error {
func (tx *Tx) Check(keyValueStringer KeyValueStringer) <-chan error {
ch := make(chan error)
go tx.check(ch)
go tx.check(keyValueStringer, ch)
return ch
}
func (tx *Tx) check(ch chan error) {
func (tx *Tx) check(keyValueStringer KeyValueStringer, ch chan error) {
// Force loading free list if opened in ReadOnly mode.
tx.db.loadFreelist()
@ -42,7 +45,7 @@ func (tx *Tx) check(ch chan error) {
}
// Recursively check buckets.
tx.checkBucket(&tx.root, reachable, freed, ch)
tx.checkBucket(&tx.root, reachable, freed, keyValueStringer, ch)
// Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ {
@ -56,7 +59,8 @@ func (tx *Tx) check(ch chan error) {
close(ch)
}
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool,
keyValueStringer KeyValueStringer, ch chan error) {
// Ignore inline buckets.
if b.root == 0 {
return
@ -85,11 +89,114 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
}
})
tx.recursivelyCheckPages(b.root, keyValueStringer.KeyToString, ch)
// Check each bucket within this bucket.
_ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, ch)
tx.checkBucket(child, reachable, freed, keyValueStringer, ch)
}
return nil
})
}
// recursivelyCheckPages confirms database consistency with respect to b-tree
// key order constraints:
// - keys on pages must be sorted
// - keys on children pages are between 2 consecutive keys on the parent's branch page).
func (tx *Tx) recursivelyCheckPages(pgid pgid, keyToString func([]byte) string, ch chan error) {
tx.recursivelyCheckPagesInternal(pgid, nil, nil, nil, keyToString, ch)
}
// recursivelyCheckPagesInternal verifies that all keys in the subtree rooted at `pgid` are:
// - >=`minKeyClosed` (can be nil)
// - <`maxKeyOpen` (can be nil)
// - Are in right ordering relationship to their parents.
// `pagesStack` is expected to contain IDs of pages from the tree root to `pgid` for the clean debugging message.
func (tx *Tx) recursivelyCheckPagesInternal(
pgid pgid, minKeyClosed, maxKeyOpen []byte, pagesStack []pgid,
keyToString func([]byte) string, ch chan error) (maxKeyInSubtree []byte) {
p := tx.page(pgid)
pagesStack = append(pagesStack, pgid)
switch {
case p.flags&branchPageFlag != 0:
// For branch page we navigate ranges of all subpages.
runningMin := minKeyClosed
for i := range p.branchPageElements() {
elem := p.branchPageElement(uint16(i))
if i == 0 && runningMin != nil && compareKeys(runningMin, elem.key()) > 0 {
ch <- fmt.Errorf("key (%d, %s) on the branch page(%d) needs to be >="+
" to the index in the ancestor. Pages stack: %v",
i, keyToString(elem.key()), pgid, pagesStack)
}
if maxKeyOpen != nil && compareKeys(elem.key(), maxKeyOpen) >= 0 {
ch <- fmt.Errorf("key (%d: %s) on the branch page(%d) needs to be <"+
" than key of the next element reachable from the ancestor (%v). Pages stack: %v",
i, keyToString(elem.key()), pgid, keyToString(maxKeyOpen), pagesStack)
}
var maxKey []byte
if i < len(p.branchPageElements())-1 {
maxKey = p.branchPageElement(uint16(i + 1)).key()
} else {
maxKey = maxKeyOpen
}
maxKeyInSubtree = tx.recursivelyCheckPagesInternal(elem.pgid, elem.key(), maxKey, pagesStack, keyToString, ch)
runningMin = maxKeyInSubtree
}
return
case p.flags&leafPageFlag != 0:
runningMin := minKeyClosed
for i := range p.leafPageElements() {
elem := p.leafPageElement(uint16(i))
if i == 0 && runningMin != nil && compareKeys(runningMin, elem.key()) > 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on leaf page(%d) needs to be >= to the key in the ancestor. Stack: %v",
i, keyToString(elem.key()), pgid, pagesStack)
}
if i > 0 && compareKeys(runningMin, elem.key()) > 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on leaf page(%d) needs to be > (found <) than previous element (hex)%s. Stack: %v",
i, keyToString(elem.key()), pgid, keyToString(runningMin), pagesStack)
}
if i > 0 && compareKeys(runningMin, elem.key()) == 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on leaf page(%d) needs to be > (found =) than previous element (hex)%s. Stack: %v",
i, keyToString(elem.key()), pgid, keyToString(runningMin), pagesStack)
}
if maxKeyOpen != nil && compareKeys(elem.key(), maxKeyOpen) >= 0 {
ch <- fmt.Errorf("key[%d]=(hex)%s on leaf page(%d) needs to be < than key of the next element in ancestor (hex)%s. Pages stack: %v",
i, keyToString(elem.key()), pgid, keyToString(maxKeyOpen), pagesStack)
}
runningMin = elem.key()
}
if p.count > 0 {
return p.leafPageElement(p.count - 1).key()
}
default:
ch <- fmt.Errorf("unexpected page type for pgid:%d", pgid)
}
return nil
}
// ===========================================================================================
// KeyValueStringer allows to prepare human-readable diagnostic messages.
type KeyValueStringer interface {
KeyToString([]byte) string
ValueToString([]byte) string
}
// HexKeyValueStringer serializes both key & value to hex representation.
func HexKeyValueStringer() KeyValueStringer {
return hexKeyValueStringer{}
}
type hexKeyValueStringer struct{}
func (_ hexKeyValueStringer) KeyToString(key []byte) string {
return hex.EncodeToString(key)
}
func (_ hexKeyValueStringer) ValueToString(value []byte) string {
return hex.EncodeToString(value)
}

View File

@ -48,7 +48,8 @@ func TestTx_Check_ReadOnly(t *testing.T) {
numChecks := 2
errc := make(chan error, numChecks)
check := func() {
errc <- <-tx.Check()
err, _ := <-tx.Check(bolt.HexKeyValueStringer())
errc <- err
}
// Ensure the freelist is not reloaded and does not race.
for i := 0; i < numChecks; i++ {