diff --git a/cursor.go b/cursor.go index 55bf568..2887791 100644 --- a/cursor.go +++ b/cursor.go @@ -259,7 +259,7 @@ func (c *Cursor) keyValue() ([]byte, []byte) { } // node returns the node that the cursor is currently positioned on. -func (c *Cursor) node(t *Tx) *node { +func (c *Cursor) node(tx *Tx) *node { _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack") // If the top of the stack is a leaf node then just return it. @@ -270,7 +270,7 @@ func (c *Cursor) node(t *Tx) *node { // Start from root and traverse down the hierarchy. var n = c.stack[0].node if n == nil { - n = t.node(c.stack[0].page.id, nil) + n = tx.node(c.stack[0].page.id, nil) } for _, ref := range c.stack[:len(c.stack)-1] { _assert(!n.isLeaf, "expected branch node") diff --git a/db.go b/db.go index ee3a24b..a76639d 100644 --- a/db.go +++ b/db.go @@ -361,7 +361,7 @@ func (db *DB) beginRWTx() (*Tx, error) { } // removeTx removes a transaction from the database. -func (db *DB) removeTx(t *Tx) { +func (db *DB) removeTx(tx *Tx) { db.metalock.Lock() defer db.metalock.Unlock() @@ -369,15 +369,15 @@ func (db *DB) removeTx(t *Tx) { db.mmaplock.RUnlock() // Remove the transaction. - for i, tx := range db.txs { - if tx == t { + for i, t := range db.txs { + if t == tx { db.txs = append(db.txs[:i], db.txs[i+1:]...) break } } // Merge statistics. - db.stats.TxStats.add(&t.stats) + db.stats.TxStats.add(&tx.stats) } // Update executes a function within the context of a read-write managed transaction. diff --git a/example_test.go b/example_test.go index eeaecf5..3e3758e 100644 --- a/example_test.go +++ b/example_test.go @@ -54,8 +54,8 @@ func ExampleDB_View() { }) // Access data from within a read-only transactional block. - db.View(func(t *Tx) error { - v := t.Bucket("people").Get([]byte("john")) + db.View(func(tx *Tx) error { + v := tx.Bucket("people").Get([]byte("john")) fmt.Printf("John's last name is %s.\n", string(v)) return nil }) diff --git a/tx.go b/tx.go index ed693be..bfdcce1 100644 --- a/tx.go +++ b/tx.go @@ -42,51 +42,51 @@ type Tx struct { } // init initializes the transaction. -func (t *Tx) init(db *DB) { - t.db = db - t.pages = nil +func (tx *Tx) init(db *DB) { + tx.db = db + tx.pages = nil // Copy the meta page since it can be changed by the writer. - t.meta = &meta{} - db.meta().copy(t.meta) + tx.meta = &meta{} + db.meta().copy(tx.meta) // Read in the buckets page. - t.buckets = &buckets{} - t.buckets.read(t.page(t.meta.buckets)) + tx.buckets = &buckets{} + tx.buckets.read(tx.page(tx.meta.buckets)) - if t.writable { - t.pages = make(map[pgid]*page) - t.nodes = make(map[pgid]*node) + if tx.writable { + tx.pages = make(map[pgid]*page) + tx.nodes = make(map[pgid]*node) // Increment the transaction id. - t.meta.txid += txid(1) + tx.meta.txid += txid(1) } } // id returns the transaction id. -func (t *Tx) id() txid { - return t.meta.txid +func (tx *Tx) id() txid { + return tx.meta.txid } // DB returns a reference to the database that created the transaction. -func (t *Tx) DB() *DB { - return t.db +func (tx *Tx) DB() *DB { + return tx.db } // Writable returns whether the transaction can perform write operations. -func (t *Tx) Writable() bool { - return t.writable +func (tx *Tx) Writable() bool { + return tx.writable } // Stats retrieves a copy of the current transaction statistics. -func (t *Tx) Stats() TxStats { - return t.stats +func (tx *Tx) Stats() TxStats { + return tx.stats } // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. -func (t *Tx) Bucket(name string) *Bucket { - b := t.buckets.get(name) +func (tx *Tx) Bucket(name string) *Bucket { + b := tx.buckets.get(name) if b == nil { return nil } @@ -94,18 +94,18 @@ func (t *Tx) Bucket(name string) *Bucket { return &Bucket{ bucket: b, name: name, - tx: t, + tx: tx, } } // Buckets retrieves a list of all buckets. -func (t *Tx) Buckets() []*Bucket { - buckets := make([]*Bucket, 0, len(t.buckets.items)) - for name, b := range t.buckets.items { +func (tx *Tx) Buckets() []*Bucket { + buckets := make([]*Bucket, 0, len(tx.buckets.items)) + for name, b := range tx.buckets.items { bucket := &Bucket{ bucket: b, name: name, - tx: t, + tx: tx, } buckets = append(buckets, bucket) } @@ -115,12 +115,12 @@ func (t *Tx) Buckets() []*Bucket { // CreateBucket creates a new bucket. // Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. -func (t *Tx) CreateBucket(name string) error { - if t.db == nil { +func (tx *Tx) CreateBucket(name string) error { + if tx.db == nil { return ErrTxClosed - } else if !t.writable { + } else if !tx.writable { return ErrTxNotWritable - } else if b := t.Bucket(name); b != nil { + } else if b := tx.Bucket(name); b != nil { return ErrBucketExists } else if len(name) == 0 { return ErrBucketNameRequired @@ -129,22 +129,22 @@ func (t *Tx) CreateBucket(name string) error { } // Create a blank root leaf page. - p, err := t.allocate(1) + p, err := tx.allocate(1) if err != nil { return err } p.flags = leafPageFlag // Add bucket to buckets page. - t.buckets.put(name, &bucket{root: p.id}) + tx.buckets.put(name, &bucket{root: p.id}) return nil } // CreateBucketIfNotExists creates a new bucket if it doesn't already exist. // Returns an error if the bucket name is blank, or if the bucket name is too long. -func (t *Tx) CreateBucketIfNotExists(name string) error { - err := t.CreateBucket(name) +func (tx *Tx) CreateBucketIfNotExists(name string) error { + err := tx.CreateBucket(name) if err != nil && err != ErrBucketExists { return err } @@ -153,42 +153,42 @@ func (t *Tx) CreateBucketIfNotExists(name string) error { // DeleteBucket deletes a bucket. // Returns an error if the bucket cannot be found. -func (t *Tx) DeleteBucket(name string) error { - if t.db == nil { +func (tx *Tx) DeleteBucket(name string) error { + if tx.db == nil { return ErrTxClosed - } else if !t.writable { + } else if !tx.writable { return ErrTxNotWritable } - b := t.Bucket(name) + b := tx.Bucket(name) if b == nil { return ErrBucketNotFound } // Remove from buckets page. - t.buckets.del(name) + tx.buckets.del(name) // Free all pages. - t.forEachPage(b.root, 0, func(p *page, depth int) { - t.db.freelist.free(t.id(), p) + tx.forEachPage(b.root, 0, func(p *page, depth int) { + tx.db.freelist.free(tx.id(), p) }) return nil } // OnCommit adds a handler function to be executed after the transaction successfully commits. -func (t *Tx) OnCommit(fn func()) { - t.commitHandlers = append(t.commitHandlers, fn) +func (tx *Tx) OnCommit(fn func()) { + tx.commitHandlers = append(tx.commitHandlers, fn) } // Commit writes all changes to disk and updates the meta page. // Returns an error if a disk write error occurs. -func (t *Tx) Commit() error { - if t.managed { +func (tx *Tx) Commit() error { + if tx.managed { panic("managed tx commit not allowed") - } else if t.db == nil { + } else if tx.db == nil { return ErrTxClosed - } else if !t.writable { + } else if !tx.writable { return ErrTxNotWritable } @@ -196,59 +196,59 @@ func (t *Tx) Commit() error { // Rebalance nodes which have had deletions. var startTime = time.Now() - t.rebalance() - t.stats.RebalanceTime += time.Since(startTime) + tx.rebalance() + tx.stats.RebalanceTime += time.Since(startTime) // spill data onto dirty pages. startTime = time.Now() - if err := t.spill(); err != nil { - t.close() + if err := tx.spill(); err != nil { + tx.close() return err } - t.stats.SpillTime += time.Since(startTime) + tx.stats.SpillTime += time.Since(startTime) // Spill buckets page. - p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) + p, err := tx.allocate((tx.buckets.size() / tx.db.pageSize) + 1) if err != nil { - t.close() + tx.close() return err } - t.buckets.write(p) + tx.buckets.write(p) // Free previous bucket page and update meta. - t.db.freelist.free(t.id(), t.page(t.meta.buckets)) - t.meta.buckets = p.id + tx.db.freelist.free(tx.id(), tx.page(tx.meta.buckets)) + tx.meta.buckets = p.id // Free the freelist and allocate new pages for it. This will overestimate // the size of the freelist but not underestimate the size (which would be bad). - t.db.freelist.free(t.id(), t.page(t.meta.freelist)) - p, err = t.allocate((t.db.freelist.size() / t.db.pageSize) + 1) + tx.db.freelist.free(tx.id(), tx.page(tx.meta.freelist)) + p, err = tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) if err != nil { - t.close() + tx.close() return err } - t.db.freelist.write(p) - t.meta.freelist = p.id + tx.db.freelist.write(p) + tx.meta.freelist = p.id // Write dirty pages to disk. startTime = time.Now() - if err := t.write(); err != nil { - t.close() + if err := tx.write(); err != nil { + tx.close() return err } // Write meta to disk. - if err := t.writeMeta(); err != nil { - t.close() + if err := tx.writeMeta(); err != nil { + tx.close() return err } - t.stats.WriteTime += time.Since(startTime) + tx.stats.WriteTime += time.Since(startTime) // Finalize the transaction. - t.close() + tx.close() // Execute commit handlers now that the locks have been removed. - for _, fn := range t.commitHandlers { + for _, fn := range tx.commitHandlers { fn() } @@ -256,57 +256,57 @@ func (t *Tx) Commit() error { } // Rollback closes the transaction and ignores all previous updates. -func (t *Tx) Rollback() error { - if t.managed { +func (tx *Tx) Rollback() error { + if tx.managed { panic("managed tx rollback not allowed") - } else if t.db == nil { + } else if tx.db == nil { return ErrTxClosed } - t.close() + tx.close() return nil } -func (t *Tx) close() { - if t.writable { +func (tx *Tx) close() { + if tx.writable { // Merge statistics. - t.db.metalock.Lock() - t.db.stats.TxStats.add(&t.stats) - t.db.metalock.Unlock() + tx.db.metalock.Lock() + tx.db.stats.TxStats.add(&tx.stats) + tx.db.metalock.Unlock() // Remove writer lock. - t.db.rwlock.Unlock() + tx.db.rwlock.Unlock() } else { - t.db.removeTx(t) + tx.db.removeTx(tx) } - t.db = nil + tx.db = nil } // allocate returns a contiguous block of memory starting at a given page. -func (t *Tx) allocate(count int) (*page, error) { - p, err := t.db.allocate(count) +func (tx *Tx) allocate(count int) (*page, error) { + p, err := tx.db.allocate(count) if err != nil { return nil, err } // Save to our page cache. - t.pages[p.id] = p + tx.pages[p.id] = p // Update statistics. - t.stats.PageCount++ - t.stats.PageAlloc += count * t.db.pageSize + tx.stats.PageCount++ + tx.stats.PageAlloc += count * tx.db.pageSize return p, nil } // rebalance attempts to balance all nodes. -func (t *Tx) rebalance() { - for _, n := range t.nodes { +func (tx *Tx) rebalance() { + for _, n := range tx.nodes { n.rebalance() } } // spill writes all the nodes to dirty pages. -func (t *Tx) spill() error { +func (tx *Tx) spill() error { // Keep track of the current root nodes. // We will update this at the end once all nodes are created. type root struct { @@ -316,8 +316,8 @@ func (t *Tx) spill() error { var roots []root // Sort nodes by highest depth first. - nodes := make(nodesByDepth, 0, len(t.nodes)) - for _, n := range t.nodes { + nodes := make(nodesByDepth, 0, len(tx.nodes)) + for _, n := range tx.nodes { nodes = append(nodes, n) } sort.Sort(nodes) @@ -333,24 +333,24 @@ func (t *Tx) spill() error { // Split nodes into appropriate sized nodes. // The first node in this list will be a reference to n to preserve ancestry. - newNodes := n.split(t.db.pageSize) - t.pending = newNodes + newNodes := n.split(tx.db.pageSize) + tx.pending = newNodes // If this is a root node that split then create a parent node. if n.parent == nil && len(newNodes) > 1 { - n.parent = &node{tx: t, isLeaf: false} + n.parent = &node{tx: tx, isLeaf: false} nodes = append(nodes, n.parent) } // Add node's page to the freelist. if n.pgid > 0 { - t.db.freelist.free(t.id(), t.page(n.pgid)) + tx.db.freelist.free(tx.id(), tx.page(n.pgid)) } // Write nodes to dirty pages. for i, newNode := range newNodes { // Allocate contiguous space for the node. - p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) + p, err := tx.allocate((newNode.size() / tx.db.pageSize) + 1) if err != nil { return err } @@ -374,140 +374,140 @@ func (t *Tx) spill() error { } // Update the statistics. - t.stats.Spill++ + tx.stats.Spill++ } - t.pending = nil + tx.pending = nil } // Update roots with new roots. for _, root := range roots { - t.buckets.updateRoot(root.pgid, root.node.root().pgid) + tx.buckets.updateRoot(root.pgid, root.node.root().pgid) } // Clear out nodes now that they are all spilled. - t.nodes = make(map[pgid]*node) + tx.nodes = make(map[pgid]*node) return nil } // write writes any dirty pages to disk. -func (t *Tx) write() error { +func (tx *Tx) write() error { // Sort pages by id. - pages := make(pages, 0, len(t.pages)) - for _, p := range t.pages { + pages := make(pages, 0, len(tx.pages)) + for _, p := range tx.pages { pages = append(pages, p) } sort.Sort(pages) // Write pages to disk in order. for _, p := range pages { - size := (int(p.overflow) + 1) * t.db.pageSize + size := (int(p.overflow) + 1) * tx.db.pageSize buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] - offset := int64(p.id) * int64(t.db.pageSize) - if _, err := t.db.ops.writeAt(buf, offset); err != nil { + offset := int64(p.id) * int64(tx.db.pageSize) + if _, err := tx.db.ops.writeAt(buf, offset); err != nil { return err } // Update statistics. - t.stats.Write++ + tx.stats.Write++ } - if err := fdatasync(t.db.file); err != nil { + if err := fdatasync(tx.db.file); err != nil { return err } // Clear out page cache. - t.pages = make(map[pgid]*page) + tx.pages = make(map[pgid]*page) return nil } // writeMeta writes the meta to the disk. -func (t *Tx) writeMeta() error { +func (tx *Tx) writeMeta() error { // Create a temporary buffer for the meta page. - buf := make([]byte, t.db.pageSize) - p := t.db.pageInBuffer(buf, 0) - t.meta.write(p) + buf := make([]byte, tx.db.pageSize) + p := tx.db.pageInBuffer(buf, 0) + tx.meta.write(p) // Write the meta page to file. - if _, err := t.db.ops.writeAt(buf, int64(p.id)*int64(t.db.pageSize)); err != nil { + if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil { return err } - if err := fdatasync(t.db.file); err != nil { + if err := fdatasync(tx.db.file); err != nil { return err } // Update statistics. - t.stats.Write++ + tx.stats.Write++ return nil } // node creates a node from a page and associates it with a given parent. -func (t *Tx) node(pgid pgid, parent *node) *node { +func (tx *Tx) node(pgid pgid, parent *node) *node { // Retrieve node if it's already been created. - if t.nodes == nil { + if tx.nodes == nil { return nil - } else if n := t.nodes[pgid]; n != nil { + } else if n := tx.nodes[pgid]; n != nil { return n } // Otherwise create a branch and cache it. - n := &node{tx: t, parent: parent} + n := &node{tx: tx, parent: parent} if n.parent != nil { n.depth = n.parent.depth + 1 } - n.read(t.page(pgid)) - t.nodes[pgid] = n + n.read(tx.page(pgid)) + tx.nodes[pgid] = n // Update statistics. - t.stats.NodeCount++ + tx.stats.NodeCount++ return n } // dereference removes all references to the old mmap. -func (t *Tx) dereference() { - for _, n := range t.nodes { +func (tx *Tx) dereference() { + for _, n := range tx.nodes { n.dereference() } - for _, n := range t.pending { + for _, n := range tx.pending { n.dereference() } // Update statistics - t.stats.NodeDeref += len(t.nodes) + len(t.pending) + tx.stats.NodeDeref += len(tx.nodes) + len(tx.pending) } // page returns a reference to the page with a given id. // If page has been written to then a temporary bufferred page is returned. -func (t *Tx) page(id pgid) *page { +func (tx *Tx) page(id pgid) *page { // Check the dirty pages first. - if t.pages != nil { - if p, ok := t.pages[id]; ok { + if tx.pages != nil { + if p, ok := tx.pages[id]; ok { return p } } // Otherwise return directly from the mmap. - return t.db.page(id) + return tx.db.page(id) } // pageNode returns the in-memory node, if it exists. // Otherwise returns the underlying page. -func (t *Tx) pageNode(id pgid) (*page, *node) { - if t.nodes != nil { - if n := t.nodes[id]; n != nil { +func (tx *Tx) pageNode(id pgid) (*page, *node) { + if tx.nodes != nil { + if n := tx.nodes[id]; n != nil { return nil, n } } - return t.page(id), nil + return tx.page(id), nil } // forEachPage iterates over every page within a given page and executes a function. -func (t *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) { - p := t.page(pgid) +func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) { + p := tx.page(pgid) // Execute function. fn(p, depth) @@ -516,24 +516,24 @@ func (t *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) { if (p.flags & branchPageFlag) != 0 { for i := 0; i < int(p.count); i++ { elem := p.branchPageElement(uint16(i)) - t.forEachPage(elem.pgid, depth+1, fn) + tx.forEachPage(elem.pgid, depth+1, fn) } } } // Page returns page information for a given page number. // This is only available from writable transactions. -func (t *Tx) Page(id int) (*PageInfo, error) { - if t.db == nil { +func (tx *Tx) Page(id int) (*PageInfo, error) { + if tx.db == nil { return nil, ErrTxClosed - } else if !t.writable { + } else if !tx.writable { return nil, ErrTxNotWritable - } else if pgid(id) >= t.meta.pgid { + } else if pgid(id) >= tx.meta.pgid { return nil, nil } // Build the page info. - p := t.page(pgid(id)) + p := tx.page(pgid(id)) info := &PageInfo{ ID: id, Count: int(p.count), @@ -541,7 +541,7 @@ func (t *Tx) Page(id int) (*PageInfo, error) { } // Determine the type (or if it's free). - if t.db.freelist.isFree(pgid(id)) { + if tx.db.freelist.isFree(pgid(id)) { info.Type = "free" } else { info.Type = p.typ()