Rename internal local Tx variables.

This commit changes the local Tx variables from "t" to "tx". This is partly
for consistency with external documentation but also because it just
annoys me for some reason.
pull/34/head
Ben Johnson 2014-04-04 12:03:04 -06:00
parent af1551e8dd
commit 12204df0b5
4 changed files with 152 additions and 152 deletions

View File

@ -259,7 +259,7 @@ func (c *Cursor) keyValue() ([]byte, []byte) {
}
// node returns the node that the cursor is currently positioned on.
func (c *Cursor) node(t *Tx) *node {
func (c *Cursor) node(tx *Tx) *node {
_assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")
// If the top of the stack is a leaf node then just return it.
@ -270,7 +270,7 @@ func (c *Cursor) node(t *Tx) *node {
// Start from root and traverse down the hierarchy.
var n = c.stack[0].node
if n == nil {
n = t.node(c.stack[0].page.id, nil)
n = tx.node(c.stack[0].page.id, nil)
}
for _, ref := range c.stack[:len(c.stack)-1] {
_assert(!n.isLeaf, "expected branch node")

8
db.go
View File

@ -361,7 +361,7 @@ func (db *DB) beginRWTx() (*Tx, error) {
}
// removeTx removes a transaction from the database.
func (db *DB) removeTx(t *Tx) {
func (db *DB) removeTx(tx *Tx) {
db.metalock.Lock()
defer db.metalock.Unlock()
@ -369,15 +369,15 @@ func (db *DB) removeTx(t *Tx) {
db.mmaplock.RUnlock()
// Remove the transaction.
for i, tx := range db.txs {
if tx == t {
for i, t := range db.txs {
if t == tx {
db.txs = append(db.txs[:i], db.txs[i+1:]...)
break
}
}
// Merge statistics.
db.stats.TxStats.add(&t.stats)
db.stats.TxStats.add(&tx.stats)
}
// Update executes a function within the context of a read-write managed transaction.

View File

@ -54,8 +54,8 @@ func ExampleDB_View() {
})
// Access data from within a read-only transactional block.
db.View(func(t *Tx) error {
v := t.Bucket("people").Get([]byte("john"))
db.View(func(tx *Tx) error {
v := tx.Bucket("people").Get([]byte("john"))
fmt.Printf("John's last name is %s.\n", string(v))
return nil
})

288
tx.go
View File

@ -42,51 +42,51 @@ type Tx struct {
}
// init initializes the transaction.
func (t *Tx) init(db *DB) {
t.db = db
t.pages = nil
func (tx *Tx) init(db *DB) {
tx.db = db
tx.pages = nil
// Copy the meta page since it can be changed by the writer.
t.meta = &meta{}
db.meta().copy(t.meta)
tx.meta = &meta{}
db.meta().copy(tx.meta)
// Read in the buckets page.
t.buckets = &buckets{}
t.buckets.read(t.page(t.meta.buckets))
tx.buckets = &buckets{}
tx.buckets.read(tx.page(tx.meta.buckets))
if t.writable {
t.pages = make(map[pgid]*page)
t.nodes = make(map[pgid]*node)
if tx.writable {
tx.pages = make(map[pgid]*page)
tx.nodes = make(map[pgid]*node)
// Increment the transaction id.
t.meta.txid += txid(1)
tx.meta.txid += txid(1)
}
}
// id returns the transaction id.
func (t *Tx) id() txid {
return t.meta.txid
func (tx *Tx) id() txid {
return tx.meta.txid
}
// DB returns a reference to the database that created the transaction.
func (t *Tx) DB() *DB {
return t.db
func (tx *Tx) DB() *DB {
return tx.db
}
// Writable returns whether the transaction can perform write operations.
func (t *Tx) Writable() bool {
return t.writable
func (tx *Tx) Writable() bool {
return tx.writable
}
// Stats retrieves a copy of the current transaction statistics.
func (t *Tx) Stats() TxStats {
return t.stats
func (tx *Tx) Stats() TxStats {
return tx.stats
}
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
func (t *Tx) Bucket(name string) *Bucket {
b := t.buckets.get(name)
func (tx *Tx) Bucket(name string) *Bucket {
b := tx.buckets.get(name)
if b == nil {
return nil
}
@ -94,18 +94,18 @@ func (t *Tx) Bucket(name string) *Bucket {
return &Bucket{
bucket: b,
name: name,
tx: t,
tx: tx,
}
}
// Buckets retrieves a list of all buckets.
func (t *Tx) Buckets() []*Bucket {
buckets := make([]*Bucket, 0, len(t.buckets.items))
for name, b := range t.buckets.items {
func (tx *Tx) Buckets() []*Bucket {
buckets := make([]*Bucket, 0, len(tx.buckets.items))
for name, b := range tx.buckets.items {
bucket := &Bucket{
bucket: b,
name: name,
tx: t,
tx: tx,
}
buckets = append(buckets, bucket)
}
@ -115,12 +115,12 @@ func (t *Tx) Buckets() []*Bucket {
// CreateBucket creates a new bucket.
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
func (t *Tx) CreateBucket(name string) error {
if t.db == nil {
func (tx *Tx) CreateBucket(name string) error {
if tx.db == nil {
return ErrTxClosed
} else if !t.writable {
} else if !tx.writable {
return ErrTxNotWritable
} else if b := t.Bucket(name); b != nil {
} else if b := tx.Bucket(name); b != nil {
return ErrBucketExists
} else if len(name) == 0 {
return ErrBucketNameRequired
@ -129,22 +129,22 @@ func (t *Tx) CreateBucket(name string) error {
}
// Create a blank root leaf page.
p, err := t.allocate(1)
p, err := tx.allocate(1)
if err != nil {
return err
}
p.flags = leafPageFlag
// Add bucket to buckets page.
t.buckets.put(name, &bucket{root: p.id})
tx.buckets.put(name, &bucket{root: p.id})
return nil
}
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
func (t *Tx) CreateBucketIfNotExists(name string) error {
err := t.CreateBucket(name)
func (tx *Tx) CreateBucketIfNotExists(name string) error {
err := tx.CreateBucket(name)
if err != nil && err != ErrBucketExists {
return err
}
@ -153,42 +153,42 @@ func (t *Tx) CreateBucketIfNotExists(name string) error {
// DeleteBucket deletes a bucket.
// Returns an error if the bucket cannot be found.
func (t *Tx) DeleteBucket(name string) error {
if t.db == nil {
func (tx *Tx) DeleteBucket(name string) error {
if tx.db == nil {
return ErrTxClosed
} else if !t.writable {
} else if !tx.writable {
return ErrTxNotWritable
}
b := t.Bucket(name)
b := tx.Bucket(name)
if b == nil {
return ErrBucketNotFound
}
// Remove from buckets page.
t.buckets.del(name)
tx.buckets.del(name)
// Free all pages.
t.forEachPage(b.root, 0, func(p *page, depth int) {
t.db.freelist.free(t.id(), p)
tx.forEachPage(b.root, 0, func(p *page, depth int) {
tx.db.freelist.free(tx.id(), p)
})
return nil
}
// OnCommit adds a handler function to be executed after the transaction successfully commits.
func (t *Tx) OnCommit(fn func()) {
t.commitHandlers = append(t.commitHandlers, fn)
func (tx *Tx) OnCommit(fn func()) {
tx.commitHandlers = append(tx.commitHandlers, fn)
}
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs.
func (t *Tx) Commit() error {
if t.managed {
func (tx *Tx) Commit() error {
if tx.managed {
panic("managed tx commit not allowed")
} else if t.db == nil {
} else if tx.db == nil {
return ErrTxClosed
} else if !t.writable {
} else if !tx.writable {
return ErrTxNotWritable
}
@ -196,59 +196,59 @@ func (t *Tx) Commit() error {
// Rebalance nodes which have had deletions.
var startTime = time.Now()
t.rebalance()
t.stats.RebalanceTime += time.Since(startTime)
tx.rebalance()
tx.stats.RebalanceTime += time.Since(startTime)
// spill data onto dirty pages.
startTime = time.Now()
if err := t.spill(); err != nil {
t.close()
if err := tx.spill(); err != nil {
tx.close()
return err
}
t.stats.SpillTime += time.Since(startTime)
tx.stats.SpillTime += time.Since(startTime)
// Spill buckets page.
p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
p, err := tx.allocate((tx.buckets.size() / tx.db.pageSize) + 1)
if err != nil {
t.close()
tx.close()
return err
}
t.buckets.write(p)
tx.buckets.write(p)
// Free previous bucket page and update meta.
t.db.freelist.free(t.id(), t.page(t.meta.buckets))
t.meta.buckets = p.id
tx.db.freelist.free(tx.id(), tx.page(tx.meta.buckets))
tx.meta.buckets = p.id
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
t.db.freelist.free(t.id(), t.page(t.meta.freelist))
p, err = t.allocate((t.db.freelist.size() / t.db.pageSize) + 1)
tx.db.freelist.free(tx.id(), tx.page(tx.meta.freelist))
p, err = tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
t.close()
tx.close()
return err
}
t.db.freelist.write(p)
t.meta.freelist = p.id
tx.db.freelist.write(p)
tx.meta.freelist = p.id
// Write dirty pages to disk.
startTime = time.Now()
if err := t.write(); err != nil {
t.close()
if err := tx.write(); err != nil {
tx.close()
return err
}
// Write meta to disk.
if err := t.writeMeta(); err != nil {
t.close()
if err := tx.writeMeta(); err != nil {
tx.close()
return err
}
t.stats.WriteTime += time.Since(startTime)
tx.stats.WriteTime += time.Since(startTime)
// Finalize the transaction.
t.close()
tx.close()
// Execute commit handlers now that the locks have been removed.
for _, fn := range t.commitHandlers {
for _, fn := range tx.commitHandlers {
fn()
}
@ -256,57 +256,57 @@ func (t *Tx) Commit() error {
}
// Rollback closes the transaction and ignores all previous updates.
func (t *Tx) Rollback() error {
if t.managed {
func (tx *Tx) Rollback() error {
if tx.managed {
panic("managed tx rollback not allowed")
} else if t.db == nil {
} else if tx.db == nil {
return ErrTxClosed
}
t.close()
tx.close()
return nil
}
func (t *Tx) close() {
if t.writable {
func (tx *Tx) close() {
if tx.writable {
// Merge statistics.
t.db.metalock.Lock()
t.db.stats.TxStats.add(&t.stats)
t.db.metalock.Unlock()
tx.db.metalock.Lock()
tx.db.stats.TxStats.add(&tx.stats)
tx.db.metalock.Unlock()
// Remove writer lock.
t.db.rwlock.Unlock()
tx.db.rwlock.Unlock()
} else {
t.db.removeTx(t)
tx.db.removeTx(tx)
}
t.db = nil
tx.db = nil
}
// allocate returns a contiguous block of memory starting at a given page.
func (t *Tx) allocate(count int) (*page, error) {
p, err := t.db.allocate(count)
func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(count)
if err != nil {
return nil, err
}
// Save to our page cache.
t.pages[p.id] = p
tx.pages[p.id] = p
// Update statistics.
t.stats.PageCount++
t.stats.PageAlloc += count * t.db.pageSize
tx.stats.PageCount++
tx.stats.PageAlloc += count * tx.db.pageSize
return p, nil
}
// rebalance attempts to balance all nodes.
func (t *Tx) rebalance() {
for _, n := range t.nodes {
func (tx *Tx) rebalance() {
for _, n := range tx.nodes {
n.rebalance()
}
}
// spill writes all the nodes to dirty pages.
func (t *Tx) spill() error {
func (tx *Tx) spill() error {
// Keep track of the current root nodes.
// We will update this at the end once all nodes are created.
type root struct {
@ -316,8 +316,8 @@ func (t *Tx) spill() error {
var roots []root
// Sort nodes by highest depth first.
nodes := make(nodesByDepth, 0, len(t.nodes))
for _, n := range t.nodes {
nodes := make(nodesByDepth, 0, len(tx.nodes))
for _, n := range tx.nodes {
nodes = append(nodes, n)
}
sort.Sort(nodes)
@ -333,24 +333,24 @@ func (t *Tx) spill() error {
// Split nodes into appropriate sized nodes.
// The first node in this list will be a reference to n to preserve ancestry.
newNodes := n.split(t.db.pageSize)
t.pending = newNodes
newNodes := n.split(tx.db.pageSize)
tx.pending = newNodes
// If this is a root node that split then create a parent node.
if n.parent == nil && len(newNodes) > 1 {
n.parent = &node{tx: t, isLeaf: false}
n.parent = &node{tx: tx, isLeaf: false}
nodes = append(nodes, n.parent)
}
// Add node's page to the freelist.
if n.pgid > 0 {
t.db.freelist.free(t.id(), t.page(n.pgid))
tx.db.freelist.free(tx.id(), tx.page(n.pgid))
}
// Write nodes to dirty pages.
for i, newNode := range newNodes {
// Allocate contiguous space for the node.
p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
p, err := tx.allocate((newNode.size() / tx.db.pageSize) + 1)
if err != nil {
return err
}
@ -374,140 +374,140 @@ func (t *Tx) spill() error {
}
// Update the statistics.
t.stats.Spill++
tx.stats.Spill++
}
t.pending = nil
tx.pending = nil
}
// Update roots with new roots.
for _, root := range roots {
t.buckets.updateRoot(root.pgid, root.node.root().pgid)
tx.buckets.updateRoot(root.pgid, root.node.root().pgid)
}
// Clear out nodes now that they are all spilled.
t.nodes = make(map[pgid]*node)
tx.nodes = make(map[pgid]*node)
return nil
}
// write writes any dirty pages to disk.
func (t *Tx) write() error {
func (tx *Tx) write() error {
// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
pages := make(pages, 0, len(tx.pages))
for _, p := range tx.pages {
pages = append(pages, p)
}
sort.Sort(pages)
// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * t.db.pageSize
size := (int(p.overflow) + 1) * tx.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
offset := int64(p.id) * int64(t.db.pageSize)
if _, err := t.db.ops.writeAt(buf, offset); err != nil {
offset := int64(p.id) * int64(tx.db.pageSize)
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Update statistics.
t.stats.Write++
tx.stats.Write++
}
if err := fdatasync(t.db.file); err != nil {
if err := fdatasync(tx.db.file); err != nil {
return err
}
// Clear out page cache.
t.pages = make(map[pgid]*page)
tx.pages = make(map[pgid]*page)
return nil
}
// writeMeta writes the meta to the disk.
func (t *Tx) writeMeta() error {
func (tx *Tx) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, t.db.pageSize)
p := t.db.pageInBuffer(buf, 0)
t.meta.write(p)
buf := make([]byte, tx.db.pageSize)
p := tx.db.pageInBuffer(buf, 0)
tx.meta.write(p)
// Write the meta page to file.
if _, err := t.db.ops.writeAt(buf, int64(p.id)*int64(t.db.pageSize)); err != nil {
if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
return err
}
if err := fdatasync(t.db.file); err != nil {
if err := fdatasync(tx.db.file); err != nil {
return err
}
// Update statistics.
t.stats.Write++
tx.stats.Write++
return nil
}
// node creates a node from a page and associates it with a given parent.
func (t *Tx) node(pgid pgid, parent *node) *node {
func (tx *Tx) node(pgid pgid, parent *node) *node {
// Retrieve node if it's already been created.
if t.nodes == nil {
if tx.nodes == nil {
return nil
} else if n := t.nodes[pgid]; n != nil {
} else if n := tx.nodes[pgid]; n != nil {
return n
}
// Otherwise create a branch and cache it.
n := &node{tx: t, parent: parent}
n := &node{tx: tx, parent: parent}
if n.parent != nil {
n.depth = n.parent.depth + 1
}
n.read(t.page(pgid))
t.nodes[pgid] = n
n.read(tx.page(pgid))
tx.nodes[pgid] = n
// Update statistics.
t.stats.NodeCount++
tx.stats.NodeCount++
return n
}
// dereference removes all references to the old mmap.
func (t *Tx) dereference() {
for _, n := range t.nodes {
func (tx *Tx) dereference() {
for _, n := range tx.nodes {
n.dereference()
}
for _, n := range t.pending {
for _, n := range tx.pending {
n.dereference()
}
// Update statistics
t.stats.NodeDeref += len(t.nodes) + len(t.pending)
tx.stats.NodeDeref += len(tx.nodes) + len(tx.pending)
}
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
func (t *Tx) page(id pgid) *page {
func (tx *Tx) page(id pgid) *page {
// Check the dirty pages first.
if t.pages != nil {
if p, ok := t.pages[id]; ok {
if tx.pages != nil {
if p, ok := tx.pages[id]; ok {
return p
}
}
// Otherwise return directly from the mmap.
return t.db.page(id)
return tx.db.page(id)
}
// pageNode returns the in-memory node, if it exists.
// Otherwise returns the underlying page.
func (t *Tx) pageNode(id pgid) (*page, *node) {
if t.nodes != nil {
if n := t.nodes[id]; n != nil {
func (tx *Tx) pageNode(id pgid) (*page, *node) {
if tx.nodes != nil {
if n := tx.nodes[id]; n != nil {
return nil, n
}
}
return t.page(id), nil
return tx.page(id), nil
}
// forEachPage iterates over every page within a given page and executes a function.
func (t *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := t.page(pgid)
func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := tx.page(pgid)
// Execute function.
fn(p, depth)
@ -516,24 +516,24 @@ func (t *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
if (p.flags & branchPageFlag) != 0 {
for i := 0; i < int(p.count); i++ {
elem := p.branchPageElement(uint16(i))
t.forEachPage(elem.pgid, depth+1, fn)
tx.forEachPage(elem.pgid, depth+1, fn)
}
}
}
// Page returns page information for a given page number.
// This is only available from writable transactions.
func (t *Tx) Page(id int) (*PageInfo, error) {
if t.db == nil {
func (tx *Tx) Page(id int) (*PageInfo, error) {
if tx.db == nil {
return nil, ErrTxClosed
} else if !t.writable {
} else if !tx.writable {
return nil, ErrTxNotWritable
} else if pgid(id) >= t.meta.pgid {
} else if pgid(id) >= tx.meta.pgid {
return nil, nil
}
// Build the page info.
p := t.page(pgid(id))
p := tx.page(pgid(id))
info := &PageInfo{
ID: id,
Count: int(p.count),
@ -541,7 +541,7 @@ func (t *Tx) Page(id int) (*PageInfo, error) {
}
// Determine the type (or if it's free).
if t.db.freelist.isFree(pgid(id)) {
if tx.db.freelist.isFree(pgid(id)) {
info.Type = "free"
} else {
info.Type = p.typ()