Add streaming check.

This commit changes Tx.Check() to return a channel through which check errors are returned. This allows
errors to be found before checking the entire data file.
pull/34/head
Ben Johnson 2014-05-28 10:28:15 -06:00
parent 4508a00891
commit b789691976
5 changed files with 46 additions and 62 deletions

View File

@ -21,19 +21,27 @@ func Check(path string) {
defer db.Close() defer db.Close()
// Perform consistency check. // Perform consistency check.
err = db.View(func(tx *bolt.Tx) error { _ = db.View(func(tx *bolt.Tx) error {
return tx.Check() var count int
}) ch := tx.Check()
loop:
// Print out any errors that occur. for {
if err != nil { select {
if errors, ok := err.(bolt.ErrorList); ok { case err, ok := <-ch:
for _, err := range errors { if !ok {
break loop
}
println(err) println(err)
count++
} }
} }
fatalln(err)
return // Print summary of errors.
} if count > 0 {
fatalf("%d errors found")
} else {
println("OK") println("OK")
}
return nil
})
} }

18
db.go
View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"os" "os"
"strings"
"sync" "sync"
"syscall" "syscall"
"unsafe" "unsafe"
@ -632,23 +631,6 @@ func (m *meta) sum64() uint64 {
return h.Sum64() return h.Sum64()
} }
// ErrorList represents a slice of errors.
type ErrorList []error
// Error returns a readable count of the errors in the list.
func (l ErrorList) Error() string {
return fmt.Sprintf("%d errors occurred", len(l))
}
// join returns a error messages joined by a string.
func (l ErrorList) join(sep string) string {
var a []string
for _, e := range l {
a = append(a, e.Error())
}
return strings.Join(a, sep)
}
// _assert will panic with a given formatted message if the given condition is false. // _assert will panic with a given formatted message if the given condition is false.
func _assert(condition bool, msg string, v ...interface{}) { func _assert(condition bool, msg string, v ...interface{}) {
if !condition { if !condition {

View File

@ -53,12 +53,12 @@ func TestOpen_Check(t *testing.T) {
withTempPath(func(path string) { withTempPath(func(path string) {
db, err := Open(path, 0666) db, err := Open(path, 0666)
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, db.View(func(tx *Tx) error { return tx.Check() })) assert.NoError(t, db.View(func(tx *Tx) error { return <-tx.Check() }))
db.Close() db.Close()
db, err = Open(path, 0666) db, err = Open(path, 0666)
assert.NoError(t, err) assert.NoError(t, err)
assert.NoError(t, db.View(func(tx *Tx) error { return tx.Check() })) assert.NoError(t, db.View(func(tx *Tx) error { return <-tx.Check() }))
db.Close() db.Close()
}) })
} }
@ -464,20 +464,13 @@ func withOpenDB(fn func(*DB, string)) {
// mustCheck runs a consistency check on the database and panics if any errors are found. // mustCheck runs a consistency check on the database and panics if any errors are found.
func mustCheck(db *DB) { func mustCheck(db *DB) {
err := db.Update(func(tx *Tx) error { err := db.Update(func(tx *Tx) error {
return tx.Check() return <-tx.Check()
}) })
if err != nil { if err != nil {
// Copy db off first. // Copy db off first.
var path = tempfile() var path = tempfile()
db.View(func(tx *Tx) error { return tx.CopyFile(path, 0600) }) db.View(func(tx *Tx) error { return tx.CopyFile(path, 0600) })
panic("check failure: " + err.Error() + ": " + path)
if errors, ok := err.(ErrorList); ok {
for _, err := range errors {
warn(err)
}
}
warn(err)
panic("check failure: " + path)
} }
} }

41
tx.go
View File

@ -184,10 +184,10 @@ func (tx *Tx) Commit() error {
} }
// If strict mode is enabled then perform a consistency check. // If strict mode is enabled then perform a consistency check.
// Only the first consistency error is reported in the panic.
if tx.db.StrictMode { if tx.db.StrictMode {
if err := tx.Check(); err != nil { if err, ok := <-tx.Check(); ok {
err := err.(ErrorList) panic("check fail: " + err.Error())
panic("check fail: " + err.Error() + ": " + err.join("; "))
} }
} }
@ -291,14 +291,18 @@ func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
// because of caching. This overhead can be removed if running on a read-only // because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at // transaction, however, it is not safe to execute other writer transactions at
// the same time. // the same time.
func (tx *Tx) Check() error { func (tx *Tx) Check() <-chan error {
var errors ErrorList ch := make(chan error)
go tx.check(ch)
return ch
}
func (tx *Tx) check(ch chan error) {
// Check if any pages are double freed. // Check if any pages are double freed.
freed := make(map[pgid]bool) freed := make(map[pgid]bool)
for _, id := range tx.db.freelist.all() { for _, id := range tx.db.freelist.all() {
if freed[id] { if freed[id] {
errors = append(errors, fmt.Errorf("page %d: already freed", id)) ch <- fmt.Errorf("page %d: already freed", id)
} }
freed[id] = true freed[id] = true
} }
@ -312,26 +316,23 @@ func (tx *Tx) Check() error {
} }
// Recursively check buckets. // Recursively check buckets.
tx.checkBucket(&tx.root, reachable, &errors) tx.checkBucket(&tx.root, reachable, ch)
// Ensure all pages below high water mark are either reachable or freed. // Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ { for i := pgid(0); i < tx.meta.pgid; i++ {
_, isReachable := reachable[i] _, isReachable := reachable[i]
if !isReachable && !freed[i] { if !isReachable && !freed[i] {
errors = append(errors, fmt.Errorf("page %d: unreachable unfreed", int(i))) ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
} else if isReachable && freed[i] { } else if isReachable && freed[i] {
errors = append(errors, fmt.Errorf("page %d: reachable freed", int(i))) ch <- fmt.Errorf("page %d: reachable freed", int(i))
} }
} }
if len(errors) > 0 { // Close the channel to signal completion.
return errors close(ch)
}
return nil
} }
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList) { func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, ch chan error) {
// Ignore inline buckets. // Ignore inline buckets.
if b.root == 0 { if b.root == 0 {
return return
@ -343,7 +344,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList
for i := pgid(0); i <= pgid(p.overflow); i++ { for i := pgid(0); i <= pgid(p.overflow); i++ {
var id = p.id + i var id = p.id + i
if _, ok := reachable[id]; ok { if _, ok := reachable[id]; ok {
*errors = append(*errors, fmt.Errorf("page %d: multiple references", int(id))) ch <- fmt.Errorf("page %d: multiple references", int(id))
} }
reachable[id] = p reachable[id] = p
} }
@ -351,18 +352,18 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList
// Retrieve page info. // Retrieve page info.
info, err := b.tx.Page(int(p.id)) info, err := b.tx.Page(int(p.id))
if err != nil { if err != nil {
*errors = append(*errors, err) ch <- err
} else if info == nil { } else if info == nil {
*errors = append(*errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))) ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
} else if info.Type != "branch" && info.Type != "leaf" { } else if info.Type != "branch" && info.Type != "leaf" {
*errors = append(*errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type)) ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type)
} }
}) })
// Check each bucket within this bucket. // Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error { _ = b.ForEach(func(k, v []byte) error {
if child := b.Bucket(k); child != nil { if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, errors) tx.checkBucket(child, reachable, ch)
} }
return nil return nil
}) })

View File

@ -310,7 +310,7 @@ func TestTx_Check_Corrupt(t *testing.T) {
}) })
}() }()
assert.Equal(t, "check fail: 1 errors occurred: page 3: already freed", msg) assert.Equal(t, "check fail: page 3: already freed", msg)
} }
// Ensure that the database can be copied to a file path. // Ensure that the database can be copied to a file path.