diff --git a/bnodes.go b/bnodes.go deleted file mode 100644 index 7d977c8..0000000 --- a/bnodes.go +++ /dev/null @@ -1,4 +0,0 @@ -package bolt - -type bnodes []bnode - diff --git a/bpage.go b/bpage.go new file mode 100644 index 0000000..496cfa2 --- /dev/null +++ b/bpage.go @@ -0,0 +1,5 @@ +package bolt + +type bpage struct { + keys [][]byte +} diff --git a/cursor.go b/cursor.go index 2f75677..e6b2c31 100644 --- a/cursor.go +++ b/cursor.go @@ -45,12 +45,17 @@ func (c *Cursor) Goto(key []byte) bool { return false } -// current the page and leaf node that the cursor is currently pointing at. -func (c *Cursor) current() (*page, *lnode) { +// top returns the page and leaf node that the cursor is currently pointing at. +func (c *Cursor) top() (*page, *lnode) { elem := c.stack[len(c.stack)-1] return elem.page, elem.page.lnode(elem.index) } +// page returns the page that the cursor is currently pointing at. +func (c *Cursor) page() *page { + return c.stack[len(c.stack)-1].page +} + // node returns the leaf node that the cursor is currently positioned on. func (c *Cursor) node() *lnode { elem := c.stack[len(c.stack)-1] diff --git a/db.go b/db.go index 2d7538b..9d003ab 100644 --- a/db.go +++ b/db.go @@ -156,11 +156,8 @@ func (db *DB) mmap() error { // init creates a new database file and initializes its meta pages. func (db *DB) init() error { - // Set the page size to the OS page size unless that is larger than max page size. + // Set the page size to the OS page size. db.pageSize = db.os.Getpagesize() - if db.pageSize > maxPageSize { - db.pageSize = maxPageSize - } // Create two meta pages on a buffer. buf := make([]byte, db.pageSize*2) diff --git a/db_test.go b/db_test.go index b9c4afe..5a86df8 100644 --- a/db_test.go +++ b/db_test.go @@ -53,26 +53,6 @@ func TestDBOpenMetaFileError(t *testing.T) { }) } -// Ensure that the database limits the upper bound of the page size. -func TestDBLimitPageSize(t *testing.T) { - withMockDB(func(db *DB, mockos *mockos, mocksyscall *mocksyscall, path string) { - b := make([]byte, 0x10000) - p0, p1 := (*page)(unsafe.Pointer(&b[0x0000])), (*page)(unsafe.Pointer(&b[0x8000])) - p0.init(0x8000) - p1.init(0x8000) - file, metafile := &mockfile{}, &mockfile{} - mockos.On("OpenFile", path, os.O_RDWR|os.O_CREATE, os.FileMode(0666)).Return(file, nil) - mockos.On("OpenFile", path, os.O_RDWR|os.O_SYNC, os.FileMode(0666)).Return(metafile, nil) - mockos.On("Getpagesize").Return(0x10000) - file.On("ReadAt", mock.Anything, int64(0)).Return(0, nil) - file.On("Stat").Return(&mockfileinfo{"", 0x10000, 0666, time.Now(), false, nil}, nil) - metafile.On("WriteAt", mock.Anything, int64(0)).Return(0, nil) - mocksyscall.On("Mmap", 0, int64(0), 0x10000, syscall.PROT_READ, syscall.MAP_SHARED).Return(b, nil) - db.Open(path, 0666) - assert.Equal(t, db.pageSize, maxPageSize) - }) -} - // Ensure that write errors to the meta file handler during initialization are returned. func TestDBMetaInitWriteError(t *testing.T) { withMockDB(func(db *DB, mockos *mockos, mocksyscall *mocksyscall, path string) { @@ -187,7 +167,7 @@ func TestDBCorruptMeta1(t *testing.T) { // Ensure that a database cannot open a transaction when it's not open. func TestDBTransactionDatabaseNotOpenError(t *testing.T) { withDB(func(db *DB, path string) { - txn, err := db.Transaction(false) + txn, err := db.Transaction() assert.Nil(t, txn) assert.Equal(t, err, DatabaseNotOpenError) }) @@ -196,8 +176,8 @@ func TestDBTransactionDatabaseNotOpenError(t *testing.T) { // Ensure that a database cannot open a writable transaction while one is in progress. func TestDBTransactionInProgressError(t *testing.T) { withOpenDB(func(db *DB, path string) { - db.Transaction(true) - txn, err := db.Transaction(true) + db.RWTransaction() + txn, err := db.RWTransaction() assert.Nil(t, txn) assert.Equal(t, err, TransactionInProgressError) }) @@ -206,10 +186,9 @@ func TestDBTransactionInProgressError(t *testing.T) { // Ensure that a database can create a new writable transaction. func TestDBTransactionWriter(t *testing.T) { withOpenDB(func(db *DB, path string) { - txn, err := db.Transaction(true) + txn, err := db.RWTransaction() if assert.NotNil(t, txn) { assert.Equal(t, txn.db, db) - assert.Equal(t, txn.writable, true) } assert.NoError(t, err) }) diff --git a/lnodes.go b/lnodes.go deleted file mode 100644 index 8db6a85..0000000 --- a/lnodes.go +++ /dev/null @@ -1,17 +0,0 @@ -package bolt - -type lnodes []lnode - -// replace replaces the node at the given index with a new key/value size. -func (s lnodes) replace(key, value []byte, index int) lnodes { - n := &s[index] - n.pos = 0 - n.ksize = len(key) - n.vsize = len(value) - return s -} - -// insert places a new node at the given index with a key/value size. -func (s lnodes) insert(key, value []byte, index int) lnodes { - return append(s[0:index], lnode{ksize: len(key), vsize: len(value)}, s[index:len(s)]) -} diff --git a/lpage.go b/lpage.go new file mode 100644 index 0000000..56c5c56 --- /dev/null +++ b/lpage.go @@ -0,0 +1,128 @@ +package bolt + +import ( + "bytes" + "sort" + "unsafe" +) + +type lpage struct { + nodes []lpnode +} + +type lpnode struct { + key []byte + value []byte +} + +// allocator is a function that returns a set of contiguous pages. +type allocator func(count int) (*page, error) + +// put inserts or replaces a key on a leaf page. +func (p *lpage) put(key []byte, value []byte) { + // Find insertion index. + index := sort.Search(len(p.nodes), func(i int) bool { return bytes.Compare(p.nodes[i].key, key) != -1 }) + + // If there is no existing key then add a new node. + if len(p.nodes) == 0 || !bytes.Equal(p.nodes[index].key, key) { + p.nodes = append(p.nodes, lpnode{}) + copy(p.nodes[index+1:], p.nodes[index:]) + } + p.nodes[index].key = key + p.nodes[index].value = value +} + +// read initializes the node data from an on-disk page. +func (p *lpage) read(page *page) { + p.nodes = make([]lpnode, page.count) + lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&page.ptr)) + for i := 0; i < int(page.count); i++ { + lnode := lnodes[i] + n := &p.nodes[i] + n.key = lnode.key() + n.value = lnode.value() + } +} + +// write writes the nodes onto one or more leaf pages. +func (p *lpage) write(pageSize int, allocate allocator) ([]*page, error) { + var pages []*page + + for _, nodes := range p.split(pageSize) { + // Determine the total page size. + var size int = pageHeaderSize + for _, node := range p.nodes { + size += lnodeSize + len(node.key) + len(node.value) + } + + // Allocate pages. + page, err := allocate(size) + if err != nil { + return nil, err + } + page.flags |= p_leaf + page.count = uint16(len(nodes)) + + // Loop over each node and write it to the page. + lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&page.ptr)) + b := (*[maxPageAllocSize]byte)(unsafe.Pointer(&page.ptr))[lnodeSize*len(nodes):] + for index, node := range nodes { + // Write node. + lnode := &lnodes[index] + lnode.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(&lnode))) + lnode.ksize = uint32(len(node.key)) + lnode.vsize = uint32(len(node.value)) + + // Write data to the end of the node. + copy(b[:], node.key) + b = b[len(node.key):] + copy(b[:], node.value) + b = b[len(node.value):] + } + + pages = append(pages, page) + } + + return pages, nil +} + +// split divides up the noes in the page into appropriately sized groups. +func (p *lpage) split(pageSize int) [][]lpnode { + // If we only have enough nodes for one page then just return the nodes. + if len(p.nodes) <= minKeysPerPage { + return [][]lpnode{p.nodes} + } + + // If we're not larger than one page then just return the nodes. + var totalSize int = pageHeaderSize + for _, node := range p.nodes { + totalSize += lnodeSize + len(node.key) + len(node.value) + } + if totalSize < pageSize { + return [][]lpnode{p.nodes} + } + + // Otherwise group into smaller pages and target a given fill size. + var size int + var group []lpnode + var groups [][]lpnode + + // Set fill threshold to 25%. + threshold := pageSize >> 4 + + for _, node := range p.nodes { + nodeSize := lnodeSize + len(node.key) + len(node.value) + + // TODO(benbjohnson): Don't create a new group for just the last node. + if group == nil || (len(group) > minKeysPerPage && size+nodeSize > threshold) { + size = pageHeaderSize + group = make([]lpnode, 0) + groups = append(groups, group) + } + + size += nodeSize + group = append(group, node) + } + + return groups +} diff --git a/page.go b/page.go index 2910d54..09d4062 100644 --- a/page.go +++ b/page.go @@ -6,10 +6,9 @@ import ( const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr)) -const maxPageSize = 0x8000 +const maxPageAllocSize = 0xFFFFFFF const minKeysPerPage = 2 const maxNodesPerPage = 65535 -const fillThreshold = 250 // 25% const ( p_branch = 0x01 diff --git a/rwtransaction.go b/rwtransaction.go index a0fd5ff..f665dc2 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -8,6 +8,8 @@ import ( // Only one read/write transaction can be active for a DB at a time. type RWTransaction struct { Transaction + bpages map[pgid]*bpage + lpages map[pgid]*lpage } // TODO: Allocate scratch meta page. @@ -45,14 +47,8 @@ func (t *RWTransaction) close() error { // CreateBucket creates a new bucket. func (t *RWTransaction) CreateBucket(name string) error { - if t.db == nil { - return InvalidTransactionError - } - // Check if bucket already exists. - if b, err := t.Bucket(name); err != nil { - return err - } else if b != nil { + if b := t.Bucket(name); b != nil { return &Error{"bucket already exists", nil} } @@ -61,16 +57,12 @@ func (t *RWTransaction) CreateBucket(name string) error { var raw = (*bucket)(unsafe.Pointer(&buf[0])) raw.root = 0 - // Open cursor to system bucket. - c := t.sys.cursor() - if c.Goto([]byte(name)) { - // TODO: Delete node first. - } + // TODO: Delete node first. // Insert new node. - if err := t.insert([]byte(name), buf[:]); err != nil { - return err - } + c := t.sys.cursor() + c.Goto([]byte(name)) + t.lpage(c.page().id).put([]byte(name), buf[:]) return nil } @@ -98,7 +90,7 @@ func (t *RWTransaction) flush(keep bool) error { func (t *RWTransaction) Put(name string, key []byte, value []byte) error { b := t.Bucket(name) if b == nil { - return BucketNotFoundError + return &Error{"bucket not found", nil} } // Validate the key and data size. @@ -110,15 +102,10 @@ func (t *RWTransaction) Put(name string, key []byte, value []byte) error { return &Error{"data too large", nil} } - // Move cursor to insertion position. - c := b.cursor() - replace := c.Goto() - p, index := c.current() - // Insert a new node. - if err := t.insert(p, index, key, value, replace); err != nil { - return err - } + c := b.cursor() + c.Goto(key) + t.lpage(c.page().id).put(key, value) return nil } @@ -132,65 +119,24 @@ func (t *RWTransaction) Delete(key []byte) error { } // allocate returns a contiguous block of memory starting at a given page. -func (t *RWTransaction) allocate(count int) (*page, error) { +func (t *RWTransaction) allocate(size int) (*page, error) { // TODO: Find a continuous block of free pages. // TODO: If no free pages are available, resize the mmap to allocate more. return nil, nil } -func (t *RWTransaction) insert(p *page, index int, key []byte, data []byte, replace bool) error { - nodes := copy(p.lnodes()) - if replace { - nodes = nodes.replace(index, key, data) - } else { - nodes = nodes.insert(index, key, data) +// lpage returns a deserialized leaf page. +func (t *RWTransaction) lpage(id pgid) *lpage { + if t.lpages != nil { + if p := t.lpages[id]; p != nil { + return p + } } - // If our page fits in the same size page then just write it. - if pageHeaderSize + nodes.size() < p.size() { - // TODO: Write new page. - // TODO: Update parent branches. - } - - // Calculate total page size. - size := pageHeaderSize - for _, n := range nodes { - size += lnodeSize + n.ksize + n.vsize - } - - // If our new page fits in our current page size then just write it. - if size < t.db.pageSize { - - return t.writeLeafPage(p.id, nodes) - } - - var nodesets [][]lnodes - if size < t.db.pageSize { - nodesets = [][]lnodes{nodes} - } - - nodesets := t.split(nodes) - - // TODO: Move remaining data on page forward. - // TODO: Write leaf node to current location. - // TODO: Adjust available page size. - return nil -} - -// split takes a list of nodes and returns multiple sets of nodes if a -// page split is required. -func (t *RWTransaction) split(nodes []lnodes) [][]lnodes { - - // If the size is less than the page size then just return the current set. - if size < t.db.pageSize { - return [][]lnodes{nodes} - } - - // Otherwise loop over nodes and split up into multiple pages. - var nodeset []lnodes - var nodesets [][]lnodes - for _, n := range nodes { - - } + // Read raw page and deserialize. + p := &lpage{} + p.read(t.page(id)) + t.lpages[id] = p + return p } diff --git a/transaction.go b/transaction.go index 07665ee..171508f 100644 --- a/transaction.go +++ b/transaction.go @@ -47,45 +47,45 @@ func (t *Transaction) DB() *DB { } // Bucket retrieves a bucket by name. -func (t *Transaction) Bucket(name string) (*Bucket, error) { +func (t *Transaction) Bucket(name string) *Bucket { // Return cached reference if it's already been looked up. if b := t.buckets[name]; b != nil { - return b, nil + return b } // Retrieve bucket data from the system bucket. - value := t.get(&t.sys, []byte(name)) + value := t.sys.cursor().Get([]byte(name)) if value == nil { return nil } // Create a bucket that overlays the data. b := &Bucket{ - bucket: (*bucket)(unsafe.Pointer(&data[0])), + bucket: (*bucket)(unsafe.Pointer(&value[0])), name: name, transaction: t, } t.buckets[name] = b - return b, nil + return b } // Cursor creates a cursor associated with a given bucket. -func (t *Transaction) Cursor(name string) (*Cursor, error) { - b, err := t.Bucket(name) - if err != nil { - return nil, err +func (t *Transaction) Cursor(name string) *Cursor { + b := t.Bucket(name) + if b == nil { + return nil } return b.Cursor() } // Get retrieves the value for a key in a named bucket. -func (t *Transaction) Get(name string, key []byte) ([]byte, error) { - b, err := t.Bucket(name) - if err != nil { - return nil, err +func (t *Transaction) Get(name string, key []byte) []byte { + c := t.Cursor(name) + if c == nil { + return nil } - return b.Get(key) + return c.Get(key) } // Stat returns information about a bucket's internal structure. diff --git a/transaction_test.go b/transaction_test.go index 6365368..084ef6d 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -9,31 +9,20 @@ import ( // Ensure that a bucket can be created and retrieved. func TestTransactionCreateBucket(t *testing.T) { withOpenDB(func(db *DB, path string) { - txn, _ := db.Transaction(false) - b, err := txn.CreateBucket("foo", false) - if assert.NoError(t, err) && assert.NotNil(t, b) { - b2, err := txn.Bucket("foo") - assert.NoError(t, err) - assert.Equal(t, b, b2) + txn, _ := db.RWTransaction() + err := txn.CreateBucket("foo") + if assert.NoError(t, err) { + assert.NotNil(t, txn.Bucket("foo")) } }) } -// Ensure that a user-created transaction cannot be used to create a bucket. -func TestTransactionInvalidCreateBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - txn := new(Transaction) - _, err := txn.CreateBucket("foo", false) - assert.Equal(t, err, InvalidTransactionError) - }) -} - // Ensure that an existing bucket cannot be created. func TestTransactionCreateExistingBucket(t *testing.T) { withOpenDB(func(db *DB, path string) { - txn, _ := db.Transaction(false) - txn.CreateBucket("foo", false) - _, err := txn.CreateBucket("foo", false) + txn, _ := db.RWTransaction() + txn.CreateBucket("foo") + err := txn.CreateBucket("foo") assert.Equal(t, err, BucketAlreadyExistsError) }) }