Merge pull request #181 from benbjohnson/split-merge

Allow split nodes to be merged with the next node.
pull/34/head
Ben Johnson 2014-06-03 16:44:58 -06:00
commit 510143d852
4 changed files with 185 additions and 72 deletions

View File

@ -613,6 +613,7 @@ func (b *Bucket) rebalance() {
// node creates a node from a page and associates it with a given parent. // node creates a node from a page and associates it with a given parent.
func (b *Bucket) node(pgid pgid, parent *node) *node { func (b *Bucket) node(pgid pgid, parent *node) *node {
_assert(b.nodes != nil, "nodes map expected") _assert(b.nodes != nil, "nodes map expected")
// Retrieve node if it's already been created. // Retrieve node if it's already been created.
if n := b.nodes[pgid]; n != nil { if n := b.nodes[pgid]; n != nil {
return n return n

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"math/rand"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -560,35 +561,36 @@ func TestBucket_Put_KeyTooLarge(t *testing.T) {
// Ensure a bucket can calculate stats. // Ensure a bucket can calculate stats.
func TestBucket_Stats(t *testing.T) { func TestBucket_Stats(t *testing.T) {
withOpenDB(func(db *DB, path string) { withOpenDB(func(db *DB, path string) {
// Add bucket with fewer keys but one big value.
big_key := []byte("really-big-value") big_key := []byte("really-big-value")
for i := 0; i < 500; i++ {
db.Update(func(tx *Tx) error {
b, _ := tx.CreateBucketIfNotExists([]byte("woojits"))
return b.Put([]byte(fmt.Sprintf("%03d", i)), []byte(strconv.Itoa(i)))
})
}
db.Update(func(tx *Tx) error { db.Update(func(tx *Tx) error {
// Add bucket with fewer keys but one big value. b, _ := tx.CreateBucketIfNotExists([]byte("woojits"))
b, err := tx.CreateBucket([]byte("woojits")) return b.Put(big_key, []byte(strings.Repeat("*", 10000)))
assert.NoError(t, err)
for i := 0; i < 500; i++ {
b.Put([]byte(fmt.Sprintf("%03d", i)), []byte(strconv.Itoa(i)))
}
b.Put(big_key, []byte(strings.Repeat("*", 10000)))
return nil
}) })
mustCheck(db) mustCheck(db)
db.View(func(tx *Tx) error { db.View(func(tx *Tx) error {
b := tx.Bucket([]byte("woojits")) b := tx.Bucket([]byte("woojits"))
stats := b.Stats() stats := b.Stats()
assert.Equal(t, 1, stats.BranchPageN, "BranchPageN") assert.Equal(t, 1, stats.BranchPageN, "BranchPageN")
assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN") assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 6, stats.LeafPageN, "LeafPageN") assert.Equal(t, 7, stats.LeafPageN, "LeafPageN")
assert.Equal(t, 2, stats.LeafOverflowN, "LeafOverflowN") assert.Equal(t, 2, stats.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 501, stats.KeyN, "KeyN") assert.Equal(t, 501, stats.KeyN, "KeyN")
assert.Equal(t, 2, stats.Depth, "Depth") assert.Equal(t, 2, stats.Depth, "Depth")
branchInuse := pageHeaderSize // branch page header branchInuse := pageHeaderSize // branch page header
branchInuse += 6 * branchPageElementSize // branch elements branchInuse += 7 * branchPageElementSize // branch elements
branchInuse += 6 * 3 // branch keys (6 3-byte keys) branchInuse += 7 * 3 // branch keys (6 3-byte keys)
assert.Equal(t, branchInuse, stats.BranchInuse, "BranchInuse") assert.Equal(t, branchInuse, stats.BranchInuse, "BranchInuse")
leafInuse := 6 * pageHeaderSize // leaf page header leafInuse := 7 * pageHeaderSize // leaf page header
leafInuse += 501 * leafPageElementSize // leaf elements leafInuse += 501 * leafPageElementSize // leaf elements
leafInuse += 500*3 + len(big_key) // leaf keys leafInuse += 500*3 + len(big_key) // leaf keys
leafInuse += 1*10 + 2*90 + 3*400 + 10000 // leaf values leafInuse += 1*10 + 2*90 + 3*400 + 10000 // leaf values
@ -597,7 +599,7 @@ func TestBucket_Stats(t *testing.T) {
if os.Getpagesize() == 4096 { if os.Getpagesize() == 4096 {
// Incompatible page size // Incompatible page size
assert.Equal(t, 4096, stats.BranchAlloc, "BranchAlloc") assert.Equal(t, 4096, stats.BranchAlloc, "BranchAlloc")
assert.Equal(t, 32768, stats.LeafAlloc, "LeafAlloc") assert.Equal(t, 36864, stats.LeafAlloc, "LeafAlloc")
} }
assert.Equal(t, 1, stats.BucketN, "BucketN") assert.Equal(t, 1, stats.BucketN, "BucketN")
@ -608,6 +610,53 @@ func TestBucket_Stats(t *testing.T) {
}) })
} }
// Ensure a bucket with random insertion utilizes fill percentage correctly.
func TestBucket_Stats_RandomFill(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
if os.Getpagesize() != 4096 {
t.Skip("invalid page size for test")
}
withOpenDB(func(db *DB, path string) {
db.FillPercent = 0.9
// Add a set of values in random order. It will be the same random
// order so we can maintain consistency between test runs.
var count int
r := rand.New(rand.NewSource(42))
for _, i := range r.Perm(1000) {
db.Update(func(tx *Tx) error {
b, _ := tx.CreateBucketIfNotExists([]byte("woojits"))
for _, j := range r.Perm(100) {
index := (j * 10000) + i
b.Put([]byte(fmt.Sprintf("%d000000000000000", index)), []byte("0000000000"))
count++
}
return nil
})
}
mustCheck(db)
db.View(func(tx *Tx) error {
s := tx.Bucket([]byte("woojits")).Stats()
assert.Equal(t, 100000, s.KeyN, "KeyN")
assert.Equal(t, 22, s.BranchPageN, "BranchPageN")
assert.Equal(t, 0, s.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 61708, s.BranchInuse, "BranchInuse")
assert.Equal(t, 90112, s.BranchAlloc, "BranchAlloc")
assert.Equal(t, 1643, s.LeafPageN, "LeafPageN")
assert.Equal(t, 0, s.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 4714178, s.LeafInuse, "LeafInuse")
assert.Equal(t, 6729728, s.LeafAlloc, "LeafAlloc")
return nil
})
})
}
// Ensure a bucket can calculate stats. // Ensure a bucket can calculate stats.
func TestBucket_Stats_Small(t *testing.T) { func TestBucket_Stats_Small(t *testing.T) {
@ -750,11 +799,11 @@ func TestBucket_Stats_Large(t *testing.T) {
withOpenDB(func(db *DB, path string) { withOpenDB(func(db *DB, path string) {
var index int var index int
for i := 0; i < 1000; i++ { for i := 0; i < 10000; i++ {
db.Update(func(tx *Tx) error { db.Update(func(tx *Tx) error {
// Add bucket with lots of keys. // Add bucket with lots of keys.
b, _ := tx.CreateBucketIfNotExists([]byte("widgets")) b, _ := tx.CreateBucketIfNotExists([]byte("widgets"))
for i := 0; i < 100; i++ { for i := 0; i < 10; i++ {
b.Put([]byte(strconv.Itoa(index)), []byte(strconv.Itoa(index))) b.Put([]byte(strconv.Itoa(index)), []byte(strconv.Itoa(index)))
index++ index++
} }
@ -766,18 +815,18 @@ func TestBucket_Stats_Large(t *testing.T) {
db.View(func(tx *Tx) error { db.View(func(tx *Tx) error {
b := tx.Bucket([]byte("widgets")) b := tx.Bucket([]byte("widgets"))
stats := b.Stats() stats := b.Stats()
assert.Equal(t, 19, stats.BranchPageN, "BranchPageN") assert.Equal(t, 13, stats.BranchPageN, "BranchPageN")
assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN") assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN")
assert.Equal(t, 1291, stats.LeafPageN, "LeafPageN") assert.Equal(t, 1195, stats.LeafPageN, "LeafPageN")
assert.Equal(t, 0, stats.LeafOverflowN, "LeafOverflowN") assert.Equal(t, 0, stats.LeafOverflowN, "LeafOverflowN")
assert.Equal(t, 100000, stats.KeyN, "KeyN") assert.Equal(t, 100000, stats.KeyN, "KeyN")
assert.Equal(t, 3, stats.Depth, "Depth") assert.Equal(t, 3, stats.Depth, "Depth")
assert.Equal(t, 27007, stats.BranchInuse, "BranchInuse") assert.Equal(t, 25208, stats.BranchInuse, "BranchInuse")
assert.Equal(t, 2598436, stats.LeafInuse, "LeafInuse") assert.Equal(t, 2596900, stats.LeafInuse, "LeafInuse")
if os.Getpagesize() == 4096 { if os.Getpagesize() == 4096 {
// Incompatible page size // Incompatible page size
assert.Equal(t, 77824, stats.BranchAlloc, "BranchAlloc") assert.Equal(t, 53248, stats.BranchAlloc, "BranchAlloc")
assert.Equal(t, 5287936, stats.LeafAlloc, "LeafAlloc") assert.Equal(t, 4894720, stats.LeafAlloc, "LeafAlloc")
} }
assert.Equal(t, 1, stats.BucketN, "BucketN") assert.Equal(t, 1, stats.BucketN, "BucketN")
assert.Equal(t, 0, stats.InlineBucketN, "InlineBucketN") assert.Equal(t, 0, stats.InlineBucketN, "InlineBucketN")

View File

@ -7,6 +7,8 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"regexp" "regexp"
"sort"
"strings"
"testing" "testing"
"time" "time"
"unsafe" "unsafe"
@ -520,6 +522,38 @@ func mustCheck(db *DB) {
} }
} }
// mustContainKeys checks that a bucket contains a given set of keys.
func mustContainKeys(b *Bucket, m map[string]string) {
found := make(map[string]string)
b.ForEach(func(k, _ []byte) error {
found[string(k)] = ""
return nil
})
// Check for keys found in bucket that shouldn't be there.
var keys []string
for k, _ := range found {
if _, ok := m[string(k)]; !ok {
keys = append(keys, k)
}
}
if len(keys) > 0 {
sort.Strings(keys)
panic(fmt.Sprintf("keys found(%d): %s", len(keys), strings.Join(keys, ",")))
}
// Check for keys not found in bucket that should be there.
for k, _ := range m {
if _, ok := found[string(k)]; !ok {
keys = append(keys, k)
}
}
if len(keys) > 0 {
sort.Strings(keys)
panic(fmt.Sprintf("keys not found(%d): %s", len(keys), strings.Join(keys, ",")))
}
}
func trunc(b []byte, length int) []byte { func trunc(b []byte, length int) []byte {
if length < len(b) { if length < len(b) {
return b[:length] return b[:length]

129
node.go
View File

@ -14,7 +14,7 @@ type node struct {
key []byte key []byte
pgid pgid pgid pgid
parent *node parent *node
children []*node children nodes
inodes inodes inodes inodes
} }
@ -205,15 +205,14 @@ func (n *node) write(p *page) {
// DEBUG ONLY: n.dump() // DEBUG ONLY: n.dump()
} }
// split breaks up a node into smaller nodes, if appropriate. // split breaks up a node into two smaller nodes, if appropriate.
// This should only be called from the spill() function. // This should only be called from the spill() function.
func (n *node) split(pageSize int) []*node { func (n *node) split(pageSize int) []*node {
var nodes = []*node{n}
// Ignore the split if the page doesn't have at least enough nodes for // Ignore the split if the page doesn't have at least enough nodes for
// multiple pages or if the data can fit on a single page. // two pages or if the data can fit on a single page.
if len(n.inodes) <= (minKeysPerPage*2) || n.size() < pageSize { sz := n.size()
return nodes if len(n.inodes) <= (minKeysPerPage*2) || sz < pageSize {
return []*node{n}
} }
// Determine the threshold before starting a new node. // Determine the threshold before starting a new node.
@ -225,43 +224,60 @@ func (n *node) split(pageSize int) []*node {
} }
threshold := int(float64(pageSize) * fillPercent) threshold := int(float64(pageSize) * fillPercent)
// Group into smaller pages and target a given fill size. // Determine split position and sizes of the two pages.
size := pageHeaderSize splitIndex, sz0 := n.splitIndex(threshold)
internalNodes := n.inodes sz1 := pageHeaderSize + (sz - sz0)
current := n
current.inodes = nil
// Loop over every inode and split once we reach our threshold. // If we can fit our extra keys on the next page then merge into it.
for i, inode := range internalNodes { if next := n.nextSibling(); next != nil && next.size()+sz1 < threshold {
elemSize := n.pageElementSize() + len(inode.key) + len(inode.value) next.inodes = append(n.inodes[splitIndex:], next.inodes...)
n.inodes = n.inodes[:splitIndex]
// Split once we reach our threshold split size. However, this should return []*node{n}
// only be done if we have enough keys for this node and we will have
// enough keys for the next node.
if len(current.inodes) >= minKeysPerPage && i < len(internalNodes)-minKeysPerPage && size+elemSize > threshold {
// If there's no parent then we need to create one.
if n.parent == nil {
n.parent = &node{bucket: n.bucket, children: []*node{n}}
}
// Create a new node and add it to the parent.
current = &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent}
n.parent.children = append(n.parent.children, current)
nodes = append(nodes, current)
// Reset our running total back to zero (plus header size).
size = pageHeaderSize
// Update the statistics.
n.bucket.tx.stats.Split++
}
// Increase our running total of the size and append the inode.
size += elemSize
current.inodes = append(current.inodes, inode)
} }
return nodes // Otherwise split node into two separate nodes. If there's no parent then
// we'll need to create one.
if n.parent == nil {
n.parent = &node{bucket: n.bucket, children: []*node{n}}
}
// Create a new node and add it to the parent.
next := &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent}
n.parent.children = append(n.parent.children, next)
// Split inodes across two nodes.
next.inodes = n.inodes[splitIndex:]
n.inodes = n.inodes[:splitIndex]
// Update the statistics.
n.bucket.tx.stats.Split++
return []*node{n, next}
}
// splitIndex finds the position where a page will fill a given threshold.
// It returns the index as well as the size of the first page.
// This is only be called from split().
func (n *node) splitIndex(threshold int) (index, sz int) {
sz = pageHeaderSize
// Loop until we only have the minimum number of keys required for the second page.
for i := 0; i < len(n.inodes)-minKeysPerPage; i++ {
index = i
inode := n.inodes[i]
elsize := n.pageElementSize() + len(inode.key) + len(inode.value)
// If we have at least the minimum number of keys and adding another
// node would put us over the threshold then exit and return.
if i >= minKeysPerPage && sz+elsize > threshold {
break
}
// Add the element size to the total size.
sz += elsize
}
return
} }
// spill writes the nodes to dirty pages and splits nodes as it goes. // spill writes the nodes to dirty pages and splits nodes as it goes.
@ -269,22 +285,29 @@ func (n *node) split(pageSize int) []*node {
func (n *node) spill() error { func (n *node) spill() error {
var tx = n.bucket.tx var tx = n.bucket.tx
// Spill child nodes first. // Spill child nodes first. Child nodes can materialize sibling nodes in
for _, child := range n.children { // the case of split-merge so we cannot use a range loop. We have to check
if err := child.spill(); err != nil { // the children size on every loop iteration.
sort.Sort(n.children)
for i := 0; i < len(n.children); i++ {
if err := n.children[i].spill(); err != nil {
return err return err
} }
} }
// Add node's page to the freelist if it's not new. // We no longer need the child list because it's only used for spill tracking.
if n.pgid > 0 { n.children = nil
tx.db.freelist.free(tx.id(), tx.page(n.pgid))
n.pgid = 0
}
// Spill nodes by deepest first. // Spill nodes by deepest first. The first node returned from split() will
// always be "n".
var nodes = n.split(tx.db.pageSize) var nodes = n.split(tx.db.pageSize)
for _, node := range nodes { for _, node := range nodes {
// Add node's page to the freelist if it's not new.
if node.pgid > 0 {
tx.db.freelist.free(tx.id(), tx.page(node.pgid))
node.pgid = 0
}
// Allocate contiguous space for the node. // Allocate contiguous space for the node.
p, err := tx.allocate((node.size() / tx.db.pageSize) + 1) p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
if err != nil { if err != nil {
@ -550,6 +573,12 @@ func (n *node) dump() {
} }
*/ */
type nodes []*node
func (s nodes) Len() int { return len(s) }
func (s nodes) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s nodes) Less(i, j int) bool { return bytes.Compare(s[i].inodes[0].key, s[j].inodes[0].key) == -1 }
// inode represents an internal node inside of a node. // inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point // It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet. // to an element which hasn't been added to a page yet.