Merge pull request #429 from tjungblu/fix_428

fixing small bench errors
This commit is contained in:
Benjamin Wang 2023-03-18 12:53:52 +08:00 committed by GitHub
commit 0c2c0a257a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 122 additions and 25 deletions

View File

@ -15,6 +15,7 @@ import (
"runtime/pprof" "runtime/pprof"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"unicode" "unicode"
"unicode/utf8" "unicode/utf8"
@ -1091,13 +1092,13 @@ func (cmd *benchCommand) Run(args ...string) error {
defer db.Close() defer db.Close()
// Write to the database. // Write to the database.
writeResults := BenchResults{int64(0), 0} var writeResults BenchResults
fmt.Fprintf(cmd.Stderr, "starting write benchmark.\n") fmt.Fprintf(cmd.Stderr, "starting write benchmark.\n")
if err := cmd.runWrites(db, options, &writeResults); err != nil { if err := cmd.runWrites(db, options, &writeResults); err != nil {
return fmt.Errorf("write: %v", err) return fmt.Errorf("write: %v", err)
} }
readResults := BenchResults{int64(0), 0} var readResults BenchResults
fmt.Fprintf(cmd.Stderr, "starting read benchmark.\n") fmt.Fprintf(cmd.Stderr, "starting read benchmark.\n")
// Read from the database. // Read from the database.
if err := cmd.runReads(db, options, &readResults); err != nil { if err := cmd.runReads(db, options, &readResults); err != nil {
@ -1105,9 +1106,9 @@ func (cmd *benchCommand) Run(args ...string) error {
} }
// Print results. // Print results.
fmt.Fprintf(os.Stderr, "# Write\t%v(ops)\t%v\t(%v/op)\t(%v op/sec)\n", writeResults.CompletedOps, writeResults.Duration, writeResults.OpDuration(), writeResults.OpsPerSecond()) fmt.Fprintf(cmd.Stderr, "# Write\t%v(ops)\t%v\t(%v/op)\t(%v op/sec)\n", writeResults.CompletedOps(), writeResults.Duration(), writeResults.OpDuration(), writeResults.OpsPerSecond())
fmt.Fprintf(os.Stderr, "# Read\t%v(ops)\t%v\t(%v/op)\t(%v op/sec)\n", readResults.CompletedOps, readResults.Duration, readResults.OpDuration(), readResults.OpsPerSecond()) fmt.Fprintf(cmd.Stderr, "# Read\t%v(ops)\t%v\t(%v/op)\t(%v op/sec)\n", readResults.CompletedOps(), readResults.Duration(), readResults.OpDuration(), readResults.OpsPerSecond())
fmt.Fprintln(os.Stderr, "") fmt.Fprintln(cmd.Stderr, "")
return nil return nil
} }
@ -1186,7 +1187,7 @@ func (cmd *benchCommand) runWrites(db *bolt.DB, options *BenchOptions, results *
} }
// Save time to write. // Save time to write.
results.Duration = time.Since(t) results.SetDuration(time.Since(t))
// Stop profiling for writes only. // Stop profiling for writes only.
if options.ProfileMode == "w" { if options.ProfileMode == "w" {
@ -1239,7 +1240,7 @@ func (cmd *benchCommand) runWritesWithSource(db *bolt.DB, options *BenchOptions,
}); err != nil { }); err != nil {
return err return err
} else { } else {
results.CompletedOps += options.BatchSize results.AddCompletedOps(options.BatchSize)
} }
} }
return nil return nil
@ -1282,7 +1283,7 @@ func (cmd *benchCommand) runWritesNestedWithSource(db *bolt.DB, options *BenchOp
}); err != nil { }); err != nil {
return err return err
} else { } else {
results.CompletedOps += options.BatchSize results.AddCompletedOps(options.BatchSize)
} }
} }
return nil return nil
@ -1315,7 +1316,7 @@ func (cmd *benchCommand) runReads(db *bolt.DB, options *BenchOptions, results *B
} }
// Save read time. // Save read time.
results.Duration = time.Since(t) results.SetDuration(time.Since(t))
// Stop profiling for reads. // Stop profiling for reads.
if options.ProfileMode == "rw" || options.ProfileMode == "r" { if options.ProfileMode == "rw" || options.ProfileMode == "r" {
@ -1330,19 +1331,21 @@ func (cmd *benchCommand) runReadsSequential(db *bolt.DB, options *BenchOptions,
t := time.Now() t := time.Now()
for { for {
numReads := int64(0)
c := tx.Bucket(benchBucketName).Cursor() c := tx.Bucket(benchBucketName).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() { for k, v := c.First(); k != nil; k, v = c.Next() {
numReads++
if v == nil { if v == nil {
return errors.New("invalid value") return errors.New("invalid value")
} }
results.CompletedOps++
} }
if options.WriteMode == "seq" && results.CompletedOps != options.Iterations { if options.WriteMode == "seq" && numReads != options.Iterations {
return fmt.Errorf("read seq: iter mismatch: expected %d, got %d", options.Iterations, results.CompletedOps) return fmt.Errorf("read seq: iter mismatch: expected %d, got %d", options.Iterations, numReads)
} }
results.AddCompletedOps(numReads)
// Make sure we do this for at least a second. // Make sure we do this for at least a second.
if time.Since(t) >= time.Second { if time.Since(t) >= time.Second {
break break
@ -1358,15 +1361,16 @@ func (cmd *benchCommand) runReadsSequentialNested(db *bolt.DB, options *BenchOpt
t := time.Now() t := time.Now()
for { for {
numReads := int64(0)
var top = tx.Bucket(benchBucketName) var top = tx.Bucket(benchBucketName)
if err := top.ForEach(func(name, _ []byte) error { if err := top.ForEach(func(name, _ []byte) error {
if b := top.Bucket(name); b != nil { if b := top.Bucket(name); b != nil {
c := b.Cursor() c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() { for k, v := c.First(); k != nil; k, v = c.Next() {
numReads++
if v == nil { if v == nil {
return ErrInvalidValue return ErrInvalidValue
} }
results.CompletedOps++
} }
} }
return nil return nil
@ -1374,10 +1378,12 @@ func (cmd *benchCommand) runReadsSequentialNested(db *bolt.DB, options *BenchOpt
return err return err
} }
if options.WriteMode == "seq-nest" && results.CompletedOps != options.Iterations { if options.WriteMode == "seq-nest" && numReads != options.Iterations {
return fmt.Errorf("read seq-nest: iter mismatch: expected %d, got %d", options.Iterations, results.CompletedOps) return fmt.Errorf("read seq-nest: iter mismatch: expected %d, got %d", options.Iterations, numReads)
} }
results.AddCompletedOps(numReads)
// Make sure we do this for at least a second. // Make sure we do this for at least a second.
if time.Since(t) >= time.Second { if time.Since(t) >= time.Second {
break break
@ -1396,7 +1402,7 @@ func checkProgress(results *BenchResults, finishChan chan interface{}, stderr io
case <-finishChan: case <-finishChan:
return return
case t := <-ticker: case t := <-ticker:
completed, taken := results.CompletedOps, t.Sub(lastTime) completed, taken := results.CompletedOps(), t.Sub(lastTime)
fmt.Fprintf(stderr, "Completed %d requests, %d/s \n", fmt.Fprintf(stderr, "Completed %d requests, %d/s \n",
completed, ((completed-lastCompleted)*int64(time.Second))/int64(taken), completed, ((completed-lastCompleted)*int64(time.Second))/int64(taken),
) )
@ -1494,18 +1500,47 @@ type BenchOptions struct {
Path string Path string
} }
// BenchResults represents the performance results of the benchmark. // BenchResults represents the performance results of the benchmark and is thread-safe.
type BenchResults struct { type BenchResults struct {
CompletedOps int64 m sync.Mutex
Duration time.Duration completedOps int64
duration time.Duration
}
func (r *BenchResults) AddCompletedOps(amount int64) {
r.m.Lock()
defer r.m.Unlock()
r.completedOps += amount
}
func (r *BenchResults) CompletedOps() int64 {
r.m.Lock()
defer r.m.Unlock()
return r.completedOps
}
func (r *BenchResults) SetDuration(dur time.Duration) {
r.m.Lock()
defer r.m.Unlock()
r.duration = dur
}
func (r *BenchResults) Duration() time.Duration {
r.m.Lock()
defer r.m.Unlock()
return r.duration
} }
// Returns the duration for a single read/write operation. // Returns the duration for a single read/write operation.
func (r *BenchResults) OpDuration() time.Duration { func (r *BenchResults) OpDuration() time.Duration {
if r.CompletedOps == 0 { if r.CompletedOps() == 0 {
return 0 return 0
} }
return r.Duration / time.Duration(r.CompletedOps) return r.Duration() / time.Duration(r.CompletedOps())
} }
// Returns average number of read/write operations that can be performed per second. // Returns average number of read/write operations that can be performed per second.

View File

@ -9,6 +9,8 @@ import (
"math/rand" "math/rand"
"os" "os"
"strconv" "strconv"
"strings"
"sync"
"testing" "testing"
"go.etcd.io/bbolt/internal/btesting" "go.etcd.io/bbolt/internal/btesting"
@ -286,12 +288,72 @@ func TestPagesCommand_Run(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
// Ensure the "bench" command runs and exits without errors
func TestBenchCommand_Run(t *testing.T) {
tests := map[string]struct {
args []string
}{
"no-args": {},
"100k count": {[]string{"-count", "100000"}},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
// Run the command.
m := NewMain()
args := append([]string{"bench"}, test.args...)
if err := m.Run(args...); err != nil {
t.Fatal(err)
}
stderr := m.Stderr.String()
if !strings.Contains(stderr, "starting write benchmark.") || !strings.Contains(stderr, "starting read benchmark.") {
t.Fatal(fmt.Errorf("benchmark result does not contain read/write start output:\n%s", stderr))
}
if strings.Contains(stderr, "iter mismatch") {
t.Fatal(fmt.Errorf("found iter mismatch in stdout:\n%s", stderr))
}
if !strings.Contains(stderr, "# Write") || !strings.Contains(stderr, "# Read") {
t.Fatal(fmt.Errorf("benchmark result does not contain read/write output:\n%s", stderr))
}
})
}
}
type ConcurrentBuffer struct {
m sync.Mutex
buf bytes.Buffer
}
func (b *ConcurrentBuffer) Read(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.buf.Read(p)
}
func (b *ConcurrentBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.buf.Write(p)
}
func (b *ConcurrentBuffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.buf.String()
}
// Main represents a test wrapper for main.Main that records output. // Main represents a test wrapper for main.Main that records output.
type Main struct { type Main struct {
*main.Main *main.Main
Stdin bytes.Buffer Stdin ConcurrentBuffer
Stdout bytes.Buffer Stdout ConcurrentBuffer
Stderr bytes.Buffer Stderr ConcurrentBuffer
} }
// NewMain returns a new instance of Main. // NewMain returns a new instance of Main.