diff --git a/simulation_test.go b/simulation_test.go index 70f2396..a96a241 100644 --- a/simulation_test.go +++ b/simulation_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" "testing" bolt "go.etcd.io/bbolt" @@ -47,15 +48,28 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa var mutex sync.Mutex - // Run n threads in parallel, each with their own operation. - var wg sync.WaitGroup - for n := 0; n < round; n++ { - + // Run n threads in parallel, each with their own operation. var threads = make(chan bool, parallelism) + var wg sync.WaitGroup + + // counter for how many goroutines were fired + var opCount int64 + + // counter for ignored operations + var igCount int64 + + var errCh = make(chan error, threadCount) + var i int for { + // this buffered channel will keep accepting booleans + // until it hits the limit defined by the parallelism + // argument to testSimulate() threads <- true + + // this wait group can only be marked "done" from inside + // the subsequent goroutine wg.Add(1) writable := ((rand.Int() % 100) < 20) // 20% writers @@ -70,11 +84,12 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa // Execute a thread for the given operation. go func(writable bool, handler simulateHandler) { defer wg.Done() - + atomic.AddInt64(&opCount, 1) // Start transaction. tx, err := db.Begin(writable) if err != nil { - t.Fatal("tx begin: ", err) + errCh <- fmt.Errorf("error tx begin: %v", err) + return } // Obtain current state of the dataset. @@ -93,7 +108,8 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa mutex.Unlock() if err := tx.Commit(); err != nil { - t.Fatal(err) + errCh <- err + return } }() } else { @@ -102,6 +118,7 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa // Ignore operation if we don't have data yet. if qdb == nil { + atomic.AddInt64(&igCount, 1) return } @@ -113,17 +130,25 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa }(writable, handler) i++ - if i > threadCount { + if i >= threadCount { break } } // Wait until all threads are done. wg.Wait() + t.Logf("transactions:%d ignored:%d", opCount, igCount) + close(errCh) + for err := range errCh { + if err != nil { + t.Fatalf("error from inside goroutine: %v", err) + } + } db.MustClose() db.MustReopen() } + } type simulateHandler func(tx *bolt.Tx, qdb *QuickDB)