mirror of
https://github.com/etcd-io/bbolt.git
synced 2025-05-31 11:42:30 +00:00
Add log into MoveBucket and clone the key
Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
This commit is contained in:
parent
0bd26bc48c
commit
886eccbdf5
48
bucket.go
48
bucket.go
@ -285,19 +285,21 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
|
||||
return errors.ErrTxNotWritable
|
||||
}
|
||||
|
||||
newKey := cloneBytes(key)
|
||||
|
||||
// Move cursor to correct position.
|
||||
c := b.Cursor()
|
||||
k, _, flags := c.seek(key)
|
||||
k, _, flags := c.seek(newKey)
|
||||
|
||||
// Return an error if bucket doesn't exist or is not a bucket.
|
||||
if !bytes.Equal(key, k) {
|
||||
if !bytes.Equal(newKey, k) {
|
||||
return errors.ErrBucketNotFound
|
||||
} else if (flags & common.BucketLeafFlag) == 0 {
|
||||
return errors.ErrIncompatibleValue
|
||||
}
|
||||
|
||||
// Recursively delete all child buckets.
|
||||
child := b.Bucket(key)
|
||||
child := b.Bucket(newKey)
|
||||
err = child.ForEachBucket(func(k []byte) error {
|
||||
if err := child.DeleteBucket(k); err != nil {
|
||||
return fmt.Errorf("delete bucket: %s", err)
|
||||
@ -309,7 +311,7 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
|
||||
}
|
||||
|
||||
// Remove cached copy.
|
||||
delete(b.buckets, string(key))
|
||||
delete(b.buckets, string(newKey))
|
||||
|
||||
// Release all bucket pages to freelist.
|
||||
child.nodes = nil
|
||||
@ -317,7 +319,7 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
|
||||
child.free()
|
||||
|
||||
// Delete the node if we have a matching key.
|
||||
c.node().del(key)
|
||||
c.node().del(newKey)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -328,48 +330,62 @@ func (b *Bucket) DeleteBucket(key []byte) (err error) {
|
||||
// 2. or the key already exists in the destination bucket;
|
||||
// 3. or the key represents a non-bucket value;
|
||||
// 4. the source and destination buckets are the same.
|
||||
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) error {
|
||||
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
|
||||
lg := b.tx.db.Logger()
|
||||
lg.Debugf("Moving bucket %q", string(key))
|
||||
defer func() {
|
||||
if err != nil {
|
||||
lg.Errorf("Moving bucket %q failed: %v", string(key), err)
|
||||
} else {
|
||||
lg.Debugf("Moving bucket %q successfully", string(key))
|
||||
}
|
||||
}()
|
||||
|
||||
if b.tx.db == nil || dstBucket.tx.db == nil {
|
||||
return errors.ErrTxClosed
|
||||
} else if !dstBucket.Writable() {
|
||||
return errors.ErrTxNotWritable
|
||||
}
|
||||
|
||||
newKey := cloneBytes(key)
|
||||
|
||||
// Move cursor to correct position.
|
||||
c := b.Cursor()
|
||||
k, v, flags := c.seek(key)
|
||||
k, v, flags := c.seek(newKey)
|
||||
|
||||
// Return an error if bucket doesn't exist or is not a bucket.
|
||||
if !bytes.Equal(key, k) {
|
||||
if !bytes.Equal(newKey, k) {
|
||||
return errors.ErrBucketNotFound
|
||||
} else if (flags & common.BucketLeafFlag) == 0 {
|
||||
return fmt.Errorf("key %q isn't a bucket in the source bucket: %w", key, errors.ErrIncompatibleValue)
|
||||
lg.Errorf("An incompatible key %s exists in the source bucket", string(newKey))
|
||||
return errors.ErrIncompatibleValue
|
||||
}
|
||||
|
||||
// Do nothing (return true directly) if the source bucket and the
|
||||
// destination bucket are actually the same bucket.
|
||||
if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
|
||||
return fmt.Errorf("source bucket %s and target bucket %s are the same: %w", b.String(), dstBucket.String(), errors.ErrSameBuckets)
|
||||
lg.Errorf("The source bucket (%s) and the target bucket (%s) are the same bucket", b.String(), dstBucket.String())
|
||||
return errors.ErrSameBuckets
|
||||
}
|
||||
|
||||
// check whether the key already exists in the destination bucket
|
||||
curDst := dstBucket.Cursor()
|
||||
k, _, flags = curDst.seek(key)
|
||||
k, _, flags = curDst.seek(newKey)
|
||||
|
||||
// Return an error if there is an existing key in the destination bucket.
|
||||
if bytes.Equal(key, k) {
|
||||
if bytes.Equal(newKey, k) {
|
||||
if (flags & common.BucketLeafFlag) != 0 {
|
||||
return errors.ErrBucketExists
|
||||
}
|
||||
return fmt.Errorf("key %q already exists in the target bucket: %w", key, errors.ErrIncompatibleValue)
|
||||
lg.Errorf("An incompatible key %s exists in the target bucket", string(newKey))
|
||||
return errors.ErrIncompatibleValue
|
||||
}
|
||||
|
||||
// remove the sub-bucket from the source bucket
|
||||
delete(b.buckets, string(key))
|
||||
c.node().del(key)
|
||||
delete(b.buckets, string(newKey))
|
||||
c.node().del(newKey)
|
||||
|
||||
// add te sub-bucket to the destination bucket
|
||||
newKey := cloneBytes(key)
|
||||
newValue := cloneBytes(v)
|
||||
curDst.node().put(newKey, newKey, newValue, 0, common.BucketLeafFlag)
|
||||
|
||||
|
@ -248,7 +248,7 @@ func createBucketAndPopulateData(t testing.TB, tx *bbolt.Tx, bk *bbolt.Bucket, b
|
||||
|
||||
newBucket, err := bk.CreateBucket([]byte(bucketName))
|
||||
require.NoError(t, err, "failed to create bucket %s", bucketName)
|
||||
populateSampleDataInBucket(t, bk, rand.Intn(4096))
|
||||
populateSampleDataInBucket(t, newBucket, rand.Intn(4096))
|
||||
return newBucket
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user