Merge pull request #154 from benbjohnson/inline-buckets

(wip) Add inline bucket support.
pull/34/head
Ben Johnson 2014-05-07 12:09:36 -06:00
commit 3d2e092a5d
7 changed files with 273 additions and 54 deletions

214
bucket.go
View File

@ -52,19 +52,25 @@ const (
minInt = -maxInt - 1 minInt = -maxInt - 1
) )
const bucketHeaderSize = int(unsafe.Sizeof(bucket{}))
// Bucket represents a collection of key/value pairs inside the database. // Bucket represents a collection of key/value pairs inside the database.
type Bucket struct { type Bucket struct {
*bucket *bucket
tx *Tx tx *Tx // the associated transaction
buckets map[string]*Bucket buckets map[string]*Bucket // subbucket cache
rootNode *node page *page // inline page reference
nodes map[pgid]*node rootNode *node // materialized node for the root page.
nodes map[pgid]*node // node cache
} }
// bucket represents the on-file representation of a bucket. // bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct { type bucket struct {
root pgid root pgid // page id of the bucket's root-level page
sequence uint64 sequence uint64 // monotonically incrementing, used by NextSequence()
} }
// newBucket returns a new bucket associated with a transaction. // newBucket returns a new bucket associated with a transaction.
@ -128,6 +134,11 @@ func (b *Bucket) Bucket(name []byte) *Bucket {
*child.bucket = *(*bucket)(unsafe.Pointer(&v[0])) *child.bucket = *(*bucket)(unsafe.Pointer(&v[0]))
b.buckets[string(name)] = &child b.buckets[string(name)] = &child
// Save a reference to the inline page if the bucket is inline.
if child.root == 0 {
child.page = (*page)(unsafe.Pointer(&v[bucketHeaderSize]))
}
return &child return &child
} }
@ -155,22 +166,19 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
} }
} }
// Create a blank root leaf page. // Create empty, inline bucket.
p, err := b.tx.allocate(1) var bucket = Bucket{bucket: &bucket{}, rootNode: &node{isLeaf: true}}
if err != nil { var value = bucket.write()
return nil, err
}
p.flags = leafPageFlag
// Insert key/value.
value := make([]byte, unsafe.Sizeof(bucket{}))
bucket := (*bucket)(unsafe.Pointer(&value[0]))
bucket.root = p.id
// Insert into node. // Insert into node.
key = cloneBytes(key) key = cloneBytes(key)
c.node().put(key, key, value, 0, bucketLeafFlag) c.node().put(key, key, value, 0, bucketLeafFlag)
// Since subbuckets are not allowed on inline buckets, we need to
// dereference the inline page, if it exists. This will cause the bucket
// to be treated as a regular, non-inline bucket for the rest of the tx.
b.page = nil
return b.Bucket(key), nil return b.Bucket(key), nil
} }
@ -224,9 +232,9 @@ func (b *Bucket) DeleteBucket(key []byte) error {
delete(b.buckets, string(key)) delete(b.buckets, string(key))
// Release all bucket pages to freelist. // Release all bucket pages to freelist.
b.tx.forEachPage(child.root, 0, func(p *page, _ int) { child.nodes = nil
b.tx.db.freelist.free(b.tx.id(), p) child.rootNode = nil
}) child.free()
// Delete the node if we have a matching key. // Delete the node if we have a matching key.
c.node().del(key) c.node().del(key)
@ -348,7 +356,7 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
func (b *Bucket) Stats() BucketStats { func (b *Bucket) Stats() BucketStats {
var s BucketStats var s BucketStats
pageSize := b.tx.db.pageSize pageSize := b.tx.db.pageSize
b.tx.forEachPage(b.root, 0, func(p *page, depth int) { b.forEachPage(func(p *page, depth int) {
if (p.flags & leafPageFlag) != 0 { if (p.flags & leafPageFlag) != 0 {
s.LeafPageN++ s.LeafPageN++
s.KeyN += int(p.count) s.KeyN += int(p.count)
@ -375,21 +383,81 @@ func (b *Bucket) Stats() BucketStats {
return s return s
} }
// forEachPage iterates over every page in a bucket, including inline pages.
func (b *Bucket) forEachPage(fn func(*page, int)) {
// If we have an inline page then just use that.
if b.page != nil {
fn(b.page, 0)
return
}
// Otherwise traverse the page hierarchy.
b.tx.forEachPage(b.root, 0, fn)
}
// forEachPageNode iterates over every page (or node) in a bucket.
// This also includes inline pages.
func (b *Bucket) forEachPageNode(fn func(*page, *node, int)) {
// If we have an inline page or root node then just use that.
if b.page != nil {
fn(b.page, nil, 0)
return
}
b._forEachPageNode(b.root, 0, fn)
}
func (b *Bucket) _forEachPageNode(pgid pgid, depth int, fn func(*page, *node, int)) {
var p, n = b.pageNode(pgid)
// Execute function.
fn(p, n, depth)
// Recursively loop over children.
if p != nil {
if (p.flags & branchPageFlag) != 0 {
for i := 0; i < int(p.count); i++ {
elem := p.branchPageElement(uint16(i))
b._forEachPageNode(elem.pgid, depth+1, fn)
}
}
} else {
if !n.isLeaf {
for _, inode := range n.inodes {
b._forEachPageNode(inode.pgid, depth+1, fn)
}
}
}
}
// spill writes all the nodes for this bucket to dirty pages. // spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error { func (b *Bucket) spill() error {
// Spill all child buckets first. // Spill all child buckets first.
for name, child := range b.buckets { for name, child := range b.buckets {
if err := child.spill(); err != nil { // If the child bucket is small enough and it has no child buckets then
return err // write it inline into the parent bucket's page. Otherwise spill it
// like a normal bucket and make the parent value a pointer to the page.
var value []byte
if child.inlineable() {
child.free()
value = child.write()
} else {
if err := child.spill(); err != nil {
return err
}
// Update the child bucket header in this bucket.
value = make([]byte, unsafe.Sizeof(bucket{}))
var bucket = (*bucket)(unsafe.Pointer(&value[0]))
*bucket = *child.bucket
} }
// Update the child bucket header in this bucket. // Skip writing the bucket if there are no materialized nodes.
value := make([]byte, unsafe.Sizeof(bucket{})) if child.rootNode == nil {
bucket := (*bucket)(unsafe.Pointer(&value[0])) continue
*bucket = *child.bucket }
// Update parent node. // Update parent node.
c := b.Cursor() var c = b.Cursor()
k, _, flags := c.seek([]byte(name)) k, _, flags := c.seek([]byte(name))
_assert(bytes.Equal([]byte(name), k), "misplaced bucket header: %x -> %x", []byte(name), k) _assert(bytes.Equal([]byte(name), k), "misplaced bucket header: %x -> %x", []byte(name), k)
_assert(flags&bucketLeafFlag != 0, "unexpected bucket header flag: %x", flags) _assert(flags&bucketLeafFlag != 0, "unexpected bucket header flag: %x", flags)
@ -413,6 +481,54 @@ func (b *Bucket) spill() error {
return nil return nil
} }
// inlineable returns true if a bucket is small enough to be written inline
// and if it contains no subbuckets. Otherwise returns false.
func (b *Bucket) inlineable() bool {
var n = b.rootNode
// Bucket must only contain a single leaf node.
if n == nil || !n.isLeaf {
return false
}
// Bucket is not inlineable if it contains subbuckets or if it goes beyond
// our threshold for inline bucket size.
var size = pageHeaderSize
for _, inode := range n.inodes {
size += leafPageElementSize + len(inode.key) + len(inode.value)
if inode.flags&bucketLeafFlag != 0 {
return false
} else if size > b.maxInlineBucketSize() {
return false
}
}
return true
}
// Returns the maximum total size of a bucket to make it a candidate for inlining.
func (b *Bucket) maxInlineBucketSize() int {
return b.tx.db.pageSize / 4
}
// write allocates and writes a bucket to a byte slice.
func (b *Bucket) write() []byte {
// Allocate the appropriate size.
var n = b.rootNode
var value = make([]byte, bucketHeaderSize+pageHeaderSize+n.size())
// Write a bucket header.
var bucket = (*bucket)(unsafe.Pointer(&value[0]))
*bucket = *b.bucket
// Convert byte slice to a fake page and write the root node.
var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
n.write(p)
return value
}
// rebalance attempts to balance all nodes. // rebalance attempts to balance all nodes.
func (b *Bucket) rebalance() { func (b *Bucket) rebalance() {
for _, n := range b.nodes { for _, n := range b.nodes {
@ -431,14 +547,22 @@ func (b *Bucket) node(pgid pgid, parent *node) *node {
return n return n
} }
// Otherwise create a branch node and cache it. // Otherwise create a node and cache it.
n := &node{bucket: b, parent: parent} n := &node{bucket: b, parent: parent}
if parent == nil { if parent == nil {
b.rootNode = n b.rootNode = n
} else { } else {
parent.children = append(parent.children, n) parent.children = append(parent.children, n)
} }
n.read(b.tx.page(pgid))
// Use the inline page if this is an inline bucket.
var p = b.page
if p == nil {
p = b.tx.page(pgid)
}
// Read the page into the node and cache it.
n.read(p)
b.nodes[pgid] = n b.nodes[pgid] = n
// Update statistics. // Update statistics.
@ -447,6 +571,23 @@ func (b *Bucket) node(pgid pgid, parent *node) *node {
return n return n
} }
// free recursively frees all pages in the bucket.
func (b *Bucket) free() {
if b.root == 0 {
return
}
var tx = b.tx
b.forEachPageNode(func(p *page, n *node, _ int) {
if p != nil {
tx.db.freelist.free(tx.id(), p)
} else {
n.free()
}
})
b.root = 0
}
// dereference removes all references to the old mmap. // dereference removes all references to the old mmap.
func (b *Bucket) dereference() { func (b *Bucket) dereference() {
for _, n := range b.nodes { for _, n := range b.nodes {
@ -464,11 +605,24 @@ func (b *Bucket) dereference() {
// pageNode returns the in-memory node, if it exists. // pageNode returns the in-memory node, if it exists.
// Otherwise returns the underlying page. // Otherwise returns the underlying page.
func (b *Bucket) pageNode(id pgid) (*page, *node) { func (b *Bucket) pageNode(id pgid) (*page, *node) {
// Inline buckets have a fake page embedded in their value so treat them
// differently. We'll return the rootNode (if available) or the fake page.
if b.root == 0 {
_assert(id == 0, "inline bucket non-zero page access(2): %d != 0", id)
if b.rootNode != nil {
return nil, b.rootNode
}
return b.page, nil
}
// Check the node cache for non-inline buckets.
if b.nodes != nil { if b.nodes != nil {
if n := b.nodes[id]; n != nil { if n := b.nodes[id]; n != nil {
return nil, n return nil, n
} }
} }
// Finally lookup the page from the transaction if no node is materialized.
return b.tx.page(id), nil return b.tx.page(id), nil
} }

View File

@ -161,6 +161,33 @@ func TestBucket_Delete(t *testing.T) {
}) })
} }
// Ensure that deleting a large set of keys will work correctly.
func TestBucket_Delete_Large(t *testing.T) {
withOpenDB(func(db *DB, path string) {
db.Update(func(tx *Tx) error {
var b, _ = tx.CreateBucket([]byte("widgets"))
for i := 0; i < 100; i++ {
assert.NoError(t, b.Put([]byte(strconv.Itoa(i)), []byte(strings.Repeat("*", 1024))))
}
return nil
})
db.Update(func(tx *Tx) error {
var b = tx.Bucket([]byte("widgets"))
for i := 0; i < 100; i++ {
assert.NoError(t, b.Delete([]byte(strconv.Itoa(i))))
}
return nil
})
db.View(func(tx *Tx) error {
var b = tx.Bucket([]byte("widgets"))
for i := 0; i < 100; i++ {
assert.Nil(t, b.Get([]byte(strconv.Itoa(i))))
}
return nil
})
})
}
// Ensure that accessing and updating nested buckets is ok across transactions. // Ensure that accessing and updating nested buckets is ok across transactions.
func TestBucket_Nested(t *testing.T) { func TestBucket_Nested(t *testing.T) {
withOpenDB(func(db *DB, path string) { withOpenDB(func(db *DB, path string) {
@ -583,18 +610,18 @@ func TestBucket_Stats_Small(t *testing.T) {
db.View(func(tx *Tx) error { db.View(func(tx *Tx) error {
b := tx.Bucket([]byte("whozawhats")) b := tx.Bucket([]byte("whozawhats"))
stats := b.Stats() stats := b.Stats()
assert.Equal(t, stats.BranchPageN, 0) assert.Equal(t, 0, stats.BranchPageN)
assert.Equal(t, stats.BranchOverflowN, 0) assert.Equal(t, 0, stats.BranchOverflowN)
assert.Equal(t, stats.LeafPageN, 1) assert.Equal(t, 1, stats.LeafPageN)
assert.Equal(t, stats.LeafOverflowN, 0) assert.Equal(t, 0, stats.LeafOverflowN)
assert.Equal(t, stats.KeyN, 1) assert.Equal(t, 1, stats.KeyN)
assert.Equal(t, stats.Depth, 1) assert.Equal(t, 1, stats.Depth)
if os.Getpagesize() != 4096 { if os.Getpagesize() != 4096 {
// Incompatible page size // Incompatible page size
assert.Equal(t, stats.BranchInuse, 0) assert.Equal(t, 0, stats.BranchInuse)
assert.Equal(t, stats.BranchAlloc, 0) assert.Equal(t, 0, stats.BranchAlloc)
assert.Equal(t, stats.LeafInuse, 38) assert.Equal(t, 38, stats.LeafInuse)
assert.Equal(t, stats.LeafAlloc, 4096) assert.Equal(t, 4096, stats.LeafAlloc)
} }
return nil return nil
}) })

View File

@ -148,7 +148,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
func (c *Cursor) first() { func (c *Cursor) first() {
for { for {
// Exit when we hit a leaf page. // Exit when we hit a leaf page.
ref := &c.stack[len(c.stack)-1] var ref = &c.stack[len(c.stack)-1]
if ref.isLeaf() { if ref.isLeaf() {
break break
} }

6
db.go
View File

@ -563,6 +563,11 @@ func (db *DB) Check() error {
} }
func (db *DB) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList) { func (db *DB) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList) {
// Ignore inline buckets.
if b.root == 0 {
return
}
// Check every page used by this bucket. // Check every page used by this bucket.
b.tx.forEachPage(b.root, 0, func(p *page, _ int) { b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
// Ensure each page is only referenced once. // Ensure each page is only referenced once.
@ -576,7 +581,6 @@ func (db *DB) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList
// Retrieve page info. // Retrieve page info.
info, err := b.tx.Page(int(p.id)) info, err := b.tx.Page(int(p.id))
// warnf("[page] %d + %d (%s)", p.id, p.overflow, info.Type)
if err != nil { if err != nil {
*errors = append(*errors, err) *errors = append(*errors, err)
} else if info == nil { } else if info == nil {

View File

@ -275,7 +275,7 @@ func TestDB_Stats(t *testing.T) {
return err return err
}) })
stats := db.Stats() stats := db.Stats()
assert.Equal(t, 3, stats.TxStats.PageCount) assert.Equal(t, 2, stats.TxStats.PageCount)
}) })
} }
@ -324,13 +324,7 @@ func TestDB_Consistency(t *testing.T) {
if p, _ := tx.Page(5); assert.NotNil(t, p) { if p, _ := tx.Page(5); assert.NotNil(t, p) {
assert.Equal(t, "leaf", p.Type) // root leaf assert.Equal(t, "leaf", p.Type) // root leaf
} }
if p, _ := tx.Page(6); assert.NotNil(t, p) { p, _ := tx.Page(6)
assert.Equal(t, "leaf", p.Type)
}
if p, _ := tx.Page(7); assert.NotNil(t, p) {
assert.Equal(t, "free", p.Type)
}
p, _ := tx.Page(8)
assert.Nil(t, p) assert.Nil(t, p)
return nil return nil
}) })

46
node.go
View File

@ -2,6 +2,8 @@ package bolt
import ( import (
"bytes" "bytes"
"fmt"
"io"
"sort" "sort"
"unsafe" "unsafe"
) )
@ -191,6 +193,8 @@ func (n *node) write(p *page) {
copy(b[0:], item.value) copy(b[0:], item.value)
b = b[len(item.value):] b = b[len(item.value):]
} }
// DEBUG ONLY: n.dump(os.Stderr)
} }
// split breaks up a node into smaller nodes, if appropriate. // split breaks up a node into smaller nodes, if appropriate.
@ -261,6 +265,7 @@ func (n *node) spill() error {
// Add node's page to the freelist if it's not new. // Add node's page to the freelist if it's not new.
if n.pgid > 0 { if n.pgid > 0 {
tx.db.freelist.free(tx.id(), tx.page(n.pgid)) tx.db.freelist.free(tx.id(), tx.page(n.pgid))
n.pgid = 0
} }
// Spill nodes by deepest first. // Spill nodes by deepest first.
@ -273,8 +278,8 @@ func (n *node) spill() error {
} }
// Write the node. // Write the node.
node.write(p)
node.pgid = p.id node.pgid = p.id
node.write(p)
// Insert into parent inodes. // Insert into parent inodes.
if node.parent != nil { if node.parent != nil {
@ -302,8 +307,8 @@ func (n *node) spill() error {
} }
// Write the new root. // Write the new root.
parent.write(p)
parent.pgid = p.id parent.pgid = p.id
parent.write(p)
} }
return nil return nil
@ -331,7 +336,7 @@ func (n *node) rebalance() {
// If root node is a branch and only has one node then collapse it. // If root node is a branch and only has one node then collapse it.
if !n.isLeaf && len(n.inodes) == 1 { if !n.isLeaf && len(n.inodes) == 1 {
// Move root's child up. // Move root's child up.
child := n.bucket.nodes[n.inodes[0].pgid] child := n.bucket.node(n.inodes[0].pgid, n)
n.isLeaf = child.isLeaf n.isLeaf = child.isLeaf
n.inodes = child.inodes[:] n.inodes = child.inodes[:]
n.children = child.children n.children = child.children
@ -352,6 +357,16 @@ func (n *node) rebalance() {
return return
} }
// If node has no keys then just remove it.
if n.numChildren() == 0 {
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
n.parent.rebalance()
return
}
_assert(n.parent.numChildren() > 1, "parent must have at least 2 children") _assert(n.parent.numChildren() > 1, "parent must have at least 2 children")
// Destination node is right sibling if idx == 0, otherwise left sibling. // Destination node is right sibling if idx == 0, otherwise left sibling.
@ -477,6 +492,31 @@ func (n *node) free() {
} }
} }
// dump writes the contents of the node for debugging purposes.
func (n *node) dump(w io.Writer) {
// Write node header.
var typ = "branch"
if n.isLeaf {
typ = "leaf"
}
fmt.Fprintf(w, "[NODE %d {type=%s count=%d}]\n", n.pgid, typ, len(n.inodes))
// Write out abbreviated version of each item.
for _, item := range n.inodes {
if n.isLeaf {
if item.flags&bucketLeafFlag != 0 {
bucket := (*bucket)(unsafe.Pointer(&item.value[0]))
fmt.Fprintf(w, "[L \"%s\" -> (bucket root=%d)]\n", item.key, bucket.root)
} else {
fmt.Fprintf(w, "[L \"%s\" -> \"%s\"]\n", item.key, item.value)
}
} else {
fmt.Fprintf(w, "[B \"%s\" -> pgid=%d]\n", item.key, item.pgid)
}
}
fmt.Fprint(w, "\n")
}
// inode represents an internal node inside of a node. // inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point // It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet. // to an element which hasn't been added to a page yet.

View File

@ -219,7 +219,7 @@ func TestTx_DeleteBucket(t *testing.T) {
db.Update(func(tx *Tx) error { db.Update(func(tx *Tx) error {
// Verify that the bucket's page is free. // Verify that the bucket's page is free.
assert.Equal(t, []pgid{7, 6, root, 2}, db.freelist.all()) assert.Equal(t, []pgid{5, 4}, db.freelist.all())
// Create the bucket again and make sure there's not a phantom value. // Create the bucket again and make sure there's not a phantom value.
b, err := tx.CreateBucket([]byte("widgets")) b, err := tx.CreateBucket([]byte("widgets"))