diff --git a/concurrent_test.go b/concurrent_test.go index cfedcd7..8e93aca 100644 --- a/concurrent_test.go +++ b/concurrent_test.go @@ -1,6 +1,7 @@ package bbolt_test import ( + "bytes" crand "crypto/rand" "encoding/hex" "encoding/json" @@ -9,7 +10,6 @@ import ( mrand "math/rand" "os" "path/filepath" - "reflect" "sort" "strings" "sync" @@ -60,14 +60,14 @@ type concurrentConfig struct { } /* -TestConcurrentReadAndWrite verifies: +TestConcurrentGenericReadAndWrite verifies: 1. Repeatable read: a read transaction should always see the same data view during its lifecycle. 2. Any data written by a writing transaction should be visible to any following reading transactions (with txid >= previous writing txid). 3. The txid should never decrease. */ -func TestConcurrentReadAndWrite(t *testing.T) { +func TestConcurrentGenericReadAndWrite(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } @@ -216,7 +216,21 @@ func concurrentReadAndWrite(t *testing.T, func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB { f := filepath.Join(t.TempDir(), "db") - t.Logf("Opening bbolt DB at: %s", f) + return mustOpenDB(t, f, o) +} + +func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB { + f := db.Path() + + t.Logf("Closing bbolt DB at: %s", f) + err := db.Close() + require.NoError(t, err) + + return mustOpenDB(t, f, o) +} + +func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB { + t.Logf("Opening bbolt DB at: %s", dbPath) if o == nil { o = bolt.DefaultOptions } @@ -228,7 +242,7 @@ func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB { o.FreelistType = freelistType - db, err := bolt.Open(f, 0666, o) + db, err := bolt.Open(dbPath, 0666, o) require.NoError(t, err) return db @@ -409,7 +423,7 @@ func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) val := b.Get(key) - if !reflect.DeepEqual(initialVal, val) { + if !bytes.Equal(initialVal, val) { return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", string(key), formatBytes(initialVal), formatBytes(val)) } @@ -713,7 +727,7 @@ func validateSequential(rs historyRecords) error { } else if rec.OperationType == Delete { delete(lastWriteKeyValueMap, bk) } else { - if !reflect.DeepEqual(v.Value, rec.Value) { + if !bytes.Equal(v.Value, rec.Value) { return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x", rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value) } @@ -738,3 +752,199 @@ func validateSequential(rs historyRecords) error { return nil } + +/* +TestConcurrentRepeatableRead verifies repeatable read. The case +intentionally creates a scenario that read and write transactions +are interleaved. It performs several writing operations after starting +each long-running read transaction to ensure it has a larger txid +than previous read transaction. It verifies that bbolt correctly +releases free pages, and will not pollute (e.g. prematurely release) +any pages which are still being used by any read transaction. +*/ +func TestConcurrentRepeatableRead(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + testCases := []struct { + name string + noFreelistSync bool + freelistType bolt.FreelistType + }{ + // [array] freelist + { + name: "sync array freelist", + noFreelistSync: false, + freelistType: bolt.FreelistArrayType, + }, + { + name: "not sync array freelist", + noFreelistSync: true, + freelistType: bolt.FreelistArrayType, + }, + // [map] freelist + { + name: "sync map freelist", + noFreelistSync: false, + freelistType: bolt.FreelistMapType, + }, + { + name: "not sync map freelist", + noFreelistSync: true, + freelistType: bolt.FreelistMapType, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + + t.Log("Preparing db.") + var ( + bucket = []byte("data") + key = []byte("mykey") + + option = &bolt.Options{ + PageSize: 4096, + NoFreelistSync: tc.noFreelistSync, + FreelistType: tc.freelistType, + } + ) + + db := mustCreateDB(t, option) + defer func() { + // The db will be reopened later, so put `db.Close()` in a function + // to avoid premature evaluation of `db`. Note that the execution + // of a deferred function is deferred to the moment the surrounding + // function returns, but the function value and parameters to the + // call are evaluated as usual and saved anew. + db.Close() + }() + + // Create lots of K/V to allocate some pages + err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(bucket) + if err != nil { + return err + } + for i := 0; i < 1000; i++ { + k := fmt.Sprintf("key_%d", i) + if err := b.Put([]byte(k), make([]byte, 1024)); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + // Remove all K/V to create some free pages + err = db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + for i := 0; i < 1000; i++ { + k := fmt.Sprintf("key_%d", i) + if err := b.Delete([]byte(k)); err != nil { + return err + } + } + return b.Put(key, []byte("randomValue")) + }) + require.NoError(t, err) + + // bbolt will not release free pages directly after committing + // a writing transaction; instead all pages freed are putting + // into a pending list. Accordingly, the free pages might not + // be able to be reused by following writing transactions. So + // we reopen the db to completely release all free pages. + db = mustReOpenDB(t, db, option) + + var ( + wg sync.WaitGroup + longRunningReaderCount = 10 + stopCh = make(chan struct{}) + errCh = make(chan error, longRunningReaderCount) + readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond} + + writeOperationCountInBetween = 5 + writeBytes = bytesRange{10, 20} + + testDuration = 10 * time.Second + ) + + for i := 0; i < longRunningReaderCount; i++ { + readWorkerName := fmt.Sprintf("reader_%d", i) + t.Logf("Starting long running read operation: %s", readWorkerName) + wg.Add(1) + go func() { + defer wg.Done() + rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh) + if rErr != nil { + errCh <- rErr + } + }() + time.Sleep(500 * time.Millisecond) + + t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween) + for j := 0; j < writeOperationCountInBetween; j++ { + _, err := executeWrite(db, bucket, key, writeBytes, 0) + require.NoError(t, err) + } + } + + t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data") + wg.Add(1) + go func() { + defer wg.Done() + cnt := longRunningReaderCount * writeOperationCountInBetween + for i := 0; i < cnt; i++ { + select { + case <-stopCh: + return + default: + } + _, err := executeWrite(db, bucket, key, writeBytes, 0) + require.NoError(t, err) + } + }() + + t.Log("Waiting for result") + select { + case err := <-errCh: + close(stopCh) + t.Errorf("Detected dirty read: %v", err) + case <-time.After(testDuration): + close(stopCh) + } + + wg.Wait() + }) + } +} + +func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error { + err := db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + + initialVal := b.Get(key) + + for { + select { + case <-stopCh: + t.Logf("%q finished.", name) + return nil + default: + } + + time.Sleep(randomDurationInRange(readInterval.min, readInterval.max)) + val := b.Get(key) + + if !bytes.Equal(initialVal, val) { + dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q", + string(key), formatBytes(initialVal), formatBytes(val)) + return dirtyReadErr + } + } + }) + + return err +}