test: improve TestConcurrentReadAndWrite to verify read/write linerizablity

Signed-off-by: Benjamin Wang <wachao@vmware.com>
pull/455/head
Benjamin Wang 2023-04-07 16:07:55 +08:00
parent bc572b8783
commit d142709a73
1 changed files with 214 additions and 50 deletions

View File

@ -3,12 +3,15 @@ package bbolt_test
import (
crand "crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
mrand "math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"
@ -21,6 +24,13 @@ import (
"go.etcd.io/bbolt/internal/common"
)
/*
TestConcurrentReadAndWrite verifies:
1. Repeatable read: a read transaction should always see the same data
view during its lifecycle;
2. Any data written by a writing transaction should be visible to any
following reading transactions (with txid >= previous writing txid).
*/
func TestConcurrentReadAndWrite(t *testing.T) {
bucket := []byte("data")
keys := []string{"key0", "key1", "key2", "key3", "key4", "key5", "key6", "key7", "key8", "key9"}
@ -116,24 +126,19 @@ func concurrentReadAndWrite(t *testing.T,
stopCh := make(chan struct{}, 1)
errCh := make(chan error, readerCount+1)
recordingCh := make(chan historyRecord, readerCount+1)
// start readonly transactions
g := new(errgroup.Group)
for i := 0; i < readerCount; i++ {
reader := &readWorker{
db: db,
bucket: bucket,
keys: keys,
minReadInterval: minReadInterval,
maxReadInterval: maxReadInterval,
errCh: errCh,
stopCh: stopCh,
t: t,
}
g.Go(reader.run)
}
// collect history records
var records historyRecords
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
records = collectHistoryRecords(recordingCh)
}()
// start write transaction
g := new(errgroup.Group)
writer := writeWorker{
db: db,
bucket: bucket,
@ -143,12 +148,30 @@ func concurrentReadAndWrite(t *testing.T,
minWriteInterval: minWriteInterval,
maxWriteInterval: maxWriteInterval,
errCh: errCh,
stopCh: stopCh,
t: t,
recordingCh: recordingCh,
errCh: errCh,
stopCh: stopCh,
t: t,
}
g.Go(writer.run)
// start readonly transactions
for i := 0; i < readerCount; i++ {
reader := &readWorker{
db: db,
bucket: bucket,
keys: keys,
minReadInterval: minReadInterval,
maxReadInterval: maxReadInterval,
recordingCh: recordingCh,
errCh: errCh,
stopCh: stopCh,
t: t,
}
g.Go(reader.run)
}
t.Logf("Keep reading and writing transactions running for about %s.", testDuration)
select {
case <-time.After(testDuration):
@ -161,12 +184,19 @@ func concurrentReadAndWrite(t *testing.T,
t.Errorf("Received error: %v", err)
}
saveDataIfFailed(t, db)
t.Log("Waiting for the history collector to finish.")
close(recordingCh)
wg.Wait()
t.Log("Analyzing the history records.")
if err := analyzeHistoryRecords(records); err != nil {
t.Errorf("The history records are not linearizable:\n %v", err)
}
saveDataIfFailed(t, db, records)
// TODO (ahrtr):
// 1. intentionally inject a random failpoint.
// 2. validate the linearizablity: each reading transaction
// should read the value written by previous writing transaction.
// 2. check db consistency at the end.
}
type readWorker struct {
@ -177,27 +207,29 @@ type readWorker struct {
minReadInterval time.Duration
maxReadInterval time.Duration
errCh chan error
stopCh chan struct{}
recordingCh chan historyRecord
errCh chan error
stopCh chan struct{}
t *testing.T
}
func (reader *readWorker) run() error {
func (r *readWorker) run() error {
for {
select {
case <-reader.stopCh:
reader.t.Log("Reading transaction finished.")
case <-r.stopCh:
r.t.Log("Reading transaction finished.")
return nil
default:
}
err := reader.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(reader.bucket)
err := r.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(r.bucket)
selectedKey := reader.keys[mrand.Intn(len(reader.keys))]
selectedKey := r.keys[mrand.Intn(len(r.keys))]
initialVal := b.Get([]byte(selectedKey))
time.Sleep(randomDurationInRange(reader.minReadInterval, reader.maxReadInterval))
time.Sleep(randomDurationInRange(r.minReadInterval, r.maxReadInterval))
val := b.Get([]byte(selectedKey))
if !reflect.DeepEqual(initialVal, val) {
@ -205,13 +237,23 @@ func (reader *readWorker) run() error {
selectedKey, formatBytes(initialVal), formatBytes(val))
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)
r.recordingCh <- historyRecord{
OperationType: Read,
Key: selectedKey,
Value: clonedVal,
Txid: tx.ID(),
}
return nil
})
if err != nil {
readErr := fmt.Errorf("[reader error]: %w", err)
reader.t.Log(readErr)
reader.errCh <- readErr
r.t.Log(readErr)
r.errCh <- readErr
return readErr
}
}
@ -227,43 +269,55 @@ type writeWorker struct {
maxWriteBytes int
minWriteInterval time.Duration
maxWriteInterval time.Duration
errCh chan error
stopCh chan struct{}
recordingCh chan historyRecord
errCh chan error
stopCh chan struct{}
t *testing.T
}
func (writer *writeWorker) run() error {
func (w *writeWorker) run() error {
for {
select {
case <-writer.stopCh:
writer.t.Log("Writing transaction finished.")
case <-w.stopCh:
w.t.Log("Writing transaction finished.")
return nil
default:
}
err := writer.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(writer.bucket)
err := w.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(w.bucket)
selectedKey := writer.keys[mrand.Intn(len(writer.keys))]
selectedKey := w.keys[mrand.Intn(len(w.keys))]
valueBytes := randomIntInRange(writer.minWriteBytes, writer.maxWriteBytes)
valueBytes := randomIntInRange(w.minWriteBytes, w.maxWriteBytes)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
return b.Put([]byte(selectedKey), v)
putErr := b.Put([]byte(selectedKey), v)
if putErr == nil {
w.recordingCh <- historyRecord{
OperationType: Write,
Key: selectedKey,
Value: v,
Txid: tx.ID(),
}
}
return putErr
})
if err != nil {
writeErr := fmt.Errorf("[writer error]: %w", err)
writer.t.Log(writeErr)
writer.errCh <- writeErr
w.t.Log(writeErr)
w.errCh <- writeErr
return writeErr
}
time.Sleep(randomDurationInRange(writer.minWriteInterval, writer.maxWriteInterval))
time.Sleep(randomDurationInRange(w.minWriteInterval, w.maxWriteInterval))
}
}
@ -285,18 +339,35 @@ func formatBytes(val []byte) string {
return hex.EncodeToString(val)
}
func saveDataIfFailed(t *testing.T, db *btesting.DB) {
func saveDataIfFailed(t *testing.T, db *btesting.DB, rs historyRecords) {
if t.Failed() {
if err := db.Close(); err != nil {
t.Errorf("Failed to close db: %v", err)
}
backupPath := testResultsDirectory(t)
targetFile := filepath.Join(backupPath, "db.bak")
backupDB(t, db, backupPath)
persistHistoryRecords(t, rs, backupPath)
}
}
t.Logf("Saving the DB file to %s", targetFile)
err := common.CopyFile(db.Path(), targetFile)
func backupDB(t *testing.T, db *btesting.DB, path string) {
targetFile := filepath.Join(path, "db.bak")
t.Logf("Saving the DB file to %s", targetFile)
err := common.CopyFile(db.Path(), targetFile)
require.NoError(t, err)
t.Logf("DB file saved to %s", targetFile)
}
func persistHistoryRecords(t *testing.T, rs historyRecords, path string) {
recordFilePath := filepath.Join(path, "history_records.json")
t.Logf("Saving history records to %s", recordFilePath)
recordFile, err := os.OpenFile(recordFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
require.NoError(t, err)
defer recordFile.Close()
encoder := json.NewEncoder(recordFile)
for _, rec := range rs {
err := encoder.Encode(rec)
require.NoError(t, err)
t.Logf("DB file saved to %s", targetFile)
}
}
@ -321,3 +392,96 @@ func testResultsDirectory(t *testing.T) string {
return path
}
/*
*********************************************************
Data structure and functions for analyzing history records
*********************************************************
*/
type OperationType string
const (
Read OperationType = "read"
Write OperationType = "write"
)
type historyRecord struct {
OperationType OperationType `json:"operationType,omitempty"`
Txid int `json:"txid,omitempty"`
Key string `json:"key,omitempty"`
Value []byte `json:"value,omitempty"`
}
type historyRecords []historyRecord
func (rs historyRecords) Len() int {
return len(rs)
}
func (rs historyRecords) Less(i, j int) bool {
// Sorted by key firstly: all records with the same key are grouped together.
keyCmp := strings.Compare(rs[i].Key, rs[j].Key)
if keyCmp != 0 {
return keyCmp < 0
}
// Sorted by txid
if rs[i].Txid != rs[j].Txid {
return rs[i].Txid < rs[j].Txid
}
// Sorted by workerType: put writer before reader if they have the same txid.
if rs[i].OperationType == Write {
return true
}
return false
}
func (rs historyRecords) Swap(i, j int) {
rs[i], rs[j] = rs[j], rs[i]
}
func collectHistoryRecords(recordingCh chan historyRecord) historyRecords {
var rs historyRecords
for record := range recordingCh {
rs = append(rs, record)
}
sort.Sort(rs)
return rs
}
func analyzeHistoryRecords(rs historyRecords) error {
lastWriteKeyValueMap := make(map[string]*historyRecord)
for _, rec := range rs {
if v, ok := lastWriteKeyValueMap[rec.Key]; ok {
if rec.OperationType == Write {
v.Value = rec.Value
v.Txid = rec.Txid
} else {
if !reflect.DeepEqual(v.Value, rec.Value) {
return fmt.Errorf("reader[txid: %d, key: %s] read %x, \nbut writer[txid: %d, key: %s] wrote %x",
rec.Txid, rec.Key, rec.Value,
v.Txid, v.Key, v.Value)
}
}
} else {
if rec.OperationType == Write {
lastWriteKeyValueMap[rec.Key] = &historyRecord{
OperationType: Write,
Key: rec.Key,
Value: rec.Value,
Txid: rec.Txid,
}
} else {
if len(rec.Value) != 0 {
return fmt.Errorf("expected the first reader[txid: %d, key: %s] read nil, \nbut got %x",
rec.Txid, rec.Key, rec.Value)
}
}
}
}
return nil
}