test: update concurrent test case to remove the dependency on internal package

Signed-off-by: Benjamin Wang <wachao@vmware.com>
pull/480/head
Benjamin Wang 2023-05-02 15:14:55 +08:00
parent 96ca46ea06
commit aaada7b154
1 changed files with 82 additions and 13 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
mrand "math/rand"
"os"
"path/filepath"
@ -20,8 +21,6 @@ import (
"golang.org/x/sync/errgroup"
bolt "go.etcd.io/bbolt"
"go.etcd.io/bbolt/internal/btesting"
"go.etcd.io/bbolt/internal/common"
)
type duration struct {
@ -139,7 +138,8 @@ func concurrentReadAndWrite(t *testing.T,
testDuration time.Duration) {
t.Log("Preparing db.")
db := btesting.MustCreateDB(t)
db := mustCreateDB(t, nil)
defer db.Close()
err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket(bucket)
return err
@ -177,7 +177,32 @@ func concurrentReadAndWrite(t *testing.T,
// 1. intentionally inject a random failpoint.
}
func checkConsistency(t *testing.T, db *btesting.DB) error {
// mustCreateDB is created in place of `btesting.MustCreateDB`, and it's
// only supposed to be used by the concurrent test case. The purpose is
// to ensure the test case can be executed on old branches or versions,
// e.g. `release-1.3` or `1.3.[5-7]`.
func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
f := filepath.Join(t.TempDir(), "db")
t.Logf("Opening bbolt DB at: %s", f)
if o == nil {
o = bolt.DefaultOptions
}
freelistType := bolt.FreelistArrayType
if env := os.Getenv("TEST_FREELIST_TYPE"); env == string(bolt.FreelistMapType) {
freelistType = bolt.FreelistMapType
}
o.FreelistType = freelistType
db, err := bolt.Open(f, 0666, o)
require.NoError(t, err)
return db
}
func checkConsistency(t *testing.T, db *bolt.DB) error {
return db.View(func(tx *bolt.Tx) error {
cnt := 0
for err := range tx.Check() {
@ -199,7 +224,7 @@ workers, which execute different operations, including `Read`,
*********************************************************
*/
func runWorkers(t *testing.T,
db *btesting.DB,
db *bolt.DB,
bucket []byte,
keys []string,
workerCount int,
@ -264,7 +289,7 @@ func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error
type worker struct {
id int
db *btesting.DB
db *bolt.DB
bucket []byte
keys []string
@ -320,7 +345,7 @@ func (w *worker) pickOperation() OperationType {
panic("unexpected")
}
func executeOperation(op OperationType, db *btesting.DB, bucket []byte, keys []string, conf concurrentConfig) (historyRecord, error) {
func executeOperation(op OperationType, db *bolt.DB, bucket []byte, keys []string, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(db, bucket, keys, conf.readInterval)
@ -333,7 +358,7 @@ func executeOperation(op OperationType, db *btesting.DB, bucket []byte, keys []s
}
}
func executeRead(db *btesting.DB, bucket []byte, keys []string, readInterval duration) (historyRecord, error) {
func executeRead(db *bolt.DB, bucket []byte, keys []string, readInterval duration) (historyRecord, error) {
var rec historyRecord
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
@ -364,7 +389,7 @@ func executeRead(db *btesting.DB, bucket []byte, keys []string, readInterval dur
return rec, err
}
func executeWrite(db *btesting.DB, bucket []byte, keys []string, writeBytes bytesRange) (historyRecord, error) {
func executeWrite(db *bolt.DB, bucket []byte, keys []string, writeBytes bytesRange) (historyRecord, error) {
var rec historyRecord
err := db.Update(func(tx *bolt.Tx) error {
@ -394,7 +419,7 @@ func executeWrite(db *btesting.DB, bucket []byte, keys []string, writeBytes byte
return rec, err
}
func executeDelete(db *btesting.DB, bucket []byte, keys []string) (historyRecord, error) {
func executeDelete(db *bolt.DB, bucket []byte, keys []string) (historyRecord, error) {
var rec historyRecord
err := db.Update(func(tx *bolt.Tx) error {
@ -441,7 +466,7 @@ Functions for persisting test data, including db file
and operation history
*********************************************************
*/
func saveDataIfFailed(t *testing.T, db *btesting.DB, rs historyRecords, force bool) {
func saveDataIfFailed(t *testing.T, db *bolt.DB, rs historyRecords, force bool) {
if t.Failed() || force {
if err := db.Close(); err != nil {
t.Errorf("Failed to close db: %v", err)
@ -452,14 +477,58 @@ func saveDataIfFailed(t *testing.T, db *btesting.DB, rs historyRecords, force bo
}
}
func backupDB(t *testing.T, db *btesting.DB, path string) {
func backupDB(t *testing.T, db *bolt.DB, path string) {
targetFile := filepath.Join(path, "db.bak")
t.Logf("Saving the DB file to %s", targetFile)
err := common.CopyFile(db.Path(), targetFile)
err := copyFile(db.Path(), targetFile)
require.NoError(t, err)
t.Logf("DB file saved to %s", targetFile)
}
func copyFile(srcPath, dstPath string) error {
// Ensure source file exists.
_, err := os.Stat(srcPath)
if os.IsNotExist(err) {
return fmt.Errorf("source file %q not found", srcPath)
} else if err != nil {
return err
}
// Ensure output file not exist.
_, err = os.Stat(dstPath)
if err == nil {
return fmt.Errorf("output file %q already exists", dstPath)
} else if !os.IsNotExist(err) {
return err
}
srcDB, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("failed to open source file %q: %w", srcPath, err)
}
defer srcDB.Close()
dstDB, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create output file %q: %w", dstPath, err)
}
defer dstDB.Close()
written, err := io.Copy(dstDB, srcDB)
if err != nil {
return fmt.Errorf("failed to copy database file from %q to %q: %w", srcPath, dstPath, err)
}
srcFi, err := srcDB.Stat()
if err != nil {
return fmt.Errorf("failed to get source file info %q: %w", srcPath, err)
}
initialSize := srcFi.Size()
if initialSize != written {
return fmt.Errorf("the byte copied (%q: %d) isn't equal to the initial db size (%q: %d)", dstPath, written, srcPath, initialSize)
}
return nil
}
func persistHistoryRecords(t *testing.T, rs historyRecords, path string) {
recordFilePath := filepath.Join(path, "history_records.json")
t.Logf("Saving history records to %s", recordFilePath)