test: refactor worker/operation and support Delete operation

Signed-off-by: Benjamin Wang <wachao@vmware.com>
pull/469/head
Benjamin Wang 2023-04-23 10:17:27 +08:00
parent c62e1b7dfd
commit 0ddf0fb8ff
1 changed files with 206 additions and 237 deletions

View File

@ -34,10 +34,16 @@ type bytesRange struct {
max int
}
type operationChance struct {
operation OperationType
chance int
}
type concurrentConfig struct {
readTime duration
writeTime duration
writeBytes bytesRange
workInterval duration
operationRatio []operationChance
readInterval duration // only used by readOpeartion
writeBytes bytesRange // only used by writeOperation
}
/*
@ -54,94 +60,60 @@ func TestConcurrentReadAndWrite(t *testing.T) {
}
bucket := []byte("data")
keys := []string{"key0", "key1", "key2", "key3", "key4", "key5", "key6", "key7", "key8", "key9"}
conf := concurrentConfig{
workInterval: duration{
min: 5 * time.Millisecond,
max: 100 * time.Millisecond,
},
operationRatio: []operationChance{
{operation: Read, chance: 60},
{operation: Write, chance: 20},
{operation: Delete, chance: 20},
},
readInterval: duration{
min: 50 * time.Millisecond,
max: 100 * time.Millisecond,
},
writeBytes: bytesRange{
min: 200,
max: 16000,
},
}
testCases := []struct {
name string
readerCount int
writerCount int
workerCount int
conf concurrentConfig
testDuration time.Duration
}{
{
name: "1 reader",
readerCount: 1,
writerCount: 1,
conf: concurrentConfig{
readTime: duration{
min: 50 * time.Millisecond,
max: 100 * time.Millisecond,
},
writeTime: duration{
min: 10 * time.Millisecond,
max: 20 * time.Millisecond,
},
writeBytes: bytesRange{
min: 200,
max: 8000,
},
},
name: "1 worker",
workerCount: 1,
conf: conf,
testDuration: 30 * time.Second,
},
{
name: "10 readers",
readerCount: 10,
writerCount: 2,
conf: concurrentConfig{
readTime: duration{
min: 50 * time.Millisecond,
max: 100 * time.Millisecond,
},
writeTime: duration{
min: 10 * time.Millisecond,
max: 20 * time.Millisecond,
},
writeBytes: bytesRange{
min: 200,
max: 8000,
},
},
name: "10 workers",
workerCount: 10,
conf: conf,
testDuration: 30 * time.Second,
},
{
name: "50 readers",
readerCount: 50,
writerCount: 10,
conf: concurrentConfig{
readTime: duration{
min: 50 * time.Millisecond,
max: 100 * time.Millisecond,
},
writeTime: duration{
min: 10 * time.Millisecond,
max: 20 * time.Millisecond,
},
writeBytes: bytesRange{
min: 500,
max: 8000,
},
},
name: "50 workers",
workerCount: 50,
conf: conf,
testDuration: 30 * time.Second,
},
{
name: "100 readers",
readerCount: 100,
writerCount: 20,
conf: concurrentConfig{
readTime: duration{
min: 50 * time.Millisecond,
max: 100 * time.Millisecond,
},
writeTime: duration{
min: 10 * time.Millisecond,
max: 20 * time.Millisecond,
},
writeBytes: bytesRange{
min: 500,
max: 8000,
},
},
name: "100 workers",
workerCount: 100,
conf: conf,
testDuration: 30 * time.Second,
},
{
name: "200 workers",
workerCount: 200,
conf: conf,
testDuration: 30 * time.Second,
},
}
@ -152,8 +124,7 @@ func TestConcurrentReadAndWrite(t *testing.T) {
concurrentReadAndWrite(t,
bucket,
keys,
tc.readerCount,
tc.writerCount,
tc.workerCount,
tc.conf,
tc.testDuration)
})
@ -163,8 +134,7 @@ func TestConcurrentReadAndWrite(t *testing.T) {
func concurrentReadAndWrite(t *testing.T,
bucket []byte,
keys []string,
readerCount int,
writerCount int,
workerCount int,
conf concurrentConfig,
testDuration time.Duration) {
@ -179,8 +149,7 @@ func concurrentReadAndWrite(t *testing.T,
t.Log("Starting workers.")
records := runWorkers(t,
db, bucket, keys,
readerCount,
writerCount,
workerCount,
conf,
testDuration)
@ -198,80 +167,55 @@ func concurrentReadAndWrite(t *testing.T,
/*
*********************************************************
Data structures and functions/methods for running
concurrent workers, including reading and writing workers
Data structures and functions/methods for running concurrent
workers, which execute different operations, including `Read`,
`Write` and `Delete`.
*********************************************************
*/
func runWorkers(t *testing.T,
db *btesting.DB,
bucket []byte,
keys []string,
readerCount int,
writerCount int,
workerCount int,
conf concurrentConfig,
testDuration time.Duration) historyRecords {
stopCh := make(chan struct{}, 1)
errCh := make(chan error, readerCount+1)
errCh := make(chan error, workerCount)
var mu sync.Mutex
var rs historyRecords
runFunc := func(w worker) error {
wrs, err := runWorker(t, w, errCh)
mu.Lock()
rs = append(rs, wrs...)
mu.Unlock()
return err
}
// start write transactions
g := new(errgroup.Group)
for i := 0; i < writerCount; i++ {
writer := &writeWorker{
for i := 0; i < workerCount; i++ {
w := &worker{
id: i,
db: db,
bucket: bucket,
keys: keys,
writeBytes: conf.writeBytes,
writeTime: conf.writeTime,
conf: conf,
errCh: errCh,
stopCh: stopCh,
t: t,
}
g.Go(func() error {
return runFunc(writer)
wrs, err := runWorker(t, w, errCh)
mu.Lock()
rs = append(rs, wrs...)
mu.Unlock()
return err
})
}
// start readonly transactions
for i := 0; i < readerCount; i++ {
reader := &readWorker{
id: i,
db: db,
bucket: bucket,
keys: keys,
readTime: conf.readTime,
errCh: errCh,
stopCh: stopCh,
t: t,
}
g.Go(func() error {
return runFunc(reader)
})
}
t.Logf("Keep reading and writing transactions running for about %s.", testDuration)
t.Logf("Keep all workers running for about %s.", testDuration)
select {
case <-time.After(testDuration):
case <-errCh:
}
close(stopCh)
t.Log("Waiting for all transactions to finish.")
t.Log("Waiting for all workers to finish.")
if err := g.Wait(); err != nil {
t.Errorf("Received error: %v", err)
}
@ -279,7 +223,7 @@ func runWorkers(t *testing.T,
return rs
}
func runWorker(t *testing.T, w worker, errCh chan error) (historyRecords, error) {
func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) {
rs, err := w.run()
if len(rs) > 0 && err == nil {
if terr := validateIncrementalTxid(rs); terr != nil {
@ -292,19 +236,14 @@ func runWorker(t *testing.T, w worker, errCh chan error) (historyRecords, error)
return rs, err
}
type worker interface {
name() string
run() (historyRecords, error)
}
type readWorker struct {
type worker struct {
id int
db *btesting.DB
bucket []byte
keys []string
readTime duration
conf concurrentConfig
errCh chan error
stopCh chan struct{}
@ -312,120 +251,146 @@ type readWorker struct {
t *testing.T
}
func (r *readWorker) name() string {
return fmt.Sprintf("readWorker-%d", r.id)
func (w *worker) name() string {
return fmt.Sprintf("worker-%d", w.id)
}
func (r *readWorker) run() (historyRecords, error) {
var rs historyRecords
for {
select {
case <-r.stopCh:
r.t.Log("Reading transaction finished.")
return rs, nil
default:
}
err := r.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(r.bucket)
selectedKey := r.keys[mrand.Intn(len(r.keys))]
initialVal := b.Get([]byte(selectedKey))
time.Sleep(randomDurationInRange(r.readTime.min, r.readTime.max))
val := b.Get([]byte(selectedKey))
if !reflect.DeepEqual(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
selectedKey, formatBytes(initialVal), formatBytes(val))
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)
rs = append(rs, historyRecord{
OperationType: Read,
Key: selectedKey,
Value: clonedVal,
Txid: tx.ID(),
})
return nil
})
if err != nil {
readErr := fmt.Errorf("[reader error]: %w", err)
r.t.Error(readErr)
r.errCh <- readErr
return rs, readErr
}
}
}
type writeWorker struct {
id int
db *btesting.DB
bucket []byte
keys []string
writeBytes bytesRange
writeTime duration
errCh chan error
stopCh chan struct{}
t *testing.T
}
func (w *writeWorker) name() string {
return fmt.Sprintf("writeWorker-%d", w.id)
}
func (w *writeWorker) run() (historyRecords, error) {
func (w *worker) run() (historyRecords, error) {
var rs historyRecords
for {
select {
case <-w.stopCh:
w.t.Log("Writing transaction finished.")
w.t.Logf("%q finished.", w.name())
return rs, nil
default:
}
err := w.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(w.bucket)
selectedKey := w.keys[mrand.Intn(len(w.keys))]
valueBytes := randomIntInRange(w.writeBytes.min, w.writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
putErr := b.Put([]byte(selectedKey), v)
if putErr == nil {
rs = append(rs, historyRecord{
OperationType: Write,
Key: selectedKey,
Value: v,
Txid: tx.ID(),
})
}
return putErr
})
op := w.pickOperation()
rec, err := runOperation(op, w.db, w.bucket, w.keys, w.conf)
if err != nil {
writeErr := fmt.Errorf("[writer error]: %w", err)
w.t.Error(writeErr)
w.errCh <- writeErr
return rs, writeErr
readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
w.t.Error(readErr)
w.errCh <- readErr
return rs, readErr
}
time.Sleep(randomDurationInRange(w.writeTime.min, w.writeTime.max))
rs = append(rs, rec)
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
}
}
func (w *worker) pickOperation() OperationType {
sum := 0
for _, op := range w.conf.operationRatio {
sum += op.chance
}
roll := mrand.Int() % sum
for _, op := range w.conf.operationRatio {
if roll < op.chance {
return op.operation
}
roll -= op.chance
}
panic("unexpected")
}
func runOperation(op OperationType, db *btesting.DB, bucket []byte, keys []string, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(db, bucket, keys, conf.readInterval)
case Write:
return executeWrite(db, bucket, keys, conf.writeBytes)
case Delete:
return executeDelete(db, bucket, keys)
default:
panic(fmt.Sprintf("unexpected operation type: %s", op))
}
}
func executeRead(db *btesting.DB, bucket []byte, keys []string, readInterval duration) (historyRecord, error) {
var rec historyRecord
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
selectedKey := keys[mrand.Intn(len(keys))]
initialVal := b.Get([]byte(selectedKey))
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get([]byte(selectedKey))
if !reflect.DeepEqual(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
selectedKey, formatBytes(initialVal), formatBytes(val))
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)
rec = historyRecord{
OperationType: Read,
Key: selectedKey,
Value: clonedVal,
Txid: tx.ID(),
}
return nil
})
return rec, err
}
func executeWrite(db *btesting.DB, bucket []byte, keys []string, writeBytes bytesRange) (historyRecord, error) {
var rec historyRecord
err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
selectedKey := keys[mrand.Intn(len(keys))]
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
putErr := b.Put([]byte(selectedKey), v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Key: selectedKey,
Value: v,
Txid: tx.ID(),
}
}
return putErr
})
return rec, err
}
func executeDelete(db *btesting.DB, bucket []byte, keys []string) (historyRecord, error) {
var rec historyRecord
err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
selectedKey := keys[mrand.Intn(len(keys))]
deleteErr := b.Delete([]byte(selectedKey))
if deleteErr == nil {
rec = historyRecord{
OperationType: Delete,
Key: selectedKey,
Txid: tx.ID(),
}
}
return deleteErr
})
return rec, err
}
func randomDurationInRange(min, max time.Duration) time.Duration {
d := int64(max) - int64(min)
d = int64(mrand.Intn(int(d))) + int64(min)
@ -512,8 +477,9 @@ Data structures and functions for analyzing history records
type OperationType string
const (
Read OperationType = "read"
Write OperationType = "write"
Read OperationType = "read"
Write OperationType = "write"
Delete OperationType = "delete"
)
type historyRecord struct {
@ -541,12 +507,13 @@ func (rs historyRecords) Less(i, j int) bool {
return rs[i].Txid < rs[j].Txid
}
// Sorted by workerType: put writer before reader if they have the same txid.
if rs[i].OperationType == Write {
return true
// Sorted by operation type: put `Read` after other operation types
// if they operate on the same key and have the same txid.
if rs[i].OperationType == Read {
return false
}
return false
return true
}
func (rs historyRecords) Swap(i, j int) {
@ -557,7 +524,7 @@ func validateIncrementalTxid(rs historyRecords) error {
lastTxid := rs[0].Txid
for i := 1; i < len(rs); i++ {
if (rs[i].OperationType == Write && rs[i].Txid <= lastTxid) || (rs[i].OperationType == Read && rs[i].Txid < lastTxid) {
if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) {
return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
}
lastTxid = rs[i].Txid
@ -576,9 +543,11 @@ func validateSerializable(rs historyRecords) error {
if rec.OperationType == Write {
v.Value = rec.Value
v.Txid = rec.Txid
} else if rec.OperationType == Delete {
delete(lastWriteKeyValueMap, rec.Key)
} else {
if !reflect.DeepEqual(v.Value, rec.Value) {
return fmt.Errorf("reader[txid: %d, key: %s] read %x, \nbut writer[txid: %d, key: %s] wrote %x",
return fmt.Errorf("readOperation[txid: %d, key: %s] read %x, \nbut writer[txid: %d, key: %s] wrote %x",
rec.Txid, rec.Key, rec.Value,
v.Txid, v.Key, v.Value)
}
@ -591,9 +560,9 @@ func validateSerializable(rs historyRecords) error {
Value: rec.Value,
Txid: rec.Txid,
}
} else {
} else if rec.OperationType == Read {
if len(rec.Value) != 0 {
return fmt.Errorf("expected the first reader[txid: %d, key: %s] read nil, \nbut got %x",
return fmt.Errorf("expected the first readOperation[txid: %d, key: %s] read nil, \nbut got %x",
rec.Txid, rec.Key, rec.Value)
}
}