diff --git a/bench.go b/bench.go index a93a27c..e41c260 100644 --- a/bench.go +++ b/bench.go @@ -3,7 +3,6 @@ package bolt import ( "errors" "fmt" - "os" "sync" "testing" ) @@ -16,29 +15,31 @@ const ( ) type Benchmark struct { - InputPath string + db *DB ReadWriteMode string TraversalPattern string Parallelism int } -func NewBenchmark(inputPath, readWriteMode, traversalPattern string, parallelism int) *Benchmark { - return &Benchmark{inputPath, readWriteMode, traversalPattern, parallelism} +func NewBenchmark(db *DB, readWriteMode, traversalPattern string, parallelism int) *Benchmark { + return &Benchmark{db, readWriteMode, traversalPattern, parallelism} } func (bm *Benchmark) Run(b *testing.B) { - // Open the database. - db, err := Open(bm.InputPath, 0600) + // Read buckets and keys before benchmark begins so we don't knew the + // results. + buckets, err := buckets(bm.db) if err != nil { b.Fatalf("error: %+v", err) - return } - defer db.Close() - - buckets, err := buckets(db, bm.InputPath) - if err != nil { - b.Fatalf("error: %+v", err) + bucketsWithKeys := make(map[string][]string) + for _, bucket := range buckets { + keys, err := keys(bm.db, bucket) + if err != nil { + b.Fatalf("error: %+v", err) + } + bucketsWithKeys[bucket] = keys } b.ResetTimer() @@ -50,7 +51,7 @@ func (bm *Benchmark) Run(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if err := bm.runBuckets(b, db, buckets); err != nil { + if err := bm.runBuckets(b, bm.db, bucketsWithKeys); err != nil { b.Fatalf("error: %+v", err) } }() @@ -60,30 +61,39 @@ func (bm *Benchmark) Run(b *testing.B) { } // Run benchmark(s) for each of the given buckets. -func (bm *Benchmark) runBuckets(b *testing.B, db *DB, buckets []string) error { +func (bm *Benchmark) runBuckets(b *testing.B, db *DB, bucketsWithKeys map[string][]string) error { return db.View(func(tx *Tx) error { - bucketsCount := len(buckets) - for _, bucket := range buckets { - c := tx.Bucket([]byte(bucket)).Cursor() - count := 0 - for k, _ := c.First(); k != nil; k, _ = c.Next() { - count++ - } - if count != bucketsCount { - return errors.New(fmt.Sprintf("wrong count: %d; expected: %d", count, bucketsCount)) + bucketsCount := len(bucketsWithKeys) + count := 0 + for bucket, keys := range bucketsWithKeys { + bucket := tx.Bucket([]byte(bucket)) + if err := bm.runKeys(b, bucket, keys); err != nil { + return err } + count++ + } + if count != bucketsCount { + return errors.New(fmt.Sprintf("wrong count: %d; expected: %d", count, bucketsCount)) } return nil }) } -func buckets(db *DB, path string) ([]string, error) { - if _, err := os.Stat(path); os.IsNotExist(err) { - return nil, err +func (bm *Benchmark) runKeys(b *testing.B, bucket *Bucket, keys []string) error { + c := bucket.Cursor() + keysCount := len(keys) + count := 0 + for k, _ := c.First(); k != nil; k, _ = c.Next() { + count++ } + if count != keysCount { + return errors.New(fmt.Sprintf("wrong count: %d; expected: %d", count, keysCount)) + } + return nil +} +func buckets(db *DB) ([]string, error) { buckets := []string{} - err := db.View(func(tx *Tx) error { // Iterate over each bucket. return tx.ForEach(func(name []byte, _ *Bucket) error { @@ -91,6 +101,23 @@ func buckets(db *DB, path string) ([]string, error) { return nil }) }) - return buckets, err } + +func keys(db *DB, bucket string) ([]string, error) { + keys := []string{} + err := db.View(func(tx *Tx) error { + // Find bucket. + b := tx.Bucket([]byte(bucket)) + if b == nil { + return errors.New(fmt.Sprintf("bucket %+v not found", b)) + } + + // Iterate over each key. + return b.ForEach(func(key, _ []byte) error { + keys = append(keys, string(key)) + return nil + }) + }) + return keys, err +} diff --git a/bench/bench.go b/bench/bench.go deleted file mode 100644 index 3bdd43a..0000000 --- a/bench/bench.go +++ /dev/null @@ -1,42 +0,0 @@ -package bench - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" -) - -type bucketItems map[string]string -type buckets map[string]bucketItems - -type Benchmark struct { - buckets buckets -} - -func New(filePath string) (*Benchmark, error) { - data := readFromFile(filePath) -} - -func readFromFile(filePath string) (*Benchmark, error) { - if _, err := os.Stat(filePath); os.IsNotExist(err) { - return nil, err - } - - file, err := ioutil.ReadFile(filePath) - if err != nil { - return nil, err - } - - b := new(Benchmark) - if err := json.Unmarshal(file, &b.buckets); err != nil { - return nil, err - } - - return b, nil -} - -func (b *Benchmark) Run() error { - fmt.Println("Do things, run benchmarks, tell people...") - return nil -} diff --git a/cmd/bolt/bench.go b/cmd/bolt/bench.go index 28cbbc3..f894a70 100644 --- a/cmd/bolt/bench.go +++ b/cmd/bolt/bench.go @@ -35,7 +35,15 @@ func Bench(inputPath string, readWriteMode string, traversalPattern string, para // benchmarks for getting all keys - b := bolt.NewBenchmark(inputPath, readWriteMode, traversalPattern, parallelism) + // Open the database. + db, err := bolt.Open(inputPath, 0600) + if err != nil { + fatalf("error: %+v", err) + return + } + defer db.Close() + + b := bolt.NewBenchmark(db, readWriteMode, traversalPattern, parallelism) result := testing.Benchmark(b.Run) diff --git a/cmd/bolt/generate.go b/cmd/bolt/generate.go index ecd391e..15edb27 100644 --- a/cmd/bolt/generate.go +++ b/cmd/bolt/generate.go @@ -1,32 +1,55 @@ package main import ( - "bufio" "fmt" - "os" - "strings" + + "github.com/boltdb/bolt" ) // Generate data for benchmarks. -func Generate(numEvents int, destPath string) { - f, err := os.Create(destPath) +func Generate(destPath string, numBuckets, numItems int) { + + // Open the database. + db, err := bolt.Open(destPath, 0600) + if err != nil { + fatalf("open db:", err) + return + } + defer db.Close() + + for bucketIndex := 0; bucketIndex < numBuckets; bucketIndex++ { + bucketName := fmt.Sprintf("bucket%03d", bucketIndex) + + err = db.Update(func(tx *bolt.Tx) error { + + // Create the bucket if it doesn't exist. + if err := tx.CreateBucketIfNotExists([]byte(bucketName)); err != nil { + fatalf("create bucket: %s", err) + return nil + } + + // Find bucket. + b := tx.Bucket([]byte(bucketName)) + if b == nil { + fatalf("bucket not found: %s", bucketName) + return nil + } + + for i := 0; i < numItems; i++ { + key := fmt.Sprintf("key%03d", i) + value := fmt.Sprintf("value%03d", i) + + // Set value for a given key. + if err := b.Put([]byte(key), []byte(value)); err != nil { + return err + } + } + + return nil + }) + } if err != nil { fatal(err) - } - defer func() { - if err := f.Close(); err != nil { - fatal(err) - } - }() - w := bufio.NewWriter(f) - - for i := 0; i < numEvents; i++ { - if _, err := w.Write([]byte(fmt.Sprintf("key%d:%s\n", i, strings.Repeat("0", 64)))); err != nil { - fatal(err) - } - } - - if err = w.Flush(); err != nil { - fatal(err) + return } } diff --git a/cmd/bolt/generate_test.go b/cmd/bolt/generate_test.go deleted file mode 100644 index 06ab7d0..0000000 --- a/cmd/bolt/generate_test.go +++ /dev/null @@ -1 +0,0 @@ -package main diff --git a/cmd/bolt/keys.go b/cmd/bolt/keys.go index 65b717f..6affefe 100644 --- a/cmd/bolt/keys.go +++ b/cmd/bolt/keys.go @@ -1,8 +1,6 @@ package main import ( - "errors" - "fmt" "os" "github.com/boltdb/bolt" @@ -10,44 +8,34 @@ import ( // Keys retrieves a list of keys for a given bucket. func Keys(path, name string) { - keys, err := keys(path, name) - - if err != nil { + if _, err := os.Stat(path); os.IsNotExist(err) { fatal(err) return } - for _, key := range keys { - println(key) - } -} - -func keys(path, name string) ([]string, error) { - if _, err := os.Stat(path); os.IsNotExist(err) { - return nil, err - } - db, err := bolt.Open(path, 0600) if err != nil { - return nil, err + fatal(err) + return } defer db.Close() - keys := []string{} - err = db.View(func(tx *bolt.Tx) error { // Find bucket. b := tx.Bucket([]byte(name)) if b == nil { - return errors.New(fmt.Sprintf("bucket %+v not found", b)) + fatalf("bucket not found: %s", name) + return nil } // Iterate over each key. return b.ForEach(func(key, _ []byte) error { - keys = append(keys, string(key)) + println(string(key)) return nil }) }) - - return keys, err + if err != nil { + fatal(err) + return + } } diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go index 19df59f..ac71631 100644 --- a/cmd/bolt/main.go +++ b/cmd/bolt/main.go @@ -95,12 +95,16 @@ func NewApp() *cli.App { Name: "generate", Usage: "Generate data for benchmarks", Action: func(c *cli.Context) { - numEvents, err := strconv.Atoi(c.Args().Get(0)) + destPath := c.Args().Get(0) + numBuckets, err := strconv.Atoi(c.Args().Get(1)) if err != nil { fatal(err) } - destPath := c.Args().Get(1) - Generate(numEvents, destPath) + numItems, err := strconv.Atoi(c.Args().Get(2)) + if err != nil { + fatal(err) + } + Generate(destPath, numBuckets, numItems) }, }, { diff --git a/cmd/bolt/set.go b/cmd/bolt/set.go index c757d27..f4a4696 100644 --- a/cmd/bolt/set.go +++ b/cmd/bolt/set.go @@ -21,11 +21,6 @@ func Set(path, name, key, value string) { defer db.Close() err = db.Update(func(tx *bolt.Tx) error { - // Create the bucket if it doesn't exist. - if err := tx.CreateBucketIfNotExists([]byte(name)); err != nil { - fatalf("create bucket: %s", err) - return nil - } // Find bucket. b := tx.Bucket([]byte(name)) diff --git a/tx_test.go b/tx_test.go index ff0e1e4..17498ed 100644 --- a/tx_test.go +++ b/tx_test.go @@ -266,6 +266,123 @@ func TestTx_OnCommit_Rollback(t *testing.T) { assert.Equal(t, 0, x) } +func BenchmarkReadSequential_1Buckets_1Items_1Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1, 1) +} +func BenchmarkReadSequential_1Buckets_10Items_1Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10, 1) +} +func BenchmarkReadSequential_1Buckets_100Items_1Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 100, 1) +} +func BenchmarkReadSequential_1Buckets_1000Items_1Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1000, 1) +} +func BenchmarkReadSequential_1Buckets_10000Items_1Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10000, 1) +} + +func BenchmarkReadSequential_1Buckets_1Items_10Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1, 10) +} +func BenchmarkReadSequential_1Buckets_10Items_10Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10, 10) +} +func BenchmarkReadSequential_1Buckets_100Items_10Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 100, 10) +} +func BenchmarkReadSequential_1Buckets_1000Items_10Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1000, 10) +} +func BenchmarkReadSequential_1Buckets_10000Items_10Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10000, 10) +} + +func BenchmarkReadSequential_1Buckets_1Items_100Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1, 100) +} +func BenchmarkReadSequential_1Buckets_10Items_100Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10, 100) +} +func BenchmarkReadSequential_1Buckets_100Items_100Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 100, 100) +} +func BenchmarkReadSequential_1Buckets_1000Items_100Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1000, 100) +} +func BenchmarkReadSequential_1Buckets_10000Items_100Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10000, 100) +} + +func BenchmarkReadSequential_1Buckets_1Items_1000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1, 1000) +} +func BenchmarkReadSequential_1Buckets_10Items_1000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10, 1000) +} +func BenchmarkReadSequential_1Buckets_100Items_1000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 100, 1000) +} +func BenchmarkReadSequential_1Buckets_1000Items_1000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1000, 1000) +} +func BenchmarkReadSequential_1Buckets_10000Items_1000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10000, 1000) +} + +func BenchmarkReadSequential_1Buckets_1Items_10000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1, 10000) +} +func BenchmarkReadSequential_1Buckets_10Items_10000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10, 10000) +} +func BenchmarkReadSequential_1Buckets_100Items_10000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 100, 10000) +} +func BenchmarkReadSequential_1Buckets_1000Items_10000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 1000, 10000) +} +func BenchmarkReadSequential_1Buckets_10000Items_10000Concurrency(b *testing.B) { + benchmarkReadSequential(b, 1, 10000, 10000) +} + +func benchmark(b *testing.B, readWriteMode, traversalPattern string, numBuckets, numItemsPerBucket, parallelism int) { + withOpenDB(func(db *DB, path string) { + if err := generateDB(db, numBuckets, numItemsPerBucket); err != nil { + b.Fatal(err) + } + NewBenchmark(db, readWriteMode, traversalPattern, parallelism).Run(b) + }) +} + +func benchmarkRead(b *testing.B, traversalPattern string, numBuckets, numItemsPerBucket, parallelism int) { + benchmark(b, BenchReadMode, traversalPattern, numBuckets, numItemsPerBucket, parallelism) +} + +func benchmarkReadSequential(b *testing.B, numBuckets, numItemsPerBucket, parallelism int) { + benchmark(b, BenchReadMode, BenchSequentialTraversal, numBuckets, numItemsPerBucket, parallelism) +} + +func benchmarkReadRandom(b *testing.B, numBuckets, numItemsPerBucket, parallelism int) { + benchmark(b, BenchReadMode, BenchRandomTraversal, numBuckets, numItemsPerBucket, parallelism) +} + +// Generate and write data to specified number of buckets/items. +func generateDB(db *DB, numBuckets, numItemsPerBucket int) error { + return db.Update(func(tx *Tx) error { + for bucketIndex := 0; bucketIndex < numBuckets; bucketIndex++ { + bucketName := fmt.Sprintf("bucket%08d") + tx.CreateBucket([]byte(bucketName)) + bucket := tx.Bucket([]byte(bucketName)) + for i := 0; i < numItemsPerBucket; i++ { + value := []byte(strings.Repeat("0", 100)) + bucket.Put([]byte(fmt.Sprintf("key%08d", i)), value) + } + } + return nil + }) +} + // Benchmark the performance iterating over a cursor. func BenchmarkTxCursor1(b *testing.B) { benchmarkTxCursor(b, 1) } func BenchmarkTxCursor10(b *testing.B) { benchmarkTxCursor(b, 10) } @@ -365,47 +482,6 @@ func benchmarkTxPutSequential(b *testing.B, total int) { }) } -// func BenchmarkParallel_1items_1threads(b *testing.B) { benchmarkParallel(1, 1) } -// func BenchmarkParallel_1items_10threads(b *testing.B) { benchmarkParallel(1, 10) } -// func BenchmarkParallel_1items_100threads(b *testing.B) { benchmarkParallel(1, 100) } -// func BenchmarkParallel_1items_1000threads(b *testing.B) { benchmarkParallel(1, 1000) } - -// func BenchmarkParallel_10items_1threads(b *testing.B) { benchmarkParallel(10, 1) } -// func BenchmarkParallel_10items_10threads(b *testing.B) { benchmarkParallel(10, 10) } -// func BenchmarkParallel_10items_100threads(b *testing.B) { benchmarkParallel(10, 100) } -// func BenchmarkParallel_10items_1000threads(b *testing.B) { benchmarkParallel(10, 1000) } - -// func BenchmarkParallel_100items_1threads(b *testing.B) { benchmarkParallel(100, 1) } -// func BenchmarkParallel_100items_10threads(b *testing.B) { benchmarkParallel(100, 10) } -// func BenchmarkParallel_100items_100threads(b *testing.B) { benchmarkParallel(100, 100) } -// func BenchmarkParallel_100items_1000threads(b *testing.B) { benchmarkParallel(100, 1000) } - -// func BenchmarkParallel_1000items_1threads(b *testing.B) { benchmarkParallel(1000, 1) } -// func BenchmarkParallel_1000items_10threads(b *testing.B) { benchmarkParallel(1000, 10) } -// func BenchmarkParallel_1000items_100threads(b *testing.B) { benchmarkParallel(1000, 100) } -// func BenchmarkParallel_1000items_1000threads(b *testing.B) { benchmarkParallel(1000, 1000) } - -// func benchmarkParallel(b *testing.B, itemCount, parallelism int) { -// // Setup database. -// for i := 0; i < itemCount; i++ { -// // ... insert key/values here ... -// } -// b.ResetTimer() - -// // Keep running a fixed number of parallel reads until we run out of time. -// for i := 0; i < b.N; i++ { -// var wg sync.WaitGroup -// for j := 0; j < parallelism; j++ { -// wg.Add(1) -// go func() { -// // ... execute read here ... -// wg.Done() -// }() -// } -// wg.Wait() -// } -// } - func ExampleTx_Rollback() { // Open the database. db, _ := Open(tempfile(), 0666)