Refactor test case TestTx_MoveBucket and add log for MoveBucket

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
pull/635/head
Benjamin Wang 2024-01-02 13:35:38 +00:00 committed by Benjamin Wang
parent ac355dec24
commit 0bd26bc48c
2 changed files with 182 additions and 197 deletions

View File

@ -1,10 +1,10 @@
package bbolt_test package bbolt_test
import ( import (
"bytes"
crand "crypto/rand" crand "crypto/rand"
"math/rand" "math/rand"
"os" "os"
"path/filepath"
"testing" "testing"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
@ -16,259 +16,243 @@ import (
func TestTx_MoveBucket(t *testing.T) { func TestTx_MoveBucket(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
srcBucketPath []string srcBucketPath []string
dstBucketPath []string dstBucketPath []string
bucketToMove string bucketToMove string
incompatibleKeyInSrc bool bucketExistInSrc bool
incompatibleKeyInDst bool bucketExistInDst bool
parentSrc bool hasIncompatibleKeyInSrc bool
parentDst bool hasIncompatibleKeyInDst bool
expActErr error expectedErr error
}{ }{
// normal cases
{ {
name: "happy path", name: "normal case",
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"}, srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{"db1", "db2"}, dstBucketPath: []string{"db1", "db2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: true,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: true, hasIncompatibleKeyInSrc: false,
parentDst: false, hasIncompatibleKeyInDst: false,
expActErr: nil, expectedErr: nil,
}, },
{ {
name: "bucketToMove not exist in srcBucket", name: "the source and target bucket share the same grandparent",
srcBucketPath: []string{"sb1", "sb2"}, srcBucketPath: []string{"grandparent", "sb2"},
dstBucketPath: []string{"db1", "db2"}, dstBucketPath: []string{"grandparent", "db2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: true,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: false, hasIncompatibleKeyInSrc: false,
parentDst: false, hasIncompatibleKeyInDst: false,
expActErr: errors.ErrBucketNotFound, expectedErr: nil,
}, },
{ {
name: "bucketToMove exist in dstBucket", name: "bucketToMove is a top level bucket",
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"}, srcBucketPath: []string{},
dstBucketPath: []string{"db1", "db2", "sb3ToMove"}, dstBucketPath: []string{"db1", "db2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: true,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: true, hasIncompatibleKeyInSrc: false,
parentDst: true, hasIncompatibleKeyInDst: false,
expActErr: errors.ErrBucketExists, expectedErr: nil,
}, },
{ {
name: "bucketToMove key exist in srcBucket but no subBucket value", name: "convert bucketToMove to a top level bucket",
srcBucketPath: []string{"sb1", "sb2"}, srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{"db1", "db2"}, dstBucketPath: []string{},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: true, bucketExistInSrc: true,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: true, hasIncompatibleKeyInSrc: false,
parentDst: false, hasIncompatibleKeyInDst: false,
expActErr: errors.ErrIncompatibleValue, expectedErr: nil,
},
// negative cases
{
name: "bucketToMove not exist in source bucket",
srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{"db1", "db2"},
bucketToMove: "bucketToMove",
bucketExistInSrc: false,
bucketExistInDst: false,
hasIncompatibleKeyInSrc: false,
hasIncompatibleKeyInDst: false,
expectedErr: errors.ErrBucketNotFound,
}, },
{ {
name: "bucketToMove key exist in dstBucket but no subBucket value", name: "bucketToMove exist in target bucket",
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"}, srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{"db1", "db2"}, dstBucketPath: []string{"db1", "db2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: true,
incompatibleKeyInDst: true, bucketExistInDst: true,
parentSrc: true, hasIncompatibleKeyInSrc: false,
parentDst: true, hasIncompatibleKeyInDst: false,
expActErr: errors.ErrIncompatibleValue, expectedErr: errors.ErrBucketExists,
}, },
{ {
name: "srcBucket is rootBucket", name: "incompatible key exist in source bucket",
srcBucketPath: []string{"", "sb3ToMove"}, srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{"db1", "db2"}, dstBucketPath: []string{"db1", "db2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: false,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: true, hasIncompatibleKeyInSrc: true,
parentDst: false, hasIncompatibleKeyInDst: false,
expActErr: nil, expectedErr: errors.ErrIncompatibleValue,
}, },
{ {
name: "dstBucket is rootBucket", name: "incompatible key exist in target bucket",
srcBucketPath: []string{"sb1", "sb2", "sb3ToMove"}, srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{""}, dstBucketPath: []string{"db1", "db2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: true,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: true, hasIncompatibleKeyInSrc: false,
parentDst: false, hasIncompatibleKeyInDst: true,
expActErr: nil, expectedErr: errors.ErrIncompatibleValue,
}, },
{ {
name: "srcBucket is rootBucket and dstBucket is rootBucket", name: "the source and target are the same bucket",
srcBucketPath: []string{"", "sb3ToMove"}, srcBucketPath: []string{"sb1", "sb2"},
dstBucketPath: []string{""}, dstBucketPath: []string{"sb1", "sb2"},
bucketToMove: "sb3ToMove", bucketToMove: "bucketToMove",
incompatibleKeyInSrc: false, bucketExistInSrc: true,
incompatibleKeyInDst: false, bucketExistInDst: false,
parentSrc: false, hasIncompatibleKeyInSrc: false,
parentDst: false, hasIncompatibleKeyInDst: false,
expActErr: errors.ErrSameBuckets, expectedErr: errors.ErrSameBuckets,
},
{
name: "both the source and target are the root bucket",
srcBucketPath: []string{},
dstBucketPath: []string{},
bucketToMove: "bucketToMove",
bucketExistInSrc: true,
bucketExistInDst: false,
hasIncompatibleKeyInSrc: false,
hasIncompatibleKeyInDst: false,
expectedErr: errors.ErrSameBuckets,
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(*testing.T) { t.Run(tc.name, func(*testing.T) {
db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: pageSize}) db := btesting.MustCreateDBWithOption(t, &bbolt.Options{PageSize: 4096})
dumpBucketBeforeMoving := tempfile() dumpBucketBeforeMoving := filepath.Join(t.TempDir(), "dbBeforeMove")
dumpBucketAfterMoving := tempfile() dumpBucketAfterMoving := filepath.Join(t.TempDir(), "dbAfterMove")
// arrange t.Log("Creating sample db and populate some data")
if err := db.Update(func(tx *bbolt.Tx) error { err := db.Update(func(tx *bbolt.Tx) error {
srcBucket := openBuckets(t, tx, tc.incompatibleKeyInSrc, true, false, tc.srcBucketPath...) srcBucket := prepareBuckets(t, tx, tc.srcBucketPath...)
dstBucket := openBuckets(t, tx, tc.incompatibleKeyInDst, true, false, tc.dstBucketPath...) dstBucket := prepareBuckets(t, tx, tc.dstBucketPath...)
if tc.incompatibleKeyInSrc { if tc.bucketExistInSrc {
if pErr := srcBucket.Put([]byte(tc.bucketToMove), []byte("0")); pErr != nil { _ = createBucketAndPopulateData(t, tx, srcBucket, tc.bucketToMove)
t.Fatalf("error inserting key %v, and value %v in bucket %v: %v", tc.bucketToMove, "0", srcBucket, pErr)
}
} }
if tc.incompatibleKeyInDst { if tc.bucketExistInDst {
if pErr := dstBucket.Put([]byte(tc.bucketToMove), []byte("0")); pErr != nil { _ = createBucketAndPopulateData(t, tx, dstBucket, tc.bucketToMove)
t.Fatalf("error inserting key %v, and value %v in bucket %v: %v", tc.bucketToMove, "0", dstBucket, pErr) }
}
if tc.hasIncompatibleKeyInSrc {
putErr := srcBucket.Put([]byte(tc.bucketToMove), []byte("bar"))
require.NoError(t, putErr)
}
if tc.hasIncompatibleKeyInDst {
putErr := dstBucket.Put([]byte(tc.bucketToMove), []byte("bar"))
require.NoError(t, putErr)
} }
return nil return nil
}); err != nil { })
t.Fatal(err) require.NoError(t, err)
}
db.MustCheck()
// act t.Log("Moving bucket")
if err := db.Update(func(tx *bbolt.Tx) error { err = db.Update(func(tx *bbolt.Tx) error {
srcBucket := openBuckets(t, tx, false, false, tc.parentSrc, tc.srcBucketPath...) srcBucket := prepareBuckets(t, tx, tc.srcBucketPath...)
dstBucket := openBuckets(t, tx, false, false, tc.parentDst, tc.dstBucketPath...) dstBucket := prepareBuckets(t, tx, tc.dstBucketPath...)
var bucketToMove *bbolt.Bucket if tc.expectedErr == nil {
if srcBucket != nil { t.Logf("Dump the bucket to %s before moving it", dumpBucketBeforeMoving)
bucketToMove = srcBucket.Bucket([]byte(tc.bucketToMove)) bk := openBucket(tx, srcBucket, tc.bucketToMove)
} else { dumpErr := dumpBucket([]byte(tc.bucketToMove), bk, dumpBucketBeforeMoving)
bucketToMove = tx.Bucket([]byte(tc.bucketToMove)) require.NoError(t, dumpErr)
}
if tc.expActErr == nil && bucketToMove != nil {
if wErr := dumpBucket([]byte(tc.bucketToMove), bucketToMove, dumpBucketBeforeMoving); wErr != nil {
t.Fatalf("error dumping bucket %v to file %v: %v", bucketToMove.String(), dumpBucketBeforeMoving, wErr)
}
} }
mErr := tx.MoveBucket([]byte(tc.bucketToMove), srcBucket, dstBucket) mErr := tx.MoveBucket([]byte(tc.bucketToMove), srcBucket, dstBucket)
require.ErrorIs(t, mErr, tc.expActErr) require.Equal(t, tc.expectedErr, mErr)
if tc.expectedErr == nil {
t.Logf("Dump the bucket to %s after moving it", dumpBucketAfterMoving)
bk := openBucket(tx, dstBucket, tc.bucketToMove)
dumpErr := dumpBucket([]byte(tc.bucketToMove), bk, dumpBucketAfterMoving)
require.NoError(t, dumpErr)
}
return nil return nil
}); err != nil { })
t.Fatal(err) require.NoError(t, err)
}
db.MustCheck()
// skip assertion if failure expected // skip assertion if failure expected
if tc.expActErr != nil { if tc.expectedErr != nil {
return return
} }
// assert t.Log("Verifying the bucket should be identical before and after being moved")
if err := db.Update(func(tx *bbolt.Tx) error { dataBeforeMove, err := os.ReadFile(dumpBucketBeforeMoving)
var movedBucket *bbolt.Bucket require.NoError(t, err)
srcBucket := openBuckets(t, tx, false, false, tc.parentSrc, tc.srcBucketPath...) dataAfterMove, err := os.ReadFile(dumpBucketAfterMoving)
require.NoError(t, err)
if srcBucket != nil { require.Equal(t, dataBeforeMove, dataAfterMove)
if movedBucket = srcBucket.Bucket([]byte(tc.bucketToMove)); movedBucket != nil {
t.Fatalf("expected childBucket %v to be moved from srcBucket %v", tc.bucketToMove, srcBucket)
}
} else {
if movedBucket = tx.Bucket([]byte(tc.bucketToMove)); movedBucket != nil {
t.Fatalf("expected childBucket %v to be moved from root bucket %v", tc.bucketToMove, "root bucket")
}
}
dstBucket := openBuckets(t, tx, false, false, tc.parentDst, tc.dstBucketPath...)
if dstBucket != nil {
if movedBucket = dstBucket.Bucket([]byte(tc.bucketToMove)); movedBucket == nil {
t.Fatalf("expected childBucket %v to be child of dstBucket %v", tc.bucketToMove, dstBucket)
}
} else {
if movedBucket = tx.Bucket([]byte(tc.bucketToMove)); movedBucket == nil {
t.Fatalf("expected childBucket %v to be child of dstBucket %v", tc.bucketToMove, "root bucket")
}
}
wErr := dumpBucket([]byte(tc.bucketToMove), movedBucket, dumpBucketAfterMoving)
if wErr != nil {
t.Fatalf("error dumping bucket %v to file %v", movedBucket.String(), dumpBucketAfterMoving)
}
beforeBucket := readBucketFromFile(t, dumpBucketBeforeMoving)
afterBucket := readBucketFromFile(t, dumpBucketAfterMoving)
if !bytes.Equal(beforeBucket, afterBucket) {
t.Fatalf("bucket's content before moving is different than after moving")
}
return nil
}); err != nil {
t.Fatal(err)
}
db.MustCheck()
}) })
} }
} }
func openBuckets(t testing.TB, tx *bbolt.Tx, incompatibleKey bool, init bool, parent bool, paths ...string) *bbolt.Bucket { // prepareBuckets opens the bucket chain. For each bucket in the chain,
t.Helper() // open it if existed, otherwise create it and populate sample data.
func prepareBuckets(t testing.TB, tx *bbolt.Tx, buckets ...string) *bbolt.Bucket {
var bk *bbolt.Bucket var bk *bbolt.Bucket
var err error
idx := len(paths) - 1 for _, key := range buckets {
for i, key := range paths { if childBucket := openBucket(tx, bk, key); childBucket == nil {
if len(key) == 0 { bk = createBucketAndPopulateData(t, tx, bk, key)
if !init {
break
}
continue
}
if (incompatibleKey && i == idx) || (parent && i == idx) {
continue
}
if bk == nil {
bk, err = tx.CreateBucketIfNotExists([]byte(key))
} else { } else {
bk, err = bk.CreateBucketIfNotExists([]byte(key)) bk = childBucket
}
if err != nil {
t.Fatalf("error creating bucket %v: %v", key, err)
}
if init {
insertRandKeysValuesBucket(t, bk, rand.Intn(4096))
} }
} }
return bk return bk
} }
func readBucketFromFile(t testing.TB, tmpFile string) []byte { func openBucket(tx *bbolt.Tx, bk *bbolt.Bucket, bucketToOpen string) *bbolt.Bucket {
data, err := os.ReadFile(tmpFile) if bk == nil {
if err != nil { return tx.Bucket([]byte(bucketToOpen))
t.Fatalf("error reading temp file %v", tmpFile)
} }
return bk.Bucket([]byte(bucketToOpen))
return data
} }
func insertRandKeysValuesBucket(t testing.TB, bk *bbolt.Bucket, n int) { func createBucketAndPopulateData(t testing.TB, tx *bbolt.Tx, bk *bbolt.Bucket, bucketName string) *bbolt.Bucket {
if bk == nil {
newBucket, err := tx.CreateBucket([]byte(bucketName))
require.NoError(t, err, "failed to create bucket %s", bucketName)
populateSampleDataInBucket(t, newBucket, rand.Intn(4096))
return newBucket
}
newBucket, err := bk.CreateBucket([]byte(bucketName))
require.NoError(t, err, "failed to create bucket %s", bucketName)
populateSampleDataInBucket(t, bk, rand.Intn(4096))
return newBucket
}
func populateSampleDataInBucket(t testing.TB, bk *bbolt.Bucket, n int) {
var min, max = 1, 1024 var min, max = 1, 1024
for i := 0; i < n; i++ { for i := 0; i < n; i++ {

View File

@ -16,6 +16,7 @@ func dumpBucket(srcBucketName []byte, srcBucket *bolt.Bucket, dstFilename string
if err != nil { if err != nil {
return err return err
} }
defer dstDB.Close()
return dstDB.Update(func(tx *bolt.Tx) error { return dstDB.Update(func(tx *bolt.Tx) error {
dstBucket, err := tx.CreateBucket(srcBucketName) dstBucket, err := tx.CreateBucket(srcBucketName)