diff --git a/TODO b/TODO index ae1dc9f..40552c4 100644 --- a/TODO +++ b/TODO @@ -8,3 +8,7 @@ X Initialize transaction. - rebalance - adjust cursors - RWTransaction Commmit + + + + diff --git a/bpage.go b/bpage.go deleted file mode 100644 index 496cfa2..0000000 --- a/bpage.go +++ /dev/null @@ -1,5 +0,0 @@ -package bolt - -type bpage struct { - keys [][]byte -} diff --git a/leaf.go b/leaf.go new file mode 100644 index 0000000..23b35eb --- /dev/null +++ b/leaf.go @@ -0,0 +1,134 @@ +package bolt + +import ( + "bytes" + "sort" + "unsafe" +) + +// leaf represents a temporary in-memory leaf page. +// It is deserialized from an memory-mapped page and is not restricted by page size. +type leaf struct { + items leafItems +} + +type leafItems []leafItem + +type leafItem struct { + key []byte + value []byte +} + +// put inserts or replaces a key on a leaf page. +func (l *leaf) put(key []byte, value []byte) { + // Find insertion index. + index := sort.Search(len(l.items), func(i int) bool { return bytes.Compare(l.items[i].key, key) != -1 }) + + // If there is no existing key then add a new item. + if index == len(l.items) { + l.items = append(l.items, leafItem{}) + } else if len(l.items) == 0 || !bytes.Equal(l.items[index].key, key) { + l.items = append(l.items, leafItem{}) + copy(l.items[index+1:], l.items[index:]) + } + l.items[index].key = key + l.items[index].value = value +} + +// read initializes the item data from an on-disk page. +func (l *leaf) read(page *page) { + ncount := int(page.count) + l.items = make(leafItems, ncount) + lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&page.ptr)) + for i := 0; i < ncount; i++ { + lnode := &lnodes[i] + item := &l.items[i] + item.key = lnode.key() + item.value = lnode.value() + } +} + +// write writes the items onto one or more leaf pages. +func (l *leaf) write(pageSize int, allocate func(size int) (*page, error)) ([]*page, error) { + var pages []*page + + for _, items := range l.split(pageSize) { + // Determine the total page size. + var size int = pageHeaderSize + for _, item := range l.items { + size += lnodeSize + len(item.key) + len(item.value) + } + + // Allocate pages. + page, err := allocate(size) + if err != nil { + return nil, err + } + page.flags |= p_leaf + page.count = uint16(len(items)) + + // Loop over each item and write it to the page. + lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&page.ptr)) + b := (*[maxAllocSize]byte)(unsafe.Pointer(&page.ptr))[lnodeSize*len(items):] + for index, item := range items { + // Write item. + lnode := &lnodes[index] + lnode.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(lnode))) + lnode.ksize = uint32(len(item.key)) + lnode.vsize = uint32(len(item.value)) + + // Write data to the end of the page. + copy(b[0:], item.key) + b = b[len(item.key):] + copy(b[0:], item.value) + b = b[len(item.value):] + } + + pages = append(pages, page) + } + + return pages, nil +} + +// split divides up the noes in the page into appropriately sized groups. +func (l *leaf) split(pageSize int) []leafItems { + // If we don't have enough items for multiple pages then just return the items. + if len(l.items) <= (minKeysPerPage * 2) { + return []leafItems{l.items} + } + + // If we're not larger than one page then just return the items. + var totalSize int = pageHeaderSize + for _, item := range l.items { + totalSize += lnodeSize + len(item.key) + len(item.value) + } + if totalSize < pageSize { + return []leafItems{l.items} + } + + // Otherwise group into smaller pages and target a given fill size. + var size int + var group leafItems + var groups []leafItems + + // Set fill threshold to 25%. + threshold := pageSize >> 4 + + for index, item := range l.items { + nodeSize := lnodeSize + len(item.key) + len(item.value) + + if group == nil || (len(group) >= minKeysPerPage && index < len(l.items)-minKeysPerPage && size+nodeSize > threshold) { + size = pageHeaderSize + if group != nil { + groups = append(groups, group) + } + group = make(leafItems, 0) + } + + size += nodeSize + group = append(group, item) + } + groups = append(groups, group) + + return groups +} diff --git a/leaf_test.go b/leaf_test.go new file mode 100644 index 0000000..ad23074 --- /dev/null +++ b/leaf_test.go @@ -0,0 +1,143 @@ +package bolt + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" +) + +// Ensure that a temporary page can insert a key/value. +func TestLeafPut(t *testing.T) { + l := &leaf{items: make(leafItems, 0)} + l.put([]byte("baz"), []byte("2")) + l.put([]byte("foo"), []byte("0")) + l.put([]byte("bar"), []byte("1")) + l.put([]byte("foo"), []byte("3")) + assert.Equal(t, len(l.items), 3) + assert.Equal(t, l.items[0], leafItem{[]byte("bar"), []byte("1")}) + assert.Equal(t, l.items[1], leafItem{[]byte("baz"), []byte("2")}) + assert.Equal(t, l.items[2], leafItem{[]byte("foo"), []byte("3")}) +} + +// Ensure that a temporary page can deserialize from a page. +func TestLeafRead(t *testing.T) { + // Create a page. + var buf [4096]byte + page := (*page)(unsafe.Pointer(&buf[0])) + page.count = 2 + + // Insert 2 leaf items at the beginning. sizeof(lnode) == 16 + nodes := (*[3]lnode)(unsafe.Pointer(&page.ptr)) + nodes[0] = lnode{flags: 0, pos: 32, ksize: 3, vsize: 4} // pos = sizeof(lnode) * 2 + nodes[1] = lnode{flags: 0, pos: 23, ksize: 10, vsize: 3} // pos = sizeof(lnode) + 3 + 4 + + // Write data for the nodes at the end. + data := (*[4096]byte)(unsafe.Pointer(&nodes[2])) + copy(data[:], []byte("barfooz")) + copy(data[7:], []byte("helloworldbye")) + + // Deserialize page into a temporary page. + l := &leaf{} + l.read(page) + + // Check that there are two items with correct data. + assert.Equal(t, len(l.items), 2) + assert.Equal(t, l.items[0].key, []byte("bar")) + assert.Equal(t, l.items[0].value, []byte("fooz")) + assert.Equal(t, l.items[1].key, []byte("helloworld")) + assert.Equal(t, l.items[1].value, []byte("bye")) +} + +// Ensure that a temporary page can serialize itself. +func TestLeafWrite(t *testing.T) { + // Create a temp page. + l := &leaf{items: make(leafItems, 0)} + l.put([]byte("susy"), []byte("que")) + l.put([]byte("ricki"), []byte("lake")) + l.put([]byte("john"), []byte("johnson")) + + // Write it to a page. + var buf [4096]byte + allocate := func(size int) (*page, error) { + return (*page)(unsafe.Pointer(&buf[0])), nil + } + pages, err := l.write(4096, allocate) + assert.NoError(t, err) + + // Read the page back in. + l2 := &leaf{} + l2.read(pages[0]) + + // Check that the two pages are the same. + assert.Equal(t, len(l2.items), 3) + assert.Equal(t, l2.items[0].key, []byte("john")) + assert.Equal(t, l2.items[0].value, []byte("johnson")) + assert.Equal(t, l2.items[1].key, []byte("ricki")) + assert.Equal(t, l2.items[1].value, []byte("lake")) + assert.Equal(t, l2.items[2].key, []byte("susy")) + assert.Equal(t, l2.items[2].value, []byte("que")) +} + +// Ensure that an error that an allocation error during writing is returned. +func TestLeafWriteError(t *testing.T) { + // Create a temp page. + l := &leaf{items: make(leafItems, 0)} + l.put([]byte("susy"), []byte("que")) + + // Write it to a page. + exp := &Error{} + allocate := func(size int) (*page, error) { + return nil, exp + } + pages, err := l.write(4096, allocate) + assert.Nil(t, pages) + assert.Equal(t, err, exp) +} + +// Ensure that a temporary page can split into appropriate subgroups. +func TestLeafSplit(t *testing.T) { + // Create a temp page. + l := &leaf{items: make(leafItems, 0)} + l.put([]byte("00000001"), []byte("0123456701234567")) + l.put([]byte("00000002"), []byte("0123456701234567")) + l.put([]byte("00000003"), []byte("0123456701234567")) + l.put([]byte("00000004"), []byte("0123456701234567")) + l.put([]byte("00000005"), []byte("0123456701234567")) + + // Split between 3 & 4. + groups := l.split(100) + + assert.Equal(t, len(groups), 2) + assert.Equal(t, len(groups[0]), 2) + assert.Equal(t, len(groups[1]), 3) +} + +// Ensure that a temporary page with the minimum number of items just returns a single split group. +func TestLeafSplitWithMinKeys(t *testing.T) { + // Create a temp page. + l := &leaf{items: make(leafItems, 0)} + l.put([]byte("00000001"), []byte("0123456701234567")) + l.put([]byte("00000002"), []byte("0123456701234567")) + + // Split. + groups := l.split(20) + assert.Equal(t, len(groups), 1) + assert.Equal(t, len(groups[0]), 2) +} + +// Ensure that a temporary page that has keys that all fit on a page just returns one split group. +func TestLeafSplitFitsInPage(t *testing.T) { + // Create a temp page. + l := &leaf{items: make(leafItems, 0)} + l.put([]byte("00000001"), []byte("0123456701234567")) + l.put([]byte("00000002"), []byte("0123456701234567")) + l.put([]byte("00000003"), []byte("0123456701234567")) + l.put([]byte("00000004"), []byte("0123456701234567")) + l.put([]byte("00000005"), []byte("0123456701234567")) + + // Split. + groups := l.split(4096) + assert.Equal(t, len(groups), 1) + assert.Equal(t, len(groups[0]), 5) +} diff --git a/rwtransaction.go b/rwtransaction.go index 14aceb9..4e576cd 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -8,40 +8,7 @@ import ( // Only one read/write transaction can be active for a DB at a time. type RWTransaction struct { Transaction - tpages map[pgid]*tpage -} - -// TODO: Allocate scratch meta page. -// TODO: Allocate scratch data pages. -// TODO: Track dirty pages (?) - -func (t *RWTransaction) Commit() error { - // TODO: Update non-system bucket pointers. - // TODO: Save freelist. - // TODO: Flush data. - - // TODO: Initialize new meta object, Update system bucket nodes, last pgno, txnid. - // meta.mm_dbs[0] = txn->mt_dbs[0]; - // meta.mm_dbs[1] = txn->mt_dbs[1]; - // meta.mm_last_pg = txn->mt_next_pgno - 1; - // meta.mm_txnid = txn->mt_txnid; - - // TODO: Pick sync or async file descriptor. - // TODO: Write meta page to file. - - // TODO(?): Write checksum at the end. - - return nil -} - -func (t *RWTransaction) Rollback() error { - return t.close() -} - -func (t *RWTransaction) close() error { - // TODO: Free scratch pages. - // TODO: Release writer lock. - return nil + leafs map[pgid]*leaf } // CreateBucket creates a new bucket. @@ -56,12 +23,10 @@ func (t *RWTransaction) CreateBucket(name string) error { var raw = (*bucket)(unsafe.Pointer(&buf[0])) raw.root = 0 - // TODO: Delete node first. - // Insert new node. c := t.sys.cursor() c.Goto([]byte(name)) - t.tpage(c.page().id).put([]byte(name), buf[:]) + t.leaf(c.page().id).put([]byte(name), buf[:]) return nil } @@ -75,17 +40,6 @@ func (t *RWTransaction) DeleteBucket(b *Bucket) error { return nil } -// Flush (some) dirty pages to the map, after clearing their dirty flag. -// @param[in] txn the transaction that's being committed -// @param[in] keep number of initial pages in dirty_list to keep dirty. -// @return 0 on success, non-zero on failure. -func (t *RWTransaction) flush(keep bool) error { - // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - - // TODO: Loop over each dirty page and write it to disk. - return nil -} - func (t *RWTransaction) Put(name string, key []byte, value []byte) error { b := t.Bucket(name) if b == nil { @@ -104,7 +58,7 @@ func (t *RWTransaction) Put(name string, key []byte, value []byte) error { // Insert a new node. c := b.cursor() c.Goto(key) - t.tpage(c.page().id).put(key, value) + t.leaf(c.page().id).put(key, value) return nil } @@ -117,6 +71,31 @@ func (t *RWTransaction) Delete(key []byte) error { return nil } +func (t *RWTransaction) Commit() error { + // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. + + + // TODO: Flush data. + + // TODO: Update meta. + // TODO: Write meta. + + return nil +} + +func (t *RWTransaction) Rollback() error { + return t.close() +} + +func (t *RWTransaction) close() error { + // Clear temporary pages. + t.leafs = nil + + // TODO: Release writer lock. + + return nil +} + // allocate returns a contiguous block of memory starting at a given page. func (t *RWTransaction) allocate(size int) (*page, error) { // TODO: Find a continuous block of free pages. @@ -124,18 +103,18 @@ func (t *RWTransaction) allocate(size int) (*page, error) { return nil, nil } -// tpage returns a deserialized leaf page. -func (t *RWTransaction) tpage(id pgid) *tpage { - if t.tpages != nil { - if p := t.tpages[id]; p != nil { - return p +// leaf returns a deserialized leaf page. +func (t *RWTransaction) leaf(id pgid) *leaf { + if t.leafs != nil { + if l := t.leafs[id]; l != nil { + return l } } // Read raw page and deserialize. - p := &tpage{} - p.read(t.page(id)) - t.tpages[id] = p + l := &leaf{} + l.read(t.page(id)) + t.leafs[id] = l - return p + return l } diff --git a/tnode.go b/tnode.go deleted file mode 100644 index fcb4782..0000000 --- a/tnode.go +++ /dev/null @@ -1,8 +0,0 @@ -package bolt - -type tnodes []tnode - -type tnode struct { - key []byte - value []byte -} diff --git a/tpage.go b/tpage.go deleted file mode 100644 index facc789..0000000 --- a/tpage.go +++ /dev/null @@ -1,131 +0,0 @@ -package bolt - -import ( - "bytes" - "sort" - "unsafe" -) - -// tpage represents a temporary in-memory leaf page. -// It is deserialized from an memory-mapped page and is not restricted by page size. -type tpage struct { - nodes tnodes -} - -// allocator is a function that returns a set of contiguous pages. -type allocator func(size int) (*page, error) - -// put inserts or replaces a key on a leaf page. -func (p *tpage) put(key []byte, value []byte) { - // Find insertion index. - index := sort.Search(len(p.nodes), func(i int) bool { return bytes.Compare(p.nodes[i].key, key) != -1 }) - - // If there is no existing key then add a new node. - if index == len(p.nodes) { - p.nodes = append(p.nodes, tnode{}) - } else if len(p.nodes) == 0 || !bytes.Equal(p.nodes[index].key, key) { - p.nodes = append(p.nodes, tnode{}) - copy(p.nodes[index+1:], p.nodes[index:]) - } - p.nodes[index].key = key - p.nodes[index].value = value -} - -// read initializes the node data from an on-disk page. -func (p *tpage) read(page *page) { - ncount := int(page.count) - p.nodes = make(tnodes, ncount) - lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&page.ptr)) - for i := 0; i < ncount; i++ { - lnode := &lnodes[i] - n := &p.nodes[i] - n.key = lnode.key() - n.value = lnode.value() - } -} - -// write writes the nodes onto one or more leaf pages. -func (p *tpage) write(pageSize int, allocate allocator) ([]*page, error) { - var pages []*page - - for _, nodes := range p.split(pageSize) { - // Determine the total page size. - var size int = pageHeaderSize - for _, node := range p.nodes { - size += lnodeSize + len(node.key) + len(node.value) - } - - // Allocate pages. - page, err := allocate(size) - if err != nil { - return nil, err - } - page.flags |= p_leaf - page.count = uint16(len(nodes)) - - // Loop over each node and write it to the page. - lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&page.ptr)) - b := (*[maxAllocSize]byte)(unsafe.Pointer(&page.ptr))[lnodeSize*len(nodes):] - for index, node := range nodes { - // Write node. - lnode := &lnodes[index] - lnode.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(lnode))) - lnode.ksize = uint32(len(node.key)) - lnode.vsize = uint32(len(node.value)) - - // Write data to the end of the node. - copy(b[0:], node.key) - b = b[len(node.key):] - copy(b[0:], node.value) - b = b[len(node.value):] - } - - pages = append(pages, page) - } - - return pages, nil -} - -// split divides up the noes in the page into appropriately sized groups. -func (p *tpage) split(pageSize int) []tnodes { - // If we only have enough nodes for multiple pages then just return the nodes. - if len(p.nodes) <= (minKeysPerPage * 2) { - return []tnodes{p.nodes} - } - - // If we're not larger than one page then just return the nodes. - var totalSize int = pageHeaderSize - for _, node := range p.nodes { - totalSize += lnodeSize + len(node.key) + len(node.value) - } - if totalSize < pageSize { - return []tnodes{p.nodes} - } - - // Otherwise group into smaller pages and target a given fill size. - var size int - var group tnodes - var groups []tnodes - - // Set fill threshold to 25%. - threshold := pageSize >> 4 - - for index, node := range p.nodes { - nodeSize := lnodeSize + len(node.key) + len(node.value) - - // TODO(benbjohnson): Don't create a new group for just the last node. - if group == nil || (len(group) >= minKeysPerPage && index < len(p.nodes)-minKeysPerPage && size+nodeSize > threshold) { - size = pageHeaderSize - if group != nil { - groups = append(groups, group) - } - group = make(tnodes, 0) - } - - size += nodeSize - group = append(group, node) - } - groups = append(groups, group) - - return groups -} diff --git a/tpage_test.go b/tpage_test.go deleted file mode 100644 index ace4c53..0000000 --- a/tpage_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package bolt - -import ( - "testing" - "unsafe" - - "github.com/stretchr/testify/assert" -) - -// Ensure that a temporary page can insert a key/value. -func TestTpagePut(t *testing.T) { - p := &tpage{nodes: make(tnodes, 0)} - p.put([]byte("baz"), []byte("2")) - p.put([]byte("foo"), []byte("0")) - p.put([]byte("bar"), []byte("1")) - p.put([]byte("foo"), []byte("3")) - assert.Equal(t, len(p.nodes), 3) - assert.Equal(t, p.nodes[0], tnode{[]byte("bar"), []byte("1")}) - assert.Equal(t, p.nodes[1], tnode{[]byte("baz"), []byte("2")}) - assert.Equal(t, p.nodes[2], tnode{[]byte("foo"), []byte("3")}) -} - -// Ensure that a temporary page can deserialize from a page. -func TestTpageRead(t *testing.T) { - // Create a page. - var buf [4096]byte - page := (*page)(unsafe.Pointer(&buf[0])) - page.count = 2 - - // Insert 2 leaf nodes at the beginning. sizeof(lnode) == 16 - nodes := (*[3]lnode)(unsafe.Pointer(&page.ptr)) - nodes[0] = lnode{flags: 0, pos: 32, ksize: 3, vsize: 4} // pos = sizeof(lnode) * 2 - nodes[1] = lnode{flags: 0, pos: 23, ksize: 10, vsize: 3} // pos = sizeof(lnode) + 3 + 4 - - // Write data for the nodes at the end. - data := (*[4096]byte)(unsafe.Pointer(&nodes[2])) - copy(data[:], []byte("barfooz")) - copy(data[7:], []byte("helloworldbye")) - - // Deserialize page into a temporary page. - p := &tpage{} - p.read(page) - - // Check that there are two nodes with correct data. - assert.Equal(t, len(p.nodes), 2) - assert.Equal(t, p.nodes[0].key, []byte("bar")) - assert.Equal(t, p.nodes[0].value, []byte("fooz")) - assert.Equal(t, p.nodes[1].key, []byte("helloworld")) - assert.Equal(t, p.nodes[1].value, []byte("bye")) -} - -// Ensure that a temporary page can serialize itself. -func TestTpageWrite(t *testing.T) { - // Create a temp page. - p := &tpage{nodes: make(tnodes, 0)} - p.put([]byte("susy"), []byte("que")) - p.put([]byte("ricki"), []byte("lake")) - p.put([]byte("john"), []byte("johnson")) - - // Write it to a page. - var buf [4096]byte - allocate := func(size int) (*page, error) { - return (*page)(unsafe.Pointer(&buf[0])), nil - } - pages, err := p.write(4096, allocate) - assert.NoError(t, err) - - // Read the page back in. - p2 := &tpage{} - p2.read(pages[0]) - - // Check that the two pages are the same. - assert.Equal(t, len(p2.nodes), 3) - assert.Equal(t, p2.nodes[0].key, []byte("john")) - assert.Equal(t, p2.nodes[0].value, []byte("johnson")) - assert.Equal(t, p2.nodes[1].key, []byte("ricki")) - assert.Equal(t, p2.nodes[1].value, []byte("lake")) - assert.Equal(t, p2.nodes[2].key, []byte("susy")) - assert.Equal(t, p2.nodes[2].value, []byte("que")) -} - -// Ensure that an error that an allocation error during writing is returned. -func TestTpageWriteError(t *testing.T) { - // Create a temp page. - p := &tpage{nodes: make(tnodes, 0)} - p.put([]byte("susy"), []byte("que")) - - // Write it to a page. - exp := &Error{} - allocate := func(size int) (*page, error) { - return nil, exp - } - pages, err := p.write(4096, allocate) - assert.Nil(t, pages) - assert.Equal(t, err, exp) -} - -// Ensure that a temporary page can split into appropriate subgroups. -func TestTpageSplit(t *testing.T) { - // Create a temp page. - p := &tpage{nodes: make(tnodes, 0)} - p.put([]byte("00000001"), []byte("0123456701234567")) - p.put([]byte("00000002"), []byte("0123456701234567")) - p.put([]byte("00000003"), []byte("0123456701234567")) - p.put([]byte("00000004"), []byte("0123456701234567")) - p.put([]byte("00000005"), []byte("0123456701234567")) - - // Split between 3 & 4. - pages := p.split(100) - - assert.Equal(t, len(pages), 2) - assert.Equal(t, len(pages[0]), 2) - assert.Equal(t, len(pages[1]), 3) -} - -// Ensure that a temporary page with the minimum number of nodes just returns a single split group. -func TestTpageSplitWithMinKeys(t *testing.T) { - // Create a temp page. - p := &tpage{nodes: make(tnodes, 0)} - p.put([]byte("00000001"), []byte("0123456701234567")) - p.put([]byte("00000002"), []byte("0123456701234567")) - - // Split. - pages := p.split(20) - assert.Equal(t, len(pages), 1) - assert.Equal(t, len(pages[0]), 2) -} - -// Ensure that a temporary page that has keys that all fit on a page just returns one split group. -func TestTpageSplitFitsInPage(t *testing.T) { - // Create a temp page. - p := &tpage{nodes: make(tnodes, 0)} - p.put([]byte("00000001"), []byte("0123456701234567")) - p.put([]byte("00000002"), []byte("0123456701234567")) - p.put([]byte("00000003"), []byte("0123456701234567")) - p.put([]byte("00000004"), []byte("0123456701234567")) - p.put([]byte("00000005"), []byte("0123456701234567")) - - // Split. - pages := p.split(4096) - assert.Equal(t, len(pages), 1) - assert.Equal(t, len(pages[0]), 5) -}