mirror of
https://github.com/etcd-io/bbolt.git
synced 2025-05-01 13:13:32 +00:00
OpenBSD does not include a UBC kernel and writes must be synchronized with the msync(2) syscall. In addition, the NoSync field of the DB struct should be ignored on OpenBSD, since unlike other platforms, missing msyncs will result in data corruption. Depends on PR #258. Fixes #257.
581 lines
16 KiB
Go
581 lines
16 KiB
Go
package bolt
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sort"
|
|
"time"
|
|
"unsafe"
|
|
)
|
|
|
|
// txid represents the internal transaction identifier.
|
|
type txid uint64
|
|
|
|
// Tx represents a read-only or read/write transaction on the database.
|
|
// Read-only transactions can be used for retrieving values for keys and creating cursors.
|
|
// Read/write transactions can create and remove buckets and create and remove keys.
|
|
//
|
|
// IMPORTANT: You must commit or rollback transactions when you are done with
|
|
// them. Pages can not be reclaimed by the writer until no more transactions
|
|
// are using them. A long running read transaction can cause the database to
|
|
// quickly grow.
|
|
type Tx struct {
|
|
writable bool
|
|
managed bool
|
|
db *DB
|
|
meta *meta
|
|
root Bucket
|
|
pages map[pgid]*page
|
|
stats TxStats
|
|
commitHandlers []func()
|
|
}
|
|
|
|
// init initializes the transaction.
|
|
func (tx *Tx) init(db *DB) {
|
|
tx.db = db
|
|
tx.pages = nil
|
|
|
|
// Copy the meta page since it can be changed by the writer.
|
|
tx.meta = &meta{}
|
|
db.meta().copy(tx.meta)
|
|
|
|
// Copy over the root bucket.
|
|
tx.root = newBucket(tx)
|
|
tx.root.bucket = &bucket{}
|
|
*tx.root.bucket = tx.meta.root
|
|
|
|
// Increment the transaction id and add a page cache for writable transactions.
|
|
if tx.writable {
|
|
tx.pages = make(map[pgid]*page)
|
|
tx.meta.txid += txid(1)
|
|
}
|
|
}
|
|
|
|
// ID returns the transaction id.
|
|
func (tx *Tx) ID() int {
|
|
return int(tx.meta.txid)
|
|
}
|
|
|
|
// DB returns a reference to the database that created the transaction.
|
|
func (tx *Tx) DB() *DB {
|
|
return tx.db
|
|
}
|
|
|
|
// Size returns current database size in bytes as seen by this transaction.
|
|
func (tx *Tx) Size() int64 {
|
|
return int64(tx.meta.pgid) * int64(tx.db.pageSize)
|
|
}
|
|
|
|
// Writable returns whether the transaction can perform write operations.
|
|
func (tx *Tx) Writable() bool {
|
|
return tx.writable
|
|
}
|
|
|
|
// Cursor creates a cursor associated with the root bucket.
|
|
// All items in the cursor will return a nil value because all root bucket keys point to buckets.
|
|
// The cursor is only valid as long as the transaction is open.
|
|
// Do not use a cursor after the transaction is closed.
|
|
func (tx *Tx) Cursor() *Cursor {
|
|
return tx.root.Cursor()
|
|
}
|
|
|
|
// Stats retrieves a copy of the current transaction statistics.
|
|
func (tx *Tx) Stats() TxStats {
|
|
return tx.stats
|
|
}
|
|
|
|
// Bucket retrieves a bucket by name.
|
|
// Returns nil if the bucket does not exist.
|
|
func (tx *Tx) Bucket(name []byte) *Bucket {
|
|
return tx.root.Bucket(name)
|
|
}
|
|
|
|
// CreateBucket creates a new bucket.
|
|
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
|
|
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
|
|
return tx.root.CreateBucket(name)
|
|
}
|
|
|
|
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
|
|
// Returns an error if the bucket name is blank, or if the bucket name is too long.
|
|
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
|
|
return tx.root.CreateBucketIfNotExists(name)
|
|
}
|
|
|
|
// DeleteBucket deletes a bucket.
|
|
// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
|
|
func (tx *Tx) DeleteBucket(name []byte) error {
|
|
return tx.root.DeleteBucket(name)
|
|
}
|
|
|
|
// ForEach executes a function for each bucket in the root.
|
|
// If the provided function returns an error then the iteration is stopped and
|
|
// the error is returned to the caller.
|
|
func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
|
|
return tx.root.ForEach(func(k, v []byte) error {
|
|
if err := fn(k, tx.root.Bucket(k)); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// OnCommit adds a handler function to be executed after the transaction successfully commits.
|
|
func (tx *Tx) OnCommit(fn func()) {
|
|
tx.commitHandlers = append(tx.commitHandlers, fn)
|
|
}
|
|
|
|
// Commit writes all changes to disk and updates the meta page.
|
|
// Returns an error if a disk write error occurs.
|
|
func (tx *Tx) Commit() error {
|
|
_assert(!tx.managed, "managed tx commit not allowed")
|
|
if tx.db == nil {
|
|
return ErrTxClosed
|
|
} else if !tx.writable {
|
|
return ErrTxNotWritable
|
|
}
|
|
|
|
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
|
|
|
|
// Rebalance nodes which have had deletions.
|
|
var startTime = time.Now()
|
|
tx.root.rebalance()
|
|
if tx.stats.Rebalance > 0 {
|
|
tx.stats.RebalanceTime += time.Since(startTime)
|
|
}
|
|
|
|
// spill data onto dirty pages.
|
|
startTime = time.Now()
|
|
if err := tx.root.spill(); err != nil {
|
|
tx.rollback()
|
|
return err
|
|
}
|
|
tx.stats.SpillTime += time.Since(startTime)
|
|
|
|
// Free the old root bucket.
|
|
tx.meta.root.root = tx.root.root
|
|
|
|
// Free the freelist and allocate new pages for it. This will overestimate
|
|
// the size of the freelist but not underestimate the size (which would be bad).
|
|
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
|
|
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
|
|
if err != nil {
|
|
tx.rollback()
|
|
return err
|
|
}
|
|
if err := tx.db.freelist.write(p); err != nil {
|
|
tx.rollback()
|
|
return err
|
|
}
|
|
tx.meta.freelist = p.id
|
|
|
|
// Write dirty pages to disk.
|
|
startTime = time.Now()
|
|
if err := tx.write(); err != nil {
|
|
tx.rollback()
|
|
return err
|
|
}
|
|
|
|
// If strict mode is enabled then perform a consistency check.
|
|
// Only the first consistency error is reported in the panic.
|
|
if tx.db.StrictMode {
|
|
if err, ok := <-tx.Check(); ok {
|
|
panic("check fail: " + err.Error())
|
|
}
|
|
}
|
|
|
|
// Write meta to disk.
|
|
if err := tx.writeMeta(); err != nil {
|
|
tx.rollback()
|
|
return err
|
|
}
|
|
tx.stats.WriteTime += time.Since(startTime)
|
|
|
|
// Finalize the transaction.
|
|
tx.close()
|
|
|
|
// Execute commit handlers now that the locks have been removed.
|
|
for _, fn := range tx.commitHandlers {
|
|
fn()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Rollback closes the transaction and ignores all previous updates.
|
|
func (tx *Tx) Rollback() error {
|
|
_assert(!tx.managed, "managed tx rollback not allowed")
|
|
if tx.db == nil {
|
|
return ErrTxClosed
|
|
}
|
|
tx.rollback()
|
|
return nil
|
|
}
|
|
|
|
func (tx *Tx) rollback() {
|
|
if tx.db == nil {
|
|
return
|
|
}
|
|
if tx.writable {
|
|
tx.db.freelist.rollback(tx.meta.txid)
|
|
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
|
|
}
|
|
tx.close()
|
|
}
|
|
|
|
func (tx *Tx) close() {
|
|
if tx.db == nil {
|
|
return
|
|
}
|
|
if tx.writable {
|
|
// Grab freelist stats.
|
|
var freelistFreeN = tx.db.freelist.free_count()
|
|
var freelistPendingN = tx.db.freelist.pending_count()
|
|
var freelistAlloc = tx.db.freelist.size()
|
|
|
|
// Remove writer lock.
|
|
tx.db.rwlock.Unlock()
|
|
|
|
// Merge statistics.
|
|
tx.db.statlock.Lock()
|
|
tx.db.stats.FreePageN = freelistFreeN
|
|
tx.db.stats.PendingPageN = freelistPendingN
|
|
tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
|
|
tx.db.stats.FreelistInuse = freelistAlloc
|
|
tx.db.stats.TxStats.add(&tx.stats)
|
|
tx.db.statlock.Unlock()
|
|
} else {
|
|
tx.db.removeTx(tx)
|
|
}
|
|
tx.db = nil
|
|
}
|
|
|
|
// Copy writes the entire database to a writer.
|
|
// A reader transaction is maintained during the copy so it is safe to continue
|
|
// using the database while a copy is in progress.
|
|
// Copy will write exactly tx.Size() bytes into the writer.
|
|
func (tx *Tx) Copy(w io.Writer) error {
|
|
var f *os.File
|
|
var err error
|
|
|
|
// Attempt to open reader directly.
|
|
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil {
|
|
// Fallback to a regular open if that doesn't work.
|
|
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Copy the meta pages.
|
|
tx.db.metalock.Lock()
|
|
_, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
|
|
tx.db.metalock.Unlock()
|
|
if err != nil {
|
|
_ = f.Close()
|
|
return fmt.Errorf("meta copy: %s", err)
|
|
}
|
|
|
|
// Copy data pages.
|
|
if _, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)); err != nil {
|
|
_ = f.Close()
|
|
return err
|
|
}
|
|
|
|
return f.Close()
|
|
}
|
|
|
|
// CopyFile copies the entire database to file at the given path.
|
|
// A reader transaction is maintained during the copy so it is safe to continue
|
|
// using the database while a copy is in progress.
|
|
func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
|
|
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = tx.Copy(f)
|
|
if err != nil {
|
|
_ = f.Close()
|
|
return err
|
|
}
|
|
return f.Close()
|
|
}
|
|
|
|
// Check performs several consistency checks on the database for this transaction.
|
|
// An error is returned if any inconsistency is found.
|
|
//
|
|
// It can be safely run concurrently on a writable transaction. However, this
|
|
// incurs a high cost for large databases and databases with a lot of subbuckets
|
|
// because of caching. This overhead can be removed if running on a read-only
|
|
// transaction, however, it is not safe to execute other writer transactions at
|
|
// the same time.
|
|
func (tx *Tx) Check() <-chan error {
|
|
ch := make(chan error)
|
|
go tx.check(ch)
|
|
return ch
|
|
}
|
|
|
|
func (tx *Tx) check(ch chan error) {
|
|
// Check if any pages are double freed.
|
|
freed := make(map[pgid]bool)
|
|
for _, id := range tx.db.freelist.all() {
|
|
if freed[id] {
|
|
ch <- fmt.Errorf("page %d: already freed", id)
|
|
}
|
|
freed[id] = true
|
|
}
|
|
|
|
// Track every reachable page.
|
|
reachable := make(map[pgid]*page)
|
|
reachable[0] = tx.page(0) // meta0
|
|
reachable[1] = tx.page(1) // meta1
|
|
for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
|
|
reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
|
|
}
|
|
|
|
// Recursively check buckets.
|
|
tx.checkBucket(&tx.root, reachable, freed, ch)
|
|
|
|
// Ensure all pages below high water mark are either reachable or freed.
|
|
for i := pgid(0); i < tx.meta.pgid; i++ {
|
|
_, isReachable := reachable[i]
|
|
if !isReachable && !freed[i] {
|
|
ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
|
|
}
|
|
}
|
|
|
|
// Close the channel to signal completion.
|
|
close(ch)
|
|
}
|
|
|
|
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
|
|
// Ignore inline buckets.
|
|
if b.root == 0 {
|
|
return
|
|
}
|
|
|
|
// Check every page used by this bucket.
|
|
b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
|
|
if p.id > tx.meta.pgid {
|
|
ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
|
|
}
|
|
|
|
// Ensure each page is only referenced once.
|
|
for i := pgid(0); i <= pgid(p.overflow); i++ {
|
|
var id = p.id + i
|
|
if _, ok := reachable[id]; ok {
|
|
ch <- fmt.Errorf("page %d: multiple references", int(id))
|
|
}
|
|
reachable[id] = p
|
|
}
|
|
|
|
// We should only encounter un-freed leaf and branch pages.
|
|
if freed[p.id] {
|
|
ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
|
|
} else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
|
|
ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
|
|
}
|
|
})
|
|
|
|
// Check each bucket within this bucket.
|
|
_ = b.ForEach(func(k, v []byte) error {
|
|
if child := b.Bucket(k); child != nil {
|
|
tx.checkBucket(child, reachable, freed, ch)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// allocate returns a contiguous block of memory starting at a given page.
|
|
func (tx *Tx) allocate(count int) (*page, error) {
|
|
p, err := tx.db.allocate(count)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Save to our page cache.
|
|
tx.pages[p.id] = p
|
|
|
|
// Update statistics.
|
|
tx.stats.PageCount++
|
|
tx.stats.PageAlloc += count * tx.db.pageSize
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// write writes any dirty pages to disk.
|
|
func (tx *Tx) write() error {
|
|
// Sort pages by id.
|
|
pages := make(pages, 0, len(tx.pages))
|
|
for _, p := range tx.pages {
|
|
pages = append(pages, p)
|
|
}
|
|
sort.Sort(pages)
|
|
|
|
// Write pages to disk in order.
|
|
for _, p := range pages {
|
|
size := (int(p.overflow) + 1) * tx.db.pageSize
|
|
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
|
|
offset := int64(p.id) * int64(tx.db.pageSize)
|
|
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Update statistics.
|
|
tx.stats.Write++
|
|
}
|
|
if !tx.db.NoSync || IgnoreNoSync {
|
|
if err := fdatasync(tx.db); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Clear out page cache.
|
|
tx.pages = make(map[pgid]*page)
|
|
|
|
return nil
|
|
}
|
|
|
|
// writeMeta writes the meta to the disk.
|
|
func (tx *Tx) writeMeta() error {
|
|
// Create a temporary buffer for the meta page.
|
|
buf := make([]byte, tx.db.pageSize)
|
|
p := tx.db.pageInBuffer(buf, 0)
|
|
tx.meta.write(p)
|
|
|
|
// Write the meta page to file.
|
|
if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
|
|
return err
|
|
}
|
|
if !tx.db.NoSync || IgnoreNoSync {
|
|
if err := fdatasync(tx.db); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Update statistics.
|
|
tx.stats.Write++
|
|
|
|
return nil
|
|
}
|
|
|
|
// page returns a reference to the page with a given id.
|
|
// If page has been written to then a temporary bufferred page is returned.
|
|
func (tx *Tx) page(id pgid) *page {
|
|
// Check the dirty pages first.
|
|
if tx.pages != nil {
|
|
if p, ok := tx.pages[id]; ok {
|
|
return p
|
|
}
|
|
}
|
|
|
|
// Otherwise return directly from the mmap.
|
|
return tx.db.page(id)
|
|
}
|
|
|
|
// forEachPage iterates over every page within a given page and executes a function.
|
|
func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
|
|
p := tx.page(pgid)
|
|
|
|
// Execute function.
|
|
fn(p, depth)
|
|
|
|
// Recursively loop over children.
|
|
if (p.flags & branchPageFlag) != 0 {
|
|
for i := 0; i < int(p.count); i++ {
|
|
elem := p.branchPageElement(uint16(i))
|
|
tx.forEachPage(elem.pgid, depth+1, fn)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Page returns page information for a given page number.
|
|
// This is only safe for concurrent use when used by a writable transaction.
|
|
func (tx *Tx) Page(id int) (*PageInfo, error) {
|
|
if tx.db == nil {
|
|
return nil, ErrTxClosed
|
|
} else if pgid(id) >= tx.meta.pgid {
|
|
return nil, nil
|
|
}
|
|
|
|
// Build the page info.
|
|
p := tx.db.page(pgid(id))
|
|
info := &PageInfo{
|
|
ID: id,
|
|
Count: int(p.count),
|
|
OverflowCount: int(p.overflow),
|
|
}
|
|
|
|
// Determine the type (or if it's free).
|
|
if tx.db.freelist.freed(pgid(id)) {
|
|
info.Type = "free"
|
|
} else {
|
|
info.Type = p.typ()
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
// TxStats represents statistics about the actions performed by the transaction.
|
|
type TxStats struct {
|
|
// Page statistics.
|
|
PageCount int // number of page allocations
|
|
PageAlloc int // total bytes allocated
|
|
|
|
// Cursor statistics.
|
|
CursorCount int // number of cursors created
|
|
|
|
// Node statistics
|
|
NodeCount int // number of node allocations
|
|
NodeDeref int // number of node dereferences
|
|
|
|
// Rebalance statistics.
|
|
Rebalance int // number of node rebalances
|
|
RebalanceTime time.Duration // total time spent rebalancing
|
|
|
|
// Split/Spill statistics.
|
|
Split int // number of nodes split
|
|
Spill int // number of nodes spilled
|
|
SpillTime time.Duration // total time spent spilling
|
|
|
|
// Write statistics.
|
|
Write int // number of writes performed
|
|
WriteTime time.Duration // total time spent writing to disk
|
|
}
|
|
|
|
func (s *TxStats) add(other *TxStats) {
|
|
s.PageCount += other.PageCount
|
|
s.PageAlloc += other.PageAlloc
|
|
s.CursorCount += other.CursorCount
|
|
s.NodeCount += other.NodeCount
|
|
s.NodeDeref += other.NodeDeref
|
|
s.Rebalance += other.Rebalance
|
|
s.RebalanceTime += other.RebalanceTime
|
|
s.Split += other.Split
|
|
s.Spill += other.Spill
|
|
s.SpillTime += other.SpillTime
|
|
s.Write += other.Write
|
|
s.WriteTime += other.WriteTime
|
|
}
|
|
|
|
// Sub calculates and returns the difference between two sets of transaction stats.
|
|
// This is useful when obtaining stats at two different points and time and
|
|
// you need the performance counters that occurred within that time span.
|
|
func (s *TxStats) Sub(other *TxStats) TxStats {
|
|
var diff TxStats
|
|
diff.PageCount = s.PageCount - other.PageCount
|
|
diff.PageAlloc = s.PageAlloc - other.PageAlloc
|
|
diff.CursorCount = s.CursorCount - other.CursorCount
|
|
diff.NodeCount = s.NodeCount - other.NodeCount
|
|
diff.NodeDeref = s.NodeDeref - other.NodeDeref
|
|
diff.Rebalance = s.Rebalance - other.Rebalance
|
|
diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
|
|
diff.Split = s.Split - other.Split
|
|
diff.Spill = s.Spill - other.Spill
|
|
diff.SpillTime = s.SpillTime - other.SpillTime
|
|
diff.Write = s.Write - other.Write
|
|
diff.WriteTime = s.WriteTime - other.WriteTime
|
|
return diff
|
|
}
|