diff --git a/bucket.go b/bucket.go index 9737128..0b9c17d 100644 --- a/bucket.go +++ b/bucket.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "sort" "unsafe" ) @@ -41,10 +40,10 @@ var ( // Bucket represents a collection of key/value pairs inside the database. type Bucket struct { *bucket - tx *Tx - buckets map[string]*Bucket - nodes map[pgid]*node - pending []*node + tx *Tx + buckets map[string]*Bucket + rootNode *node + nodes map[pgid]*node } // bucket represents the on-file representation of a bucket. @@ -382,76 +381,19 @@ func (b *Bucket) spill() error { c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag) } - // Ignore if there are no nodes to spill. - if len(b.nodes) == 0 { + // Ignore if there's not a materialized root node. + if b.rootNode == nil { return nil } - // Sort nodes by highest depth first. - nodes := make(nodesByDepth, 0, len(b.nodes)) - for _, n := range b.nodes { - nodes = append(nodes, n) + // Spill nodes. + if err := b.rootNode.spill(); err != nil { + return err } - sort.Sort(nodes) - - // Spill nodes by deepest first. - for i := 0; i < len(nodes); i++ { - n := nodes[i] - - // Split nodes into appropriate sized nodes. - // The first node in this list will be a reference to n to preserve ancestry. - newNodes := n.split(b.tx.db.pageSize) - b.pending = newNodes - - // If this is a root node that split then create a parent node. - if n.parent == nil && len(newNodes) > 1 { - n.parent = &node{bucket: b, isLeaf: false} - nodes = append(nodes, n.parent) - } - - // Add node's page to the freelist. - if n.pgid > 0 { - b.tx.db.freelist.free(b.tx.id(), b.tx.page(n.pgid)) - } - - // Write nodes to dirty pages. - for i, newNode := range newNodes { - // Allocate contiguous space for the node. - p, err := b.tx.allocate((newNode.size() / b.tx.db.pageSize) + 1) - if err != nil { - return err - } - - // Write the node to the page. - newNode.write(p) - newNode.pgid = p.id - newNode.parent = n.parent - - // The first node should use the existing entry, other nodes are inserts. - var oldKey []byte - if i == 0 { - oldKey = n.key - } else { - oldKey = newNode.inodes[0].key - } - - // Update the parent entry. - if newNode.parent != nil { - newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid, 0) - } - - // Update the statistics. - b.tx.stats.Spill++ - } - - b.pending = nil - } - - // Clear out nodes now that they are all spilled. - b.nodes = make(map[pgid]*node) + b.rootNode = b.rootNode.root() // Update the root node for this bucket. - b.root = nodes[len(nodes)-1].pgid + b.root = b.rootNode.pgid return nil } @@ -474,10 +416,12 @@ func (b *Bucket) node(pgid pgid, parent *node) *node { return n } - // Otherwise create a branch and cache it. + // Otherwise create a branch node and cache it. n := &node{bucket: b, parent: parent} - if n.parent != nil { - n.depth = n.parent.depth + 1 + if parent == nil { + b.rootNode = n + } else { + parent.children = append(parent.children, n) } n.read(b.tx.page(pgid)) b.nodes[pgid] = n @@ -494,16 +438,12 @@ func (b *Bucket) dereference() { n.dereference() } - for _, n := range b.pending { - n.dereference() - } - for _, child := range b.buckets { child.dereference() } // Update statistics - b.tx.stats.NodeDeref += len(b.nodes) + len(b.pending) + b.tx.stats.NodeDeref += len(b.nodes) } // pageNode returns the in-memory node, if it exists. diff --git a/bucket_test.go b/bucket_test.go index c506b72..dc89bcc 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -161,6 +161,64 @@ func TestBucket_Delete(t *testing.T) { }) } +// Ensure that accessing and updating nested buckets is ok across transactions. +func TestBucket_Nested(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.Update(func(tx *Tx) error { + // Create a widgets bucket. + b, err := tx.CreateBucket([]byte("widgets")) + assert.NoError(t, err) + + // Create a widgets/foo bucket. + _, err = b.CreateBucket([]byte("foo")) + assert.NoError(t, err) + + // Create a widgets/bar key. + assert.NoError(t, b.Put([]byte("bar"), []byte("0000"))) + + return nil + }) + mustCheck(db) + + // Update widgets/bar. + db.Update(func(tx *Tx) error { + var b = tx.Bucket([]byte("widgets")) + assert.NoError(t, b.Put([]byte("bar"), []byte("xxxx"))) + return nil + }) + mustCheck(db) + + // Cause a split. + db.Update(func(tx *Tx) error { + var b = tx.Bucket([]byte("widgets")) + for i := 0; i < 10000; i++ { + assert.NoError(t, b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))) + } + return nil + }) + mustCheck(db) + + // Insert into widgets/foo/baz. + db.Update(func(tx *Tx) error { + var b = tx.Bucket([]byte("widgets")) + assert.NoError(t, b.Bucket([]byte("foo")).Put([]byte("baz"), []byte("yyyy"))) + return nil + }) + mustCheck(db) + + // Verify. + db.View(func(tx *Tx) error { + var b = tx.Bucket([]byte("widgets")) + assert.Equal(t, []byte("yyyy"), b.Bucket([]byte("foo")).Get([]byte("baz"))) + assert.Equal(t, []byte("xxxx"), b.Get([]byte("bar"))) + for i := 0; i < 10000; i++ { + assert.Equal(t, []byte(strconv.Itoa(i)), b.Get([]byte(strconv.Itoa(i)))) + } + return nil + }) + }) +} + // Ensure that deleting a bucket using Delete() returns an error. func TestBucket_Delete_Bucket(t *testing.T) { withOpenDB(func(db *DB, path string) { @@ -550,31 +608,35 @@ func TestBucket_Stats_Large(t *testing.T) { } withOpenDB(func(db *DB, path string) { - db.Update(func(tx *Tx) error { - // Add bucket with lots of keys. - tx.CreateBucket([]byte("widgets")) - b := tx.Bucket([]byte("widgets")) - for i := 0; i < 100000; i++ { - b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) - } - return nil - }) + var index int + for i := 0; i < 1000; i++ { + db.Update(func(tx *Tx) error { + // Add bucket with lots of keys. + b, _ := tx.CreateBucketIfNotExists([]byte("widgets")) + for i := 0; i < 100; i++ { + b.Put([]byte(strconv.Itoa(index)), []byte(strconv.Itoa(index))) + index++ + } + return nil + }) + } mustCheck(db) + db.View(func(tx *Tx) error { b := tx.Bucket([]byte("widgets")) stats := b.Stats() - assert.Equal(t, stats.BranchPageN, 15) - assert.Equal(t, stats.BranchOverflowN, 0) - assert.Equal(t, stats.LeafPageN, 1281) - assert.Equal(t, stats.LeafOverflowN, 0) - assert.Equal(t, stats.KeyN, 100000) - assert.Equal(t, stats.Depth, 3) + assert.Equal(t, 19, stats.BranchPageN) + assert.Equal(t, 0, stats.BranchOverflowN) + assert.Equal(t, 1291, stats.LeafPageN) + assert.Equal(t, 0, stats.LeafOverflowN) + assert.Equal(t, 100000, stats.KeyN) + assert.Equal(t, 3, stats.Depth) if os.Getpagesize() != 4096 { // Incompatible page size - assert.Equal(t, stats.BranchInuse, 27289) - assert.Equal(t, stats.BranchAlloc, 61440) - assert.Equal(t, stats.LeafInuse, 2598276) - assert.Equal(t, stats.LeafAlloc, 5246976) + assert.Equal(t, 27289, stats.BranchInuse) + assert.Equal(t, 61440, stats.BranchAlloc) + assert.Equal(t, 2598276, stats.LeafInuse) + assert.Equal(t, 5246976, stats.LeafAlloc) } return nil }) @@ -703,13 +765,12 @@ func TestBucket_Delete_Quick(t *testing.T) { db.View(func(tx *Tx) error { b := tx.Bucket([]byte("widgets")) for j, exp := range items { + value := b.Get(exp.Key) if j > i { - value := b.Get(exp.Key) if !assert.Equal(t, exp.Value, value) { t.FailNow() } } else { - value := b.Get(exp.Key) if !assert.Nil(t, value) { t.FailNow() } diff --git a/cursor.go b/cursor.go index 13f2a06..520edd4 100644 --- a/cursor.go +++ b/cursor.go @@ -192,7 +192,7 @@ func (c *Cursor) last() { func (c *Cursor) search(key []byte, pgid pgid) { p, n := c.bucket.pageNode(pgid) if p != nil { - _assert((p.flags&(branchPageFlag|leafPageFlag)) != 0, "invalid page type: "+p.typ()) + _assert((p.flags&(branchPageFlag|leafPageFlag)) != 0, "invalid page type: %d: %s", p.id, p.typ()) } e := elemRef{page: p, node: n} c.stack = append(c.stack, e) diff --git a/node.go b/node.go index 49de144..fd737ea 100644 --- a/node.go +++ b/node.go @@ -12,12 +12,20 @@ type node struct { isLeaf bool unbalanced bool key []byte - depth int pgid pgid parent *node + children []*node inodes inodes } +// root returns the top-level node this node is attached to. +func (n *node) root() *node { + if n.parent == nil { + return n + } + return n.parent.root() +} + // minKeys returns the minimum number of inodes this node should have. func (n *node) minKeys() int { if n.isLeaf { @@ -185,12 +193,15 @@ func (n *node) write(p *page) { } } -// split divides up the node into appropriately sized nodes. +// split breaks up a node into smaller nodes, if appropriate. +// This should only be called from the spill() function. func (n *node) split(pageSize int) []*node { + var nodes = []*node{n} + // Ignore the split if the page doesn't have at least enough nodes for // multiple pages or if the data can fit on a single page. if len(n.inodes) <= (minKeysPerPage*2) || n.size() < pageSize { - return []*node{n} + return nodes } // Set fill threshold to 50%. @@ -198,28 +209,106 @@ func (n *node) split(pageSize int) []*node { // Group into smaller pages and target a given fill size. size := pageHeaderSize - inodes := n.inodes + internalNodes := n.inodes current := n current.inodes = nil - var nodes []*node - for i, inode := range inodes { + // Loop over every inode and split once we reach our threshold. + for i, inode := range internalNodes { elemSize := n.pageElementSize() + len(inode.key) + len(inode.value) - if len(current.inodes) >= minKeysPerPage && i < len(inodes)-minKeysPerPage && size+elemSize > threshold { - size = pageHeaderSize + // Split once we reach our threshold split size. However, this should + // only be done if we have enough keys for this node and we will have + // enough keys for the next node. + if len(current.inodes) >= minKeysPerPage && i < len(internalNodes)-minKeysPerPage && size+elemSize > threshold { + // If there's no parent then we need to create one. + if n.parent == nil { + n.parent = &node{bucket: n.bucket, children: []*node{n}} + } + + // Create a new node and add it to the parent. + current = &node{bucket: n.bucket, isLeaf: n.isLeaf, parent: n.parent} + n.parent.children = append(n.parent.children, current) nodes = append(nodes, current) - current = &node{bucket: n.bucket, isLeaf: n.isLeaf} + + // Reset our running total back to zero (plus header size). + size = pageHeaderSize + + // Update the statistics. + n.bucket.tx.stats.Split++ } + // Increase our running total of the size and append the inode. size += elemSize current.inodes = append(current.inodes, inode) } - nodes = append(nodes, current) return nodes } +// spill writes the nodes to dirty pages and splits nodes as it goes. +// Returns an error if dirty pages cannot be allocated. +func (n *node) spill() error { + var tx = n.bucket.tx + + // Spill child nodes first. + for _, child := range n.children { + if err := child.spill(); err != nil { + return err + } + } + + // Add node's page to the freelist if it's not new. + if n.pgid > 0 { + tx.db.freelist.free(tx.id(), tx.page(n.pgid)) + } + + // Spill nodes by deepest first. + var nodes = n.split(tx.db.pageSize) + for _, node := range nodes { + // Allocate contiguous space for the node. + p, err := tx.allocate((node.size() / tx.db.pageSize) + 1) + if err != nil { + return err + } + + // Write the node. + node.write(p) + node.pgid = p.id + + // Insert into parent inodes. + if node.parent != nil { + var key = node.key + if key == nil { + key = node.inodes[0].key + } + + node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0) + node.key = node.inodes[0].key + } + + // Update the statistics. + tx.stats.Spill++ + } + + // This is a special case where we need to write the parent if it is new + // and caused by a split in the root. + var parent = n.parent + if parent != nil && parent.pgid == 0 { + // Allocate contiguous space for the node. + p, err := tx.allocate((parent.size() / tx.db.pageSize) + 1) + if err != nil { + return err + } + + // Write the new root. + parent.write(p) + parent.pgid = p.id + } + + return nil +} + // rebalance attempts to combine the node with sibling nodes if the node fill // size is below a threshold or if there are not enough keys. func (n *node) rebalance() { @@ -241,10 +330,11 @@ func (n *node) rebalance() { if n.parent == nil { // If root node is a branch and only has one node then collapse it. if !n.isLeaf && len(n.inodes) == 1 { - // Move child's children up. + // Move root's child up. child := n.bucket.nodes[n.inodes[0].pgid] n.isLeaf = child.isLeaf n.inodes = child.inodes[:] + n.children = child.children // Reparent all child nodes being moved. for _, inode := range n.inodes { @@ -278,7 +368,9 @@ func (n *node) rebalance() { if useNextSibling { // Reparent and move node. if child, ok := n.bucket.nodes[target.inodes[0].pgid]; ok { + child.parent.removeChild(child) child.parent = n + child.parent.children = append(child.parent.children, child) } n.inodes = append(n.inodes, target.inodes[0]) target.inodes = target.inodes[1:] @@ -289,7 +381,9 @@ func (n *node) rebalance() { } else { // Reparent and move node. if child, ok := n.bucket.nodes[target.inodes[len(target.inodes)-1].pgid]; ok { + child.parent.removeChild(child) child.parent = n + child.parent.children = append(child.parent.children, child) } n.inodes = append(n.inodes, inode{}) copy(n.inodes[1:], n.inodes) @@ -309,26 +403,32 @@ func (n *node) rebalance() { // Reparent all child nodes being moved. for _, inode := range target.inodes { if child, ok := n.bucket.nodes[inode.pgid]; ok { + child.parent.removeChild(child) child.parent = n + child.parent.children = append(child.parent.children, child) } } // Copy over inodes from target and remove target. n.inodes = append(n.inodes, target.inodes...) n.parent.del(target.key) + n.parent.removeChild(target) delete(n.bucket.nodes, target.pgid) target.free() } else { // Reparent all child nodes being moved. for _, inode := range n.inodes { if child, ok := n.bucket.nodes[inode.pgid]; ok { + child.parent.removeChild(child) child.parent = target + child.parent.children = append(child.parent.children, child) } } // Copy over inodes to target and remove node. target.inodes = append(target.inodes, n.inodes...) n.parent.del(n.key) + n.parent.removeChild(n) n.parent.put(target.key, target.inodes[0].key, nil, target.pgid, 0) delete(n.bucket.nodes, n.pgid) n.free() @@ -338,6 +438,17 @@ func (n *node) rebalance() { n.parent.rebalance() } +// removes a node from the list of in-memory children. +// This does not affect the inodes. +func (n *node) removeChild(target *node) { + for i, child := range n.children { + if child == target { + n.children = append(n.children[:i], n.children[i+1:]...) + return + } + } +} + // dereference causes the node to copy all its inode key/value references to heap memory. // This is required when the mmap is reallocated so inodes are not pointing to stale data. func (n *node) dereference() { @@ -362,16 +473,10 @@ func (n *node) dereference() { func (n *node) free() { if n.pgid != 0 { n.bucket.tx.db.freelist.free(n.bucket.tx.id(), n.bucket.tx.page(n.pgid)) + n.pgid = 0 } } -// nodesByDepth sorts a list of branches by deepest first. -type nodesByDepth []*node - -func (s nodesByDepth) Len() int { return len(s) } -func (s nodesByDepth) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s nodesByDepth) Less(i, j int) bool { return s[i].depth > s[j].depth } - // inode represents an internal node inside of a node. // It can be used to point to elements in a page or point // to an element which hasn't been added to a page yet. diff --git a/node_test.go b/node_test.go index e58a544..f639376 100644 --- a/node_test.go +++ b/node_test.go @@ -85,7 +85,7 @@ func TestNode_write_LeafPage(t *testing.T) { // Ensure that a node can split into appropriate subgroups. func TestNode_split(t *testing.T) { // Create a node. - n := &node{inodes: make(inodes, 0)} + n := &node{inodes: make(inodes, 0), bucket: &Bucket{tx: &Tx{}}} n.put([]byte("00000001"), []byte("00000001"), []byte("0123456701234567"), 0, 0) n.put([]byte("00000002"), []byte("00000002"), []byte("0123456701234567"), 0, 0) n.put([]byte("00000003"), []byte("00000003"), []byte("0123456701234567"), 0, 0) @@ -93,30 +93,30 @@ func TestNode_split(t *testing.T) { n.put([]byte("00000005"), []byte("00000005"), []byte("0123456701234567"), 0, 0) // Split between 2 & 3. - nodes := n.split(100) + n.split(100) - assert.Equal(t, len(nodes), 2) - assert.Equal(t, len(nodes[0].inodes), 2) - assert.Equal(t, len(nodes[1].inodes), 3) + var parent = n.parent + assert.Equal(t, len(parent.children), 2) + assert.Equal(t, len(parent.children[0].inodes), 2) + assert.Equal(t, len(parent.children[1].inodes), 3) } // Ensure that a page with the minimum number of inodes just returns a single node. func TestNode_split_MinKeys(t *testing.T) { // Create a node. - n := &node{inodes: make(inodes, 0)} + n := &node{inodes: make(inodes, 0), bucket: &Bucket{tx: &Tx{}}} n.put([]byte("00000001"), []byte("00000001"), []byte("0123456701234567"), 0, 0) n.put([]byte("00000002"), []byte("00000002"), []byte("0123456701234567"), 0, 0) // Split. - nodes := n.split(20) - assert.Equal(t, len(nodes), 1) - assert.Equal(t, len(nodes[0].inodes), 2) + n.split(20) + assert.Nil(t, n.parent) } // Ensure that a node that has keys that all fit on a page just returns one leaf. func TestNode_split_SinglePage(t *testing.T) { // Create a node. - n := &node{inodes: make(inodes, 0)} + n := &node{inodes: make(inodes, 0), bucket: &Bucket{tx: &Tx{}}} n.put([]byte("00000001"), []byte("00000001"), []byte("0123456701234567"), 0, 0) n.put([]byte("00000002"), []byte("00000002"), []byte("0123456701234567"), 0, 0) n.put([]byte("00000003"), []byte("00000003"), []byte("0123456701234567"), 0, 0) @@ -124,7 +124,6 @@ func TestNode_split_SinglePage(t *testing.T) { n.put([]byte("00000005"), []byte("00000005"), []byte("0123456701234567"), 0, 0) // Split. - nodes := n.split(4096) - assert.Equal(t, len(nodes), 1) - assert.Equal(t, len(nodes[0].inodes), 5) + n.split(4096) + assert.Nil(t, n.parent) } diff --git a/tx.go b/tx.go index 612f493..7cdadae 100644 --- a/tx.go +++ b/tx.go @@ -363,8 +363,9 @@ type TxStats struct { Rebalance int // number of node rebalances RebalanceTime time.Duration // total time spent rebalancing - // Spill statistics. - Spill int // number of node spilled + // Split/Spill statistics. + Split int // number of nodes split + Spill int // number of nodes spilled SpillTime time.Duration // total time spent spilling // Write statistics. @@ -380,6 +381,7 @@ func (s *TxStats) add(other *TxStats) { s.NodeDeref += other.NodeDeref s.Rebalance += other.Rebalance s.RebalanceTime += other.RebalanceTime + s.Split += other.Split s.Spill += other.Spill s.SpillTime += other.SpillTime s.Write += other.Write @@ -398,6 +400,7 @@ func (s *TxStats) Sub(other *TxStats) TxStats { diff.NodeDeref = s.NodeDeref - other.NodeDeref diff.Rebalance = s.Rebalance - other.Rebalance diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime + diff.Split = s.Split - other.Split diff.Spill = s.Spill - other.Spill diff.SpillTime = s.SpillTime - other.SpillTime diff.Write = s.Write - other.Write