Update concurrent test to support multiple operations in each transaction

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
pull/764/head
Benjamin Wang 2024-06-05 17:14:23 +01:00
parent b342624aaf
commit 487b5dd3df
1 changed files with 112 additions and 108 deletions

View File

@ -347,27 +347,45 @@ func (w *worker) name() string {
func (w *worker) run() (historyRecords, error) {
var rs historyRecords
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-w.stopCh:
w.t.Logf("%q finished.", w.name())
return rs, nil
default:
}
op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, err := executeOperation(op, w.db, bucket, key, w.conf)
if err != nil {
readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
w.t.Error(readErr)
w.errCh <- readErr
return rs, readErr
}
err := w.db.Update(func(tx *bolt.Tx) error {
for {
op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, eerr := executeOperation(op, tx, bucket, key, w.conf)
if eerr != nil {
opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, eerr)
w.t.Error(opErr)
w.errCh <- opErr
return opErr
}
rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
}
select {
case <-ticker.C:
return nil
case <-w.stopCh:
return nil
default:
}
}
})
if err != nil {
return rs, err
}
}
}
@ -401,111 +419,100 @@ func (w *worker) pickOperation() OperationType {
panic("unexpected")
}
func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(db, bucket, key, conf.readInterval)
return executeRead(tx, bucket, key, conf.readInterval)
case Write:
return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio)
return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio)
case Delete:
return executeDelete(db, bucket, key)
return executeDelete(tx, bucket, key)
default:
panic(fmt.Sprintf("unexpected operation type: %s", op))
}
}
func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
var rec historyRecord
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
b := tx.Bucket(bucket)
if !bytes.Equal(initialVal, val) {
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
clonedVal := make([]byte, len(val))
copy(clonedVal, val)
if !bytes.Equal(initialVal, val) {
return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)
rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}
return rec, nil
}
func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
var rec historyRecord
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Read,
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return rec, nil
}
return nil
})
b := tx.Bucket(bucket)
return rec, err
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return rec, cErr
}
putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
}
return rec, putErr
}
func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) {
var rec historyRecord
err := db.Update(func(tx *bolt.Tx) error {
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return nil
b := tx.Bucket(bucket)
err := b.Delete(key)
if err == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}
b := tx.Bucket(bucket)
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return cErr
}
putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
}
return putErr
})
return rec, err
}
func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) {
var rec historyRecord
err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
deleteErr := b.Delete(key)
if deleteErr == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}
}
return deleteErr
})
}
return rec, err
}
@ -674,17 +681,7 @@ func (rs historyRecords) Less(i, j int) bool {
}
// Sorted by txid
if rs[i].Txid != rs[j].Txid {
return rs[i].Txid < rs[j].Txid
}
// Sorted by operation type: put `Read` after other operation types
// if they operate on the same (bucket, key) and have the same txid.
if rs[i].OperationType == Read {
return false
}
return true
return rs[i].Txid < rs[j].Txid
}
func (rs historyRecords) Swap(i, j int) {
@ -695,7 +692,7 @@ func validateIncrementalTxid(rs historyRecords) error {
lastTxid := rs[0].Txid
for i := 1; i < len(rs); i++ {
if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) {
if rs[i].Txid < lastTxid {
return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
}
lastTxid = rs[i].Txid
@ -705,7 +702,7 @@ func validateIncrementalTxid(rs historyRecords) error {
}
func validateSequential(rs historyRecords) error {
sort.Sort(rs)
sort.Stable(rs)
type bucketAndKey struct {
bucket string
@ -886,7 +883,11 @@ func TestConcurrentRepeatableRead(t *testing.T) {
t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
for j := 0; j < writeOperationCountInBetween; j++ {
_, err := executeWrite(db, bucket, key, writeBytes, 0)
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})
require.NoError(t, err)
}
}
@ -902,7 +903,10 @@ func TestConcurrentRepeatableRead(t *testing.T) {
return
default:
}
_, err := executeWrite(db, bucket, key, writeBytes, 0)
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})
require.NoError(t, err)
}
}()