Fix merge-split regression.

This commit reverts merge-split and fixes the node.split() to do a multi-page split. This issue
caused problems with bulk loading because it would split into a small page and a very large page.
The very large page, in turn, would be an arbitrary size so when it was freed later it would be
difficult to reuse and would cause serious fragmentation issues.
pull/34/head
Ben Johnson 2014-06-18 16:16:58 -06:00
parent 56de9902a8
commit b1dbd35da1
4 changed files with 52 additions and 30 deletions

View File

@ -691,15 +691,15 @@ func TestBucket_Stats_RandomFill(t *testing.T) {
s := tx.Bucket([]byte("woojits")).Stats()
assert.Equal(t, 100000, s.KeyN, "KeyN")
assert.Equal(t, 22, s.BranchPageN, "BranchPageN")
assert.Equal(t, 98, s.BranchPageN, "BranchPageN")
assert.Equal(t, 0, s.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 61708, s.BranchInuse, "BranchInuse")
assert.Equal(t, 90112, s.BranchAlloc, "BranchAlloc")
assert.Equal(t, 130984, s.BranchInuse, "BranchInuse")
assert.Equal(t, 401408, s.BranchAlloc, "BranchAlloc")
assert.Equal(t, 1643, s.LeafPageN, "LeafPageN")
assert.Equal(t, 3412, s.LeafPageN, "LeafPageN")
assert.Equal(t, 0, s.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 4714178, s.LeafInuse, "LeafInuse")
assert.Equal(t, 6729728, s.LeafAlloc, "LeafAlloc")
assert.Equal(t, 4742482, s.LeafInuse, "LeafInuse")
assert.Equal(t, 13975552, s.LeafAlloc, "LeafAlloc")
return nil
})
})
@ -847,11 +847,11 @@ func TestBucket_Stats_Large(t *testing.T) {
withOpenDB(func(db *DB, path string) {
var index int
for i := 0; i < 10000; i++ {
for i := 0; i < 100; i++ {
db.Update(func(tx *Tx) error {
// Add bucket with lots of keys.
b, _ := tx.CreateBucketIfNotExists([]byte("widgets"))
for i := 0; i < 10; i++ {
for i := 0; i < 1000; i++ {
b.Put([]byte(strconv.Itoa(index)), []byte(strconv.Itoa(index)))
index++
}
@ -865,16 +865,16 @@ func TestBucket_Stats_Large(t *testing.T) {
stats := b.Stats()
assert.Equal(t, 13, stats.BranchPageN, "BranchPageN")
assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 1195, stats.LeafPageN, "LeafPageN")
assert.Equal(t, 1196, stats.LeafPageN, "LeafPageN")
assert.Equal(t, 0, stats.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 100000, stats.KeyN, "KeyN")
assert.Equal(t, 3, stats.Depth, "Depth")
assert.Equal(t, 25208, stats.BranchInuse, "BranchInuse")
assert.Equal(t, 2596900, stats.LeafInuse, "LeafInuse")
assert.Equal(t, 25257, stats.BranchInuse, "BranchInuse")
assert.Equal(t, 2596916, stats.LeafInuse, "LeafInuse")
if os.Getpagesize() == 4096 {
// Incompatible page size
assert.Equal(t, 53248, stats.BranchAlloc, "BranchAlloc")
assert.Equal(t, 4894720, stats.LeafAlloc, "LeafAlloc")
assert.Equal(t, 4898816, stats.LeafAlloc, "LeafAlloc")
}
assert.Equal(t, 1, stats.BucketN, "BucketN")
assert.Equal(t, 0, stats.InlineBucketN, "InlineBucketN")

7
db.go
View File

@ -5,6 +5,8 @@ import (
"fmt"
"hash/fnv"
"os"
"runtime/debug"
"strings"
"sync"
"unsafe"
)
@ -652,3 +654,8 @@ func warn(v ...interface{}) {
func warnf(msg string, v ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", v...)
}
func printstack() {
stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
fmt.Fprintln(os.Stderr, stack)
}

View File

@ -512,7 +512,7 @@ func withOpenDB(fn func(*DB, string)) {
// mustCheck runs a consistency check on the database and panics if any errors are found.
func mustCheck(db *DB) {
err := db.Update(func(tx *Tx) error {
err := db.View(func(tx *Tx) error {
return <-tx.Check()
})
if err != nil {

49
node.go
View File

@ -9,6 +9,7 @@ import (
// node represents an in-memory, deserialized page.
type node struct {
bucket *Bucket
dirty bool
isLeaf bool
unbalanced bool
key []byte
@ -205,14 +206,37 @@ func (n *node) write(p *page) {
// DEBUG ONLY: n.dump()
}
// split breaks up a node into two smaller nodes, if appropriate.
// split breaks up a node into multiple smaller nodes, if appropriate.
// This should only be called from the spill() function.
func (n *node) split(pageSize int) []*node {
var nodes []*node
node := n
for {
// Split node into two.
a, b := node.splitTwo(pageSize)
nodes = append(nodes, a)
// If we can't split then exit the loop.
if b == nil {
break
}
// Set node to b so it gets split on the next iteration.
node = b
}
return nodes
}
// splitTwo breaks up a node into two smaller nodes, if appropriate.
// This should only be called from the split() function.
func (n *node) splitTwo(pageSize int) (*node, *node) {
// Ignore the split if the page doesn't have at least enough nodes for
// two pages or if the data can fit on a single page.
sz := n.size()
if len(n.inodes) <= (minKeysPerPage*2) || sz < pageSize {
return []*node{n}
return n, nil
}
// Determine the threshold before starting a new node.
@ -225,18 +249,10 @@ func (n *node) split(pageSize int) []*node {
threshold := int(float64(pageSize) * fillPercent)
// Determine split position and sizes of the two pages.
splitIndex, sz0 := n.splitIndex(threshold)
sz1 := pageHeaderSize + (sz - sz0)
splitIndex, _ := n.splitIndex(threshold)
// If we can fit our extra keys on the next page then merge into it.
if next := n.nextSibling(); next != nil && next.size()+sz1 < threshold {
next.inodes = append(n.inodes[splitIndex:], next.inodes...)
n.inodes = n.inodes[:splitIndex]
return []*node{n}
}
// Otherwise split node into two separate nodes. If there's no parent then
// we'll need to create one.
// Split node into two separate nodes.
// If there's no parent then we'll need to create one.
if n.parent == nil {
n.parent = &node{bucket: n.bucket, children: []*node{n}}
}
@ -252,7 +268,7 @@ func (n *node) split(pageSize int) []*node {
// Update the statistics.
n.bucket.tx.stats.Split++
return []*node{n, next}
return n, next
}
// splitIndex finds the position where a page will fill a given threshold.
@ -298,8 +314,7 @@ func (n *node) spill() error {
// We no longer need the child list because it's only used for spill tracking.
n.children = nil
// Spill nodes by deepest first. The first node returned from split() will
// always be "n".
// Split nodes into appropriate sizes. The first node will always be n.
var nodes = n.split(tx.db.pageSize)
for _, node := range nodes {
// Add node's page to the freelist if it's not new.
@ -328,7 +343,7 @@ func (n *node) spill() error {
node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
node.key = node.inodes[0].key
_assert(len(n.key) > 0, "spill: zero-length node key")
_assert(len(node.key) > 0, "spill: zero-length node key")
}
// Update the statistics.