From b1dbd35da1d7abea5e52b7aacdb7bed2812dd9db Mon Sep 17 00:00:00 2001
From: Ben Johnson <benbjohnson@yahoo.com>
Date: Wed, 18 Jun 2014 16:16:58 -0600
Subject: [PATCH] Fix merge-split regression.

This commit reverts merge-split and fixes the node.split() to do a multi-page split. This issue
caused problems with bulk loading because it would split into a small page and a very large page.
The very large page, in turn, would be an arbitrary size so when it was freed later it would be
difficult to reuse and would cause serious fragmentation issues.
---
 bucket_test.go | 24 ++++++++++++------------
 db.go          |  7 +++++++
 db_test.go     |  2 +-
 node.go        | 49 ++++++++++++++++++++++++++++++++-----------------
 4 files changed, 52 insertions(+), 30 deletions(-)

diff --git a/bucket_test.go b/bucket_test.go
index 7e959d0..5a1b81c 100644
--- a/bucket_test.go
+++ b/bucket_test.go
@@ -691,15 +691,15 @@ func TestBucket_Stats_RandomFill(t *testing.T) {
 			s := tx.Bucket([]byte("woojits")).Stats()
 			assert.Equal(t, 100000, s.KeyN, "KeyN")
 
-			assert.Equal(t, 22, s.BranchPageN, "BranchPageN")
+			assert.Equal(t, 98, s.BranchPageN, "BranchPageN")
 			assert.Equal(t, 0, s.BranchOverflowN, "BranchOverflowN")
-			assert.Equal(t, 61708, s.BranchInuse, "BranchInuse")
-			assert.Equal(t, 90112, s.BranchAlloc, "BranchAlloc")
+			assert.Equal(t, 130984, s.BranchInuse, "BranchInuse")
+			assert.Equal(t, 401408, s.BranchAlloc, "BranchAlloc")
 
-			assert.Equal(t, 1643, s.LeafPageN, "LeafPageN")
+			assert.Equal(t, 3412, s.LeafPageN, "LeafPageN")
 			assert.Equal(t, 0, s.LeafOverflowN, "LeafOverflowN")
-			assert.Equal(t, 4714178, s.LeafInuse, "LeafInuse")
-			assert.Equal(t, 6729728, s.LeafAlloc, "LeafAlloc")
+			assert.Equal(t, 4742482, s.LeafInuse, "LeafInuse")
+			assert.Equal(t, 13975552, s.LeafAlloc, "LeafAlloc")
 			return nil
 		})
 	})
@@ -847,11 +847,11 @@ func TestBucket_Stats_Large(t *testing.T) {
 
 	withOpenDB(func(db *DB, path string) {
 		var index int
-		for i := 0; i < 10000; i++ {
+		for i := 0; i < 100; i++ {
 			db.Update(func(tx *Tx) error {
 				// Add bucket with lots of keys.
 				b, _ := tx.CreateBucketIfNotExists([]byte("widgets"))
-				for i := 0; i < 10; i++ {
+				for i := 0; i < 1000; i++ {
 					b.Put([]byte(strconv.Itoa(index)), []byte(strconv.Itoa(index)))
 					index++
 				}
@@ -865,16 +865,16 @@ func TestBucket_Stats_Large(t *testing.T) {
 			stats := b.Stats()
 			assert.Equal(t, 13, stats.BranchPageN, "BranchPageN")
 			assert.Equal(t, 0, stats.BranchOverflowN, "BranchOverflowN")
-			assert.Equal(t, 1195, stats.LeafPageN, "LeafPageN")
+			assert.Equal(t, 1196, stats.LeafPageN, "LeafPageN")
 			assert.Equal(t, 0, stats.LeafOverflowN, "LeafOverflowN")
 			assert.Equal(t, 100000, stats.KeyN, "KeyN")
 			assert.Equal(t, 3, stats.Depth, "Depth")
-			assert.Equal(t, 25208, stats.BranchInuse, "BranchInuse")
-			assert.Equal(t, 2596900, stats.LeafInuse, "LeafInuse")
+			assert.Equal(t, 25257, stats.BranchInuse, "BranchInuse")
+			assert.Equal(t, 2596916, stats.LeafInuse, "LeafInuse")
 			if os.Getpagesize() == 4096 {
 				// Incompatible page size
 				assert.Equal(t, 53248, stats.BranchAlloc, "BranchAlloc")
-				assert.Equal(t, 4894720, stats.LeafAlloc, "LeafAlloc")
+				assert.Equal(t, 4898816, stats.LeafAlloc, "LeafAlloc")
 			}
 			assert.Equal(t, 1, stats.BucketN, "BucketN")
 			assert.Equal(t, 0, stats.InlineBucketN, "InlineBucketN")
diff --git a/db.go b/db.go
index e88e499..9a125ee 100644
--- a/db.go
+++ b/db.go
@@ -5,6 +5,8 @@ import (
 	"fmt"
 	"hash/fnv"
 	"os"
+	"runtime/debug"
+	"strings"
 	"sync"
 	"unsafe"
 )
@@ -652,3 +654,8 @@ func warn(v ...interface{}) {
 func warnf(msg string, v ...interface{}) {
 	fmt.Fprintf(os.Stderr, msg+"\n", v...)
 }
+
+func printstack() {
+	stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
+	fmt.Fprintln(os.Stderr, stack)
+}
diff --git a/db_test.go b/db_test.go
index 55535a0..b5c943e 100644
--- a/db_test.go
+++ b/db_test.go
@@ -512,7 +512,7 @@ func withOpenDB(fn func(*DB, string)) {
 
 // mustCheck runs a consistency check on the database and panics if any errors are found.
 func mustCheck(db *DB) {
-	err := db.Update(func(tx *Tx) error {
+	err := db.View(func(tx *Tx) error {
 		return <-tx.Check()
 	})
 	if err != nil {
diff --git a/node.go b/node.go
index 1865ffe..a2c31fb 100644
--- a/node.go
+++ b/node.go
@@ -9,6 +9,7 @@ import (
 // node represents an in-memory, deserialized page.
 type node struct {
 	bucket     *Bucket
+	dirty      bool
 	isLeaf     bool
 	unbalanced bool
 	key        []byte
@@ -205,14 +206,37 @@ func (n *node) write(p *page) {
 	// DEBUG ONLY: n.dump()
 }
 
-// split breaks up a node into two smaller nodes, if appropriate.
+// split breaks up a node into multiple smaller nodes, if appropriate.
 // This should only be called from the spill() function.
 func (n *node) split(pageSize int) []*node {
+	var nodes []*node
+
+	node := n
+	for {
+		// Split node into two.
+		a, b := node.splitTwo(pageSize)
+		nodes = append(nodes, a)
+
+		// If we can't split then exit the loop.
+		if b == nil {
+			break
+		}
+
+		// Set node to b so it gets split on the next iteration.
+		node = b
+	}
+
+	return nodes
+}
+
+// splitTwo breaks up a node into two smaller nodes, if appropriate.
+// This should only be called from the split() function.
+func (n *node) splitTwo(pageSize int) (*node, *node) {
 	// Ignore the split if the page doesn't have at least enough nodes for
 	// two pages or if the data can fit on a single page.
 	sz := n.size()
 	if len(n.inodes) <= (minKeysPerPage*2) || sz < pageSize {
-		return []*node{n}
+		return n, nil
 	}
 
 	// Determine the threshold before starting a new node.
@@ -225,18 +249,10 @@ func (n *node) split(pageSize int) []*node {
 	threshold := int(float64(pageSize) * fillPercent)
 
 	// Determine split position and sizes of the two pages.
-	splitIndex, sz0 := n.splitIndex(threshold)
-	sz1 := pageHeaderSize + (sz - sz0)
+	splitIndex, _ := n.splitIndex(threshold)
 
-	// If we can fit our extra keys on the next page then merge into it.
-	if next := n.nextSibling(); next != nil && next.size()+sz1 < threshold {
-		next.inodes = append(n.inodes[splitIndex:], next.inodes...)
-		n.inodes = n.inodes[:splitIndex]
-		return []*node{n}
-	}
-
-	// Otherwise split node into two separate nodes. If there's no parent then
-	// we'll need to create one.
+	// Split node into two separate nodes.
+	// If there's no parent then we'll need to create one.
 	if n.parent == nil {
 		n.parent = &node{bucket: n.bucket, children: []*node{n}}
 	}
@@ -252,7 +268,7 @@ func (n *node) split(pageSize int) []*node {
 	// Update the statistics.
 	n.bucket.tx.stats.Split++
 
-	return []*node{n, next}
+	return n, next
 }
 
 // splitIndex finds the position where a page will fill a given threshold.
@@ -298,8 +314,7 @@ func (n *node) spill() error {
 	// We no longer need the child list because it's only used for spill tracking.
 	n.children = nil
 
-	// Spill nodes by deepest first. The first node returned from split() will
-	// always be "n".
+	// Split nodes into appropriate sizes. The first node will always be n.
 	var nodes = n.split(tx.db.pageSize)
 	for _, node := range nodes {
 		// Add node's page to the freelist if it's not new.
@@ -328,7 +343,7 @@ func (n *node) spill() error {
 
 			node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
 			node.key = node.inodes[0].key
-			_assert(len(n.key) > 0, "spill: zero-length node key")
+			_assert(len(node.key) > 0, "spill: zero-length node key")
 		}
 
 		// Update the statistics.