Add DB.Check().

pull/34/head
Ben Johnson 2014-03-28 00:07:05 -06:00
parent 7dafeaa896
commit 7f2de9f17a
7 changed files with 140 additions and 1 deletions

View File

@ -15,6 +15,14 @@ func Open(path string, mode os.FileMode) (*DB, error) {
return db, nil
}
// ErrorList represents a slice of errors.
type ErrorList []error
// Error returns a readable count of the errors in the list.
func (l ErrorList) Error() string {
return fmt.Sprintf("%d errors occurred", len(l))
}
// _assert will panic with a given formatted message if the given condition is false.
func _assert(condition bool, msg string, v ...interface{}) {
if !condition {

View File

@ -271,6 +271,7 @@ func TestBucketStat(t *testing.T) {
return nil
})
mustCheck(db)
db.View(func(tx *Tx) error {
b := tx.Bucket("widgets")
stat := b.Stat()

34
cmd/bolt/check.go Normal file
View File

@ -0,0 +1,34 @@
package main
import (
"os"
"github.com/boltdb/bolt"
)
// Check performs a consistency check on the database and prints any errors found.
func Check(path string) {
if _, err := os.Stat(path); os.IsNotExist(err) {
fatal(err)
return
}
db, err := bolt.Open(path, 0600)
if err != nil {
fatal(err)
return
}
defer db.Close()
// Perform consistency check.
if err := db.Check(); err != nil {
if errors, ok := err.(bolt.ErrorList); ok {
for _, err := range errors {
println(err)
}
}
fatalln(err)
return
}
println("OK")
}

View File

@ -61,6 +61,14 @@ func NewApp() *cli.App {
Pages(path)
},
},
{
Name: "check",
Usage: "Performs a consistency check on the database",
Action: func(c *cli.Context) {
path := c.Args().Get(0)
Check(path)
},
},
}
return app
}

57
db.go
View File

@ -523,6 +523,63 @@ func (db *DB) Stat() (*Stat, error) {
return s, nil
}
// Check performs several consistency checks on the database.
// An error is returned if any inconsistency is found.
func (db *DB) Check() error {
return db.Update(func(tx *Tx) error {
var errors ErrorList
// Track every reachable page.
reachable := make(map[pgid]*page)
reachable[0] = tx.page(0) // meta0
reachable[1] = tx.page(1) // meta1
reachable[tx.meta.buckets] = tx.page(tx.meta.buckets)
reachable[tx.meta.freelist] = tx.page(tx.meta.freelist)
// Check each reachable page within each bucket.
for _, bucket := range tx.Buckets() {
// warnf("[bucket] %s", bucket.name)
tx.forEachPage(bucket.root, 0, func(p *page, _ int) {
// Ensure each page is only referenced once.
for i := pgid(0); i <= pgid(p.overflow); i++ {
var id = p.id + i
if _, ok := reachable[id]; ok {
errors = append(errors, fmt.Errorf("page %d: multiple references", int(id)))
}
reachable[id] = p
}
// Retrieve page info.
info, err := tx.Page(int(p.id))
// warnf("[page] %d + %d (%s)", p.id, p.overflow, info.Type)
if err != nil {
errors = append(errors, err)
} else if info == nil {
errors = append(errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(tx.meta.pgid)))
} else if info.Type != "branch" && info.Type != "leaf" {
errors = append(errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type))
}
})
}
// Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ {
_, isReachable := reachable[i]
if !isReachable && !db.freelist.isFree(i) {
errors = append(errors, fmt.Errorf("page %d: unreachable unfreed", int(i)))
}
}
// TODO(benbjohnson): Ensure that only one buckets page exists.
if len(errors) > 0 {
return errors
}
return nil
})
}
// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id pgid) *page {
pos := id * pgid(db.pageSize)

View File

@ -78,7 +78,8 @@ func TestDBMetaInitWriteError(t *testing.T) {
// Ensure that a database that is too small returns an error.
func TestDBFileTooSmall(t *testing.T) {
withOpenDB(func(db *DB, path string) {
withDB(func(db *DB, path string) {
assert.NoError(t, db.Open(path, 0666))
db.Close()
// corrupt the database
@ -130,6 +131,7 @@ func TestDBBeginRW(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, tx.DB(), db)
assert.Equal(t, tx.Writable(), true)
assert.NoError(t, tx.Commit())
})
}
@ -382,9 +384,28 @@ func withOpenDB(fn func(*DB, string)) {
}
defer db.Close()
fn(db, path)
// Check database consistency after every test.
mustCheck(db)
})
}
// mustCheck runs a consistency check on the database and panics if any errors are found.
func mustCheck(db *DB) {
if err := db.Check(); err != nil {
// Copy db off first.
db.CopyFile("/tmp/check.db", 0600)
if errors, ok := err.(ErrorList); ok {
for _, err := range errors {
warn(err)
}
}
warn(err)
panic("check failure: see /tmp/check.db")
}
}
func trunc(b []byte, length int) []byte {
if length < len(b) {
return b[:length]

10
node.go
View File

@ -258,6 +258,7 @@ func (n *node) rebalance() {
// Remove old child.
child.parent = nil
delete(n.tx.nodes, child.pgid)
child.free()
}
return
@ -318,6 +319,7 @@ func (n *node) rebalance() {
n.inodes = append(n.inodes, target.inodes...)
n.parent.del(target.key)
delete(n.tx.nodes, target.pgid)
target.free()
} else {
// Reparent all child nodes being moved.
for _, inode := range n.inodes {
@ -331,6 +333,7 @@ func (n *node) rebalance() {
n.parent.del(n.key)
n.parent.put(target.key, target.inodes[0].key, nil, target.pgid)
delete(n.tx.nodes, n.pgid)
n.free()
}
// Either this node or the target node was deleted from the parent so rebalance it.
@ -357,6 +360,13 @@ func (n *node) dereference() {
}
}
// free adds the node's underlying page to the freelist.
func (n *node) free() {
if n.pgid != 0 {
n.tx.db.freelist.free(n.tx.id(), n.tx.page(n.pgid))
}
}
// nodesByDepth sorts a list of branches by deepest first.
type nodesByDepth []*node