diff --git a/TODO b/TODO deleted file mode 100644 index 9c8a1a0..0000000 --- a/TODO +++ /dev/null @@ -1,13 +0,0 @@ -TODO -==== -X Open DB. -X Initialize transaction. -- Cursor First, Get(key), Next -- RWTransaction.insert() - - rebalance - - adjust cursors -- RWTransaction Commmit - - - - diff --git a/bucket.go b/bucket.go index 6653389..4114ab7 100644 --- a/bucket.go +++ b/bucket.go @@ -1,9 +1,13 @@ package bolt +import ( + "bytes" +) + // Bucket represents a collection of key/value pairs inside the database. -// All keys inside the bucket are unique. The Bucket type is not typically used -// directly. Instead the bucket name is typically passed into the Get(), Put(), -// or Delete() functions. +// All keys inside the bucket are unique. +// +// Accessing or changing data from a Bucket whose Transaction has closed will cause a panic. type Bucket struct { *bucket name string @@ -21,8 +25,9 @@ func (b *Bucket) Name() string { return b.name } -// cursor creates a new cursor for this bucket. -func (b *Bucket) cursor() *Cursor { +// Cursor creates a new cursor for this bucket. +func (b *Bucket) Cursor() *Cursor { + _assert(b.transaction.isOpen(), "transaction not open") return &Cursor{ transaction: b.transaction, root: b.root, @@ -30,8 +35,98 @@ func (b *Bucket) cursor() *Cursor { } } +// Get retrieves the value for a key in a named bucket. +// Returns a nil value if the key does not exist. +func (b *Bucket) Get(key []byte) []byte { + _assert(b.transaction.isOpen(), "transaction not open") + c := b.Cursor() + k, v := c.Seek(key) + + // If our target node isn't the same key as what's passed in then return nil. + if !bytes.Equal(key, k) { + return nil + } + + return v +} + +// Put sets the value for a key inside of the bucket. +// If the key exist then its previous value will be overwritten. +// Returns an error if bucket was created from a read-only transaction, if the +// key is blank, if the key is too large, or if the value is too large. +func (b *Bucket) Put(key []byte, value []byte) error { + _assert(b.transaction.isOpen(), "transaction not open") + if !b.transaction.writable { + return ErrTransactionNotWritable + } else if len(key) == 0 { + return ErrKeyRequired + } else if len(key) > MaxKeySize { + return ErrKeyTooLarge + } else if len(value) > MaxValueSize { + return ErrValueTooLarge + } + + // Move cursor to correct position. + c := b.Cursor() + c.Seek(key) + + // Insert the key/value. + c.node(b.transaction).put(key, key, value, 0) + + return nil +} + +// Delete removes a key from the bucket. +// If the key does not exist then nothing is done and a nil error is returned. +// Returns an error if the bucket was created from a read-only transaction. +func (b *Bucket) Delete(key []byte) error { + _assert(b.transaction.isOpen(), "transaction not open") + if !b.transaction.writable { + return ErrTransactionNotWritable + } + + // Move cursor to correct position. + c := b.Cursor() + c.Seek(key) + + // Delete the node if we have a matching key. + c.node(c.transaction).del(key) + + return nil +} + +// NextSequence returns an autoincrementing integer for the bucket. +// Returns an error if the bucket was created from a read-only transaction or +// if the next sequence will overflow the int type. +func (b *Bucket) NextSequence() (int, error) { + _assert(b.transaction.isOpen(), "transaction not open") + if !b.transaction.writable { + return 0, ErrTransactionNotWritable + } else if b.bucket.sequence == uint64(maxInt) { + return 0, ErrSequenceOverflow + } + + // Increment and return the sequence. + b.bucket.sequence++ + + return int(b.bucket.sequence), nil +} + +// ForEach executes a function for each key/value pair in a bucket. +func (b *Bucket) ForEach(fn func(k, v []byte) error) error { + _assert(b.transaction.isOpen(), "transaction not open") + c := b.Cursor() + for k, v := c.First(); k != nil; k, v = c.Next() { + if err := fn(k, v); err != nil { + return err + } + } + return nil +} + // Stat returns stats on a bucket. func (b *Bucket) Stat() *BucketStat { + _assert(b.transaction.isOpen(), "transaction not open") s := &BucketStat{} b.transaction.forEachPage(b.root, 0, func(p *page, depth int) { if (p.flags & leafPageFlag) != 0 { diff --git a/bucket_test.go b/bucket_test.go index 5e33189..2778432 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -11,23 +11,26 @@ import ( // Ensure a bucket can calculate stats. func TestBucketStat(t *testing.T) { withOpenDB(func(db *DB, path string) { - db.Do(func(txn *RWTransaction) error { + db.Do(func(txn *Transaction) error { // Add bucket with lots of keys. txn.CreateBucket("widgets") + b := txn.Bucket("widgets") for i := 0; i < 100000; i++ { - txn.Put("widgets", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) } // Add bucket with fewer keys but one big value. txn.CreateBucket("woojits") + b = txn.Bucket("woojits") for i := 0; i < 500; i++ { - txn.Put("woojits", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) } - txn.Put("woojits", []byte("really-big-value"), []byte(strings.Repeat("*", 10000))) + b.Put([]byte("really-big-value"), []byte(strings.Repeat("*", 10000))) // Add a bucket that fits on a single root leaf. txn.CreateBucket("whozawhats") - txn.Put("whozawhats", []byte("foo"), []byte("bar")) + b = txn.Bucket("whozawhats") + b.Put([]byte("foo"), []byte("bar")) return nil }) diff --git a/cursor.go b/cursor.go index 9a527af..0cb94b5 100644 --- a/cursor.go +++ b/cursor.go @@ -201,7 +201,7 @@ func (c *Cursor) keyValue() ([]byte, []byte) { } // node returns the node that the cursor is currently positioned on. -func (c *Cursor) node(t *RWTransaction) *node { +func (c *Cursor) node(t *Transaction) *node { _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack") // Start from root and traverse down the hierarchy. diff --git a/db.go b/db.go index c9b0e92..3d8c668 100644 --- a/db.go +++ b/db.go @@ -16,8 +16,12 @@ const minMmapSize = 1 << 22 // 4MB const maxMmapStep = 1 << 30 // 1GB // DB represents a collection of buckets persisted to a file on disk. -// All data access is performed through transactions which can be obtained through the DB. -// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. +// All data access is performed through transactions which can be obtained from +// the DB. There are a number of functions duplicated from the Transction type +// which provide ease-of-use, single transaction access to the data. +// +// All the functions on DB will return a ErrDatabaseNotOpen if accessed before +// Open() is called or after Close is called. type DB struct { os _os syscall _syscall @@ -29,7 +33,7 @@ type DB struct { meta1 *meta pageSize int opened bool - rwtransaction *RWTransaction + rwtransaction *Transaction transactions []*Transaction freelist *freelist @@ -43,16 +47,6 @@ func (db *DB) Path() string { return db.path } -// GoString returns the Go string representation of the database. -func (db *DB) GoString() string { - return fmt.Sprintf("bolt.DB{path:%q}", db.path) -} - -// String returns the string representation of the database. -func (db *DB) String() string { - return fmt.Sprintf("DB<%q>", db.path) -} - // Open opens a data file at the given path and initializes the database. // If the file does not exist then it will be created automatically. func (db *DB) Open(path string, mode os.FileMode) error { @@ -262,7 +256,8 @@ func (db *DB) close() { // Transaction creates a read-only transaction. // Multiple read-only transactions can be used concurrently. // -// IMPORTANT: You must close the transaction after you are finished or else the database will not reclaim old pages. +// IMPORTANT: You must close the transaction after you are finished or else the +// database will not reclaim old pages. func (db *DB) Transaction() (*Transaction, error) { db.metalock.Lock() defer db.metalock.Unlock() @@ -289,12 +284,12 @@ func (db *DB) Transaction() (*Transaction, error) { // RWTransaction creates a read/write transaction. // Only one read/write transaction is allowed at a time. -// You must call Commit() or Rollback() on the transaction to close it. -func (db *DB) RWTransaction() (*RWTransaction, error) { +// You must call Commit() or Close() on the transaction to close it. +func (db *DB) RWTransaction() (*Transaction, error) { db.metalock.Lock() defer db.metalock.Unlock() - // Obtain writer lock. This is released by the RWTransaction when it closes. + // Obtain writer lock. This is released by the writer transaction when it closes. db.rwlock.Lock() // Exit if the database is not open yet. @@ -303,8 +298,8 @@ func (db *DB) RWTransaction() (*RWTransaction, error) { return nil, ErrDatabaseNotOpen } - // Create a transaction associated with the database. - t := &RWTransaction{nodes: make(map[pgid]*node)} + // Create a writable transaction associated with the database. + t := &Transaction{writable: true, nodes: make(map[pgid]*node)} t.init(db) db.rwtransaction = t @@ -339,12 +334,12 @@ func (db *DB) removeTransaction(t *Transaction) { } } -// Do executes a function within the context of a RWTransaction. +// Do executes a function within the context of a writable Transaction. // If no error is returned from the function then the transaction is committed. // If an error is returned then the entire transaction is rolled back. // Any error that is returned from the function or returned from the commit is // returned from the Do() method. -func (db *DB) Do(fn func(*RWTransaction) error) error { +func (db *DB) Do(fn func(*Transaction) error) error { t, err := db.RWTransaction() if err != nil { return err @@ -366,7 +361,7 @@ func (db *DB) With(fn func(*Transaction) error) error { if err != nil { return err } - defer t.Close() + defer t.Rollback() // If an error is returned from the function then pass it through. return fn(t) @@ -376,28 +371,36 @@ func (db *DB) With(fn func(*Transaction) error) error { // An error is returned if the bucket cannot be found. func (db *DB) ForEach(name string, fn func(k, v []byte) error) error { return db.With(func(t *Transaction) error { - return t.ForEach(name, fn) + b := t.Bucket(name) + if b == nil { + return ErrBucketNotFound + } + return b.ForEach(fn) }) } // Bucket retrieves a reference to a bucket. // This is typically useful for checking the existence of a bucket. +// +// Do not use the returned bucket for accessing or changing data. func (db *DB) Bucket(name string) (*Bucket, error) { t, err := db.Transaction() if err != nil { return nil, err } - defer t.Close() + defer t.Rollback() return t.Bucket(name), nil } // Buckets retrieves a list of all buckets in the database. +// +// Do not use any of the returned buckets for accessing or changing data. func (db *DB) Buckets() ([]*Bucket, error) { t, err := db.Transaction() if err != nil { return nil, err } - defer t.Close() + defer t.Rollback() return t.Buckets(), nil } @@ -405,7 +408,7 @@ func (db *DB) Buckets() ([]*Bucket, error) { // This function can return an error if the bucket already exists, if the name // is blank, or the bucket name is too long. func (db *DB) CreateBucket(name string) error { - return db.Do(func(t *RWTransaction) error { + return db.Do(func(t *Transaction) error { return t.CreateBucket(name) }) } @@ -413,7 +416,7 @@ func (db *DB) CreateBucket(name string) error { // CreateBucketIfNotExists creates a new bucket with the given name if it doesn't already exist. // This function can return an error if the name is blank, or the bucket name is too long. func (db *DB) CreateBucketIfNotExists(name string) error { - return db.Do(func(t *RWTransaction) error { + return db.Do(func(t *Transaction) error { return t.CreateBucketIfNotExists(name) }) } @@ -421,7 +424,7 @@ func (db *DB) CreateBucketIfNotExists(name string) error { // DeleteBucket removes a bucket from the database. // Returns an error if the bucket does not exist. func (db *DB) DeleteBucket(name string) error { - return db.Do(func(t *RWTransaction) error { + return db.Do(func(t *Transaction) error { return t.DeleteBucket(name) }) } @@ -430,10 +433,17 @@ func (db *DB) DeleteBucket(name string) error { // This function can return an error if the bucket does not exist. func (db *DB) NextSequence(name string) (int, error) { var seq int - err := db.Do(func(t *RWTransaction) error { + err := db.Do(func(t *Transaction) error { + b := t.Bucket(name) + if b == nil { + return ErrBucketNotFound + } + var err error - seq, err = t.NextSequence(name) - return err + if seq, err = b.NextSequence(); err != nil { + return err + } + return nil }) if err != nil { return 0, err @@ -442,29 +452,43 @@ func (db *DB) NextSequence(name string) (int, error) { } // Get retrieves the value for a key in a bucket. -// Returns an error if the key does not exist. +// Returns an error if the bucket does not exist. func (db *DB) Get(name string, key []byte) ([]byte, error) { t, err := db.Transaction() if err != nil { return nil, err } - defer t.Close() - return t.Get(name, key) + defer t.Rollback() + + b := t.Bucket(name) + if b == nil { + return nil, ErrBucketNotFound + } + + return b.Get(key), nil } // Put sets the value for a key in a bucket. // Returns an error if the bucket is not found, if key is blank, if the key is too large, or if the value is too large. func (db *DB) Put(name string, key []byte, value []byte) error { - return db.Do(func(t *RWTransaction) error { - return t.Put(name, key, value) + return db.Do(func(t *Transaction) error { + b := t.Bucket(name) + if b == nil { + return ErrBucketNotFound + } + return b.Put(key, value) }) } // Delete removes a key from a bucket. // Returns an error if the bucket cannot be found. func (db *DB) Delete(name string, key []byte) error { - return db.Do(func(t *RWTransaction) error { - return t.Delete(name, key) + return db.Do(func(t *Transaction) error { + b := t.Bucket(name) + if b == nil { + return ErrBucketNotFound + } + return b.Delete(key) }) } @@ -477,7 +501,7 @@ func (db *DB) Copy(w io.Writer) error { if err != nil { return err } - defer t.Close() + defer t.Commit() // Open reader on the database. f, err := os.Open(db.path) @@ -522,7 +546,7 @@ func (db *DB) Stat() (*Stat, error) { db.mmaplock.RUnlock() db.metalock.Unlock() - err := db.Do(func(t *RWTransaction) error { + err := db.Do(func(t *Transaction) error { s.PageCount = int(t.meta.pgid) s.FreePageCount = len(db.freelist.all()) s.PageSize = db.pageSize @@ -534,6 +558,16 @@ func (db *DB) Stat() (*Stat, error) { return s, nil } +// GoString returns the Go string representation of the database. +func (db *DB) GoString() string { + return fmt.Sprintf("bolt.DB{path:%q}", db.path) +} + +// String returns the string representation of the database. +func (db *DB) String() string { + return fmt.Sprintf("DB<%q>", db.path) +} + // page retrieves a page reference from the mmap based on the current page size. func (db *DB) page(id pgid) *page { return (*page)(unsafe.Pointer(&db.data[id*pgid(db.pageSize)])) diff --git a/db_test.go b/db_test.go index 3d0b987..df15a8b 100644 --- a/db_test.go +++ b/db_test.go @@ -188,14 +188,34 @@ func TestDBDeleteFromMissingBucket(t *testing.T) { }) } +// Ensure that a Transaction can be retrieved. +func TestDBRWTransaction(t *testing.T) { + withOpenDB(func(db *DB, path string) { + txn, err := db.RWTransaction() + assert.NotNil(t, txn) + assert.NoError(t, err) + assert.Equal(t, txn.DB(), db) + }) +} + +// Ensure that opening a Transaction while the DB is closed returns an error. +func TestRWTransactionOpenWithClosedDB(t *testing.T) { + withDB(func(db *DB, path string) { + txn, err := db.RWTransaction() + assert.Equal(t, err, ErrDatabaseNotOpen) + assert.Nil(t, txn) + }) +} + // Ensure a database can provide a transactional block. func TestDBTransactionBlock(t *testing.T) { withOpenDB(func(db *DB, path string) { - err := db.Do(func(txn *RWTransaction) error { + err := db.Do(func(txn *Transaction) error { txn.CreateBucket("widgets") - txn.Put("widgets", []byte("foo"), []byte("bar")) - txn.Put("widgets", []byte("baz"), []byte("bat")) - txn.Delete("widgets", []byte("foo")) + b := txn.Bucket("widgets") + b.Put([]byte("foo"), []byte("bar")) + b.Put([]byte("baz"), []byte("bat")) + b.Delete([]byte("foo")) return nil }) assert.NoError(t, err) @@ -209,7 +229,7 @@ func TestDBTransactionBlock(t *testing.T) { // Ensure a closed database returns an error while running a transaction block func TestDBTransactionBlockWhileClosed(t *testing.T) { withDB(func(db *DB, path string) { - err := db.Do(func(txn *RWTransaction) error { + err := db.Do(func(txn *Transaction) error { txn.CreateBucket("widgets") return nil }) @@ -333,10 +353,11 @@ func TestDBCopyFile(t *testing.T) { // Ensure the database can return stats about itself. func TestDBStat(t *testing.T) { withOpenDB(func(db *DB, path string) { - db.Do(func(txn *RWTransaction) error { + db.Do(func(txn *Transaction) error { txn.CreateBucket("widgets") + b := txn.Bucket("widgets") for i := 0; i < 10000; i++ { - txn.Put("widgets", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) } return nil }) @@ -349,7 +370,7 @@ func TestDBStat(t *testing.T) { t0, _ := db.Transaction() t1, _ := db.Transaction() t2, _ := db.Transaction() - t2.Close() + t2.Rollback() // Obtain stats. stat, err := db.Stat() @@ -361,8 +382,8 @@ func TestDBStat(t *testing.T) { assert.Equal(t, stat.TransactionCount, 2) // Close readers. - t0.Close() - t1.Close() + t0.Rollback() + t1.Rollback() }) } diff --git a/doc.go b/doc.go index d7f3ec1..c948b05 100644 --- a/doc.go +++ b/doc.go @@ -10,19 +10,18 @@ optimized for fast read access and does not require recovery in the event of a system crash. Transactions which have not finished committing will simply be rolled back in the event of a crash. -The design of Bolt is based on Howard Chu's LMDB database project. +The design of Bolt is based on Howard Chu's LMDB project. Basics -There are only a few types in Bolt: DB, Bucket, Transaction, RWTransaction, and -Cursor. The DB is a collection of buckets and is represented by a single file -on disk. A bucket is a collection of unique keys that are associated with values. +There are only a few types in Bolt: DB, Bucket, Transaction, and Cursor. The DB +is a collection of buckets and is represented by a single file on disk. A +bucket is a collection of unique keys that are associated with values. -Transactions provide read-only access to data inside the database. They can -retrieve key/value pairs and can use Cursors to iterate over the entire dataset. -RWTransactions provide read-write access to the database. They can create and -delete buckets and they can insert and remove keys. Only one RWTransaction is -allowed at a time. +Transactions provide a consistent view of the database. They can be used for +retrieving, setting, and deleting properties. They can also be used to iterate +over all the values in a bucket. Only one writer Transaction can be in use at +a time. Caveats @@ -30,8 +29,8 @@ Caveats The database uses a read-only, memory-mapped data file to ensure that applications cannot corrupt the database, however, this means that keys and values returned from Bolt cannot be changed. Writing to a read-only byte slice -will cause Go to panic. If you need to work with data returned from a Get() you -need to first copy it to a new byte slice. +will cause Go to panic. If you need to alter data returned from a Transaction +you need to first copy it to a new byte slice. Bolt currently works on Mac OS and Linux. Windows support is coming soon. diff --git a/error.go b/error.go index 7238203..36f09a2 100644 --- a/error.go +++ b/error.go @@ -16,6 +16,10 @@ var ( // already open. ErrDatabaseOpen = &Error{"database already open", nil} + // ErrTransactionNotWritable is returned changing data using a read-only + // transaction. + ErrTransactionNotWritable = &Error{"transaction not writable", nil} + // ErrBucketNotFound is returned when trying to access a bucket that has // not been created yet. ErrBucketNotFound = &Error{"bucket not found", nil} diff --git a/example_test.go b/example_test.go index 8747f94..b3d5d20 100644 --- a/example_test.go +++ b/example_test.go @@ -67,11 +67,13 @@ func ExampleDB_Do() { defer db.Close() // Execute several commands within a write transaction. - err := db.Do(func(t *RWTransaction) error { + err := db.Do(func(t *Transaction) error { if err := t.CreateBucket("widgets"); err != nil { return err } - if err := t.Put("widgets", []byte("foo"), []byte("bar")); err != nil { + + b := t.Bucket("widgets") + if err := b.Put([]byte("foo"), []byte("bar")); err != nil { return err } return nil @@ -100,7 +102,7 @@ func ExampleDB_With() { // Access data from within a read-only transactional block. db.With(func(t *Transaction) error { - v, _ := t.Get("people", []byte("john")) + v := t.Bucket("people").Get([]byte("john")) fmt.Printf("John's last name is %s.\n", string(v)) return nil }) @@ -133,29 +135,30 @@ func ExampleDB_ForEach() { // A liger is awesome. } -func ExampleRWTransaction() { +func ExampleTransaction_Commit() { // Open the database. var db DB - db.Open("/tmp/bolt/rwtransaction.db", 0666) + db.Open("/tmp/bolt/db_rwtransaction.db", 0666) defer db.Close() // Create a bucket. db.CreateBucket("widgets") // Create several keys in a transaction. - rwtxn, _ := db.RWTransaction() - rwtxn.Put("widgets", []byte("john"), []byte("blue")) - rwtxn.Put("widgets", []byte("abby"), []byte("red")) - rwtxn.Put("widgets", []byte("zephyr"), []byte("purple")) - rwtxn.Commit() + txn, _ := db.RWTransaction() + b := txn.Bucket("widgets") + b.Put([]byte("john"), []byte("blue")) + b.Put([]byte("abby"), []byte("red")) + b.Put([]byte("zephyr"), []byte("purple")) + txn.Commit() // Iterate over the values in sorted key order. - txn, _ := db.Transaction() - c, _ := txn.Cursor("widgets") + txn, _ = db.Transaction() + c := txn.Bucket("widgets").Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { fmt.Printf("%s likes %s\n", string(k), string(v)) } - txn.Close() + txn.Rollback() // Output: // abby likes red @@ -163,10 +166,10 @@ func ExampleRWTransaction() { // zephyr likes purple } -func ExampleRWTransaction_rollback() { +func ExampleTransaction_Rollback() { // Open the database. var db DB - db.Open("/tmp/bolt/rwtransaction_rollback.db", 0666) + db.Open("/tmp/bolt/transaction_close.db", 0666) defer db.Close() // Create a bucket. @@ -176,9 +179,9 @@ func ExampleRWTransaction_rollback() { db.Put("widgets", []byte("foo"), []byte("bar")) // Update the key but rollback the transaction so it never saves. - rwtxn, _ := db.RWTransaction() - rwtxn.Put("widgets", []byte("foo"), []byte("baz")) - rwtxn.Rollback() + txn, _ := db.RWTransaction() + txn.Bucket("widgets").Put([]byte("foo"), []byte("baz")) + txn.Rollback() // Ensure that our original value is still set. value, _ := db.Get("widgets", []byte("foo")) diff --git a/functional_test.go b/functional_test.go index f7c3c3a..acd2f4a 100644 --- a/functional_test.go +++ b/functional_test.go @@ -56,15 +56,15 @@ func TestParallelTransactions(t *testing.T) { // Verify all data is in for local data list. for _, item := range local { - value, err := txn.Get("widgets", item.Key) + value := txn.Bucket("widgets").Get(item.Key) if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) { - txn.Close() + txn.Rollback() wg.Done() t.FailNow() } } - txn.Close() + txn.Rollback() wg.Done() <-readers }() @@ -89,8 +89,9 @@ func TestParallelTransactions(t *testing.T) { } // Insert whole batch. + b := txn.Bucket("widgets") for _, item := range batchItems { - err := txn.Put("widgets", item.Key, item.Value) + err := b.Put(item.Key, item.Value) if !assert.NoError(t, err) { t.FailNow() } diff --git a/node.go b/node.go index 68f651e..81d2667 100644 --- a/node.go +++ b/node.go @@ -8,7 +8,7 @@ import ( // node represents an in-memory, deserialized page. type node struct { - transaction *RWTransaction + transaction *Transaction isLeaf bool unbalanced bool key []byte diff --git a/rwtransaction.go b/rwtransaction.go deleted file mode 100644 index e22c766..0000000 --- a/rwtransaction.go +++ /dev/null @@ -1,355 +0,0 @@ -package bolt - -import ( - "sort" - "unsafe" -) - -// RWTransaction represents a transaction that can read and write data. -// Only one read/write transaction can be active for a database at a time. -// RWTransaction is composed of a read-only Transaction so it can also use -// functions provided by Transaction. -type RWTransaction struct { - Transaction - nodes map[pgid]*node - pending []*node -} - -// init initializes the transaction. -func (t *RWTransaction) init(db *DB) { - t.Transaction.init(db) - t.pages = make(map[pgid]*page) - - // Increment the transaction id. - t.meta.txnid += txnid(1) -} - -// CreateBucket creates a new bucket. -// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. -func (t *RWTransaction) CreateBucket(name string) error { - // Check if bucket already exists. - if b := t.Bucket(name); b != nil { - return ErrBucketExists - } else if len(name) == 0 { - return ErrBucketNameRequired - } else if len(name) > MaxBucketNameSize { - return ErrBucketNameTooLarge - } - - // Create a blank root leaf page. - p, err := t.allocate(1) - if err != nil { - return err - } - p.flags = leafPageFlag - - // Add bucket to buckets page. - t.buckets.put(name, &bucket{root: p.id}) - - return nil -} - -// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. -// Returns an error if the bucket name is blank, or if the bucket name is too long. -func (t *RWTransaction) CreateBucketIfNotExists(name string) error { - err := t.CreateBucket(name) - if err != nil && err != ErrBucketExists { - return err - } - return nil -} - -// DeleteBucket deletes a bucket. -// Returns an error if the bucket cannot be found. -func (t *RWTransaction) DeleteBucket(name string) error { - if b := t.Bucket(name); b == nil { - return ErrBucketNotFound - } - - // Remove from buckets page. - t.buckets.del(name) - - // TODO(benbjohnson): Free all pages. - - return nil -} - -// NextSequence returns an autoincrementing integer for the bucket. -func (t *RWTransaction) NextSequence(name string) (int, error) { - // Check if bucket already exists. - b := t.Bucket(name) - if b == nil { - return 0, ErrBucketNotFound - } - - // Make sure next sequence number will not be larger than the maximum - // integer size of the system. - if b.bucket.sequence == uint64(maxInt) { - return 0, ErrSequenceOverflow - } - - // Increment and return the sequence. - b.bucket.sequence++ - - return int(b.bucket.sequence), nil -} - -// Put sets the value for a key inside of the named bucket. -// If the key exist then its previous value will be overwritten. -// Returns an error if the bucket is not found, if the key is blank, if the key is too large, or if the value is too large. -func (t *RWTransaction) Put(name string, key []byte, value []byte) error { - b := t.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - - // Validate the key and data size. - if len(key) == 0 { - return ErrKeyRequired - } else if len(key) > MaxKeySize { - return ErrKeyTooLarge - } else if len(value) > MaxValueSize { - return ErrValueTooLarge - } - - // Move cursor to correct position. - c := b.cursor() - c.Seek(key) - - // Insert the key/value. - c.node(t).put(key, key, value, 0) - - return nil -} - -// Delete removes a key from the named bucket. -// If the key does not exist then nothing is done and a nil error is returned. -// Returns an error if the bucket cannot be found. -func (t *RWTransaction) Delete(name string, key []byte) error { - b := t.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - - // Move cursor to correct position. - c := b.cursor() - c.Seek(key) - - // Delete the node if we have a matching key. - c.node(t).del(key) - - return nil -} - -// Commit writes all changes to disk and updates the meta page. -// Returns an error if a disk write error occurs. -func (t *RWTransaction) Commit() error { - defer t.close() - - // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - - // Rebalance and spill data onto dirty pages. - t.rebalance() - t.spill() - - // Spill buckets page. - p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) - if err != nil { - return err - } - t.buckets.write(p) - - // Write dirty pages to disk. - if err := t.write(); err != nil { - return err - } - - // Update the meta. - t.meta.buckets = p.id - - // Write meta to disk. - if err := t.writeMeta(); err != nil { - return err - } - - return nil -} - -// Rollback closes the transaction and ignores all previous updates. -func (t *RWTransaction) Rollback() { - t.close() -} - -func (t *RWTransaction) close() { - t.db.rwlock.Unlock() -} - -// allocate returns a contiguous block of memory starting at a given page. -func (t *RWTransaction) allocate(count int) (*page, error) { - p, err := t.db.allocate(count) - if err != nil { - return nil, err - } - - // Save to our page cache. - t.pages[p.id] = p - - return p, nil -} - -// rebalance attempts to balance all nodes. -func (t *RWTransaction) rebalance() { - for _, n := range t.nodes { - n.rebalance() - } -} - -// spill writes all the nodes to dirty pages. -func (t *RWTransaction) spill() error { - // Keep track of the current root nodes. - // We will update this at the end once all nodes are created. - type root struct { - node *node - pgid pgid - } - var roots []root - - // Sort nodes by highest depth first. - nodes := make(nodesByDepth, 0, len(t.nodes)) - for _, n := range t.nodes { - nodes = append(nodes, n) - } - sort.Sort(nodes) - - // Spill nodes by deepest first. - for i := 0; i < len(nodes); i++ { - n := nodes[i] - - // Save existing root buckets for later. - if n.parent == nil && n.pgid != 0 { - roots = append(roots, root{n, n.pgid}) - } - - // Split nodes into appropriate sized nodes. - // The first node in this list will be a reference to n to preserve ancestry. - newNodes := n.split(t.db.pageSize) - t.pending = newNodes - - // If this is a root node that split then create a parent node. - if n.parent == nil && len(newNodes) > 1 { - n.parent = &node{transaction: t, isLeaf: false} - nodes = append(nodes, n.parent) - } - - // Add node's page to the freelist. - if n.pgid > 0 { - t.db.freelist.free(t.id(), t.page(n.pgid)) - } - - // Write nodes to dirty pages. - for i, newNode := range newNodes { - // Allocate contiguous space for the node. - p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) - if err != nil { - return err - } - - // Write the node to the page. - newNode.write(p) - newNode.pgid = p.id - newNode.parent = n.parent - - // The first node should use the existing entry, other nodes are inserts. - var oldKey []byte - if i == 0 { - oldKey = n.key - } else { - oldKey = newNode.inodes[0].key - } - - // Update the parent entry. - if newNode.parent != nil { - newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) - } - } - - t.pending = nil - } - - // Update roots with new roots. - for _, root := range roots { - t.buckets.updateRoot(root.pgid, root.node.root().pgid) - } - - // Clear out nodes now that they are all spilled. - t.nodes = make(map[pgid]*node) - - return nil -} - -// write writes any dirty pages to disk. -func (t *RWTransaction) write() error { - // Sort pages by id. - pages := make(pages, 0, len(t.pages)) - for _, p := range t.pages { - pages = append(pages, p) - } - sort.Sort(pages) - - // Write pages to disk in order. - for _, p := range pages { - size := (int(p.overflow) + 1) * t.db.pageSize - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] - offset := int64(p.id) * int64(t.db.pageSize) - if _, err := t.db.file.WriteAt(buf, offset); err != nil { - return err - } - } - - // Clear out page cache. - t.pages = make(map[pgid]*page) - - return nil -} - -// writeMeta writes the meta to the disk. -func (t *RWTransaction) writeMeta() error { - // Create a temporary buffer for the meta page. - buf := make([]byte, t.db.pageSize) - p := t.db.pageInBuffer(buf, 0) - t.meta.write(p) - - // Write the meta page to file. - t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) - - return nil -} - -// node creates a node from a page and associates it with a given parent. -func (t *RWTransaction) node(pgid pgid, parent *node) *node { - // Retrieve node if it has already been fetched. - if n := t.nodes[pgid]; n != nil { - return n - } - - // Otherwise create a branch and cache it. - n := &node{transaction: t, parent: parent} - if n.parent != nil { - n.depth = n.parent.depth + 1 - } - n.read(t.page(pgid)) - t.nodes[pgid] = n - - return n -} - -// dereference removes all references to the old mmap. -func (t *RWTransaction) dereference() { - for _, n := range t.nodes { - n.dereference() - } - - for _, n := range t.pending { - n.dereference() - } -} diff --git a/rwtransaction_test.go b/rwtransaction_test.go deleted file mode 100644 index 18b6ae9..0000000 --- a/rwtransaction_test.go +++ /dev/null @@ -1,306 +0,0 @@ -package bolt - -import ( - "bytes" - "fmt" - "os" - "strings" - "testing" - "testing/quick" - - "github.com/stretchr/testify/assert" -) - -// Ensure that a RWTransaction can be retrieved. -func TestRWTransaction(t *testing.T) { - withOpenDB(func(db *DB, path string) { - txn, err := db.RWTransaction() - assert.NotNil(t, txn) - assert.NoError(t, err) - assert.Equal(t, txn.DB(), db) - }) -} - -// Ensure that opening a RWTransaction while the DB is closed returns an error. -func TestRWTransactionOpenWithClosedDB(t *testing.T) { - withDB(func(db *DB, path string) { - txn, err := db.RWTransaction() - assert.Equal(t, err, ErrDatabaseNotOpen) - assert.Nil(t, txn) - }) -} - -// Ensure that a bucket can be created and retrieved. -func TestRWTransactionCreateBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - // Create a bucket. - err := db.CreateBucket("widgets") - assert.NoError(t, err) - - // Read the bucket through a separate transaction. - b, err := db.Bucket("widgets") - assert.NotNil(t, b) - assert.NoError(t, err) - }) -} - -// Ensure that a bucket can be created if it doesn't already exist. -func TestRWTransactionCreateBucketIfNotExists(t *testing.T) { - withOpenDB(func(db *DB, path string) { - assert.NoError(t, db.CreateBucketIfNotExists("widgets")) - assert.NoError(t, db.CreateBucketIfNotExists("widgets")) - - // Read the bucket through a separate transaction. - b, err := db.Bucket("widgets") - assert.NotNil(t, b) - assert.NoError(t, err) - }) -} - -// Ensure that a bucket cannot be created twice. -func TestRWTransactionRecreateBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - // Create a bucket. - err := db.CreateBucket("widgets") - assert.NoError(t, err) - - // Create the same bucket again. - err = db.CreateBucket("widgets") - assert.Equal(t, err, ErrBucketExists) - }) -} - -// Ensure that a bucket is created with a non-blank name. -func TestRWTransactionCreateBucketWithoutName(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.CreateBucket("") - assert.Equal(t, err, ErrBucketNameRequired) - }) -} - -// Ensure that a bucket name is not too long. -func TestRWTransactionCreateBucketWithLongName(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.CreateBucket(strings.Repeat("X", 255)) - assert.NoError(t, err) - - err = db.CreateBucket(strings.Repeat("X", 256)) - assert.Equal(t, err, ErrBucketNameTooLarge) - }) -} - -// Ensure that a bucket can be deleted. -func TestRWTransactionDeleteBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - // Create a bucket and add a value. - db.CreateBucket("widgets") - db.Put("widgets", []byte("foo"), []byte("bar")) - - // Delete the bucket and make sure we can't get the value. - assert.NoError(t, db.DeleteBucket("widgets")) - value, err := db.Get("widgets", []byte("foo")) - assert.Equal(t, err, ErrBucketNotFound) - assert.Nil(t, value) - - // Create the bucket again and make sure there's not a phantom value. - assert.NoError(t, db.CreateBucket("widgets")) - value, err = db.Get("widgets", []byte("foo")) - assert.NoError(t, err) - assert.Nil(t, value) - }) -} - -// Ensure that a bucket can return an autoincrementing sequence. -func TestRWTransactionNextSequence(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - db.CreateBucket("woojits") - - // Make sure sequence increments. - seq, err := db.NextSequence("widgets") - assert.NoError(t, err) - assert.Equal(t, seq, 1) - seq, err = db.NextSequence("widgets") - assert.NoError(t, err) - assert.Equal(t, seq, 2) - - // Buckets should be separate. - seq, err = db.NextSequence("woojits") - assert.NoError(t, err) - assert.Equal(t, seq, 1) - - // Missing buckets return an error. - seq, err = db.NextSequence("no_such_bucket") - assert.Equal(t, err, ErrBucketNotFound) - assert.Equal(t, seq, 0) - }) -} - -// Ensure that incrementing past the maximum sequence number will return an error. -func TestRWTransactionNextSequenceOverflow(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - db.Do(func(txn *RWTransaction) error { - b := txn.Bucket("widgets") - b.bucket.sequence = uint64(maxInt) - seq, err := txn.NextSequence("widgets") - assert.Equal(t, err, ErrSequenceOverflow) - assert.Equal(t, seq, 0) - return nil - }) - }) -} - -// Ensure that an error is returned when inserting into a bucket that doesn't exist. -func TestRWTransactionPutBucketNotFound(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.Put("widgets", []byte("foo"), []byte("bar")) - assert.Equal(t, err, ErrBucketNotFound) - }) -} - -// Ensure that an error is returned when inserting with an empty key. -func TestRWTransactionPutEmptyKey(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - err := db.Put("widgets", []byte(""), []byte("bar")) - assert.Equal(t, err, ErrKeyRequired) - err = db.Put("widgets", nil, []byte("bar")) - assert.Equal(t, err, ErrKeyRequired) - }) -} - -// Ensure that an error is returned when inserting with a key that's too large. -func TestRWTransactionPutKeyTooLarge(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - err := db.Put("widgets", make([]byte, 32769), []byte("bar")) - assert.Equal(t, err, ErrKeyTooLarge) - }) -} - -// Ensure that an error is returned when deleting from a bucket that doesn't exist. -func TestRWTransactionDeleteBucketNotFound(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.DeleteBucket("widgets") - assert.Equal(t, err, ErrBucketNotFound) - }) -} - -// Ensure that a bucket can write random keys and values across multiple txns. -func TestRWTransactionPutSingle(t *testing.T) { - index := 0 - f := func(items testdata) bool { - withOpenDB(func(db *DB, path string) { - m := make(map[string][]byte) - - db.CreateBucket("widgets") - for _, item := range items { - if err := db.Put("widgets", item.Key, item.Value); err != nil { - panic("put error: " + err.Error()) - } - m[string(item.Key)] = item.Value - - // Verify all key/values so far. - i := 0 - for k, v := range m { - value, err := db.Get("widgets", []byte(k)) - if err != nil { - panic("get error: " + err.Error()) - } - if !bytes.Equal(value, v) { - db.CopyFile("/tmp/bolt.put.single.db", 0666) - t.Fatalf("value mismatch [run %d] (%d of %d):\nkey: %x\ngot: %x\nexp: %x", index, i, len(m), []byte(k), value, v) - } - i++ - } - } - - fmt.Fprint(os.Stderr, ".") - }) - index++ - return true - } - if err := quick.Check(f, qconfig()); err != nil { - t.Error(err) - } - fmt.Fprint(os.Stderr, "\n") -} - -// Ensure that a transaction can insert multiple key/value pairs at once. -func TestRWTransactionPutMultiple(t *testing.T) { - f := func(items testdata) bool { - withOpenDB(func(db *DB, path string) { - // Bulk insert all values. - db.CreateBucket("widgets") - rwtxn, _ := db.RWTransaction() - for _, item := range items { - assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) - } - assert.NoError(t, rwtxn.Commit()) - - // Verify all items exist. - txn, _ := db.Transaction() - for _, item := range items { - value, err := txn.Get("widgets", item.Key) - assert.NoError(t, err) - if !assert.Equal(t, item.Value, value) { - db.CopyFile("/tmp/bolt.put.multiple.db", 0666) - t.FailNow() - } - } - txn.Close() - }) - fmt.Fprint(os.Stderr, ".") - return true - } - if err := quick.Check(f, qconfig()); err != nil { - t.Error(err) - } - fmt.Fprint(os.Stderr, "\n") -} - -// Ensure that a transaction can delete all key/value pairs and return to a single leaf page. -func TestRWTransactionDelete(t *testing.T) { - f := func(items testdata) bool { - withOpenDB(func(db *DB, path string) { - // Bulk insert all values. - db.CreateBucket("widgets") - rwtxn, _ := db.RWTransaction() - for _, item := range items { - assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) - } - assert.NoError(t, rwtxn.Commit()) - - // Remove items one at a time and check consistency. - for i, item := range items { - assert.NoError(t, db.Delete("widgets", item.Key)) - - // Anything before our deletion index should be nil. - txn, _ := db.Transaction() - for j, exp := range items { - if j > i { - value, err := txn.Get("widgets", exp.Key) - assert.NoError(t, err) - if !assert.Equal(t, exp.Value, value) { - t.FailNow() - } - } else { - value, err := txn.Get("widgets", exp.Key) - assert.NoError(t, err) - if !assert.Nil(t, value) { - t.FailNow() - } - } - } - txn.Close() - } - }) - fmt.Fprint(os.Stderr, ".") - return true - } - if err := quick.Check(f, qconfig()); err != nil { - t.Error(err) - } - fmt.Fprint(os.Stderr, "\n") -} diff --git a/transaction.go b/transaction.go index 6e9ca8f..1bb990b 100644 --- a/transaction.go +++ b/transaction.go @@ -1,27 +1,28 @@ package bolt import ( - "bytes" + "sort" + "unsafe" ) -// Transaction represents a read-only transaction on the database. -// It can be used for retrieving values for keys as well as creating cursors for -// iterating over the data. -// -// IMPORTANT: You must close transactions when you are done with them. Pages -// can not be reclaimed by the writer until no more transactions are using them. -// A long running read transaction can cause the database to quickly grow. -type Transaction struct { - db *DB - meta *meta - buckets *buckets - pages map[pgid]*page -} - // txnid represents the internal transaction identifier. type txnid uint64 -// init initializes the transaction and associates it with a database. +// Transaction represents a consistent view into the database. +// Read-only transactions can be created by calling DB.Transaction(). +// Read-write transactions can be created by calling DB.RWTransaction(). +// Only one read-write transaction is allowed at a time. +type Transaction struct { + db *DB + meta *meta + buckets *buckets + writable bool + pages map[pgid]*page + nodes map[pgid]*node + pending []*node +} + +// init initializes the transaction. func (t *Transaction) init(db *DB) { t.db = db t.pages = nil @@ -33,6 +34,11 @@ func (t *Transaction) init(db *DB) { // Read in the buckets page. t.buckets = &buckets{} t.buckets.read(t.page(t.meta.buckets)) + + t.pages = make(map[pgid]*page) + + // Increment the transaction id. + t.meta.txnid += txnid(1) } // id returns the transaction id. @@ -40,19 +46,20 @@ func (t *Transaction) id() txnid { return t.meta.txnid } -// Close closes the transaction and releases any pages it is using. -func (t *Transaction) Close() { - t.db.removeTransaction(t) -} - // DB returns a reference to the database that created the transaction. func (t *Transaction) DB() *DB { return t.db } +// Writable returns whether the transaction can change data. +func (t *Transaction) Writable() bool { + return t.writable +} + // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. func (t *Transaction) Bucket(name string) *Bucket { + _assert(t.isOpen(), "transaction not open") b := t.buckets.get(name) if b == nil { return nil @@ -67,6 +74,7 @@ func (t *Transaction) Bucket(name string) *Bucket { // Buckets retrieves a list of all buckets. func (t *Transaction) Buckets() []*Bucket { + _assert(t.isOpen(), "transaction not open") buckets := make([]*Bucket, 0, len(t.buckets.items)) for name, b := range t.buckets.items { bucket := &Bucket{bucket: b, transaction: t, name: name} @@ -75,49 +83,263 @@ func (t *Transaction) Buckets() []*Bucket { return buckets } -// Cursor creates a cursor associated with a given bucket. -// The cursor is only valid as long as the Transaction is open. -// Do not use a cursor after the transaction is closed. -func (t *Transaction) Cursor(name string) (*Cursor, error) { - b := t.Bucket(name) - if b == nil { - return nil, ErrBucketNotFound +// CreateBucket creates a new bucket. +// Returns an error if the transaction is read-only, if bucket already exists, +// if the bucket name is blank, or if the bucket name is too long. +func (t *Transaction) CreateBucket(name string) error { + _assert(t.isOpen(), "transaction not open") + if !t.writable { + return ErrTransactionNotWritable + } else if b := t.Bucket(name); b != nil { + return ErrBucketExists + } else if len(name) == 0 { + return ErrBucketNameRequired + } else if len(name) > MaxBucketNameSize { + return ErrBucketNameTooLarge } - return b.cursor(), nil -} -// Get retrieves the value for a key in a named bucket. -// Returns a nil value if the key does not exist. -// Returns an error if the bucket does not exist. -func (t *Transaction) Get(name string, key []byte) (value []byte, err error) { - c, err := t.Cursor(name) - if err != nil { - return nil, err - } - k, v := c.Seek(key) - // If our target node isn't the same key as what's passed in then return nil. - if !bytes.Equal(key, k) { - return nil, nil - } - return v, nil -} - -// ForEach executes a function for each key/value pair in a bucket. -// An error is returned if the bucket cannot be found. -func (t *Transaction) ForEach(name string, fn func(k, v []byte) error) error { - // Open a cursor on the bucket. - c, err := t.Cursor(name) + // Create a blank root leaf page. + p, err := t.allocate(1) if err != nil { return err } + p.flags = leafPageFlag - // Iterate over each key/value pair in the bucket. - for k, v := c.First(); k != nil; k, v = c.Next() { - if err := fn(k, v); err != nil { + // Add bucket to buckets page. + t.buckets.put(name, &bucket{root: p.id}) + + return nil +} + +// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. +// Returns an error if the transaction is read-only, if the bucket name is +// blank, or if the bucket name is too long. +func (t *Transaction) CreateBucketIfNotExists(name string) error { + _assert(t.isOpen(), "transaction not open") + err := t.CreateBucket(name) + if err != nil && err != ErrBucketExists { + return err + } + return nil +} + +// DeleteBucket deletes a bucket. +// Returns an error if the transaction is read-only or if the bucket cannot be found. +func (t *Transaction) DeleteBucket(name string) error { + _assert(t.isOpen(), "transaction not open") + if !t.writable { + return ErrTransactionNotWritable + } else if b := t.Bucket(name); b == nil { + return ErrBucketNotFound + } + + // Remove from buckets page. + t.buckets.del(name) + + // TODO(benbjohnson): Free all pages. + + return nil +} + +// Commit writes all changes to disk and updates the meta page. +// Read-only transactions will simply be closed. +// Returns an error if a disk write error occurs. +func (t *Transaction) Commit() error { + defer t.close() + + // Ignore commit for read-only transactions. + if !t.writable { + return nil + } + + // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. + + // Rebalance and spill data onto dirty pages. + t.rebalance() + t.spill() + + // Spill buckets page. + p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) + if err != nil { + return err + } + t.buckets.write(p) + + // Write dirty pages to disk. + if err := t.write(); err != nil { + return err + } + + // Update the meta. + t.meta.buckets = p.id + + // Write meta to disk. + if err := t.writeMeta(); err != nil { + return err + } + + return nil +} + +// Rollback closes the transaction and rolls back any pending changes. +func (t *Transaction) Rollback() { + t.close() +} + +func (t *Transaction) close() { + if t.writable { + t.db.rwlock.Unlock() + } else { + t.db.removeTransaction(t) + } + + // Detach from the database. + t.db = nil +} + +// isOpen returns whether the transaction is currently open. +func (t *Transaction) isOpen() bool { + return t.db != nil +} + +// allocate returns a contiguous block of memory starting at a given page. +func (t *Transaction) allocate(count int) (*page, error) { + p, err := t.db.allocate(count) + if err != nil { + return nil, err + } + + // Save to our page cache. + t.pages[p.id] = p + + return p, nil +} + +// rebalance attempts to balance all nodes. +func (t *Transaction) rebalance() { + for _, n := range t.nodes { + n.rebalance() + } +} + +// spill writes all the nodes to dirty pages. +func (t *Transaction) spill() error { + // Keep track of the current root nodes. + // We will update this at the end once all nodes are created. + type root struct { + node *node + pgid pgid + } + var roots []root + + // Sort nodes by highest depth first. + nodes := make(nodesByDepth, 0, len(t.nodes)) + for _, n := range t.nodes { + nodes = append(nodes, n) + } + sort.Sort(nodes) + + // Spill nodes by deepest first. + for i := 0; i < len(nodes); i++ { + n := nodes[i] + + // Save existing root buckets for later. + if n.parent == nil && n.pgid != 0 { + roots = append(roots, root{n, n.pgid}) + } + + // Split nodes into appropriate sized nodes. + // The first node in this list will be a reference to n to preserve ancestry. + newNodes := n.split(t.db.pageSize) + t.pending = newNodes + + // If this is a root node that split then create a parent node. + if n.parent == nil && len(newNodes) > 1 { + n.parent = &node{transaction: t, isLeaf: false} + nodes = append(nodes, n.parent) + } + + // Add node's page to the freelist. + if n.pgid > 0 { + t.db.freelist.free(t.id(), t.page(n.pgid)) + } + + // Write nodes to dirty pages. + for i, newNode := range newNodes { + // Allocate contiguous space for the node. + p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) + if err != nil { + return err + } + + // Write the node to the page. + newNode.write(p) + newNode.pgid = p.id + newNode.parent = n.parent + + // The first node should use the existing entry, other nodes are inserts. + var oldKey []byte + if i == 0 { + oldKey = n.key + } else { + oldKey = newNode.inodes[0].key + } + + // Update the parent entry. + if newNode.parent != nil { + newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) + } + } + + t.pending = nil + } + + // Update roots with new roots. + for _, root := range roots { + t.buckets.updateRoot(root.pgid, root.node.root().pgid) + } + + // Clear out nodes now that they are all spilled. + t.nodes = make(map[pgid]*node) + + return nil +} + +// write writes any dirty pages to disk. +func (t *Transaction) write() error { + // Sort pages by id. + pages := make(pages, 0, len(t.pages)) + for _, p := range t.pages { + pages = append(pages, p) + } + sort.Sort(pages) + + // Write pages to disk in order. + for _, p := range pages { + size := (int(p.overflow) + 1) * t.db.pageSize + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] + offset := int64(p.id) * int64(t.db.pageSize) + if _, err := t.db.file.WriteAt(buf, offset); err != nil { return err } } + // Clear out page cache. + t.pages = make(map[pgid]*page) + + return nil +} + +// writeMeta writes the meta to the disk. +func (t *Transaction) writeMeta() error { + // Create a temporary buffer for the meta page. + buf := make([]byte, t.db.pageSize) + p := t.db.pageInBuffer(buf, 0) + t.meta.write(p) + + // Write the meta page to file. + t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + return nil } @@ -135,6 +357,35 @@ func (t *Transaction) page(id pgid) *page { return t.db.page(id) } +// node creates a node from a page and associates it with a given parent. +func (t *Transaction) node(pgid pgid, parent *node) *node { + // Retrieve node if it has already been fetched. + if n := t.nodes[pgid]; n != nil { + return n + } + + // Otherwise create a branch and cache it. + n := &node{transaction: t, parent: parent} + if n.parent != nil { + n.depth = n.parent.depth + 1 + } + n.read(t.page(pgid)) + t.nodes[pgid] = n + + return n +} + +// dereference removes all references to the old mmap. +func (t *Transaction) dereference() { + for _, n := range t.nodes { + n.dereference() + } + + for _, n := range t.pending { + n.dereference() + } +} + // forEachPage iterates over every page within a given page and executes a function. func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := t.page(pgid) diff --git a/transaction_test.go b/transaction_test.go index 4a7170c..880b299 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -1,9 +1,11 @@ package bolt import ( + "bytes" "fmt" "os" "sort" + "strings" "testing" "testing/quick" @@ -53,24 +55,11 @@ func TestTransactionCursorEmptyBucket(t *testing.T) { withOpenDB(func(db *DB, path string) { db.CreateBucket("widgets") txn, _ := db.Transaction() - c, err := txn.Cursor("widgets") - assert.NoError(t, err) + c := txn.Bucket("widgets").Cursor() k, v := c.First() assert.Nil(t, k) assert.Nil(t, v) - txn.Close() - }) -} - -// Ensure that a Transaction returns a nil when a bucket doesn't exist. -func TestTransactionCursorMissingBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - txn, _ := db.Transaction() - c, err := txn.Cursor("woojits") - assert.Nil(t, c) - assert.Equal(t, err, ErrBucketNotFound) - txn.Close() + txn.Rollback() }) } @@ -82,8 +71,7 @@ func TestTransactionCursorLeafRoot(t *testing.T) { db.Put("widgets", []byte("foo"), []byte{0}) db.Put("widgets", []byte("bar"), []byte{1}) txn, _ := db.Transaction() - c, err := txn.Cursor("widgets") - assert.NoError(t, err) + c := txn.Bucket("widgets").Cursor() k, v := c.First() assert.Equal(t, string(k), "bar") @@ -105,7 +93,7 @@ func TestTransactionCursorLeafRoot(t *testing.T) { assert.Nil(t, k) assert.Nil(t, v) - txn.Close() + txn.Rollback() }) } @@ -117,8 +105,7 @@ func TestTransactionCursorLeafRootReverse(t *testing.T) { db.Put("widgets", []byte("foo"), []byte{0}) db.Put("widgets", []byte("bar"), []byte{1}) txn, _ := db.Transaction() - c, err := txn.Cursor("widgets") - assert.NoError(t, err) + c := txn.Bucket("widgets").Cursor() k, v := c.Last() assert.Equal(t, string(k), "foo") @@ -140,7 +127,7 @@ func TestTransactionCursorLeafRootReverse(t *testing.T) { assert.Nil(t, k) assert.Nil(t, v) - txn.Close() + txn.Rollback() }) } @@ -152,8 +139,7 @@ func TestTransactionCursorRestart(t *testing.T) { db.Put("widgets", []byte("foo"), []byte{}) txn, _ := db.Transaction() - c, err := txn.Cursor("widgets") - assert.NoError(t, err) + c := txn.Bucket("widgets").Cursor() k, _ := c.First() assert.Equal(t, string(k), "bar") @@ -167,7 +153,7 @@ func TestTransactionCursorRestart(t *testing.T) { k, _ = c.Next() assert.Equal(t, string(k), "foo") - txn.Close() + txn.Rollback() }) } @@ -177,27 +163,27 @@ func TestTransactionCursorIterate(t *testing.T) { withOpenDB(func(db *DB, path string) { // Bulk insert all values. db.CreateBucket("widgets") - rwtxn, _ := db.RWTransaction() + txn, _ := db.RWTransaction() + b := txn.Bucket("widgets") for _, item := range items { - assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) + assert.NoError(t, b.Put(item.Key, item.Value)) } - assert.NoError(t, rwtxn.Commit()) + assert.NoError(t, txn.Commit()) // Sort test data. sort.Sort(items) // Iterate over all items and check consistency. var index = 0 - txn, _ := db.Transaction() - c, err := txn.Cursor("widgets") - assert.NoError(t, err) + txn, _ = db.Transaction() + c := txn.Bucket("widgets").Cursor() for k, v := c.First(); k != nil && index < len(items); k, v = c.Next() { assert.Equal(t, k, items[index].Key) assert.Equal(t, v, items[index].Value) index++ } assert.Equal(t, len(items), index) - txn.Close() + txn.Rollback() }) fmt.Fprint(os.Stderr, ".") return true @@ -214,27 +200,301 @@ func TestTransactionCursorIterateReverse(t *testing.T) { withOpenDB(func(db *DB, path string) { // Bulk insert all values. db.CreateBucket("widgets") - rwtxn, _ := db.RWTransaction() + txn, _ := db.RWTransaction() + b := txn.Bucket("widgets") for _, item := range items { - assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) + assert.NoError(t, b.Put(item.Key, item.Value)) } - assert.NoError(t, rwtxn.Commit()) + assert.NoError(t, txn.Commit()) // Sort test data. sort.Sort(revtestdata(items)) // Iterate over all items and check consistency. var index = 0 - txn, _ := db.Transaction() - c, err := txn.Cursor("widgets") - assert.NoError(t, err) + txn, _ = db.Transaction() + c := txn.Bucket("widgets").Cursor() for k, v := c.Last(); k != nil && index < len(items); k, v = c.Prev() { assert.Equal(t, k, items[index].Key) assert.Equal(t, v, items[index].Value) index++ } assert.Equal(t, len(items), index) - txn.Close() + txn.Rollback() + }) + fmt.Fprint(os.Stderr, ".") + return true + } + if err := quick.Check(f, qconfig()); err != nil { + t.Error(err) + } + fmt.Fprint(os.Stderr, "\n") +} + +// Ensure that a bucket can be created and retrieved. +func TestTransactionCreateBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + // Create a bucket. + err := db.CreateBucket("widgets") + assert.NoError(t, err) + + // Read the bucket through a separate transaction. + b, err := db.Bucket("widgets") + assert.NotNil(t, b) + assert.NoError(t, err) + }) +} + +// Ensure that a bucket can be created if it doesn't already exist. +func TestTransactionCreateBucketIfNotExists(t *testing.T) { + withOpenDB(func(db *DB, path string) { + assert.NoError(t, db.CreateBucketIfNotExists("widgets")) + assert.NoError(t, db.CreateBucketIfNotExists("widgets")) + + // Read the bucket through a separate transaction. + b, err := db.Bucket("widgets") + assert.NotNil(t, b) + assert.NoError(t, err) + }) +} + +// Ensure that a bucket cannot be created twice. +func TestTransactionRecreateBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + // Create a bucket. + err := db.CreateBucket("widgets") + assert.NoError(t, err) + + // Create the same bucket again. + err = db.CreateBucket("widgets") + assert.Equal(t, err, ErrBucketExists) + }) +} + +// Ensure that a bucket is created with a non-blank name. +func TestTransactionCreateBucketWithoutName(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.CreateBucket("") + assert.Equal(t, err, ErrBucketNameRequired) + }) +} + +// Ensure that a bucket name is not too long. +func TestTransactionCreateBucketWithLongName(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.CreateBucket(strings.Repeat("X", 255)) + assert.NoError(t, err) + + err = db.CreateBucket(strings.Repeat("X", 256)) + assert.Equal(t, err, ErrBucketNameTooLarge) + }) +} + +// Ensure that a bucket can be deleted. +func TestTransactionDeleteBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + // Create a bucket and add a value. + db.CreateBucket("widgets") + db.Put("widgets", []byte("foo"), []byte("bar")) + + // Delete the bucket and make sure we can't get the value. + assert.NoError(t, db.DeleteBucket("widgets")) + value, err := db.Get("widgets", []byte("foo")) + assert.Equal(t, err, ErrBucketNotFound) + assert.Nil(t, value) + + // Create the bucket again and make sure there's not a phantom value. + assert.NoError(t, db.CreateBucket("widgets")) + value, err = db.Get("widgets", []byte("foo")) + assert.NoError(t, err) + assert.Nil(t, value) + }) +} + +// Ensure that a bucket can return an autoincrementing sequence. +func TestTransactionNextSequence(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + db.CreateBucket("woojits") + + // Make sure sequence increments. + seq, err := db.NextSequence("widgets") + assert.NoError(t, err) + assert.Equal(t, seq, 1) + seq, err = db.NextSequence("widgets") + assert.NoError(t, err) + assert.Equal(t, seq, 2) + + // Buckets should be separate. + seq, err = db.NextSequence("woojits") + assert.NoError(t, err) + assert.Equal(t, seq, 1) + + // Missing buckets return an error. + seq, err = db.NextSequence("no_such_bucket") + assert.Equal(t, err, ErrBucketNotFound) + assert.Equal(t, seq, 0) + }) +} + +// Ensure that incrementing past the maximum sequence number will return an error. +func TestTransactionNextSequenceOverflow(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + db.Do(func(txn *Transaction) error { + b := txn.Bucket("widgets") + b.bucket.sequence = uint64(maxInt) + seq, err := b.NextSequence() + assert.Equal(t, err, ErrSequenceOverflow) + assert.Equal(t, seq, 0) + return nil + }) + }) +} + +// Ensure that an error is returned when inserting into a bucket that doesn't exist. +func TestTransactionPutBucketNotFound(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.Put("widgets", []byte("foo"), []byte("bar")) + assert.Equal(t, err, ErrBucketNotFound) + }) +} + +// Ensure that an error is returned when inserting with an empty key. +func TestTransactionPutEmptyKey(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + err := db.Put("widgets", []byte(""), []byte("bar")) + assert.Equal(t, err, ErrKeyRequired) + err = db.Put("widgets", nil, []byte("bar")) + assert.Equal(t, err, ErrKeyRequired) + }) +} + +// Ensure that an error is returned when inserting with a key that's too large. +func TestTransactionPutKeyTooLarge(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + err := db.Put("widgets", make([]byte, 32769), []byte("bar")) + assert.Equal(t, err, ErrKeyTooLarge) + }) +} + +// Ensure that an error is returned when deleting from a bucket that doesn't exist. +func TestTransactionDeleteBucketNotFound(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.DeleteBucket("widgets") + assert.Equal(t, err, ErrBucketNotFound) + }) +} + +// Ensure that a bucket can write random keys and values across multiple txns. +func TestTransactionPutSingle(t *testing.T) { + index := 0 + f := func(items testdata) bool { + withOpenDB(func(db *DB, path string) { + m := make(map[string][]byte) + + db.CreateBucket("widgets") + for _, item := range items { + if err := db.Put("widgets", item.Key, item.Value); err != nil { + panic("put error: " + err.Error()) + } + m[string(item.Key)] = item.Value + + // Verify all key/values so far. + i := 0 + for k, v := range m { + value, err := db.Get("widgets", []byte(k)) + if err != nil { + panic("get error: " + err.Error()) + } + if !bytes.Equal(value, v) { + db.CopyFile("/tmp/bolt.put.single.db", 0666) + t.Fatalf("value mismatch [run %d] (%d of %d):\nkey: %x\ngot: %x\nexp: %x", index, i, len(m), []byte(k), value, v) + } + i++ + } + } + + fmt.Fprint(os.Stderr, ".") + }) + index++ + return true + } + if err := quick.Check(f, qconfig()); err != nil { + t.Error(err) + } + fmt.Fprint(os.Stderr, "\n") +} + +// Ensure that a transaction can insert multiple key/value pairs at once. +func TestTransactionPutMultiple(t *testing.T) { + f := func(items testdata) bool { + withOpenDB(func(db *DB, path string) { + // Bulk insert all values. + db.CreateBucket("widgets") + txn, _ := db.RWTransaction() + b := txn.Bucket("widgets") + for _, item := range items { + assert.NoError(t, b.Put(item.Key, item.Value)) + } + assert.NoError(t, txn.Commit()) + + // Verify all items exist. + txn, _ = db.Transaction() + for _, item := range items { + value := txn.Bucket("widgets").Get(item.Key) + if !assert.Equal(t, item.Value, value) { + db.CopyFile("/tmp/bolt.put.multiple.db", 0666) + t.FailNow() + } + } + txn.Rollback() + }) + fmt.Fprint(os.Stderr, ".") + return true + } + if err := quick.Check(f, qconfig()); err != nil { + t.Error(err) + } + fmt.Fprint(os.Stderr, "\n") +} + +// Ensure that a transaction can delete all key/value pairs and return to a single leaf page. +func TestTransactionDelete(t *testing.T) { + f := func(items testdata) bool { + withOpenDB(func(db *DB, path string) { + // Bulk insert all values. + db.CreateBucket("widgets") + txn, _ := db.RWTransaction() + b := txn.Bucket("widgets") + for _, item := range items { + assert.NoError(t, b.Put(item.Key, item.Value)) + } + assert.NoError(t, txn.Commit()) + + // Remove items one at a time and check consistency. + for i, item := range items { + assert.NoError(t, db.Delete("widgets", item.Key)) + + // Anything before our deletion index should be nil. + txn, _ := db.Transaction() + for j, exp := range items { + if j > i { + value := txn.Bucket("widgets").Get(exp.Key) + if !assert.Equal(t, exp.Value, value) { + t.FailNow() + } + } else { + value := txn.Bucket("widgets").Get(exp.Key) + if !assert.Nil(t, value) { + t.FailNow() + } + } + } + txn.Rollback() + } }) fmt.Fprint(os.Stderr, ".") return true