Merge pull request from alrs/fix-test-goroutines

Fix Swallowed goroutine Error Statements
pull/191/head
Gyuho Lee 2019-11-20 13:18:12 -08:00 committed by GitHub
commit 96e95490d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 8 deletions

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
bolt "go.etcd.io/bbolt"
@ -47,15 +48,28 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
var mutex sync.Mutex
// Run n threads in parallel, each with their own operation.
var wg sync.WaitGroup
for n := 0; n < round; n++ {
// Run n threads in parallel, each with their own operation.
var threads = make(chan bool, parallelism)
var wg sync.WaitGroup
// counter for how many goroutines were fired
var opCount int64
// counter for ignored operations
var igCount int64
var errCh = make(chan error, threadCount)
var i int
for {
// this buffered channel will keep accepting booleans
// until it hits the limit defined by the parallelism
// argument to testSimulate()
threads <- true
// this wait group can only be marked "done" from inside
// the subsequent goroutine
wg.Add(1)
writable := ((rand.Int() % 100) < 20) // 20% writers
@ -70,11 +84,12 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
// Execute a thread for the given operation.
go func(writable bool, handler simulateHandler) {
defer wg.Done()
atomic.AddInt64(&opCount, 1)
// Start transaction.
tx, err := db.Begin(writable)
if err != nil {
t.Fatal("tx begin: ", err)
errCh <- fmt.Errorf("error tx begin: %v", err)
return
}
// Obtain current state of the dataset.
@ -93,7 +108,8 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
mutex.Unlock()
if err := tx.Commit(); err != nil {
t.Fatal(err)
errCh <- err
return
}
}()
} else {
@ -102,6 +118,7 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
// Ignore operation if we don't have data yet.
if qdb == nil {
atomic.AddInt64(&igCount, 1)
return
}
@ -113,17 +130,25 @@ func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, pa
}(writable, handler)
i++
if i > threadCount {
if i >= threadCount {
break
}
}
// Wait until all threads are done.
wg.Wait()
t.Logf("transactions:%d ignored:%d", opCount, igCount)
close(errCh)
for err := range errCh {
if err != nil {
t.Fatalf("error from inside goroutine: %v", err)
}
}
db.MustClose()
db.MustReopen()
}
}
type simulateHandler func(tx *bolt.Tx, qdb *QuickDB)