test: verify that txids are incremental

Signed-off-by: Benjamin Wang <wachao@vmware.com>
pull/467/head
Benjamin Wang 2023-04-16 13:28:13 +08:00
parent 89ed8dcb40
commit e431258c0d
1 changed files with 48 additions and 5 deletions

View File

@ -208,7 +208,8 @@ func runWorkers(t *testing.T,
// start write transaction
g := new(errgroup.Group)
writer := writeWorker{
writer := &writeWorker{
id: 0,
db: db,
bucket: bucket,
keys: keys,
@ -221,7 +222,7 @@ func runWorkers(t *testing.T,
t: t,
}
g.Go(func() error {
wrs, err := writer.run()
wrs, err := runWorker(t, writer, errCh)
mu.Lock()
rs = append(rs, wrs...)
mu.Unlock()
@ -231,6 +232,7 @@ func runWorkers(t *testing.T,
// start readonly transactions
for i := 0; i < readerCount; i++ {
reader := &readWorker{
id: i,
db: db,
bucket: bucket,
keys: keys,
@ -242,7 +244,7 @@ func runWorkers(t *testing.T,
t: t,
}
g.Go(func() error {
rrs, err := reader.run()
rrs, err := runWorker(t, reader, errCh)
mu.Lock()
rs = append(rs, rrs...)
mu.Unlock()
@ -265,7 +267,26 @@ func runWorkers(t *testing.T,
return rs
}
func runWorker(t *testing.T, w worker, errCh chan error) (historyRecords, error) {
rs, err := w.run()
if len(rs) > 0 && err == nil {
if terr := validateIncrementalTxid(rs); terr != nil {
txidErr := fmt.Errorf("[%s]: %w", w.name(), terr)
t.Error(txidErr)
errCh <- txidErr
return rs, txidErr
}
}
return rs, err
}
type worker interface {
name() string
run() (historyRecords, error)
}
type readWorker struct {
id int
db *btesting.DB
bucket []byte
@ -279,6 +300,10 @@ type readWorker struct {
t *testing.T
}
func (w *readWorker) name() string {
return fmt.Sprintf("readWorker-%d", w.id)
}
func (r *readWorker) run() (historyRecords, error) {
var rs historyRecords
for {
@ -317,7 +342,7 @@ func (r *readWorker) run() (historyRecords, error) {
if err != nil {
readErr := fmt.Errorf("[reader error]: %w", err)
r.t.Log(readErr)
r.t.Error(readErr)
r.errCh <- readErr
return rs, readErr
}
@ -325,6 +350,7 @@ func (r *readWorker) run() (historyRecords, error) {
}
type writeWorker struct {
id int
db *btesting.DB
bucket []byte
@ -339,6 +365,10 @@ type writeWorker struct {
t *testing.T
}
func (w *writeWorker) name() string {
return fmt.Sprintf("writeWorker-%d", w.id)
}
func (w *writeWorker) run() (historyRecords, error) {
var rs historyRecords
for {
@ -375,7 +405,7 @@ func (w *writeWorker) run() (historyRecords, error) {
if err != nil {
writeErr := fmt.Errorf("[writer error]: %w", err)
w.t.Log(writeErr)
w.t.Error(writeErr)
w.errCh <- writeErr
return rs, writeErr
}
@ -511,6 +541,19 @@ func (rs historyRecords) Swap(i, j int) {
rs[i], rs[j] = rs[j], rs[i]
}
func validateIncrementalTxid(rs historyRecords) error {
lastTxid := rs[0].Txid
for i := 1; i < len(rs); i++ {
if (rs[i].OperationType == Write && rs[i].Txid <= lastTxid) || (rs[i].OperationType == Read && rs[i].Txid < lastTxid) {
return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
}
lastTxid = rs[i].Txid
}
return nil
}
func validateSerializable(rs historyRecords) error {
sort.Sort(rs)