test: add test case TestConcurrentRepeatableRead to verify repeatable read

Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
Benjamin Wang 2023-05-24 13:32:11 +08:00
parent 504e7be72c
commit 1eaf75a9d9

View File

@ -1,6 +1,7 @@
package bbolt_test
import (
"bytes"
crand "crypto/rand"
"encoding/hex"
"encoding/json"
@ -9,7 +10,6 @@ import (
mrand "math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
@ -67,7 +67,7 @@ TestConcurrentReadAndWrite verifies:
following reading transactions (with txid >= previous writing txid).
3. The txid should never decrease.
*/
func TestConcurrentReadAndWrite(t *testing.T) {
func TestConcurrentGenericReadAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
@ -216,7 +216,21 @@ func concurrentReadAndWrite(t *testing.T,
func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
f := filepath.Join(t.TempDir(), "db")
t.Logf("Opening bbolt DB at: %s", f)
return mustOpenDB(t, f, o)
}
func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB {
f := db.Path()
t.Logf("CLosing bbolt DB at: %s", f)
err := db.Close()
require.NoError(t, err)
return mustOpenDB(t, f, o)
}
func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB {
t.Logf("Opening bbolt DB at: %s", dbPath)
if o == nil {
o = bolt.DefaultOptions
}
@ -228,7 +242,7 @@ func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
o.FreelistType = freelistType
db, err := bolt.Open(f, 0666, o)
db, err := bolt.Open(dbPath, 0666, o)
require.NoError(t, err)
return db
@ -409,7 +423,7 @@ func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
if !reflect.DeepEqual(initialVal, val) {
if !bytes.Equal(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
@ -713,7 +727,7 @@ func validateSequential(rs historyRecords) error {
} else if rec.OperationType == Delete {
delete(lastWriteKeyValueMap, bk)
} else {
if !reflect.DeepEqual(v.Value, rec.Value) {
if !bytes.Equal(v.Value, rec.Value) {
return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x",
rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value)
}
@ -738,3 +752,180 @@ func validateSequential(rs historyRecords) error {
return nil
}
func TestConcurrentRepeatableRead(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
testCases := []struct {
name string
noFreelistSync bool
freelistType bolt.FreelistType
}{
// [array] freelist
{
name: "sync array freelist",
noFreelistSync: false,
freelistType: bolt.FreelistArrayType,
},
{
name: "not sync array freelist",
noFreelistSync: true,
freelistType: bolt.FreelistArrayType,
},
// [map] freelist
{
name: "sync map freelist",
noFreelistSync: false,
freelistType: bolt.FreelistMapType,
},
{
name: "not sync map freelist",
noFreelistSync: true,
freelistType: bolt.FreelistMapType,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Log("Preparing db.")
var (
bucket = []byte("data")
key = []byte("mykey")
option = &bolt.Options{
PageSize: 4096,
NoFreelistSync: tc.noFreelistSync,
FreelistType: tc.freelistType,
}
)
db := mustCreateDB(t, option)
defer func() {
db.Close()
}()
// Create lots of K/V to allocate some pages
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(bucket)
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
k := fmt.Sprintf("key_%d", i)
if err := b.Put([]byte(k), make([]byte, 1024)); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
// Remove all K/V to create some free pages
err = db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
for i := 0; i < 1000; i++ {
k := fmt.Sprintf("key_%d", i)
if err := b.Delete([]byte(k)); err != nil {
return err
}
}
return b.Put(key, []byte("randomValue"))
})
require.NoError(t, err)
db = mustReOpenDB(t, db, option)
var (
wg sync.WaitGroup
longRunningReaderCount = 10
stopCh = make(chan struct{})
errCh = make(chan error, longRunningReaderCount)
readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond}
writeOperationCountInBetween = 5
writeBytes = bytesRange{10, 20}
testDuration = 10 * time.Second
)
for i := 0; i < longRunningReaderCount; i++ {
readWorkerName := fmt.Sprintf("reader_%d", i)
t.Logf("Starting long running read operation: %s", readWorkerName)
wg.Add(1)
go func() {
defer wg.Done()
rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh)
if rErr != nil {
errCh <- rErr
}
}()
time.Sleep(500 * time.Millisecond)
t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
for j := 0; j < writeOperationCountInBetween; j++ {
_, err := executeWrite(db, bucket, key, writeBytes, 0)
require.NoError(t, err)
}
}
t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data")
wg.Add(1)
go func() {
defer wg.Done()
cnt := longRunningReaderCount * writeOperationCountInBetween
for i := 0; i < cnt; i++ {
select {
case <-stopCh:
return
default:
}
_, err := executeWrite(db, bucket, key, writeBytes, 0)
require.NoError(t, err)
}
}()
t.Log("Waiting for result")
select {
case err := <-errCh:
close(stopCh)
t.Errorf("Detected dirty read: %v", err)
case <-time.After(testDuration):
close(stopCh)
}
wg.Wait()
})
}
}
func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error {
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
initialVal := b.Get(key)
for {
select {
case <-stopCh:
t.Logf("%q finished.", name)
return nil
default:
}
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
if !bytes.Equal(initialVal, val) {
dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
return dirtyReadErr
}
}
})
return err
}