mirror of https://github.com/etcd-io/bbolt.git
Merge pull request #514 from ahrtr/dirty_read_20230523
test: add test case to verify repeatable readpull/518/head
commit
debd537008
|
@ -1,6 +1,7 @@
|
|||
package bbolt_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
crand "crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
|
@ -9,7 +10,6 @@ import (
|
|||
mrand "math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -60,14 +60,14 @@ type concurrentConfig struct {
|
|||
}
|
||||
|
||||
/*
|
||||
TestConcurrentReadAndWrite verifies:
|
||||
TestConcurrentGenericReadAndWrite verifies:
|
||||
1. Repeatable read: a read transaction should always see the same data
|
||||
view during its lifecycle.
|
||||
2. Any data written by a writing transaction should be visible to any
|
||||
following reading transactions (with txid >= previous writing txid).
|
||||
3. The txid should never decrease.
|
||||
*/
|
||||
func TestConcurrentReadAndWrite(t *testing.T) {
|
||||
func TestConcurrentGenericReadAndWrite(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
@ -216,7 +216,21 @@ func concurrentReadAndWrite(t *testing.T,
|
|||
func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
|
||||
f := filepath.Join(t.TempDir(), "db")
|
||||
|
||||
t.Logf("Opening bbolt DB at: %s", f)
|
||||
return mustOpenDB(t, f, o)
|
||||
}
|
||||
|
||||
func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB {
|
||||
f := db.Path()
|
||||
|
||||
t.Logf("Closing bbolt DB at: %s", f)
|
||||
err := db.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
return mustOpenDB(t, f, o)
|
||||
}
|
||||
|
||||
func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB {
|
||||
t.Logf("Opening bbolt DB at: %s", dbPath)
|
||||
if o == nil {
|
||||
o = bolt.DefaultOptions
|
||||
}
|
||||
|
@ -228,7 +242,7 @@ func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
|
|||
|
||||
o.FreelistType = freelistType
|
||||
|
||||
db, err := bolt.Open(f, 0666, o)
|
||||
db, err := bolt.Open(dbPath, 0666, o)
|
||||
require.NoError(t, err)
|
||||
|
||||
return db
|
||||
|
@ -409,7 +423,7 @@ func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration)
|
|||
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
|
||||
val := b.Get(key)
|
||||
|
||||
if !reflect.DeepEqual(initialVal, val) {
|
||||
if !bytes.Equal(initialVal, val) {
|
||||
return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
|
||||
string(key), formatBytes(initialVal), formatBytes(val))
|
||||
}
|
||||
|
@ -713,7 +727,7 @@ func validateSequential(rs historyRecords) error {
|
|||
} else if rec.OperationType == Delete {
|
||||
delete(lastWriteKeyValueMap, bk)
|
||||
} else {
|
||||
if !reflect.DeepEqual(v.Value, rec.Value) {
|
||||
if !bytes.Equal(v.Value, rec.Value) {
|
||||
return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x",
|
||||
rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value)
|
||||
}
|
||||
|
@ -738,3 +752,199 @@ func validateSequential(rs historyRecords) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
TestConcurrentRepeatableRead verifies repeatable read. The case
|
||||
intentionally creates a scenario that read and write transactions
|
||||
are interleaved. It performs several writing operations after starting
|
||||
each long-running read transaction to ensure it has a larger txid
|
||||
than previous read transaction. It verifies that bbolt correctly
|
||||
releases free pages, and will not pollute (e.g. prematurely release)
|
||||
any pages which are still being used by any read transaction.
|
||||
*/
|
||||
func TestConcurrentRepeatableRead(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
noFreelistSync bool
|
||||
freelistType bolt.FreelistType
|
||||
}{
|
||||
// [array] freelist
|
||||
{
|
||||
name: "sync array freelist",
|
||||
noFreelistSync: false,
|
||||
freelistType: bolt.FreelistArrayType,
|
||||
},
|
||||
{
|
||||
name: "not sync array freelist",
|
||||
noFreelistSync: true,
|
||||
freelistType: bolt.FreelistArrayType,
|
||||
},
|
||||
// [map] freelist
|
||||
{
|
||||
name: "sync map freelist",
|
||||
noFreelistSync: false,
|
||||
freelistType: bolt.FreelistMapType,
|
||||
},
|
||||
{
|
||||
name: "not sync map freelist",
|
||||
noFreelistSync: true,
|
||||
freelistType: bolt.FreelistMapType,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
t.Log("Preparing db.")
|
||||
var (
|
||||
bucket = []byte("data")
|
||||
key = []byte("mykey")
|
||||
|
||||
option = &bolt.Options{
|
||||
PageSize: 4096,
|
||||
NoFreelistSync: tc.noFreelistSync,
|
||||
FreelistType: tc.freelistType,
|
||||
}
|
||||
)
|
||||
|
||||
db := mustCreateDB(t, option)
|
||||
defer func() {
|
||||
// The db will be reopened later, so put `db.Close()` in a function
|
||||
// to avoid premature evaluation of `db`. Note that the execution
|
||||
// of a deferred function is deferred to the moment the surrounding
|
||||
// function returns, but the function value and parameters to the
|
||||
// call are evaluated as usual and saved anew.
|
||||
db.Close()
|
||||
}()
|
||||
|
||||
// Create lots of K/V to allocate some pages
|
||||
err := db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < 1000; i++ {
|
||||
k := fmt.Sprintf("key_%d", i)
|
||||
if err := b.Put([]byte(k), make([]byte, 1024)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove all K/V to create some free pages
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
for i := 0; i < 1000; i++ {
|
||||
k := fmt.Sprintf("key_%d", i)
|
||||
if err := b.Delete([]byte(k)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return b.Put(key, []byte("randomValue"))
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// bbolt will not release free pages directly after committing
|
||||
// a writing transaction; instead all pages freed are putting
|
||||
// into a pending list. Accordingly, the free pages might not
|
||||
// be able to be reused by following writing transactions. So
|
||||
// we reopen the db to completely release all free pages.
|
||||
db = mustReOpenDB(t, db, option)
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
longRunningReaderCount = 10
|
||||
stopCh = make(chan struct{})
|
||||
errCh = make(chan error, longRunningReaderCount)
|
||||
readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond}
|
||||
|
||||
writeOperationCountInBetween = 5
|
||||
writeBytes = bytesRange{10, 20}
|
||||
|
||||
testDuration = 10 * time.Second
|
||||
)
|
||||
|
||||
for i := 0; i < longRunningReaderCount; i++ {
|
||||
readWorkerName := fmt.Sprintf("reader_%d", i)
|
||||
t.Logf("Starting long running read operation: %s", readWorkerName)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh)
|
||||
if rErr != nil {
|
||||
errCh <- rErr
|
||||
}
|
||||
}()
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
|
||||
for j := 0; j < writeOperationCountInBetween; j++ {
|
||||
_, err := executeWrite(db, bucket, key, writeBytes, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data")
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
cnt := longRunningReaderCount * writeOperationCountInBetween
|
||||
for i := 0; i < cnt; i++ {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
_, err := executeWrite(db, bucket, key, writeBytes, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Log("Waiting for result")
|
||||
select {
|
||||
case err := <-errCh:
|
||||
close(stopCh)
|
||||
t.Errorf("Detected dirty read: %v", err)
|
||||
case <-time.After(testDuration):
|
||||
close(stopCh)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error {
|
||||
err := db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(bucket)
|
||||
|
||||
initialVal := b.Get(key)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
t.Logf("%q finished.", name)
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
|
||||
val := b.Get(key)
|
||||
|
||||
if !bytes.Equal(initialVal, val) {
|
||||
dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
|
||||
string(key), formatBytes(initialVal), formatBytes(val))
|
||||
return dirtyReadErr
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue