From 3b2fd8f2d3e376fa7a3f3b2ba665fcd4a5b5bb15 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sat, 22 Feb 2014 22:54:54 -0700 Subject: [PATCH] Revert "Refactor Transaction/Bucket API." This reverts commit 1ad2b99f281d587b767b36f886401e81d17915a9. --- TODO | 13 ++ bucket.go | 105 +------------ bucket_test.go | 13 +- cursor.go | 2 +- db.go | 114 +++++--------- db_test.go | 41 ++--- doc.go | 21 +-- error.go | 4 - example_test.go | 39 +++-- functional_test.go | 9 +- node.go | 2 +- rwtransaction.go | 355 ++++++++++++++++++++++++++++++++++++++++++ rwtransaction_test.go | 306 ++++++++++++++++++++++++++++++++++++ transaction.go | 351 ++++++----------------------------------- transaction_test.go | 336 +++++---------------------------------- 15 files changed, 857 insertions(+), 854 deletions(-) create mode 100644 TODO create mode 100644 rwtransaction.go create mode 100644 rwtransaction_test.go diff --git a/TODO b/TODO new file mode 100644 index 0000000..9c8a1a0 --- /dev/null +++ b/TODO @@ -0,0 +1,13 @@ +TODO +==== +X Open DB. +X Initialize transaction. +- Cursor First, Get(key), Next +- RWTransaction.insert() + - rebalance + - adjust cursors +- RWTransaction Commmit + + + + diff --git a/bucket.go b/bucket.go index 4114ab7..6653389 100644 --- a/bucket.go +++ b/bucket.go @@ -1,13 +1,9 @@ package bolt -import ( - "bytes" -) - // Bucket represents a collection of key/value pairs inside the database. -// All keys inside the bucket are unique. -// -// Accessing or changing data from a Bucket whose Transaction has closed will cause a panic. +// All keys inside the bucket are unique. The Bucket type is not typically used +// directly. Instead the bucket name is typically passed into the Get(), Put(), +// or Delete() functions. type Bucket struct { *bucket name string @@ -25,9 +21,8 @@ func (b *Bucket) Name() string { return b.name } -// Cursor creates a new cursor for this bucket. -func (b *Bucket) Cursor() *Cursor { - _assert(b.transaction.isOpen(), "transaction not open") +// cursor creates a new cursor for this bucket. +func (b *Bucket) cursor() *Cursor { return &Cursor{ transaction: b.transaction, root: b.root, @@ -35,98 +30,8 @@ func (b *Bucket) Cursor() *Cursor { } } -// Get retrieves the value for a key in a named bucket. -// Returns a nil value if the key does not exist. -func (b *Bucket) Get(key []byte) []byte { - _assert(b.transaction.isOpen(), "transaction not open") - c := b.Cursor() - k, v := c.Seek(key) - - // If our target node isn't the same key as what's passed in then return nil. - if !bytes.Equal(key, k) { - return nil - } - - return v -} - -// Put sets the value for a key inside of the bucket. -// If the key exist then its previous value will be overwritten. -// Returns an error if bucket was created from a read-only transaction, if the -// key is blank, if the key is too large, or if the value is too large. -func (b *Bucket) Put(key []byte, value []byte) error { - _assert(b.transaction.isOpen(), "transaction not open") - if !b.transaction.writable { - return ErrTransactionNotWritable - } else if len(key) == 0 { - return ErrKeyRequired - } else if len(key) > MaxKeySize { - return ErrKeyTooLarge - } else if len(value) > MaxValueSize { - return ErrValueTooLarge - } - - // Move cursor to correct position. - c := b.Cursor() - c.Seek(key) - - // Insert the key/value. - c.node(b.transaction).put(key, key, value, 0) - - return nil -} - -// Delete removes a key from the bucket. -// If the key does not exist then nothing is done and a nil error is returned. -// Returns an error if the bucket was created from a read-only transaction. -func (b *Bucket) Delete(key []byte) error { - _assert(b.transaction.isOpen(), "transaction not open") - if !b.transaction.writable { - return ErrTransactionNotWritable - } - - // Move cursor to correct position. - c := b.Cursor() - c.Seek(key) - - // Delete the node if we have a matching key. - c.node(c.transaction).del(key) - - return nil -} - -// NextSequence returns an autoincrementing integer for the bucket. -// Returns an error if the bucket was created from a read-only transaction or -// if the next sequence will overflow the int type. -func (b *Bucket) NextSequence() (int, error) { - _assert(b.transaction.isOpen(), "transaction not open") - if !b.transaction.writable { - return 0, ErrTransactionNotWritable - } else if b.bucket.sequence == uint64(maxInt) { - return 0, ErrSequenceOverflow - } - - // Increment and return the sequence. - b.bucket.sequence++ - - return int(b.bucket.sequence), nil -} - -// ForEach executes a function for each key/value pair in a bucket. -func (b *Bucket) ForEach(fn func(k, v []byte) error) error { - _assert(b.transaction.isOpen(), "transaction not open") - c := b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if err := fn(k, v); err != nil { - return err - } - } - return nil -} - // Stat returns stats on a bucket. func (b *Bucket) Stat() *BucketStat { - _assert(b.transaction.isOpen(), "transaction not open") s := &BucketStat{} b.transaction.forEachPage(b.root, 0, func(p *page, depth int) { if (p.flags & leafPageFlag) != 0 { diff --git a/bucket_test.go b/bucket_test.go index 2778432..5e33189 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -11,26 +11,23 @@ import ( // Ensure a bucket can calculate stats. func TestBucketStat(t *testing.T) { withOpenDB(func(db *DB, path string) { - db.Do(func(txn *Transaction) error { + db.Do(func(txn *RWTransaction) error { // Add bucket with lots of keys. txn.CreateBucket("widgets") - b := txn.Bucket("widgets") for i := 0; i < 100000; i++ { - b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + txn.Put("widgets", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) } // Add bucket with fewer keys but one big value. txn.CreateBucket("woojits") - b = txn.Bucket("woojits") for i := 0; i < 500; i++ { - b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + txn.Put("woojits", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) } - b.Put([]byte("really-big-value"), []byte(strings.Repeat("*", 10000))) + txn.Put("woojits", []byte("really-big-value"), []byte(strings.Repeat("*", 10000))) // Add a bucket that fits on a single root leaf. txn.CreateBucket("whozawhats") - b = txn.Bucket("whozawhats") - b.Put([]byte("foo"), []byte("bar")) + txn.Put("whozawhats", []byte("foo"), []byte("bar")) return nil }) diff --git a/cursor.go b/cursor.go index 0cb94b5..9a527af 100644 --- a/cursor.go +++ b/cursor.go @@ -201,7 +201,7 @@ func (c *Cursor) keyValue() ([]byte, []byte) { } // node returns the node that the cursor is currently positioned on. -func (c *Cursor) node(t *Transaction) *node { +func (c *Cursor) node(t *RWTransaction) *node { _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack") // Start from root and traverse down the hierarchy. diff --git a/db.go b/db.go index 3d8c668..c9b0e92 100644 --- a/db.go +++ b/db.go @@ -16,12 +16,8 @@ const minMmapSize = 1 << 22 // 4MB const maxMmapStep = 1 << 30 // 1GB // DB represents a collection of buckets persisted to a file on disk. -// All data access is performed through transactions which can be obtained from -// the DB. There are a number of functions duplicated from the Transction type -// which provide ease-of-use, single transaction access to the data. -// -// All the functions on DB will return a ErrDatabaseNotOpen if accessed before -// Open() is called or after Close is called. +// All data access is performed through transactions which can be obtained through the DB. +// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called. type DB struct { os _os syscall _syscall @@ -33,7 +29,7 @@ type DB struct { meta1 *meta pageSize int opened bool - rwtransaction *Transaction + rwtransaction *RWTransaction transactions []*Transaction freelist *freelist @@ -47,6 +43,16 @@ func (db *DB) Path() string { return db.path } +// GoString returns the Go string representation of the database. +func (db *DB) GoString() string { + return fmt.Sprintf("bolt.DB{path:%q}", db.path) +} + +// String returns the string representation of the database. +func (db *DB) String() string { + return fmt.Sprintf("DB<%q>", db.path) +} + // Open opens a data file at the given path and initializes the database. // If the file does not exist then it will be created automatically. func (db *DB) Open(path string, mode os.FileMode) error { @@ -256,8 +262,7 @@ func (db *DB) close() { // Transaction creates a read-only transaction. // Multiple read-only transactions can be used concurrently. // -// IMPORTANT: You must close the transaction after you are finished or else the -// database will not reclaim old pages. +// IMPORTANT: You must close the transaction after you are finished or else the database will not reclaim old pages. func (db *DB) Transaction() (*Transaction, error) { db.metalock.Lock() defer db.metalock.Unlock() @@ -284,12 +289,12 @@ func (db *DB) Transaction() (*Transaction, error) { // RWTransaction creates a read/write transaction. // Only one read/write transaction is allowed at a time. -// You must call Commit() or Close() on the transaction to close it. -func (db *DB) RWTransaction() (*Transaction, error) { +// You must call Commit() or Rollback() on the transaction to close it. +func (db *DB) RWTransaction() (*RWTransaction, error) { db.metalock.Lock() defer db.metalock.Unlock() - // Obtain writer lock. This is released by the writer transaction when it closes. + // Obtain writer lock. This is released by the RWTransaction when it closes. db.rwlock.Lock() // Exit if the database is not open yet. @@ -298,8 +303,8 @@ func (db *DB) RWTransaction() (*Transaction, error) { return nil, ErrDatabaseNotOpen } - // Create a writable transaction associated with the database. - t := &Transaction{writable: true, nodes: make(map[pgid]*node)} + // Create a transaction associated with the database. + t := &RWTransaction{nodes: make(map[pgid]*node)} t.init(db) db.rwtransaction = t @@ -334,12 +339,12 @@ func (db *DB) removeTransaction(t *Transaction) { } } -// Do executes a function within the context of a writable Transaction. +// Do executes a function within the context of a RWTransaction. // If no error is returned from the function then the transaction is committed. // If an error is returned then the entire transaction is rolled back. // Any error that is returned from the function or returned from the commit is // returned from the Do() method. -func (db *DB) Do(fn func(*Transaction) error) error { +func (db *DB) Do(fn func(*RWTransaction) error) error { t, err := db.RWTransaction() if err != nil { return err @@ -361,7 +366,7 @@ func (db *DB) With(fn func(*Transaction) error) error { if err != nil { return err } - defer t.Rollback() + defer t.Close() // If an error is returned from the function then pass it through. return fn(t) @@ -371,36 +376,28 @@ func (db *DB) With(fn func(*Transaction) error) error { // An error is returned if the bucket cannot be found. func (db *DB) ForEach(name string, fn func(k, v []byte) error) error { return db.With(func(t *Transaction) error { - b := t.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - return b.ForEach(fn) + return t.ForEach(name, fn) }) } // Bucket retrieves a reference to a bucket. // This is typically useful for checking the existence of a bucket. -// -// Do not use the returned bucket for accessing or changing data. func (db *DB) Bucket(name string) (*Bucket, error) { t, err := db.Transaction() if err != nil { return nil, err } - defer t.Rollback() + defer t.Close() return t.Bucket(name), nil } // Buckets retrieves a list of all buckets in the database. -// -// Do not use any of the returned buckets for accessing or changing data. func (db *DB) Buckets() ([]*Bucket, error) { t, err := db.Transaction() if err != nil { return nil, err } - defer t.Rollback() + defer t.Close() return t.Buckets(), nil } @@ -408,7 +405,7 @@ func (db *DB) Buckets() ([]*Bucket, error) { // This function can return an error if the bucket already exists, if the name // is blank, or the bucket name is too long. func (db *DB) CreateBucket(name string) error { - return db.Do(func(t *Transaction) error { + return db.Do(func(t *RWTransaction) error { return t.CreateBucket(name) }) } @@ -416,7 +413,7 @@ func (db *DB) CreateBucket(name string) error { // CreateBucketIfNotExists creates a new bucket with the given name if it doesn't already exist. // This function can return an error if the name is blank, or the bucket name is too long. func (db *DB) CreateBucketIfNotExists(name string) error { - return db.Do(func(t *Transaction) error { + return db.Do(func(t *RWTransaction) error { return t.CreateBucketIfNotExists(name) }) } @@ -424,7 +421,7 @@ func (db *DB) CreateBucketIfNotExists(name string) error { // DeleteBucket removes a bucket from the database. // Returns an error if the bucket does not exist. func (db *DB) DeleteBucket(name string) error { - return db.Do(func(t *Transaction) error { + return db.Do(func(t *RWTransaction) error { return t.DeleteBucket(name) }) } @@ -433,17 +430,10 @@ func (db *DB) DeleteBucket(name string) error { // This function can return an error if the bucket does not exist. func (db *DB) NextSequence(name string) (int, error) { var seq int - err := db.Do(func(t *Transaction) error { - b := t.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - + err := db.Do(func(t *RWTransaction) error { var err error - if seq, err = b.NextSequence(); err != nil { - return err - } - return nil + seq, err = t.NextSequence(name) + return err }) if err != nil { return 0, err @@ -452,43 +442,29 @@ func (db *DB) NextSequence(name string) (int, error) { } // Get retrieves the value for a key in a bucket. -// Returns an error if the bucket does not exist. +// Returns an error if the key does not exist. func (db *DB) Get(name string, key []byte) ([]byte, error) { t, err := db.Transaction() if err != nil { return nil, err } - defer t.Rollback() - - b := t.Bucket(name) - if b == nil { - return nil, ErrBucketNotFound - } - - return b.Get(key), nil + defer t.Close() + return t.Get(name, key) } // Put sets the value for a key in a bucket. // Returns an error if the bucket is not found, if key is blank, if the key is too large, or if the value is too large. func (db *DB) Put(name string, key []byte, value []byte) error { - return db.Do(func(t *Transaction) error { - b := t.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - return b.Put(key, value) + return db.Do(func(t *RWTransaction) error { + return t.Put(name, key, value) }) } // Delete removes a key from a bucket. // Returns an error if the bucket cannot be found. func (db *DB) Delete(name string, key []byte) error { - return db.Do(func(t *Transaction) error { - b := t.Bucket(name) - if b == nil { - return ErrBucketNotFound - } - return b.Delete(key) + return db.Do(func(t *RWTransaction) error { + return t.Delete(name, key) }) } @@ -501,7 +477,7 @@ func (db *DB) Copy(w io.Writer) error { if err != nil { return err } - defer t.Commit() + defer t.Close() // Open reader on the database. f, err := os.Open(db.path) @@ -546,7 +522,7 @@ func (db *DB) Stat() (*Stat, error) { db.mmaplock.RUnlock() db.metalock.Unlock() - err := db.Do(func(t *Transaction) error { + err := db.Do(func(t *RWTransaction) error { s.PageCount = int(t.meta.pgid) s.FreePageCount = len(db.freelist.all()) s.PageSize = db.pageSize @@ -558,16 +534,6 @@ func (db *DB) Stat() (*Stat, error) { return s, nil } -// GoString returns the Go string representation of the database. -func (db *DB) GoString() string { - return fmt.Sprintf("bolt.DB{path:%q}", db.path) -} - -// String returns the string representation of the database. -func (db *DB) String() string { - return fmt.Sprintf("DB<%q>", db.path) -} - // page retrieves a page reference from the mmap based on the current page size. func (db *DB) page(id pgid) *page { return (*page)(unsafe.Pointer(&db.data[id*pgid(db.pageSize)])) diff --git a/db_test.go b/db_test.go index df15a8b..3d0b987 100644 --- a/db_test.go +++ b/db_test.go @@ -188,34 +188,14 @@ func TestDBDeleteFromMissingBucket(t *testing.T) { }) } -// Ensure that a Transaction can be retrieved. -func TestDBRWTransaction(t *testing.T) { - withOpenDB(func(db *DB, path string) { - txn, err := db.RWTransaction() - assert.NotNil(t, txn) - assert.NoError(t, err) - assert.Equal(t, txn.DB(), db) - }) -} - -// Ensure that opening a Transaction while the DB is closed returns an error. -func TestRWTransactionOpenWithClosedDB(t *testing.T) { - withDB(func(db *DB, path string) { - txn, err := db.RWTransaction() - assert.Equal(t, err, ErrDatabaseNotOpen) - assert.Nil(t, txn) - }) -} - // Ensure a database can provide a transactional block. func TestDBTransactionBlock(t *testing.T) { withOpenDB(func(db *DB, path string) { - err := db.Do(func(txn *Transaction) error { + err := db.Do(func(txn *RWTransaction) error { txn.CreateBucket("widgets") - b := txn.Bucket("widgets") - b.Put([]byte("foo"), []byte("bar")) - b.Put([]byte("baz"), []byte("bat")) - b.Delete([]byte("foo")) + txn.Put("widgets", []byte("foo"), []byte("bar")) + txn.Put("widgets", []byte("baz"), []byte("bat")) + txn.Delete("widgets", []byte("foo")) return nil }) assert.NoError(t, err) @@ -229,7 +209,7 @@ func TestDBTransactionBlock(t *testing.T) { // Ensure a closed database returns an error while running a transaction block func TestDBTransactionBlockWhileClosed(t *testing.T) { withDB(func(db *DB, path string) { - err := db.Do(func(txn *Transaction) error { + err := db.Do(func(txn *RWTransaction) error { txn.CreateBucket("widgets") return nil }) @@ -353,11 +333,10 @@ func TestDBCopyFile(t *testing.T) { // Ensure the database can return stats about itself. func TestDBStat(t *testing.T) { withOpenDB(func(db *DB, path string) { - db.Do(func(txn *Transaction) error { + db.Do(func(txn *RWTransaction) error { txn.CreateBucket("widgets") - b := txn.Bucket("widgets") for i := 0; i < 10000; i++ { - b.Put([]byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + txn.Put("widgets", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) } return nil }) @@ -370,7 +349,7 @@ func TestDBStat(t *testing.T) { t0, _ := db.Transaction() t1, _ := db.Transaction() t2, _ := db.Transaction() - t2.Rollback() + t2.Close() // Obtain stats. stat, err := db.Stat() @@ -382,8 +361,8 @@ func TestDBStat(t *testing.T) { assert.Equal(t, stat.TransactionCount, 2) // Close readers. - t0.Rollback() - t1.Rollback() + t0.Close() + t1.Close() }) } diff --git a/doc.go b/doc.go index c948b05..d7f3ec1 100644 --- a/doc.go +++ b/doc.go @@ -10,18 +10,19 @@ optimized for fast read access and does not require recovery in the event of a system crash. Transactions which have not finished committing will simply be rolled back in the event of a crash. -The design of Bolt is based on Howard Chu's LMDB project. +The design of Bolt is based on Howard Chu's LMDB database project. Basics -There are only a few types in Bolt: DB, Bucket, Transaction, and Cursor. The DB -is a collection of buckets and is represented by a single file on disk. A -bucket is a collection of unique keys that are associated with values. +There are only a few types in Bolt: DB, Bucket, Transaction, RWTransaction, and +Cursor. The DB is a collection of buckets and is represented by a single file +on disk. A bucket is a collection of unique keys that are associated with values. -Transactions provide a consistent view of the database. They can be used for -retrieving, setting, and deleting properties. They can also be used to iterate -over all the values in a bucket. Only one writer Transaction can be in use at -a time. +Transactions provide read-only access to data inside the database. They can +retrieve key/value pairs and can use Cursors to iterate over the entire dataset. +RWTransactions provide read-write access to the database. They can create and +delete buckets and they can insert and remove keys. Only one RWTransaction is +allowed at a time. Caveats @@ -29,8 +30,8 @@ Caveats The database uses a read-only, memory-mapped data file to ensure that applications cannot corrupt the database, however, this means that keys and values returned from Bolt cannot be changed. Writing to a read-only byte slice -will cause Go to panic. If you need to alter data returned from a Transaction -you need to first copy it to a new byte slice. +will cause Go to panic. If you need to work with data returned from a Get() you +need to first copy it to a new byte slice. Bolt currently works on Mac OS and Linux. Windows support is coming soon. diff --git a/error.go b/error.go index 36f09a2..7238203 100644 --- a/error.go +++ b/error.go @@ -16,10 +16,6 @@ var ( // already open. ErrDatabaseOpen = &Error{"database already open", nil} - // ErrTransactionNotWritable is returned changing data using a read-only - // transaction. - ErrTransactionNotWritable = &Error{"transaction not writable", nil} - // ErrBucketNotFound is returned when trying to access a bucket that has // not been created yet. ErrBucketNotFound = &Error{"bucket not found", nil} diff --git a/example_test.go b/example_test.go index b3d5d20..8747f94 100644 --- a/example_test.go +++ b/example_test.go @@ -67,13 +67,11 @@ func ExampleDB_Do() { defer db.Close() // Execute several commands within a write transaction. - err := db.Do(func(t *Transaction) error { + err := db.Do(func(t *RWTransaction) error { if err := t.CreateBucket("widgets"); err != nil { return err } - - b := t.Bucket("widgets") - if err := b.Put([]byte("foo"), []byte("bar")); err != nil { + if err := t.Put("widgets", []byte("foo"), []byte("bar")); err != nil { return err } return nil @@ -102,7 +100,7 @@ func ExampleDB_With() { // Access data from within a read-only transactional block. db.With(func(t *Transaction) error { - v := t.Bucket("people").Get([]byte("john")) + v, _ := t.Get("people", []byte("john")) fmt.Printf("John's last name is %s.\n", string(v)) return nil }) @@ -135,30 +133,29 @@ func ExampleDB_ForEach() { // A liger is awesome. } -func ExampleTransaction_Commit() { +func ExampleRWTransaction() { // Open the database. var db DB - db.Open("/tmp/bolt/db_rwtransaction.db", 0666) + db.Open("/tmp/bolt/rwtransaction.db", 0666) defer db.Close() // Create a bucket. db.CreateBucket("widgets") // Create several keys in a transaction. - txn, _ := db.RWTransaction() - b := txn.Bucket("widgets") - b.Put([]byte("john"), []byte("blue")) - b.Put([]byte("abby"), []byte("red")) - b.Put([]byte("zephyr"), []byte("purple")) - txn.Commit() + rwtxn, _ := db.RWTransaction() + rwtxn.Put("widgets", []byte("john"), []byte("blue")) + rwtxn.Put("widgets", []byte("abby"), []byte("red")) + rwtxn.Put("widgets", []byte("zephyr"), []byte("purple")) + rwtxn.Commit() // Iterate over the values in sorted key order. - txn, _ = db.Transaction() - c := txn.Bucket("widgets").Cursor() + txn, _ := db.Transaction() + c, _ := txn.Cursor("widgets") for k, v := c.First(); k != nil; k, v = c.Next() { fmt.Printf("%s likes %s\n", string(k), string(v)) } - txn.Rollback() + txn.Close() // Output: // abby likes red @@ -166,10 +163,10 @@ func ExampleTransaction_Commit() { // zephyr likes purple } -func ExampleTransaction_Rollback() { +func ExampleRWTransaction_rollback() { // Open the database. var db DB - db.Open("/tmp/bolt/transaction_close.db", 0666) + db.Open("/tmp/bolt/rwtransaction_rollback.db", 0666) defer db.Close() // Create a bucket. @@ -179,9 +176,9 @@ func ExampleTransaction_Rollback() { db.Put("widgets", []byte("foo"), []byte("bar")) // Update the key but rollback the transaction so it never saves. - txn, _ := db.RWTransaction() - txn.Bucket("widgets").Put([]byte("foo"), []byte("baz")) - txn.Rollback() + rwtxn, _ := db.RWTransaction() + rwtxn.Put("widgets", []byte("foo"), []byte("baz")) + rwtxn.Rollback() // Ensure that our original value is still set. value, _ := db.Get("widgets", []byte("foo")) diff --git a/functional_test.go b/functional_test.go index acd2f4a..f7c3c3a 100644 --- a/functional_test.go +++ b/functional_test.go @@ -56,15 +56,15 @@ func TestParallelTransactions(t *testing.T) { // Verify all data is in for local data list. for _, item := range local { - value := txn.Bucket("widgets").Get(item.Key) + value, err := txn.Get("widgets", item.Key) if !assert.NoError(t, err) || !assert.Equal(t, value, item.Value) { - txn.Rollback() + txn.Close() wg.Done() t.FailNow() } } - txn.Rollback() + txn.Close() wg.Done() <-readers }() @@ -89,9 +89,8 @@ func TestParallelTransactions(t *testing.T) { } // Insert whole batch. - b := txn.Bucket("widgets") for _, item := range batchItems { - err := b.Put(item.Key, item.Value) + err := txn.Put("widgets", item.Key, item.Value) if !assert.NoError(t, err) { t.FailNow() } diff --git a/node.go b/node.go index 81d2667..68f651e 100644 --- a/node.go +++ b/node.go @@ -8,7 +8,7 @@ import ( // node represents an in-memory, deserialized page. type node struct { - transaction *Transaction + transaction *RWTransaction isLeaf bool unbalanced bool key []byte diff --git a/rwtransaction.go b/rwtransaction.go new file mode 100644 index 0000000..e22c766 --- /dev/null +++ b/rwtransaction.go @@ -0,0 +1,355 @@ +package bolt + +import ( + "sort" + "unsafe" +) + +// RWTransaction represents a transaction that can read and write data. +// Only one read/write transaction can be active for a database at a time. +// RWTransaction is composed of a read-only Transaction so it can also use +// functions provided by Transaction. +type RWTransaction struct { + Transaction + nodes map[pgid]*node + pending []*node +} + +// init initializes the transaction. +func (t *RWTransaction) init(db *DB) { + t.Transaction.init(db) + t.pages = make(map[pgid]*page) + + // Increment the transaction id. + t.meta.txnid += txnid(1) +} + +// CreateBucket creates a new bucket. +// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. +func (t *RWTransaction) CreateBucket(name string) error { + // Check if bucket already exists. + if b := t.Bucket(name); b != nil { + return ErrBucketExists + } else if len(name) == 0 { + return ErrBucketNameRequired + } else if len(name) > MaxBucketNameSize { + return ErrBucketNameTooLarge + } + + // Create a blank root leaf page. + p, err := t.allocate(1) + if err != nil { + return err + } + p.flags = leafPageFlag + + // Add bucket to buckets page. + t.buckets.put(name, &bucket{root: p.id}) + + return nil +} + +// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. +// Returns an error if the bucket name is blank, or if the bucket name is too long. +func (t *RWTransaction) CreateBucketIfNotExists(name string) error { + err := t.CreateBucket(name) + if err != nil && err != ErrBucketExists { + return err + } + return nil +} + +// DeleteBucket deletes a bucket. +// Returns an error if the bucket cannot be found. +func (t *RWTransaction) DeleteBucket(name string) error { + if b := t.Bucket(name); b == nil { + return ErrBucketNotFound + } + + // Remove from buckets page. + t.buckets.del(name) + + // TODO(benbjohnson): Free all pages. + + return nil +} + +// NextSequence returns an autoincrementing integer for the bucket. +func (t *RWTransaction) NextSequence(name string) (int, error) { + // Check if bucket already exists. + b := t.Bucket(name) + if b == nil { + return 0, ErrBucketNotFound + } + + // Make sure next sequence number will not be larger than the maximum + // integer size of the system. + if b.bucket.sequence == uint64(maxInt) { + return 0, ErrSequenceOverflow + } + + // Increment and return the sequence. + b.bucket.sequence++ + + return int(b.bucket.sequence), nil +} + +// Put sets the value for a key inside of the named bucket. +// If the key exist then its previous value will be overwritten. +// Returns an error if the bucket is not found, if the key is blank, if the key is too large, or if the value is too large. +func (t *RWTransaction) Put(name string, key []byte, value []byte) error { + b := t.Bucket(name) + if b == nil { + return ErrBucketNotFound + } + + // Validate the key and data size. + if len(key) == 0 { + return ErrKeyRequired + } else if len(key) > MaxKeySize { + return ErrKeyTooLarge + } else if len(value) > MaxValueSize { + return ErrValueTooLarge + } + + // Move cursor to correct position. + c := b.cursor() + c.Seek(key) + + // Insert the key/value. + c.node(t).put(key, key, value, 0) + + return nil +} + +// Delete removes a key from the named bucket. +// If the key does not exist then nothing is done and a nil error is returned. +// Returns an error if the bucket cannot be found. +func (t *RWTransaction) Delete(name string, key []byte) error { + b := t.Bucket(name) + if b == nil { + return ErrBucketNotFound + } + + // Move cursor to correct position. + c := b.cursor() + c.Seek(key) + + // Delete the node if we have a matching key. + c.node(t).del(key) + + return nil +} + +// Commit writes all changes to disk and updates the meta page. +// Returns an error if a disk write error occurs. +func (t *RWTransaction) Commit() error { + defer t.close() + + // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. + + // Rebalance and spill data onto dirty pages. + t.rebalance() + t.spill() + + // Spill buckets page. + p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) + if err != nil { + return err + } + t.buckets.write(p) + + // Write dirty pages to disk. + if err := t.write(); err != nil { + return err + } + + // Update the meta. + t.meta.buckets = p.id + + // Write meta to disk. + if err := t.writeMeta(); err != nil { + return err + } + + return nil +} + +// Rollback closes the transaction and ignores all previous updates. +func (t *RWTransaction) Rollback() { + t.close() +} + +func (t *RWTransaction) close() { + t.db.rwlock.Unlock() +} + +// allocate returns a contiguous block of memory starting at a given page. +func (t *RWTransaction) allocate(count int) (*page, error) { + p, err := t.db.allocate(count) + if err != nil { + return nil, err + } + + // Save to our page cache. + t.pages[p.id] = p + + return p, nil +} + +// rebalance attempts to balance all nodes. +func (t *RWTransaction) rebalance() { + for _, n := range t.nodes { + n.rebalance() + } +} + +// spill writes all the nodes to dirty pages. +func (t *RWTransaction) spill() error { + // Keep track of the current root nodes. + // We will update this at the end once all nodes are created. + type root struct { + node *node + pgid pgid + } + var roots []root + + // Sort nodes by highest depth first. + nodes := make(nodesByDepth, 0, len(t.nodes)) + for _, n := range t.nodes { + nodes = append(nodes, n) + } + sort.Sort(nodes) + + // Spill nodes by deepest first. + for i := 0; i < len(nodes); i++ { + n := nodes[i] + + // Save existing root buckets for later. + if n.parent == nil && n.pgid != 0 { + roots = append(roots, root{n, n.pgid}) + } + + // Split nodes into appropriate sized nodes. + // The first node in this list will be a reference to n to preserve ancestry. + newNodes := n.split(t.db.pageSize) + t.pending = newNodes + + // If this is a root node that split then create a parent node. + if n.parent == nil && len(newNodes) > 1 { + n.parent = &node{transaction: t, isLeaf: false} + nodes = append(nodes, n.parent) + } + + // Add node's page to the freelist. + if n.pgid > 0 { + t.db.freelist.free(t.id(), t.page(n.pgid)) + } + + // Write nodes to dirty pages. + for i, newNode := range newNodes { + // Allocate contiguous space for the node. + p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) + if err != nil { + return err + } + + // Write the node to the page. + newNode.write(p) + newNode.pgid = p.id + newNode.parent = n.parent + + // The first node should use the existing entry, other nodes are inserts. + var oldKey []byte + if i == 0 { + oldKey = n.key + } else { + oldKey = newNode.inodes[0].key + } + + // Update the parent entry. + if newNode.parent != nil { + newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) + } + } + + t.pending = nil + } + + // Update roots with new roots. + for _, root := range roots { + t.buckets.updateRoot(root.pgid, root.node.root().pgid) + } + + // Clear out nodes now that they are all spilled. + t.nodes = make(map[pgid]*node) + + return nil +} + +// write writes any dirty pages to disk. +func (t *RWTransaction) write() error { + // Sort pages by id. + pages := make(pages, 0, len(t.pages)) + for _, p := range t.pages { + pages = append(pages, p) + } + sort.Sort(pages) + + // Write pages to disk in order. + for _, p := range pages { + size := (int(p.overflow) + 1) * t.db.pageSize + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] + offset := int64(p.id) * int64(t.db.pageSize) + if _, err := t.db.file.WriteAt(buf, offset); err != nil { + return err + } + } + + // Clear out page cache. + t.pages = make(map[pgid]*page) + + return nil +} + +// writeMeta writes the meta to the disk. +func (t *RWTransaction) writeMeta() error { + // Create a temporary buffer for the meta page. + buf := make([]byte, t.db.pageSize) + p := t.db.pageInBuffer(buf, 0) + t.meta.write(p) + + // Write the meta page to file. + t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + + return nil +} + +// node creates a node from a page and associates it with a given parent. +func (t *RWTransaction) node(pgid pgid, parent *node) *node { + // Retrieve node if it has already been fetched. + if n := t.nodes[pgid]; n != nil { + return n + } + + // Otherwise create a branch and cache it. + n := &node{transaction: t, parent: parent} + if n.parent != nil { + n.depth = n.parent.depth + 1 + } + n.read(t.page(pgid)) + t.nodes[pgid] = n + + return n +} + +// dereference removes all references to the old mmap. +func (t *RWTransaction) dereference() { + for _, n := range t.nodes { + n.dereference() + } + + for _, n := range t.pending { + n.dereference() + } +} diff --git a/rwtransaction_test.go b/rwtransaction_test.go new file mode 100644 index 0000000..18b6ae9 --- /dev/null +++ b/rwtransaction_test.go @@ -0,0 +1,306 @@ +package bolt + +import ( + "bytes" + "fmt" + "os" + "strings" + "testing" + "testing/quick" + + "github.com/stretchr/testify/assert" +) + +// Ensure that a RWTransaction can be retrieved. +func TestRWTransaction(t *testing.T) { + withOpenDB(func(db *DB, path string) { + txn, err := db.RWTransaction() + assert.NotNil(t, txn) + assert.NoError(t, err) + assert.Equal(t, txn.DB(), db) + }) +} + +// Ensure that opening a RWTransaction while the DB is closed returns an error. +func TestRWTransactionOpenWithClosedDB(t *testing.T) { + withDB(func(db *DB, path string) { + txn, err := db.RWTransaction() + assert.Equal(t, err, ErrDatabaseNotOpen) + assert.Nil(t, txn) + }) +} + +// Ensure that a bucket can be created and retrieved. +func TestRWTransactionCreateBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + // Create a bucket. + err := db.CreateBucket("widgets") + assert.NoError(t, err) + + // Read the bucket through a separate transaction. + b, err := db.Bucket("widgets") + assert.NotNil(t, b) + assert.NoError(t, err) + }) +} + +// Ensure that a bucket can be created if it doesn't already exist. +func TestRWTransactionCreateBucketIfNotExists(t *testing.T) { + withOpenDB(func(db *DB, path string) { + assert.NoError(t, db.CreateBucketIfNotExists("widgets")) + assert.NoError(t, db.CreateBucketIfNotExists("widgets")) + + // Read the bucket through a separate transaction. + b, err := db.Bucket("widgets") + assert.NotNil(t, b) + assert.NoError(t, err) + }) +} + +// Ensure that a bucket cannot be created twice. +func TestRWTransactionRecreateBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + // Create a bucket. + err := db.CreateBucket("widgets") + assert.NoError(t, err) + + // Create the same bucket again. + err = db.CreateBucket("widgets") + assert.Equal(t, err, ErrBucketExists) + }) +} + +// Ensure that a bucket is created with a non-blank name. +func TestRWTransactionCreateBucketWithoutName(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.CreateBucket("") + assert.Equal(t, err, ErrBucketNameRequired) + }) +} + +// Ensure that a bucket name is not too long. +func TestRWTransactionCreateBucketWithLongName(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.CreateBucket(strings.Repeat("X", 255)) + assert.NoError(t, err) + + err = db.CreateBucket(strings.Repeat("X", 256)) + assert.Equal(t, err, ErrBucketNameTooLarge) + }) +} + +// Ensure that a bucket can be deleted. +func TestRWTransactionDeleteBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + // Create a bucket and add a value. + db.CreateBucket("widgets") + db.Put("widgets", []byte("foo"), []byte("bar")) + + // Delete the bucket and make sure we can't get the value. + assert.NoError(t, db.DeleteBucket("widgets")) + value, err := db.Get("widgets", []byte("foo")) + assert.Equal(t, err, ErrBucketNotFound) + assert.Nil(t, value) + + // Create the bucket again and make sure there's not a phantom value. + assert.NoError(t, db.CreateBucket("widgets")) + value, err = db.Get("widgets", []byte("foo")) + assert.NoError(t, err) + assert.Nil(t, value) + }) +} + +// Ensure that a bucket can return an autoincrementing sequence. +func TestRWTransactionNextSequence(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + db.CreateBucket("woojits") + + // Make sure sequence increments. + seq, err := db.NextSequence("widgets") + assert.NoError(t, err) + assert.Equal(t, seq, 1) + seq, err = db.NextSequence("widgets") + assert.NoError(t, err) + assert.Equal(t, seq, 2) + + // Buckets should be separate. + seq, err = db.NextSequence("woojits") + assert.NoError(t, err) + assert.Equal(t, seq, 1) + + // Missing buckets return an error. + seq, err = db.NextSequence("no_such_bucket") + assert.Equal(t, err, ErrBucketNotFound) + assert.Equal(t, seq, 0) + }) +} + +// Ensure that incrementing past the maximum sequence number will return an error. +func TestRWTransactionNextSequenceOverflow(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + db.Do(func(txn *RWTransaction) error { + b := txn.Bucket("widgets") + b.bucket.sequence = uint64(maxInt) + seq, err := txn.NextSequence("widgets") + assert.Equal(t, err, ErrSequenceOverflow) + assert.Equal(t, seq, 0) + return nil + }) + }) +} + +// Ensure that an error is returned when inserting into a bucket that doesn't exist. +func TestRWTransactionPutBucketNotFound(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.Put("widgets", []byte("foo"), []byte("bar")) + assert.Equal(t, err, ErrBucketNotFound) + }) +} + +// Ensure that an error is returned when inserting with an empty key. +func TestRWTransactionPutEmptyKey(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + err := db.Put("widgets", []byte(""), []byte("bar")) + assert.Equal(t, err, ErrKeyRequired) + err = db.Put("widgets", nil, []byte("bar")) + assert.Equal(t, err, ErrKeyRequired) + }) +} + +// Ensure that an error is returned when inserting with a key that's too large. +func TestRWTransactionPutKeyTooLarge(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + err := db.Put("widgets", make([]byte, 32769), []byte("bar")) + assert.Equal(t, err, ErrKeyTooLarge) + }) +} + +// Ensure that an error is returned when deleting from a bucket that doesn't exist. +func TestRWTransactionDeleteBucketNotFound(t *testing.T) { + withOpenDB(func(db *DB, path string) { + err := db.DeleteBucket("widgets") + assert.Equal(t, err, ErrBucketNotFound) + }) +} + +// Ensure that a bucket can write random keys and values across multiple txns. +func TestRWTransactionPutSingle(t *testing.T) { + index := 0 + f := func(items testdata) bool { + withOpenDB(func(db *DB, path string) { + m := make(map[string][]byte) + + db.CreateBucket("widgets") + for _, item := range items { + if err := db.Put("widgets", item.Key, item.Value); err != nil { + panic("put error: " + err.Error()) + } + m[string(item.Key)] = item.Value + + // Verify all key/values so far. + i := 0 + for k, v := range m { + value, err := db.Get("widgets", []byte(k)) + if err != nil { + panic("get error: " + err.Error()) + } + if !bytes.Equal(value, v) { + db.CopyFile("/tmp/bolt.put.single.db", 0666) + t.Fatalf("value mismatch [run %d] (%d of %d):\nkey: %x\ngot: %x\nexp: %x", index, i, len(m), []byte(k), value, v) + } + i++ + } + } + + fmt.Fprint(os.Stderr, ".") + }) + index++ + return true + } + if err := quick.Check(f, qconfig()); err != nil { + t.Error(err) + } + fmt.Fprint(os.Stderr, "\n") +} + +// Ensure that a transaction can insert multiple key/value pairs at once. +func TestRWTransactionPutMultiple(t *testing.T) { + f := func(items testdata) bool { + withOpenDB(func(db *DB, path string) { + // Bulk insert all values. + db.CreateBucket("widgets") + rwtxn, _ := db.RWTransaction() + for _, item := range items { + assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) + } + assert.NoError(t, rwtxn.Commit()) + + // Verify all items exist. + txn, _ := db.Transaction() + for _, item := range items { + value, err := txn.Get("widgets", item.Key) + assert.NoError(t, err) + if !assert.Equal(t, item.Value, value) { + db.CopyFile("/tmp/bolt.put.multiple.db", 0666) + t.FailNow() + } + } + txn.Close() + }) + fmt.Fprint(os.Stderr, ".") + return true + } + if err := quick.Check(f, qconfig()); err != nil { + t.Error(err) + } + fmt.Fprint(os.Stderr, "\n") +} + +// Ensure that a transaction can delete all key/value pairs and return to a single leaf page. +func TestRWTransactionDelete(t *testing.T) { + f := func(items testdata) bool { + withOpenDB(func(db *DB, path string) { + // Bulk insert all values. + db.CreateBucket("widgets") + rwtxn, _ := db.RWTransaction() + for _, item := range items { + assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) + } + assert.NoError(t, rwtxn.Commit()) + + // Remove items one at a time and check consistency. + for i, item := range items { + assert.NoError(t, db.Delete("widgets", item.Key)) + + // Anything before our deletion index should be nil. + txn, _ := db.Transaction() + for j, exp := range items { + if j > i { + value, err := txn.Get("widgets", exp.Key) + assert.NoError(t, err) + if !assert.Equal(t, exp.Value, value) { + t.FailNow() + } + } else { + value, err := txn.Get("widgets", exp.Key) + assert.NoError(t, err) + if !assert.Nil(t, value) { + t.FailNow() + } + } + } + txn.Close() + } + }) + fmt.Fprint(os.Stderr, ".") + return true + } + if err := quick.Check(f, qconfig()); err != nil { + t.Error(err) + } + fmt.Fprint(os.Stderr, "\n") +} diff --git a/transaction.go b/transaction.go index 1bb990b..6e9ca8f 100644 --- a/transaction.go +++ b/transaction.go @@ -1,28 +1,27 @@ package bolt import ( - "sort" - "unsafe" + "bytes" ) +// Transaction represents a read-only transaction on the database. +// It can be used for retrieving values for keys as well as creating cursors for +// iterating over the data. +// +// IMPORTANT: You must close transactions when you are done with them. Pages +// can not be reclaimed by the writer until no more transactions are using them. +// A long running read transaction can cause the database to quickly grow. +type Transaction struct { + db *DB + meta *meta + buckets *buckets + pages map[pgid]*page +} + // txnid represents the internal transaction identifier. type txnid uint64 -// Transaction represents a consistent view into the database. -// Read-only transactions can be created by calling DB.Transaction(). -// Read-write transactions can be created by calling DB.RWTransaction(). -// Only one read-write transaction is allowed at a time. -type Transaction struct { - db *DB - meta *meta - buckets *buckets - writable bool - pages map[pgid]*page - nodes map[pgid]*node - pending []*node -} - -// init initializes the transaction. +// init initializes the transaction and associates it with a database. func (t *Transaction) init(db *DB) { t.db = db t.pages = nil @@ -34,11 +33,6 @@ func (t *Transaction) init(db *DB) { // Read in the buckets page. t.buckets = &buckets{} t.buckets.read(t.page(t.meta.buckets)) - - t.pages = make(map[pgid]*page) - - // Increment the transaction id. - t.meta.txnid += txnid(1) } // id returns the transaction id. @@ -46,20 +40,19 @@ func (t *Transaction) id() txnid { return t.meta.txnid } +// Close closes the transaction and releases any pages it is using. +func (t *Transaction) Close() { + t.db.removeTransaction(t) +} + // DB returns a reference to the database that created the transaction. func (t *Transaction) DB() *DB { return t.db } -// Writable returns whether the transaction can change data. -func (t *Transaction) Writable() bool { - return t.writable -} - // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. func (t *Transaction) Bucket(name string) *Bucket { - _assert(t.isOpen(), "transaction not open") b := t.buckets.get(name) if b == nil { return nil @@ -74,7 +67,6 @@ func (t *Transaction) Bucket(name string) *Bucket { // Buckets retrieves a list of all buckets. func (t *Transaction) Buckets() []*Bucket { - _assert(t.isOpen(), "transaction not open") buckets := make([]*Bucket, 0, len(t.buckets.items)) for name, b := range t.buckets.items { bucket := &Bucket{bucket: b, transaction: t, name: name} @@ -83,263 +75,49 @@ func (t *Transaction) Buckets() []*Bucket { return buckets } -// CreateBucket creates a new bucket. -// Returns an error if the transaction is read-only, if bucket already exists, -// if the bucket name is blank, or if the bucket name is too long. -func (t *Transaction) CreateBucket(name string) error { - _assert(t.isOpen(), "transaction not open") - if !t.writable { - return ErrTransactionNotWritable - } else if b := t.Bucket(name); b != nil { - return ErrBucketExists - } else if len(name) == 0 { - return ErrBucketNameRequired - } else if len(name) > MaxBucketNameSize { - return ErrBucketNameTooLarge +// Cursor creates a cursor associated with a given bucket. +// The cursor is only valid as long as the Transaction is open. +// Do not use a cursor after the transaction is closed. +func (t *Transaction) Cursor(name string) (*Cursor, error) { + b := t.Bucket(name) + if b == nil { + return nil, ErrBucketNotFound } - - // Create a blank root leaf page. - p, err := t.allocate(1) - if err != nil { - return err - } - p.flags = leafPageFlag - - // Add bucket to buckets page. - t.buckets.put(name, &bucket{root: p.id}) - - return nil + return b.cursor(), nil } -// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. -// Returns an error if the transaction is read-only, if the bucket name is -// blank, or if the bucket name is too long. -func (t *Transaction) CreateBucketIfNotExists(name string) error { - _assert(t.isOpen(), "transaction not open") - err := t.CreateBucket(name) - if err != nil && err != ErrBucketExists { - return err - } - return nil -} - -// DeleteBucket deletes a bucket. -// Returns an error if the transaction is read-only or if the bucket cannot be found. -func (t *Transaction) DeleteBucket(name string) error { - _assert(t.isOpen(), "transaction not open") - if !t.writable { - return ErrTransactionNotWritable - } else if b := t.Bucket(name); b == nil { - return ErrBucketNotFound - } - - // Remove from buckets page. - t.buckets.del(name) - - // TODO(benbjohnson): Free all pages. - - return nil -} - -// Commit writes all changes to disk and updates the meta page. -// Read-only transactions will simply be closed. -// Returns an error if a disk write error occurs. -func (t *Transaction) Commit() error { - defer t.close() - - // Ignore commit for read-only transactions. - if !t.writable { - return nil - } - - // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - - // Rebalance and spill data onto dirty pages. - t.rebalance() - t.spill() - - // Spill buckets page. - p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) - if err != nil { - return err - } - t.buckets.write(p) - - // Write dirty pages to disk. - if err := t.write(); err != nil { - return err - } - - // Update the meta. - t.meta.buckets = p.id - - // Write meta to disk. - if err := t.writeMeta(); err != nil { - return err - } - - return nil -} - -// Rollback closes the transaction and rolls back any pending changes. -func (t *Transaction) Rollback() { - t.close() -} - -func (t *Transaction) close() { - if t.writable { - t.db.rwlock.Unlock() - } else { - t.db.removeTransaction(t) - } - - // Detach from the database. - t.db = nil -} - -// isOpen returns whether the transaction is currently open. -func (t *Transaction) isOpen() bool { - return t.db != nil -} - -// allocate returns a contiguous block of memory starting at a given page. -func (t *Transaction) allocate(count int) (*page, error) { - p, err := t.db.allocate(count) +// Get retrieves the value for a key in a named bucket. +// Returns a nil value if the key does not exist. +// Returns an error if the bucket does not exist. +func (t *Transaction) Get(name string, key []byte) (value []byte, err error) { + c, err := t.Cursor(name) if err != nil { return nil, err } - - // Save to our page cache. - t.pages[p.id] = p - - return p, nil + k, v := c.Seek(key) + // If our target node isn't the same key as what's passed in then return nil. + if !bytes.Equal(key, k) { + return nil, nil + } + return v, nil } -// rebalance attempts to balance all nodes. -func (t *Transaction) rebalance() { - for _, n := range t.nodes { - n.rebalance() - } -} - -// spill writes all the nodes to dirty pages. -func (t *Transaction) spill() error { - // Keep track of the current root nodes. - // We will update this at the end once all nodes are created. - type root struct { - node *node - pgid pgid - } - var roots []root - - // Sort nodes by highest depth first. - nodes := make(nodesByDepth, 0, len(t.nodes)) - for _, n := range t.nodes { - nodes = append(nodes, n) - } - sort.Sort(nodes) - - // Spill nodes by deepest first. - for i := 0; i < len(nodes); i++ { - n := nodes[i] - - // Save existing root buckets for later. - if n.parent == nil && n.pgid != 0 { - roots = append(roots, root{n, n.pgid}) - } - - // Split nodes into appropriate sized nodes. - // The first node in this list will be a reference to n to preserve ancestry. - newNodes := n.split(t.db.pageSize) - t.pending = newNodes - - // If this is a root node that split then create a parent node. - if n.parent == nil && len(newNodes) > 1 { - n.parent = &node{transaction: t, isLeaf: false} - nodes = append(nodes, n.parent) - } - - // Add node's page to the freelist. - if n.pgid > 0 { - t.db.freelist.free(t.id(), t.page(n.pgid)) - } - - // Write nodes to dirty pages. - for i, newNode := range newNodes { - // Allocate contiguous space for the node. - p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) - if err != nil { - return err - } - - // Write the node to the page. - newNode.write(p) - newNode.pgid = p.id - newNode.parent = n.parent - - // The first node should use the existing entry, other nodes are inserts. - var oldKey []byte - if i == 0 { - oldKey = n.key - } else { - oldKey = newNode.inodes[0].key - } - - // Update the parent entry. - if newNode.parent != nil { - newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) - } - } - - t.pending = nil +// ForEach executes a function for each key/value pair in a bucket. +// An error is returned if the bucket cannot be found. +func (t *Transaction) ForEach(name string, fn func(k, v []byte) error) error { + // Open a cursor on the bucket. + c, err := t.Cursor(name) + if err != nil { + return err } - // Update roots with new roots. - for _, root := range roots { - t.buckets.updateRoot(root.pgid, root.node.root().pgid) - } - - // Clear out nodes now that they are all spilled. - t.nodes = make(map[pgid]*node) - - return nil -} - -// write writes any dirty pages to disk. -func (t *Transaction) write() error { - // Sort pages by id. - pages := make(pages, 0, len(t.pages)) - for _, p := range t.pages { - pages = append(pages, p) - } - sort.Sort(pages) - - // Write pages to disk in order. - for _, p := range pages { - size := (int(p.overflow) + 1) * t.db.pageSize - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] - offset := int64(p.id) * int64(t.db.pageSize) - if _, err := t.db.file.WriteAt(buf, offset); err != nil { + // Iterate over each key/value pair in the bucket. + for k, v := c.First(); k != nil; k, v = c.Next() { + if err := fn(k, v); err != nil { return err } } - // Clear out page cache. - t.pages = make(map[pgid]*page) - - return nil -} - -// writeMeta writes the meta to the disk. -func (t *Transaction) writeMeta() error { - // Create a temporary buffer for the meta page. - buf := make([]byte, t.db.pageSize) - p := t.db.pageInBuffer(buf, 0) - t.meta.write(p) - - // Write the meta page to file. - t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) - return nil } @@ -357,35 +135,6 @@ func (t *Transaction) page(id pgid) *page { return t.db.page(id) } -// node creates a node from a page and associates it with a given parent. -func (t *Transaction) node(pgid pgid, parent *node) *node { - // Retrieve node if it has already been fetched. - if n := t.nodes[pgid]; n != nil { - return n - } - - // Otherwise create a branch and cache it. - n := &node{transaction: t, parent: parent} - if n.parent != nil { - n.depth = n.parent.depth + 1 - } - n.read(t.page(pgid)) - t.nodes[pgid] = n - - return n -} - -// dereference removes all references to the old mmap. -func (t *Transaction) dereference() { - for _, n := range t.nodes { - n.dereference() - } - - for _, n := range t.pending { - n.dereference() - } -} - // forEachPage iterates over every page within a given page and executes a function. func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := t.page(pgid) diff --git a/transaction_test.go b/transaction_test.go index 880b299..4a7170c 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -1,11 +1,9 @@ package bolt import ( - "bytes" "fmt" "os" "sort" - "strings" "testing" "testing/quick" @@ -55,11 +53,24 @@ func TestTransactionCursorEmptyBucket(t *testing.T) { withOpenDB(func(db *DB, path string) { db.CreateBucket("widgets") txn, _ := db.Transaction() - c := txn.Bucket("widgets").Cursor() + c, err := txn.Cursor("widgets") + assert.NoError(t, err) k, v := c.First() assert.Nil(t, k) assert.Nil(t, v) - txn.Rollback() + txn.Close() + }) +} + +// Ensure that a Transaction returns a nil when a bucket doesn't exist. +func TestTransactionCursorMissingBucket(t *testing.T) { + withOpenDB(func(db *DB, path string) { + db.CreateBucket("widgets") + txn, _ := db.Transaction() + c, err := txn.Cursor("woojits") + assert.Nil(t, c) + assert.Equal(t, err, ErrBucketNotFound) + txn.Close() }) } @@ -71,7 +82,8 @@ func TestTransactionCursorLeafRoot(t *testing.T) { db.Put("widgets", []byte("foo"), []byte{0}) db.Put("widgets", []byte("bar"), []byte{1}) txn, _ := db.Transaction() - c := txn.Bucket("widgets").Cursor() + c, err := txn.Cursor("widgets") + assert.NoError(t, err) k, v := c.First() assert.Equal(t, string(k), "bar") @@ -93,7 +105,7 @@ func TestTransactionCursorLeafRoot(t *testing.T) { assert.Nil(t, k) assert.Nil(t, v) - txn.Rollback() + txn.Close() }) } @@ -105,7 +117,8 @@ func TestTransactionCursorLeafRootReverse(t *testing.T) { db.Put("widgets", []byte("foo"), []byte{0}) db.Put("widgets", []byte("bar"), []byte{1}) txn, _ := db.Transaction() - c := txn.Bucket("widgets").Cursor() + c, err := txn.Cursor("widgets") + assert.NoError(t, err) k, v := c.Last() assert.Equal(t, string(k), "foo") @@ -127,7 +140,7 @@ func TestTransactionCursorLeafRootReverse(t *testing.T) { assert.Nil(t, k) assert.Nil(t, v) - txn.Rollback() + txn.Close() }) } @@ -139,7 +152,8 @@ func TestTransactionCursorRestart(t *testing.T) { db.Put("widgets", []byte("foo"), []byte{}) txn, _ := db.Transaction() - c := txn.Bucket("widgets").Cursor() + c, err := txn.Cursor("widgets") + assert.NoError(t, err) k, _ := c.First() assert.Equal(t, string(k), "bar") @@ -153,7 +167,7 @@ func TestTransactionCursorRestart(t *testing.T) { k, _ = c.Next() assert.Equal(t, string(k), "foo") - txn.Rollback() + txn.Close() }) } @@ -163,27 +177,27 @@ func TestTransactionCursorIterate(t *testing.T) { withOpenDB(func(db *DB, path string) { // Bulk insert all values. db.CreateBucket("widgets") - txn, _ := db.RWTransaction() - b := txn.Bucket("widgets") + rwtxn, _ := db.RWTransaction() for _, item := range items { - assert.NoError(t, b.Put(item.Key, item.Value)) + assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) } - assert.NoError(t, txn.Commit()) + assert.NoError(t, rwtxn.Commit()) // Sort test data. sort.Sort(items) // Iterate over all items and check consistency. var index = 0 - txn, _ = db.Transaction() - c := txn.Bucket("widgets").Cursor() + txn, _ := db.Transaction() + c, err := txn.Cursor("widgets") + assert.NoError(t, err) for k, v := c.First(); k != nil && index < len(items); k, v = c.Next() { assert.Equal(t, k, items[index].Key) assert.Equal(t, v, items[index].Value) index++ } assert.Equal(t, len(items), index) - txn.Rollback() + txn.Close() }) fmt.Fprint(os.Stderr, ".") return true @@ -200,301 +214,27 @@ func TestTransactionCursorIterateReverse(t *testing.T) { withOpenDB(func(db *DB, path string) { // Bulk insert all values. db.CreateBucket("widgets") - txn, _ := db.RWTransaction() - b := txn.Bucket("widgets") + rwtxn, _ := db.RWTransaction() for _, item := range items { - assert.NoError(t, b.Put(item.Key, item.Value)) + assert.NoError(t, rwtxn.Put("widgets", item.Key, item.Value)) } - assert.NoError(t, txn.Commit()) + assert.NoError(t, rwtxn.Commit()) // Sort test data. sort.Sort(revtestdata(items)) // Iterate over all items and check consistency. var index = 0 - txn, _ = db.Transaction() - c := txn.Bucket("widgets").Cursor() + txn, _ := db.Transaction() + c, err := txn.Cursor("widgets") + assert.NoError(t, err) for k, v := c.Last(); k != nil && index < len(items); k, v = c.Prev() { assert.Equal(t, k, items[index].Key) assert.Equal(t, v, items[index].Value) index++ } assert.Equal(t, len(items), index) - txn.Rollback() - }) - fmt.Fprint(os.Stderr, ".") - return true - } - if err := quick.Check(f, qconfig()); err != nil { - t.Error(err) - } - fmt.Fprint(os.Stderr, "\n") -} - -// Ensure that a bucket can be created and retrieved. -func TestTransactionCreateBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - // Create a bucket. - err := db.CreateBucket("widgets") - assert.NoError(t, err) - - // Read the bucket through a separate transaction. - b, err := db.Bucket("widgets") - assert.NotNil(t, b) - assert.NoError(t, err) - }) -} - -// Ensure that a bucket can be created if it doesn't already exist. -func TestTransactionCreateBucketIfNotExists(t *testing.T) { - withOpenDB(func(db *DB, path string) { - assert.NoError(t, db.CreateBucketIfNotExists("widgets")) - assert.NoError(t, db.CreateBucketIfNotExists("widgets")) - - // Read the bucket through a separate transaction. - b, err := db.Bucket("widgets") - assert.NotNil(t, b) - assert.NoError(t, err) - }) -} - -// Ensure that a bucket cannot be created twice. -func TestTransactionRecreateBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - // Create a bucket. - err := db.CreateBucket("widgets") - assert.NoError(t, err) - - // Create the same bucket again. - err = db.CreateBucket("widgets") - assert.Equal(t, err, ErrBucketExists) - }) -} - -// Ensure that a bucket is created with a non-blank name. -func TestTransactionCreateBucketWithoutName(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.CreateBucket("") - assert.Equal(t, err, ErrBucketNameRequired) - }) -} - -// Ensure that a bucket name is not too long. -func TestTransactionCreateBucketWithLongName(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.CreateBucket(strings.Repeat("X", 255)) - assert.NoError(t, err) - - err = db.CreateBucket(strings.Repeat("X", 256)) - assert.Equal(t, err, ErrBucketNameTooLarge) - }) -} - -// Ensure that a bucket can be deleted. -func TestTransactionDeleteBucket(t *testing.T) { - withOpenDB(func(db *DB, path string) { - // Create a bucket and add a value. - db.CreateBucket("widgets") - db.Put("widgets", []byte("foo"), []byte("bar")) - - // Delete the bucket and make sure we can't get the value. - assert.NoError(t, db.DeleteBucket("widgets")) - value, err := db.Get("widgets", []byte("foo")) - assert.Equal(t, err, ErrBucketNotFound) - assert.Nil(t, value) - - // Create the bucket again and make sure there's not a phantom value. - assert.NoError(t, db.CreateBucket("widgets")) - value, err = db.Get("widgets", []byte("foo")) - assert.NoError(t, err) - assert.Nil(t, value) - }) -} - -// Ensure that a bucket can return an autoincrementing sequence. -func TestTransactionNextSequence(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - db.CreateBucket("woojits") - - // Make sure sequence increments. - seq, err := db.NextSequence("widgets") - assert.NoError(t, err) - assert.Equal(t, seq, 1) - seq, err = db.NextSequence("widgets") - assert.NoError(t, err) - assert.Equal(t, seq, 2) - - // Buckets should be separate. - seq, err = db.NextSequence("woojits") - assert.NoError(t, err) - assert.Equal(t, seq, 1) - - // Missing buckets return an error. - seq, err = db.NextSequence("no_such_bucket") - assert.Equal(t, err, ErrBucketNotFound) - assert.Equal(t, seq, 0) - }) -} - -// Ensure that incrementing past the maximum sequence number will return an error. -func TestTransactionNextSequenceOverflow(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - db.Do(func(txn *Transaction) error { - b := txn.Bucket("widgets") - b.bucket.sequence = uint64(maxInt) - seq, err := b.NextSequence() - assert.Equal(t, err, ErrSequenceOverflow) - assert.Equal(t, seq, 0) - return nil - }) - }) -} - -// Ensure that an error is returned when inserting into a bucket that doesn't exist. -func TestTransactionPutBucketNotFound(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.Put("widgets", []byte("foo"), []byte("bar")) - assert.Equal(t, err, ErrBucketNotFound) - }) -} - -// Ensure that an error is returned when inserting with an empty key. -func TestTransactionPutEmptyKey(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - err := db.Put("widgets", []byte(""), []byte("bar")) - assert.Equal(t, err, ErrKeyRequired) - err = db.Put("widgets", nil, []byte("bar")) - assert.Equal(t, err, ErrKeyRequired) - }) -} - -// Ensure that an error is returned when inserting with a key that's too large. -func TestTransactionPutKeyTooLarge(t *testing.T) { - withOpenDB(func(db *DB, path string) { - db.CreateBucket("widgets") - err := db.Put("widgets", make([]byte, 32769), []byte("bar")) - assert.Equal(t, err, ErrKeyTooLarge) - }) -} - -// Ensure that an error is returned when deleting from a bucket that doesn't exist. -func TestTransactionDeleteBucketNotFound(t *testing.T) { - withOpenDB(func(db *DB, path string) { - err := db.DeleteBucket("widgets") - assert.Equal(t, err, ErrBucketNotFound) - }) -} - -// Ensure that a bucket can write random keys and values across multiple txns. -func TestTransactionPutSingle(t *testing.T) { - index := 0 - f := func(items testdata) bool { - withOpenDB(func(db *DB, path string) { - m := make(map[string][]byte) - - db.CreateBucket("widgets") - for _, item := range items { - if err := db.Put("widgets", item.Key, item.Value); err != nil { - panic("put error: " + err.Error()) - } - m[string(item.Key)] = item.Value - - // Verify all key/values so far. - i := 0 - for k, v := range m { - value, err := db.Get("widgets", []byte(k)) - if err != nil { - panic("get error: " + err.Error()) - } - if !bytes.Equal(value, v) { - db.CopyFile("/tmp/bolt.put.single.db", 0666) - t.Fatalf("value mismatch [run %d] (%d of %d):\nkey: %x\ngot: %x\nexp: %x", index, i, len(m), []byte(k), value, v) - } - i++ - } - } - - fmt.Fprint(os.Stderr, ".") - }) - index++ - return true - } - if err := quick.Check(f, qconfig()); err != nil { - t.Error(err) - } - fmt.Fprint(os.Stderr, "\n") -} - -// Ensure that a transaction can insert multiple key/value pairs at once. -func TestTransactionPutMultiple(t *testing.T) { - f := func(items testdata) bool { - withOpenDB(func(db *DB, path string) { - // Bulk insert all values. - db.CreateBucket("widgets") - txn, _ := db.RWTransaction() - b := txn.Bucket("widgets") - for _, item := range items { - assert.NoError(t, b.Put(item.Key, item.Value)) - } - assert.NoError(t, txn.Commit()) - - // Verify all items exist. - txn, _ = db.Transaction() - for _, item := range items { - value := txn.Bucket("widgets").Get(item.Key) - if !assert.Equal(t, item.Value, value) { - db.CopyFile("/tmp/bolt.put.multiple.db", 0666) - t.FailNow() - } - } - txn.Rollback() - }) - fmt.Fprint(os.Stderr, ".") - return true - } - if err := quick.Check(f, qconfig()); err != nil { - t.Error(err) - } - fmt.Fprint(os.Stderr, "\n") -} - -// Ensure that a transaction can delete all key/value pairs and return to a single leaf page. -func TestTransactionDelete(t *testing.T) { - f := func(items testdata) bool { - withOpenDB(func(db *DB, path string) { - // Bulk insert all values. - db.CreateBucket("widgets") - txn, _ := db.RWTransaction() - b := txn.Bucket("widgets") - for _, item := range items { - assert.NoError(t, b.Put(item.Key, item.Value)) - } - assert.NoError(t, txn.Commit()) - - // Remove items one at a time and check consistency. - for i, item := range items { - assert.NoError(t, db.Delete("widgets", item.Key)) - - // Anything before our deletion index should be nil. - txn, _ := db.Transaction() - for j, exp := range items { - if j > i { - value := txn.Bucket("widgets").Get(exp.Key) - if !assert.Equal(t, exp.Value, value) { - t.FailNow() - } - } else { - value := txn.Bucket("widgets").Get(exp.Key) - if !assert.Nil(t, value) { - t.FailNow() - } - } - } - txn.Rollback() - } + txn.Close() }) fmt.Fprint(os.Stderr, ".") return true