mirror of
https://github.com/etcd-io/bbolt.git
synced 2025-05-31 11:42:30 +00:00
Add transaction batching
DB.Batch makes it easy to make lots of small transactions with significantly better performance. Batch combines multiple concurrent Update calls into a single disk transaction, managing errors smartly.
This commit is contained in:
parent
8be3a28087
commit
adbb1a19c1
42
README.md
42
README.md
@ -125,6 +125,48 @@ no mutating operations are allowed within a read-only transaction. You can only
|
|||||||
retrieve buckets, retrieve values, and copy the database within a read-only
|
retrieve buckets, retrieve values, and copy the database within a read-only
|
||||||
transaction.
|
transaction.
|
||||||
|
|
||||||
|
|
||||||
|
#### Batch read-write transactions
|
||||||
|
|
||||||
|
Each `DB.Update()` waits for disk to commit the writes. This overhead
|
||||||
|
can be minimized by combining multiple updates with the `DB.Batch()`
|
||||||
|
function:
|
||||||
|
|
||||||
|
```go
|
||||||
|
err := db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
...
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
|
Concurrent Batch calls are opportunistically combined into larger
|
||||||
|
transactions. Batch is only useful when there are multiple goroutines
|
||||||
|
calling it.
|
||||||
|
|
||||||
|
The trade-off is that `Batch` can call the given
|
||||||
|
function multiple times, if parts of the transaction fail. The
|
||||||
|
function must be idempotent and side effects must take effect only
|
||||||
|
after a successful return from `DB.Batch()`.
|
||||||
|
|
||||||
|
For example: don't display messages from inside the function, instead
|
||||||
|
set variables in the enclosing scope:
|
||||||
|
|
||||||
|
```go
|
||||||
|
var id uint64
|
||||||
|
err := db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
// Find last key in bucket, decode as bigendian uint64, increment
|
||||||
|
// by one, encode back to []byte, and add new key.
|
||||||
|
...
|
||||||
|
id = newValue
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return ...
|
||||||
|
}
|
||||||
|
fmt.Println("Allocated ID %d", id)
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
#### Managing transactions manually
|
#### Managing transactions manually
|
||||||
|
|
||||||
The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()`
|
The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()`
|
||||||
|
135
batch.go
Normal file
135
batch.go
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
package bolt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Batch calls fn as part of a batch. It behaves similar to Update,
|
||||||
|
// except:
|
||||||
|
//
|
||||||
|
// 1. concurrent Batch calls can be combined into a single Bolt
|
||||||
|
// transaction.
|
||||||
|
//
|
||||||
|
// 2. the function passed to Batch may be called multiple times,
|
||||||
|
// regardless of whether it returns error or not.
|
||||||
|
//
|
||||||
|
// This means that Batch function side effects must be idempotent and
|
||||||
|
// take permanent effect only after a successful return is seen in
|
||||||
|
// caller.
|
||||||
|
//
|
||||||
|
// Batch is only useful when there are multiple goroutines calling it.
|
||||||
|
func (db *DB) Batch(fn func(*Tx) error) error {
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
db.batchMu.Lock()
|
||||||
|
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
|
||||||
|
// There is no existing batch, or the existing batch is full; start a new one.
|
||||||
|
db.batch = &batch{
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
|
||||||
|
}
|
||||||
|
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
|
||||||
|
if len(db.batch.calls) >= db.MaxBatchSize {
|
||||||
|
// wake up batch, it's ready to run
|
||||||
|
go db.batch.trigger()
|
||||||
|
}
|
||||||
|
db.batchMu.Unlock()
|
||||||
|
|
||||||
|
err := <-errCh
|
||||||
|
if err == trySolo {
|
||||||
|
err = db.Update(fn)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type call struct {
|
||||||
|
fn func(*Tx) error
|
||||||
|
err chan<- error
|
||||||
|
}
|
||||||
|
|
||||||
|
type batch struct {
|
||||||
|
db *DB
|
||||||
|
timer *time.Timer
|
||||||
|
start sync.Once
|
||||||
|
calls []call
|
||||||
|
}
|
||||||
|
|
||||||
|
// trigger runs the batch if it hasn't already been run.
|
||||||
|
func (b *batch) trigger() {
|
||||||
|
b.start.Do(b.run)
|
||||||
|
}
|
||||||
|
|
||||||
|
// run performs the transactions in the batch and communicates results
|
||||||
|
// back to DB.Batch.
|
||||||
|
func (b *batch) run() {
|
||||||
|
b.db.batchMu.Lock()
|
||||||
|
b.timer.Stop()
|
||||||
|
// Make sure no new work is added to this batch, but don't break
|
||||||
|
// other batches.
|
||||||
|
if b.db.batch == b {
|
||||||
|
b.db.batch = nil
|
||||||
|
}
|
||||||
|
b.db.batchMu.Unlock()
|
||||||
|
|
||||||
|
retry:
|
||||||
|
for len(b.calls) > 0 {
|
||||||
|
var failIdx = -1
|
||||||
|
err := b.db.Update(func(tx *Tx) error {
|
||||||
|
for i, c := range b.calls {
|
||||||
|
if err := safelyCall(c.fn, tx); err != nil {
|
||||||
|
failIdx = i
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if failIdx >= 0 {
|
||||||
|
// take the failing transaction out of the batch. it's
|
||||||
|
// safe to shorten b.calls here because db.batch no longer
|
||||||
|
// points to us, and we hold the mutex anyway.
|
||||||
|
c := b.calls[failIdx]
|
||||||
|
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
|
||||||
|
// tell the submitter re-run it solo, continue with the rest of the batch
|
||||||
|
c.err <- trySolo
|
||||||
|
continue retry
|
||||||
|
}
|
||||||
|
|
||||||
|
// pass success, or bolt internal errors, to all callers
|
||||||
|
for _, c := range b.calls {
|
||||||
|
if c.err != nil {
|
||||||
|
c.err <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break retry
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// trySolo is a special sentinel error value used for signaling that a
|
||||||
|
// transaction function should be re-run. It should never be seen by
|
||||||
|
// callers.
|
||||||
|
var trySolo = errors.New("batch function returned an error and should be re-run solo")
|
||||||
|
|
||||||
|
type panicked struct {
|
||||||
|
reason interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p panicked) Error() string {
|
||||||
|
if err, ok := p.reason.(error); ok {
|
||||||
|
return err.Error()
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("panic: %v", p.reason)
|
||||||
|
}
|
||||||
|
|
||||||
|
func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
err = panicked{p}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return fn(tx)
|
||||||
|
}
|
170
batch_benchmark_test.go
Normal file
170
batch_benchmark_test.go
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
package bolt_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"hash/fnv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/boltdb/bolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func validateBatchBench(b *testing.B, db *TestDB) {
|
||||||
|
var rollback = errors.New("sentinel error to cause rollback")
|
||||||
|
validate := func(tx *bolt.Tx) error {
|
||||||
|
bucket := tx.Bucket([]byte("bench"))
|
||||||
|
h := fnv.New32a()
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
for id := uint32(0); id < 1000; id++ {
|
||||||
|
binary.LittleEndian.PutUint32(buf, id)
|
||||||
|
h.Reset()
|
||||||
|
h.Write(buf[:])
|
||||||
|
k := h.Sum(nil)
|
||||||
|
v := bucket.Get(k)
|
||||||
|
if v == nil {
|
||||||
|
b.Errorf("not found id=%d key=%x", id, k)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if g, e := v, []byte("filler"); !bytes.Equal(g, e) {
|
||||||
|
b.Errorf("bad value for id=%d key=%x: %s != %q", id, k, g, e)
|
||||||
|
}
|
||||||
|
if err := bucket.Delete(k); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// should be empty now
|
||||||
|
c := bucket.Cursor()
|
||||||
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||||
|
b.Errorf("unexpected key: %x = %q", k, v)
|
||||||
|
}
|
||||||
|
return rollback
|
||||||
|
}
|
||||||
|
if err := db.Update(validate); err != nil && err != rollback {
|
||||||
|
b.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkDBBatchAutomatic(b *testing.B) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
db.MustCreateBucket([]byte("bench"))
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
start := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for round := 0; round < 1000; round++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func(id uint32) {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
|
||||||
|
h := fnv.New32a()
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
binary.LittleEndian.PutUint32(buf, id)
|
||||||
|
h.Write(buf[:])
|
||||||
|
k := h.Sum(nil)
|
||||||
|
insert := func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("bench"))
|
||||||
|
return b.Put(k, []byte("filler"))
|
||||||
|
}
|
||||||
|
if err := db.Batch(insert); err != nil {
|
||||||
|
b.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(uint32(round))
|
||||||
|
}
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
b.StopTimer()
|
||||||
|
validateBatchBench(b, db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkDBBatchSingle(b *testing.B) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
db.MustCreateBucket([]byte("bench"))
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
start := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for round := 0; round < 1000; round++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id uint32) {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
|
||||||
|
h := fnv.New32a()
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
binary.LittleEndian.PutUint32(buf, id)
|
||||||
|
h.Write(buf[:])
|
||||||
|
k := h.Sum(nil)
|
||||||
|
insert := func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("bench"))
|
||||||
|
return b.Put(k, []byte("filler"))
|
||||||
|
}
|
||||||
|
if err := db.Update(insert); err != nil {
|
||||||
|
b.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(uint32(round))
|
||||||
|
}
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
b.StopTimer()
|
||||||
|
validateBatchBench(b, db)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkDBBatchManual10x100(b *testing.B) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
db.MustCreateBucket([]byte("bench"))
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
start := make(chan struct{})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for major := 0; major < 10; major++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id uint32) {
|
||||||
|
defer wg.Done()
|
||||||
|
<-start
|
||||||
|
|
||||||
|
insert100 := func(tx *bolt.Tx) error {
|
||||||
|
h := fnv.New32a()
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
for minor := uint32(0); minor < 100; minor++ {
|
||||||
|
binary.LittleEndian.PutUint32(buf, uint32(id*100+minor))
|
||||||
|
h.Reset()
|
||||||
|
h.Write(buf[:])
|
||||||
|
k := h.Sum(nil)
|
||||||
|
b := tx.Bucket([]byte("bench"))
|
||||||
|
if err := b.Put(k, []byte("filler")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := db.Update(insert100); err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
}(uint32(major))
|
||||||
|
}
|
||||||
|
close(start)
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
b.StopTimer()
|
||||||
|
validateBatchBench(b, db)
|
||||||
|
}
|
148
batch_example_test.go
Normal file
148
batch_example_test.go
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
package bolt_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/boltdb/bolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Set this to see how the counts are actually updated.
|
||||||
|
const verbose = false
|
||||||
|
|
||||||
|
// Counter updates a counter in Bolt for every URL path requested.
|
||||||
|
type counter struct {
|
||||||
|
db *bolt.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c counter) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
// Communicates the new count from a successful database
|
||||||
|
// transaction.
|
||||||
|
var result uint64
|
||||||
|
|
||||||
|
increment := func(tx *bolt.Tx) error {
|
||||||
|
b, err := tx.CreateBucketIfNotExists([]byte("hits"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
key := []byte(req.URL.String())
|
||||||
|
// Decode handles key not found for us.
|
||||||
|
count := decode(b.Get(key)) + 1
|
||||||
|
b.Put(key, encode(count))
|
||||||
|
// All good, communicate new count.
|
||||||
|
result = count
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := c.db.Batch(increment); err != nil {
|
||||||
|
http.Error(rw, err.Error(), 500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if verbose {
|
||||||
|
log.Printf("server: %s: %d", req.URL.String(), result)
|
||||||
|
}
|
||||||
|
|
||||||
|
rw.Header().Set("Content-Type", "application/octet-stream")
|
||||||
|
fmt.Fprintf(rw, "%d\n", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func client(id int, base string, paths []string) error {
|
||||||
|
// Process paths in random order.
|
||||||
|
rng := rand.New(rand.NewSource(int64(id)))
|
||||||
|
permutation := rng.Perm(len(paths))
|
||||||
|
|
||||||
|
for i := range paths {
|
||||||
|
path := paths[permutation[i]]
|
||||||
|
resp, err := http.Get(base + path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
buf, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if verbose {
|
||||||
|
log.Printf("client: %s: %s", path, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleDB_Batch() {
|
||||||
|
// Open the database.
|
||||||
|
db, _ := bolt.Open(tempfile(), 0666, nil)
|
||||||
|
defer os.Remove(db.Path())
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Start our web server
|
||||||
|
count := counter{db}
|
||||||
|
srv := httptest.NewServer(count)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Decrease the batch size to make things more interesting.
|
||||||
|
db.MaxBatchSize = 3
|
||||||
|
|
||||||
|
// Get every path multiple times concurrently.
|
||||||
|
const clients = 10
|
||||||
|
paths := []string{
|
||||||
|
"/foo",
|
||||||
|
"/bar",
|
||||||
|
"/baz",
|
||||||
|
"/quux",
|
||||||
|
"/thud",
|
||||||
|
"/xyzzy",
|
||||||
|
}
|
||||||
|
errors := make(chan error, clients)
|
||||||
|
for i := 0; i < clients; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
errors <- client(id, srv.URL, paths)
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
// Check all responses to make sure there's no error.
|
||||||
|
for i := 0; i < clients; i++ {
|
||||||
|
if err := <-errors; err != nil {
|
||||||
|
fmt.Printf("client error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the final result
|
||||||
|
db.View(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("hits"))
|
||||||
|
c := b.Cursor()
|
||||||
|
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||||
|
fmt.Printf("hits to %s: %d\n", k, decode(v))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Output:
|
||||||
|
// hits to /bar: 10
|
||||||
|
// hits to /baz: 10
|
||||||
|
// hits to /foo: 10
|
||||||
|
// hits to /quux: 10
|
||||||
|
// hits to /thud: 10
|
||||||
|
// hits to /xyzzy: 10
|
||||||
|
}
|
||||||
|
|
||||||
|
// encode marshals a counter.
|
||||||
|
func encode(n uint64) []byte {
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(buf, n)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode unmarshals a counter. Nil buffers are decoded as 0.
|
||||||
|
func decode(buf []byte) uint64 {
|
||||||
|
if buf == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return binary.BigEndian.Uint64(buf)
|
||||||
|
}
|
167
batch_test.go
Normal file
167
batch_test.go
Normal file
@ -0,0 +1,167 @@
|
|||||||
|
package bolt_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/boltdb/bolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ensure two functions can perform updates in a single batch.
|
||||||
|
func TestDB_Batch(t *testing.T) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
db.MustCreateBucket([]byte("widgets"))
|
||||||
|
|
||||||
|
// Iterate over multiple updates in separate goroutines.
|
||||||
|
n := 2
|
||||||
|
ch := make(chan error)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func(i int) {
|
||||||
|
ch <- db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check all responses to make sure there's no error.
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
if err := <-ch; err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure data is correct.
|
||||||
|
db.MustView(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("widgets"))
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
if v := b.Get(u64tob(uint64(i))); v == nil {
|
||||||
|
t.Errorf("key not found: %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDB_Batch_Panic(t *testing.T) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
var sentinel int
|
||||||
|
var bork = &sentinel
|
||||||
|
var problem interface{}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Execute a function inside a batch that panics.
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
problem = p
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
err = db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
panic(bork)
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Verify there is no error.
|
||||||
|
if g, e := err, error(nil); g != e {
|
||||||
|
t.Fatalf("wrong error: %v != %v", g, e)
|
||||||
|
}
|
||||||
|
// Verify the panic was captured.
|
||||||
|
if g, e := problem, bork; g != e {
|
||||||
|
t.Fatalf("wrong error: %v != %v", g, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDB_BatchFull(t *testing.T) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
db.MustCreateBucket([]byte("widgets"))
|
||||||
|
|
||||||
|
const size = 3
|
||||||
|
// buffered so we never leak goroutines
|
||||||
|
ch := make(chan error, size)
|
||||||
|
put := func(i int) {
|
||||||
|
ch <- db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
db.MaxBatchSize = size
|
||||||
|
// high enough to never trigger here
|
||||||
|
db.MaxBatchDelay = 1 * time.Hour
|
||||||
|
|
||||||
|
go put(1)
|
||||||
|
go put(2)
|
||||||
|
|
||||||
|
// Give the batch a chance to exhibit bugs.
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// not triggered yet
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatalf("batch triggered too early")
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
go put(3)
|
||||||
|
|
||||||
|
// Check all responses to make sure there's no error.
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
if err := <-ch; err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure data is correct.
|
||||||
|
db.MustView(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("widgets"))
|
||||||
|
for i := 1; i <= size; i++ {
|
||||||
|
if v := b.Get(u64tob(uint64(i))); v == nil {
|
||||||
|
t.Errorf("key not found: %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDB_BatchTime(t *testing.T) {
|
||||||
|
db := NewTestDB()
|
||||||
|
defer db.Close()
|
||||||
|
db.MustCreateBucket([]byte("widgets"))
|
||||||
|
|
||||||
|
const size = 1
|
||||||
|
// buffered so we never leak goroutines
|
||||||
|
ch := make(chan error, size)
|
||||||
|
put := func(i int) {
|
||||||
|
ch <- db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
return tx.Bucket([]byte("widgets")).Put(u64tob(uint64(i)), []byte{})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
db.MaxBatchSize = 1000
|
||||||
|
db.MaxBatchDelay = 0
|
||||||
|
|
||||||
|
go put(1)
|
||||||
|
|
||||||
|
// Batch must trigger by time alone.
|
||||||
|
|
||||||
|
// Check all responses to make sure there's no error.
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
if err := <-ch; err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure data is correct.
|
||||||
|
db.MustView(func(tx *bolt.Tx) error {
|
||||||
|
b := tx.Bucket([]byte("widgets"))
|
||||||
|
for i := 1; i <= size; i++ {
|
||||||
|
if v := b.Get(u64tob(uint64(i))); v == nil {
|
||||||
|
t.Errorf("key not found: %d", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
29
db.go
29
db.go
@ -27,6 +27,12 @@ const magic uint32 = 0xED0CDAED
|
|||||||
// must be synchronzied using the msync(2) syscall.
|
// must be synchronzied using the msync(2) syscall.
|
||||||
const IgnoreNoSync = runtime.GOOS == "openbsd"
|
const IgnoreNoSync = runtime.GOOS == "openbsd"
|
||||||
|
|
||||||
|
// Default values if not set in a DB instance.
|
||||||
|
const (
|
||||||
|
DefaultMaxBatchSize int = 1000
|
||||||
|
DefaultMaxBatchDelay = 10 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
// DB represents a collection of buckets persisted to a file on disk.
|
// DB represents a collection of buckets persisted to a file on disk.
|
||||||
// All data access is performed through transactions which can be obtained through the DB.
|
// All data access is performed through transactions which can be obtained through the DB.
|
||||||
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
|
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
|
||||||
@ -49,6 +55,22 @@ type DB struct {
|
|||||||
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
|
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
|
||||||
NoSync bool
|
NoSync bool
|
||||||
|
|
||||||
|
// MaxBatchSize is the maximum size of a batch. Default value is
|
||||||
|
// copied from DefaultMaxBatchSize in Open.
|
||||||
|
//
|
||||||
|
// If <=0, disables batching.
|
||||||
|
//
|
||||||
|
// Do not change concurrently with calls to Batch.
|
||||||
|
MaxBatchSize int
|
||||||
|
|
||||||
|
// MaxBatchDelay is the maximum delay before a batch starts.
|
||||||
|
// Default value is copied from DefaultMaxBatchDelay in Open.
|
||||||
|
//
|
||||||
|
// If <=0, effectively disables batching.
|
||||||
|
//
|
||||||
|
// Do not change concurrently with calls to Batch.
|
||||||
|
MaxBatchDelay time.Duration
|
||||||
|
|
||||||
path string
|
path string
|
||||||
file *os.File
|
file *os.File
|
||||||
dataref []byte
|
dataref []byte
|
||||||
@ -63,6 +85,9 @@ type DB struct {
|
|||||||
freelist *freelist
|
freelist *freelist
|
||||||
stats Stats
|
stats Stats
|
||||||
|
|
||||||
|
batchMu sync.Mutex
|
||||||
|
batch *batch
|
||||||
|
|
||||||
rwlock sync.Mutex // Allows only one writer at a time.
|
rwlock sync.Mutex // Allows only one writer at a time.
|
||||||
metalock sync.Mutex // Protects meta page access.
|
metalock sync.Mutex // Protects meta page access.
|
||||||
mmaplock sync.RWMutex // Protects mmap access during remapping.
|
mmaplock sync.RWMutex // Protects mmap access during remapping.
|
||||||
@ -99,6 +124,10 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
|
|||||||
options = DefaultOptions
|
options = DefaultOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set default values for later DB operations.
|
||||||
|
db.MaxBatchSize = DefaultMaxBatchSize
|
||||||
|
db.MaxBatchDelay = DefaultMaxBatchDelay
|
||||||
|
|
||||||
// Open data file and separate sync handler for metadata writes.
|
// Open data file and separate sync handler for metadata writes.
|
||||||
db.path = path
|
db.path = path
|
||||||
|
|
||||||
|
28
db_test.go
28
db_test.go
@ -618,6 +618,34 @@ func NewTestDB() *TestDB {
|
|||||||
return &TestDB{db}
|
return &TestDB{db}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MustView executes a read-only function. Panic on error.
|
||||||
|
func (db *TestDB) MustView(fn func(tx *bolt.Tx) error) {
|
||||||
|
if err := db.DB.View(func(tx *bolt.Tx) error {
|
||||||
|
return fn(tx)
|
||||||
|
}); err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustUpdate executes a read-write function. Panic on error.
|
||||||
|
func (db *TestDB) MustUpdate(fn func(tx *bolt.Tx) error) {
|
||||||
|
if err := db.DB.View(func(tx *bolt.Tx) error {
|
||||||
|
return fn(tx)
|
||||||
|
}); err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MustCreateBucket creates a new bucket. Panic on error.
|
||||||
|
func (db *TestDB) MustCreateBucket(name []byte) {
|
||||||
|
if err := db.Update(func(tx *bolt.Tx) error {
|
||||||
|
_, err := tx.CreateBucket([]byte(name))
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes the database and deletes the underlying file.
|
// Close closes the database and deletes the underlying file.
|
||||||
func (db *TestDB) Close() {
|
func (db *TestDB) Close() {
|
||||||
// Log statistics.
|
// Log statistics.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user