Allow split nodes to be merged with the next node.

This commit changes the node.split() functionality to check if the next node has
available space and, if so, it will merge the newly split keys into the next node.

Previously, keys could be continually put into the left side of a split causing that
first half to split off small right side nodes. This was especially problematic with
databases with a high fill percent.
pull/34/head
Ben Johnson 2014-06-02 15:26:58 -06:00
parent 10074ee8f4
commit a96185e8b6
3 changed files with 121 additions and 57 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
@ -560,18 +561,19 @@ func TestBucket_Put_KeyTooLarge(t *testing.T) {
// Ensure a bucket can calculate stats.
func TestBucket_Stats(t *testing.T) {
withOpenDB(func(db *DB, path string) {
// Add bucket with fewer keys but one big value.
big_key := []byte("really-big-value")
for i := 0; i < 500; i++ {
db.Update(func(tx *Tx) error {
b, _ := tx.CreateBucketIfNotExists([]byte("woojits"))
return b.Put([]byte(fmt.Sprintf("%03d", i)), []byte(strconv.Itoa(i)))
})
}
db.Update(func(tx *Tx) error {
// Add bucket with fewer keys but one big value.
b, err := tx.CreateBucket([]byte("woojits"))
assert.NoError(t, err)
for i := 0; i < 500; i++ {
b.Put([]byte(fmt.Sprintf("%03d", i)), []byte(strconv.Itoa(i)))
}
b.Put(big_key, []byte(strings.Repeat("*", 10000)))
return nil
b, _ := tx.CreateBucketIfNotExists([]byte("woojits"))
return b.Put(big_key, []byte(strings.Repeat("*", 10000)))
})
mustCheck(db)
db.View(func(tx *Tx) error {
b := tx.Bucket([]byte("woojits"))
@ -608,6 +610,48 @@ func TestBucket_Stats(t *testing.T) {
})
}
// Ensure a bucket with random insertion utilizes fill percentage correctly.
func TestBucket_Stats_RandomFill(t *testing.T) {
if os.Getpagesize() != 4096 {
t.Skip("invalid page size for test")
}
withOpenDB(func(db *DB, path string) {
db.FillPercent = 0.9
// Add a set of values in random order. It will be the same random
// order so we can maintain consistency between test runs.
r := rand.New(rand.NewSource(42))
for _, i := range r.Perm(1000) {
db.Update(func(tx *Tx) error {
b, _ := tx.CreateBucketIfNotExists([]byte("woojits"))
for _, j := range r.Perm(100) {
index := (j * 10000) + i
b.Put([]byte(fmt.Sprintf("%d000000000000000", index)), []byte("0000000000"))
}
return nil
})
mustCheck(db)
}
db.View(func(tx *Tx) error {
s := tx.Bucket([]byte("woojits")).Stats()
assert.Equal(t, 100000, s.KeyN, "KeyN")
assert.Equal(t, 22, s.BranchPageN, "BranchPageN")
assert.Equal(t, 0, s.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 62963, s.BranchInuse, "BranchInuse")
assert.Equal(t, 90112, s.BranchAlloc, "BranchAlloc")
assert.Equal(t, 1677, s.LeafPageN, "LeafPageN")
assert.Equal(t, 0, s.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 4714722, s.LeafInuse, "LeafInuse")
assert.Equal(t, 6868992, s.LeafAlloc, "LeafAlloc")
return nil
})
})
}
// Ensure a bucket can calculate stats.
func TestBucket_Stats_Small(t *testing.T) {
@ -750,11 +794,11 @@ func TestBucket_Stats_Large(t *testing.T) {
withOpenDB(func(db *DB, path string) {
var index int
for i := 0; i < 1000; i++ {
for i := 0; i < 10000; i++ {
db.Update(func(tx *Tx) error {
// Add bucket with lots of keys.
b, _ := tx.CreateBucketIfNotExists([]byte("widgets"))
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
b.Put([]byte(strconv.Itoa(index)), []byte(strconv.Itoa(index)))
index++
}
@ -766,18 +810,18 @@ func TestBucket_Stats_Large(t *testing.T) {
db.View(func(tx *Tx) error {
b := tx.Bucket([]byte("widgets"))
stats := b.Stats()
assert.Equal(t, 19, stats.BranchPageN, "BranchPageN")
assert.Equal(t, 13, stats.BranchPageN, "BranchPageN")
assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 1291, stats.LeafPageN, "LeafPageN")
assert.Equal(t, 1195, stats.LeafPageN, "LeafPageN")
assert.Equal(t, 0, stats.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 100000, stats.KeyN, "KeyN")
assert.Equal(t, 3, stats.Depth, "Depth")
assert.Equal(t, 27007, stats.BranchInuse, "BranchInuse")
assert.Equal(t, 2598436, stats.LeafInuse, "LeafInuse")
assert.Equal(t, 25208, stats.BranchInuse, "BranchInuse")
assert.Equal(t, 2596900, stats.LeafInuse, "LeafInuse")
if os.Getpagesize() == 4096 {
// Incompatible page size
assert.Equal(t, 77824, stats.BranchAlloc, "BranchAlloc")
assert.Equal(t, 5287936, stats.LeafAlloc, "LeafAlloc")
assert.Equal(t, 53248, stats.BranchAlloc, "BranchAlloc")
assert.Equal(t, 4894720, stats.LeafAlloc, "LeafAlloc")
}
assert.Equal(t, 1, stats.BucketN, "BucketN")
assert.Equal(t, 0, stats.InlineBucketN, "InlineBucketN")

View File

@ -70,12 +70,14 @@ func (f *freelist) allocate(n int) pgid {
// free releases a page and its overflow for a given transaction id.
// If the page is already free then a panic will occur.
func (f *freelist) free(txid txid, p *page) {
warn("free:", txid, p.id, p.overflow)
_assert(p.id > 1, "cannot free page 0 or 1: %d", p.id)
// Verify that page is not already free.
minid, maxid := p.id, p.id+pgid(p.overflow)
for _, id := range f.ids {
if id >= minid && id <= maxid {
warn(" ‡", id, "|", minid, maxid)
panic(fmt.Sprintf("page %d already freed in tx", id))
}
}
@ -90,6 +92,9 @@ func (f *freelist) free(txid txid, p *page) {
// Free page and all its overflow pages.
var ids = f.pending[txid]
for i := 0; i < int(p.overflow+1); i++ {
if p.id+pgid(i) == 55 {
warn(" •", txid, p.id+pgid(i))
}
ids = append(ids, p.id+pgid(i))
}
f.pending[txid] = ids

95
node.go
View File

@ -205,15 +205,14 @@ func (n *node) write(p *page) {
// DEBUG ONLY: n.dump()
}
// split breaks up a node into smaller nodes, if appropriate.
// split breaks up a node into two smaller nodes, if appropriate.
// This should only be called from the spill() function.
func (n *node) split(pageSize int) []*node {
var nodes = []*node{n}
// Ignore the split if the page doesn't have at least enough nodes for
// multiple pages or if the data can fit on a single page.
if len(n.inodes) <= (minKeysPerPage*2) || n.size() < pageSize {
return nodes
// two pages or if the data can fit on a single page.
sz := n.size()
if len(n.inodes) <= (minKeysPerPage*2) || sz < pageSize {
return []*node{n}
}
// Determine the threshold before starting a new node.
@ -225,43 +224,59 @@ func (n *node) split(pageSize int) []*node {
}
threshold := int(float64(pageSize) * fillPercent)
// Group into smaller pages and target a given fill size.
size := pageHeaderSize
internalNodes := n.inodes
current := n
current.inodes = nil
// Determine split position and sizes of the two pages.
splitIndex, sz0 := n.splitIndex(threshold)
sz1 := pageHeaderSize + (sz - sz0)
// Loop over every inode and split once we reach our threshold.
for i, inode := range internalNodes {
elemSize := n.pageElementSize() + len(inode.key) + len(inode.value)
// Split once we reach our threshold split size. However, this should
// only be done if we have enough keys for this node and we will have
// enough keys for the next node.
if len(current.inodes) >= minKeysPerPage && i < len(internalNodes)-minKeysPerPage && size+elemSize > threshold {
// If there's no parent then we need to create one.
if n.parent == nil {
n.parent = &node{bucket: n.bucket, children: []*node{n}}
}
// Create a new node and add it to the parent.
current = &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent}
n.parent.children = append(n.parent.children, current)
nodes = append(nodes, current)
// Reset our running total back to zero (plus header size).
size = pageHeaderSize
// Update the statistics.
n.bucket.tx.stats.Split++
}
// Increase our running total of the size and append the inode.
size += elemSize
current.inodes = append(current.inodes, inode)
// If we can fit our extra keys on the next page then merge into it.
if next := n.nextSibling(); next != nil && next.size()+sz1 < threshold {
next.inodes = append(n.inodes[splitIndex:], next.inodes...)
return []*node{n}
}
return nodes
// Otherwise split node into two separate nodes. If there's no parent then
// we'll need to create one.
if n.parent == nil {
n.parent = &node{bucket: n.bucket, children: []*node{n}}
}
// Create a new node and add it to the parent.
next := &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent}
n.parent.children = append(n.parent.children, next)
// Split inodes across two nodes.
next.inodes = n.inodes[splitIndex:]
n.inodes = n.inodes[:splitIndex]
// Update the statistics.
n.bucket.tx.stats.Split++
return []*node{n, next}
}
// splitIndex finds the position where a page will fill a given threshold.
// It returns the index as well as the size of the first page.
// This is only be called from split().
func (n *node) splitIndex(threshold int) (index, sz int) {
sz = pageHeaderSize
// Loop until we only have the minimum number of keys required for the second page.
for i := 0; i < len(n.inodes)-minKeysPerPage; i++ {
index = i
inode := n.inodes[i]
elsize := n.pageElementSize() + len(inode.key) + len(inode.value)
// If we have at least the minimum number of keys and adding another
// node would put us over the threshold then exit and return.
if i >= minKeysPerPage && sz+elsize > threshold {
break
}
// Add the element size to the total size.
sz += elsize
}
return
}
// spill writes the nodes to dirty pages and splits nodes as it goes.