Fix root split on very large append.

This commit fixes a bug where a root split on a very large insert would
cause an overflow of the root node. The problem was that the new root
was not split when it was created so a new root with more than 64K child
nodes would overflow the page.count (uint16).
pull/34/head
Ben Johnson 2014-07-23 15:08:59 -06:00
parent fd3c1d44b0
commit aa66291b9b
3 changed files with 61 additions and 22 deletions

View File

@ -107,6 +107,31 @@ func TestBucket_Put_Large(t *testing.T) {
})
}
// Ensure that a database can perform multiple large appends safely.
func TestDB_Put_VeryLarge(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
n, batchN := 400000, 200000
ksize, vsize := 8, 500
withOpenDB(func(db *DB, path string) {
for i := 0; i < n; i += batchN {
err := db.Update(func(tx *Tx) error {
b, _ := tx.CreateBucketIfNotExists([]byte("widgets"))
for j := 0; j < batchN; j++ {
k, v := make([]byte, ksize), make([]byte, vsize)
binary.BigEndian.PutUint32(k, uint32(i+j))
assert.NoError(t, b.Put(k, v))
}
return nil
})
assert.NoError(t, err)
}
})
}
// Ensure that a setting a value on a key with a bucket value returns an error.
func TestBucket_Put_IncompatibleValue(t *testing.T) {
withOpenDB(func(db *DB, path string) {

View File

@ -608,15 +608,36 @@ func withOpenDB(fn func(*DB, string)) {
// mustCheck runs a consistency check on the database and panics if any errors are found.
func mustCheck(db *DB) {
err := db.View(func(tx *Tx) error {
return <-tx.Check()
db.View(func(tx *Tx) error {
// Collect all the errors.
var errors []error
for err := range tx.Check() {
errors = append(errors, err)
if len(errors) > 10 {
break
}
}
// If errors occurred, copy the DB and print the errors.
if len(errors) > 0 {
var path = tempfile()
tx.CopyFile(path, 0600)
// Print errors.
fmt.Print("\n\n")
fmt.Printf("consistency check failed (%d errors)\n", len(errors))
for _, err := range errors {
fmt.Println(err)
}
fmt.Println("")
fmt.Println("db saved to:")
fmt.Println(path)
fmt.Print("\n\n")
os.Exit(-1)
}
return nil
})
if err != nil {
// Copy db off first.
var path = tempfile()
db.View(func(tx *Tx) error { return tx.CopyFile(path, 0600) })
panic("check failure: " + err.Error() + ": " + path)
}
}
// mustContainKeys checks that a bucket contains a given set of keys.

21
node.go
View File

@ -188,6 +188,8 @@ func (n *node) write(p *page) {
} else {
p.flags |= branchPageFlag
}
_assert(len(n.inodes) < 0xFFFF, "inode overflow: %d (pgid=%d)", len(n.inodes), p.id)
p.count = uint16(len(n.inodes))
// Loop over each item and write it to the page.
@ -367,20 +369,11 @@ func (n *node) spill() error {
tx.stats.Spill++
}
// This is a special case where we need to write the parent if it is new
// and caused by a split in the root.
var parent = n.parent
if parent != nil && parent.pgid == 0 {
// Allocate contiguous space for the node.
p, err := tx.allocate((parent.size() / tx.db.pageSize) + 1)
if err != nil {
return err
}
// Write the new root.
_assert(p.id < tx.meta.pgid, "pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid)
parent.pgid = p.id
parent.write(p)
// If the root node split and created a new root then we need to spill that
// as well. We'll clear out the children to make sure it doesn't try to respill.
if n.parent != nil && n.parent.pgid == 0 {
n.children = nil
return n.parent.spill()
}
return nil