bbolt/concurrent_test.go

957 lines
23 KiB
Go

package bbolt_test
import (
"bytes"
crand "crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"io"
mrand "math/rand"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"testing"
"time"
"unicode/utf8"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
bolt "go.etcd.io/bbolt"
)
const (
bucketPrefix = "bucket"
keyPrefix = "key"
noopTxKey = "%magic-no-op-key%"
// TestConcurrentCaseDuration is used as a env variable to specify the
// concurrent test duration.
testConcurrentCaseDuration = "TEST_CONCURRENT_CASE_DURATION"
defaultConcurrentTestDuration = 30 * time.Second
)
type duration struct {
min time.Duration
max time.Duration
}
type bytesRange struct {
min int
max int
}
type operationChance struct {
operation OperationType
chance int
}
type concurrentConfig struct {
bucketCount int
keyCount int
workInterval duration
operationRatio []operationChance
readInterval duration // only used by readOperation
noopWriteRatio int // only used by writeOperation
writeBytes bytesRange // only used by writeOperation
}
/*
TestConcurrentGenericReadAndWrite verifies:
1. Repeatable read: a read transaction should always see the same data
view during its lifecycle.
2. Any data written by a writing transaction should be visible to any
following reading transactions (with txid >= previous writing txid).
3. The txid should never decrease.
*/
func TestConcurrentGenericReadAndWrite(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
testDuration := concurrentTestDuration(t)
conf := concurrentConfig{
bucketCount: 5,
keyCount: 10000,
workInterval: duration{},
operationRatio: []operationChance{
{operation: Read, chance: 60},
{operation: Write, chance: 20},
{operation: Delete, chance: 20},
},
readInterval: duration{
min: 50 * time.Millisecond,
max: 100 * time.Millisecond,
},
noopWriteRatio: 20,
writeBytes: bytesRange{
min: 200,
max: 16000,
},
}
testCases := []struct {
name string
workerCount int
conf concurrentConfig
testDuration time.Duration
}{
{
name: "1 worker",
workerCount: 1,
conf: conf,
testDuration: testDuration,
},
{
name: "10 workers",
workerCount: 10,
conf: conf,
testDuration: testDuration,
},
{
name: "50 workers",
workerCount: 50,
conf: conf,
testDuration: testDuration,
},
{
name: "100 workers",
workerCount: 100,
conf: conf,
testDuration: testDuration,
},
{
name: "200 workers",
workerCount: 200,
conf: conf,
testDuration: testDuration,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
concurrentReadAndWrite(t,
tc.workerCount,
tc.conf,
tc.testDuration)
})
}
}
func concurrentTestDuration(t *testing.T) time.Duration {
durationInEnv := strings.ToLower(os.Getenv(testConcurrentCaseDuration))
if durationInEnv == "" {
t.Logf("%q not set, defaults to %s", testConcurrentCaseDuration, defaultConcurrentTestDuration)
return defaultConcurrentTestDuration
}
d, err := time.ParseDuration(durationInEnv)
if err != nil {
t.Logf("Failed to parse %s=%s, error: %v, defaults to %s", testConcurrentCaseDuration, durationInEnv, err, defaultConcurrentTestDuration)
return defaultConcurrentTestDuration
}
t.Logf("Concurrent test duration set by %s=%s", testConcurrentCaseDuration, d)
return d
}
func concurrentReadAndWrite(t *testing.T,
workerCount int,
conf concurrentConfig,
testDuration time.Duration) {
t.Log("Preparing db.")
db := mustCreateDB(t, &bolt.Options{
PageSize: 4096,
})
defer db.Close()
err := db.Update(func(tx *bolt.Tx) error {
for i := 0; i < conf.bucketCount; i++ {
if _, err := tx.CreateBucketIfNotExists(bucketName(i)); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
var records historyRecords
// t.Failed() returns false during panicking. We need to forcibly
// save data on panicking.
// Refer to: https://github.com/golang/go/issues/49929
panicked := true
defer func() {
t.Log("Save data if failed.")
saveDataIfFailed(t, db, records, panicked)
}()
t.Log("Starting workers.")
records = runWorkers(t,
db,
workerCount,
conf,
testDuration)
t.Log("Analyzing the history records.")
if err := validateSequential(records); err != nil {
t.Errorf("The history records are not sequential:\n %v", err)
}
t.Log("Checking database consistency.")
if err := checkConsistency(t, db); err != nil {
t.Errorf("The data isn't consistency: %v", err)
}
panicked = false
// TODO (ahrtr):
// 1. intentionally inject a random failpoint.
}
// mustCreateDB is created in place of `btesting.MustCreateDB`, and it's
// only supposed to be used by the concurrent test case. The purpose is
// to ensure the test case can be executed on old branches or versions,
// e.g. `release-1.3` or `1.3.[5-7]`.
func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
f := filepath.Join(t.TempDir(), "db")
return mustOpenDB(t, f, o)
}
func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB {
f := db.Path()
t.Logf("Closing bbolt DB at: %s", f)
err := db.Close()
require.NoError(t, err)
return mustOpenDB(t, f, o)
}
func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB {
t.Logf("Opening bbolt DB at: %s", dbPath)
if o == nil {
o = bolt.DefaultOptions
}
freelistType := bolt.FreelistArrayType
if env := os.Getenv("TEST_FREELIST_TYPE"); env == string(bolt.FreelistMapType) {
freelistType = bolt.FreelistMapType
}
o.FreelistType = freelistType
db, err := bolt.Open(dbPath, 0600, o)
require.NoError(t, err)
return db
}
func checkConsistency(t *testing.T, db *bolt.DB) error {
return db.View(func(tx *bolt.Tx) error {
cnt := 0
for err := range tx.Check() {
t.Errorf("Consistency error: %v", err)
cnt++
}
if cnt > 0 {
return fmt.Errorf("%d consistency errors found", cnt)
}
return nil
})
}
/*
*********************************************************
Data structures and functions/methods for running concurrent
workers, which execute different operations, including `Read`,
`Write` and `Delete`.
*********************************************************
*/
func runWorkers(t *testing.T,
db *bolt.DB,
workerCount int,
conf concurrentConfig,
testDuration time.Duration) historyRecords {
stopCh := make(chan struct{}, 1)
errCh := make(chan error, workerCount)
var mu sync.Mutex
var rs historyRecords
g := new(errgroup.Group)
for i := 0; i < workerCount; i++ {
w := &worker{
id: i,
db: db,
conf: conf,
errCh: errCh,
stopCh: stopCh,
t: t,
}
g.Go(func() error {
wrs, err := runWorker(t, w, errCh)
mu.Lock()
rs = append(rs, wrs...)
mu.Unlock()
return err
})
}
t.Logf("Keep all workers running for about %s.", testDuration)
select {
case <-time.After(testDuration):
case <-errCh:
}
close(stopCh)
t.Log("Waiting for all workers to finish.")
if err := g.Wait(); err != nil {
t.Errorf("Received error: %v", err)
}
return rs
}
func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) {
rs, err := w.run()
if len(rs) > 0 && err == nil {
if terr := validateIncrementalTxid(rs); terr != nil {
txidErr := fmt.Errorf("[%s]: %w", w.name(), terr)
t.Error(txidErr)
errCh <- txidErr
return rs, txidErr
}
}
return rs, err
}
type worker struct {
id int
db *bolt.DB
conf concurrentConfig
errCh chan error
stopCh chan struct{}
t *testing.T
}
func (w *worker) name() string {
return fmt.Sprintf("worker-%d", w.id)
}
func (w *worker) run() (historyRecords, error) {
var rs historyRecords
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-w.stopCh:
return rs, nil
default:
}
err := w.db.Update(func(tx *bolt.Tx) error {
for {
op := w.pickOperation()
bucket, key := w.pickBucket(), w.pickKey()
rec, eerr := executeOperation(op, tx, bucket, key, w.conf)
if eerr != nil {
opErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, eerr)
w.t.Error(opErr)
w.errCh <- opErr
return opErr
}
rs = append(rs, rec)
if w.conf.workInterval != (duration{}) {
time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
}
select {
case <-ticker.C:
return nil
case <-w.stopCh:
return nil
default:
}
}
})
if err != nil {
return rs, err
}
}
}
func (w *worker) pickBucket() []byte {
return bucketName(mrand.Intn(w.conf.bucketCount))
}
func bucketName(index int) []byte {
bucket := fmt.Sprintf("%s_%d", bucketPrefix, index)
return []byte(bucket)
}
func (w *worker) pickKey() []byte {
key := fmt.Sprintf("%s_%d", keyPrefix, mrand.Intn(w.conf.keyCount))
return []byte(key)
}
func (w *worker) pickOperation() OperationType {
sum := 0
for _, op := range w.conf.operationRatio {
sum += op.chance
}
roll := mrand.Int() % sum
for _, op := range w.conf.operationRatio {
if roll < op.chance {
return op.operation
}
roll -= op.chance
}
panic("unexpected")
}
func executeOperation(op OperationType, tx *bolt.Tx, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
switch op {
case Read:
return executeRead(tx, bucket, key, conf.readInterval)
case Write:
return executeWrite(tx, bucket, key, conf.writeBytes, conf.noopWriteRatio)
case Delete:
return executeDelete(tx, bucket, key)
default:
panic(fmt.Sprintf("unexpected operation type: %s", op))
}
}
func executeRead(tx *bolt.Tx, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
var rec historyRecord
b := tx.Bucket(bucket)
initialVal := b.Get(key)
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
if !bytes.Equal(initialVal, val) {
return rec, fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
}
clonedVal := make([]byte, len(val))
copy(clonedVal, val)
rec = historyRecord{
OperationType: Read,
Bucket: string(bucket),
Key: string(key),
Value: clonedVal,
Txid: tx.ID(),
}
return rec, nil
}
func executeWrite(tx *bolt.Tx, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
var rec historyRecord
if mrand.Intn(100) < noopWriteRatio {
// A no-op write transaction has two consequences:
// 1. The txid increases by 1;
// 2. Two meta pages point to the same root page.
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: noopTxKey,
Value: nil,
Txid: tx.ID(),
}
return rec, nil
}
b := tx.Bucket(bucket)
valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
v := make([]byte, valueBytes)
if _, cErr := crand.Read(v); cErr != nil {
return rec, cErr
}
putErr := b.Put(key, v)
if putErr == nil {
rec = historyRecord{
OperationType: Write,
Bucket: string(bucket),
Key: string(key),
Value: v,
Txid: tx.ID(),
}
}
return rec, putErr
}
func executeDelete(tx *bolt.Tx, bucket []byte, key []byte) (historyRecord, error) {
var rec historyRecord
b := tx.Bucket(bucket)
err := b.Delete(key)
if err == nil {
rec = historyRecord{
OperationType: Delete,
Bucket: string(bucket),
Key: string(key),
Txid: tx.ID(),
}
}
return rec, err
}
func randomDurationInRange(min, max time.Duration) time.Duration {
d := int64(max) - int64(min)
d = int64(mrand.Intn(int(d))) + int64(min)
return time.Duration(d)
}
func randomIntInRange(min, max int) int {
return mrand.Intn(max-min) + min
}
func formatBytes(val []byte) string {
if utf8.ValidString(string(val)) {
return string(val)
}
return hex.EncodeToString(val)
}
/*
*********************************************************
Functions for persisting test data, including db file
and operation history
*********************************************************
*/
func saveDataIfFailed(t *testing.T, db *bolt.DB, rs historyRecords, force bool) {
if t.Failed() || force {
t.Log("Saving data...")
dbPath := db.Path()
if err := db.Close(); err != nil {
t.Errorf("Failed to close db: %v", err)
}
backupPath := testResultsDirectory(t)
backupDB(t, dbPath, backupPath)
persistHistoryRecords(t, rs, backupPath)
}
}
func backupDB(t *testing.T, srcPath string, dstPath string) {
targetFile := filepath.Join(dstPath, "db.bak")
t.Logf("Saving the DB file to %s", targetFile)
err := copyFile(srcPath, targetFile)
require.NoError(t, err)
t.Logf("DB file saved to %s", targetFile)
}
func copyFile(srcPath, dstPath string) error {
// Ensure source file exists.
_, err := os.Stat(srcPath)
if os.IsNotExist(err) {
return fmt.Errorf("source file %q not found", srcPath)
} else if err != nil {
return err
}
// Ensure output file not exist.
_, err = os.Stat(dstPath)
if err == nil {
return fmt.Errorf("output file %q already exists", dstPath)
} else if !os.IsNotExist(err) {
return err
}
srcDB, err := os.Open(srcPath)
if err != nil {
return fmt.Errorf("failed to open source file %q: %w", srcPath, err)
}
defer srcDB.Close()
dstDB, err := os.Create(dstPath)
if err != nil {
return fmt.Errorf("failed to create output file %q: %w", dstPath, err)
}
defer dstDB.Close()
written, err := io.Copy(dstDB, srcDB)
if err != nil {
return fmt.Errorf("failed to copy database file from %q to %q: %w", srcPath, dstPath, err)
}
srcFi, err := srcDB.Stat()
if err != nil {
return fmt.Errorf("failed to get source file info %q: %w", srcPath, err)
}
initialSize := srcFi.Size()
if initialSize != written {
return fmt.Errorf("the byte copied (%q: %d) isn't equal to the initial db size (%q: %d)", dstPath, written, srcPath, initialSize)
}
return nil
}
func persistHistoryRecords(t *testing.T, rs historyRecords, path string) {
recordFilePath := filepath.Join(path, "history_records.json")
t.Logf("Saving history records to %s", recordFilePath)
recordFile, err := os.OpenFile(recordFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
require.NoError(t, err)
defer recordFile.Close()
encoder := json.NewEncoder(recordFile)
for _, rec := range rs {
err := encoder.Encode(rec)
require.NoError(t, err)
}
}
func testResultsDirectory(t *testing.T) string {
resultsDirectory, ok := os.LookupEnv("RESULTS_DIR")
var err error
if !ok {
resultsDirectory, err = os.MkdirTemp("", "*.db")
require.NoError(t, err)
}
resultsDirectory, err = filepath.Abs(resultsDirectory)
require.NoError(t, err)
path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_")))
require.NoError(t, err)
err = os.RemoveAll(path)
require.NoError(t, err)
err = os.MkdirAll(path, 0700)
require.NoError(t, err)
return path
}
/*
*********************************************************
Data structures and functions for analyzing history records
*********************************************************
*/
type OperationType string
const (
Read OperationType = "read"
Write OperationType = "write"
Delete OperationType = "delete"
)
type historyRecord struct {
OperationType OperationType `json:"operationType,omitempty"`
Txid int `json:"txid,omitempty"`
Bucket string `json:"bucket,omitempty"`
Key string `json:"key,omitempty"`
Value []byte `json:"value,omitempty"`
}
type historyRecords []historyRecord
func (rs historyRecords) Len() int {
return len(rs)
}
func (rs historyRecords) Less(i, j int) bool {
// Sorted by (bucket, key) firstly: all records in the same
// (bucket, key) are grouped together.
bucketCmp := strings.Compare(rs[i].Bucket, rs[j].Bucket)
if bucketCmp != 0 {
return bucketCmp < 0
}
keyCmp := strings.Compare(rs[i].Key, rs[j].Key)
if keyCmp != 0 {
return keyCmp < 0
}
// Sorted by txid
return rs[i].Txid < rs[j].Txid
}
func (rs historyRecords) Swap(i, j int) {
rs[i], rs[j] = rs[j], rs[i]
}
func validateIncrementalTxid(rs historyRecords) error {
lastTxid := rs[0].Txid
for i := 1; i < len(rs); i++ {
if rs[i].Txid < lastTxid {
return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
}
lastTxid = rs[i].Txid
}
return nil
}
func validateSequential(rs historyRecords) error {
sort.Stable(rs)
type bucketAndKey struct {
bucket string
key string
}
lastWriteKeyValueMap := make(map[bucketAndKey]*historyRecord)
for _, rec := range rs {
bk := bucketAndKey{
bucket: rec.Bucket,
key: rec.Key,
}
if v, ok := lastWriteKeyValueMap[bk]; ok {
if rec.OperationType == Write {
v.Txid = rec.Txid
if rec.Key != noopTxKey {
v.Value = rec.Value
}
} else if rec.OperationType == Delete {
delete(lastWriteKeyValueMap, bk)
} else {
if !bytes.Equal(v.Value, rec.Value) {
return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x",
rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value)
}
}
} else {
if rec.OperationType == Write && rec.Key != noopTxKey {
lastWriteKeyValueMap[bk] = &historyRecord{
OperationType: Write,
Bucket: rec.Bucket,
Key: rec.Key,
Value: rec.Value,
Txid: rec.Txid,
}
} else if rec.OperationType == Read {
if len(rec.Value) != 0 {
return fmt.Errorf("expected the first readOperation[txid: %d, bucket: %s, key: %s] read nil, \nbut got %x",
rec.Txid, rec.Bucket, rec.Key, rec.Value)
}
}
}
}
return nil
}
/*
TestConcurrentRepeatableRead verifies repeatable read. The case
intentionally creates a scenario that read and write transactions
are interleaved. It performs several writing operations after starting
each long-running read transaction to ensure it has a larger txid
than previous read transaction. It verifies that bbolt correctly
releases free pages, and will not pollute (e.g. prematurely release)
any pages which are still being used by any read transaction.
*/
func TestConcurrentRepeatableRead(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
testCases := []struct {
name string
noFreelistSync bool
freelistType bolt.FreelistType
}{
// [array] freelist
{
name: "sync array freelist",
noFreelistSync: false,
freelistType: bolt.FreelistArrayType,
},
{
name: "not sync array freelist",
noFreelistSync: true,
freelistType: bolt.FreelistArrayType,
},
// [map] freelist
{
name: "sync map freelist",
noFreelistSync: false,
freelistType: bolt.FreelistMapType,
},
{
name: "not sync map freelist",
noFreelistSync: true,
freelistType: bolt.FreelistMapType,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Log("Preparing db.")
var (
bucket = []byte("data")
key = []byte("mykey")
option = &bolt.Options{
PageSize: 4096,
NoFreelistSync: tc.noFreelistSync,
FreelistType: tc.freelistType,
}
)
db := mustCreateDB(t, option)
defer func() {
// The db will be reopened later, so put `db.Close()` in a function
// to avoid premature evaluation of `db`. Note that the execution
// of a deferred function is deferred to the moment the surrounding
// function returns, but the function value and parameters to the
// call are evaluated as usual and saved anew.
db.Close()
}()
// Create lots of K/V to allocate some pages
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(bucket)
if err != nil {
return err
}
for i := 0; i < 1000; i++ {
k := fmt.Sprintf("key_%d", i)
if err := b.Put([]byte(k), make([]byte, 1024)); err != nil {
return err
}
}
return nil
})
require.NoError(t, err)
// Remove all K/V to create some free pages
err = db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
for i := 0; i < 1000; i++ {
k := fmt.Sprintf("key_%d", i)
if err := b.Delete([]byte(k)); err != nil {
return err
}
}
return b.Put(key, []byte("randomValue"))
})
require.NoError(t, err)
// bbolt will not release free pages directly after committing
// a writing transaction; instead all pages freed are putting
// into a pending list. Accordingly, the free pages might not
// be able to be reused by following writing transactions. So
// we reopen the db to completely release all free pages.
db = mustReOpenDB(t, db, option)
var (
wg sync.WaitGroup
longRunningReaderCount = 10
stopCh = make(chan struct{})
errCh = make(chan error, longRunningReaderCount)
readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond}
writeOperationCountInBetween = 5
writeBytes = bytesRange{10, 20}
testDuration = 10 * time.Second
)
for i := 0; i < longRunningReaderCount; i++ {
readWorkerName := fmt.Sprintf("reader_%d", i)
t.Logf("Starting long running read operation: %s", readWorkerName)
wg.Add(1)
go func() {
defer wg.Done()
rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh)
if rErr != nil {
errCh <- rErr
}
}()
time.Sleep(500 * time.Millisecond)
t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
for j := 0; j < writeOperationCountInBetween; j++ {
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})
require.NoError(t, err)
}
}
t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data")
wg.Add(1)
go func() {
defer wg.Done()
cnt := longRunningReaderCount * writeOperationCountInBetween
for i := 0; i < cnt; i++ {
select {
case <-stopCh:
return
default:
}
err := db.Update(func(tx *bolt.Tx) error {
_, eerr := executeWrite(tx, bucket, key, writeBytes, 0)
return eerr
})
require.NoError(t, err)
}
}()
t.Log("Waiting for result")
select {
case err := <-errCh:
close(stopCh)
t.Errorf("Detected dirty read: %v", err)
case <-time.After(testDuration):
close(stopCh)
}
wg.Wait()
})
}
}
func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error {
err := db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(bucket)
initialVal := b.Get(key)
for {
select {
case <-stopCh:
t.Logf("%q finished.", name)
return nil
default:
}
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
val := b.Get(key)
if !bytes.Equal(initialVal, val) {
dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
string(key), formatBytes(initialVal), formatBytes(val))
return dirtyReadErr
}
}
})
return err
}