Enhance TestDB_Concurrent_WriteTo to check consistent read

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
pull/779/head
Benjamin Wang 2024-06-27 16:48:18 +01:00
parent 53977ba4a8
commit 848f5fb7e4
1 changed files with 68 additions and 41 deletions

View File

@ -668,68 +668,95 @@ func TestDB_BeginRW(t *testing.T) {
}
// TestDB_Concurrent_WriteTo checks that issuing WriteTo operations concurrently
// with commits does not produce corrupted db files.
func TestDB_Concurrent_WriteTo(t *testing.T) {
o := &bolt.Options{NoFreelistSync: false}
// with commits does not produce corrupted db files. It also verifies that all
// readonly transactions, which are created based on the same data view, should
// always read the same data.
func TestDB_Concurrent_WriteTo_and_ConsistentRead(t *testing.T) {
o := &bolt.Options{
NoFreelistSync: false,
PageSize: 4096,
}
db := btesting.MustCreateDBWithOption(t, o)
wtxs, rtxs := 50, 5
bucketName := []byte("data")
var dataLock sync.Mutex
dataCache := make(map[int][]map[string]string)
var wg sync.WaitGroup
wtxs, rtxs := 5, 5
wg.Add(wtxs * rtxs)
f := func(tx *bolt.Tx) {
f := func(round int, tx *bolt.Tx) {
defer wg.Done()
f, err := os.CreateTemp("", "bolt-")
if err != nil {
panic(err)
}
time.Sleep(time.Duration(rand.Intn(20)+1) * time.Millisecond)
_, err = tx.WriteTo(f)
if err != nil {
panic(err)
}
time.Sleep(time.Duration(rand.Intn(200)+10) * time.Millisecond)
f := filepath.Join(t.TempDir(), fmt.Sprintf("%d-bolt-", round))
err := tx.CopyFile(f, 0600)
require.NoError(t, err)
// read all the data
b := tx.Bucket(bucketName)
data := make(map[string]string)
err = b.ForEach(func(k, v []byte) error {
data[string(k)] = string(v)
return nil
})
require.NoError(t, err)
// cache the data
dataLock.Lock()
dataSlice := dataCache[round]
dataSlice = append(dataSlice, data)
dataCache[round] = dataSlice
dataLock.Unlock()
err = tx.Rollback()
if err != nil {
panic(err)
}
f.Close()
require.NoError(t, err)
copyOpt := *o
snap := btesting.MustOpenDBWithOption(t, f.Name(), &copyOpt)
snap := btesting.MustOpenDBWithOption(t, f, &copyOpt)
defer snap.MustClose()
snap.MustCheck()
}
tx1, err := db.Begin(true)
if err != nil {
t.Fatal(err)
}
if _, err := tx1.CreateBucket([]byte("abc")); err != nil {
t.Fatal(err)
}
if err := tx1.Commit(); err != nil {
t.Fatal(err)
}
err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket(bucketName)
return err
})
require.NoError(t, err)
for i := 0; i < wtxs; i++ {
tx, err := db.Begin(true)
if err != nil {
t.Fatal(err)
}
if err := tx.Bucket([]byte("abc")).Put([]byte{0}, []byte{0}); err != nil {
t.Fatal(err)
}
require.NoError(t, err)
b := tx.Bucket(bucketName)
for j := 0; j < rtxs; j++ {
rtx, rerr := db.Begin(false)
if rerr != nil {
t.Fatal(rerr)
require.NoError(t, rerr)
go f(i, rtx)
for k := 0; k < 10; k++ {
key, value := fmt.Sprintf("key_%d", rand.Intn(10)), fmt.Sprintf("value_%d", rand.Intn(100))
perr := b.Put([]byte(key), []byte(value))
require.NoError(t, perr)
}
go f(rtx)
}
if err := tx.Commit(); err != nil {
t.Fatal(err)
}
err = tx.Commit()
require.NoError(t, err)
}
wg.Wait()
// compare the data. The data generated in the same round
// should be exactly the same.
for round, dataSlice := range dataCache {
data0 := dataSlice[0]
for i := 1; i < len(dataSlice); i++ {
datai := dataSlice[i]
same := reflect.DeepEqual(data0, datai)
require.True(t, same, fmt.Sprintf("found inconsistent data in round %d, data[0]: %v, data[%d] : %v", round, data0, i, datai))
}
}
}
// Ensure that opening a transaction while the DB is closed returns an error.