mirror of https://github.com/etcd-io/bbolt.git
Merge pull request #356 from ptabor/20221215-flags
Add internal iterator to Bucket that goes over buckets.pull/358/head
commit
696c85ce3d
27
bucket.go
27
bucket.go
|
@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error {
|
|||
|
||||
// Recursively delete all child buckets.
|
||||
child := b.Bucket(key)
|
||||
err := child.ForEach(func(k, v []byte) error {
|
||||
if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 {
|
||||
if err := child.DeleteBucket(k); err != nil {
|
||||
return fmt.Errorf("delete bucket: %s", err)
|
||||
}
|
||||
err := child.ForEachBucket(func(k []byte) error {
|
||||
if err := child.DeleteBucket(k); err != nil {
|
||||
return fmt.Errorf("delete bucket: %s", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Stat returns stats on a bucket.
|
||||
func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
|
||||
if b.tx.db == nil {
|
||||
return ErrTxClosed
|
||||
}
|
||||
c := b.Cursor()
|
||||
for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
|
||||
if flags&bucketLeafFlag != 0 {
|
||||
if err := fn(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns stats on a bucket.
|
||||
func (b *Bucket) Stats() BucketStats {
|
||||
var s, subStats BucketStats
|
||||
pageSize := b.tx.db.pageSize
|
||||
|
@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats {
|
|||
|
||||
// Keep track of maximum page depth.
|
||||
if depth+1 > s.Depth {
|
||||
s.Depth = (depth + 1)
|
||||
s.Depth = depth + 1
|
||||
}
|
||||
})
|
||||
|
||||
|
|
169
bucket_test.go
169
bucket_test.go
|
@ -5,6 +5,8 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -13,7 +15,6 @@ import (
|
|||
"testing"
|
||||
"testing/quick"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
|
@ -1005,57 +1006,133 @@ func TestBucket_ForEach(t *testing.T) {
|
|||
db := MustOpenDB()
|
||||
defer db.MustClose()
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucket([]byte("widgets"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := b.Put([]byte("foo"), []byte("0000")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := b.Put([]byte("baz"), []byte("0001")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := b.Put([]byte("bar"), []byte("0002")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
type kv struct {
|
||||
k []byte
|
||||
v []byte
|
||||
}
|
||||
|
||||
var index int
|
||||
if err := b.ForEach(func(k, v []byte) error {
|
||||
switch index {
|
||||
case 0:
|
||||
if !bytes.Equal(k, []byte("bar")) {
|
||||
t.Fatalf("unexpected key: %v", k)
|
||||
} else if !bytes.Equal(v, []byte("0002")) {
|
||||
t.Fatalf("unexpected value: %v", v)
|
||||
}
|
||||
case 1:
|
||||
if !bytes.Equal(k, []byte("baz")) {
|
||||
t.Fatalf("unexpected key: %v", k)
|
||||
} else if !bytes.Equal(v, []byte("0001")) {
|
||||
t.Fatalf("unexpected value: %v", v)
|
||||
}
|
||||
case 2:
|
||||
if !bytes.Equal(k, []byte("foo")) {
|
||||
t.Fatalf("unexpected key: %v", k)
|
||||
} else if !bytes.Equal(v, []byte("0000")) {
|
||||
t.Fatalf("unexpected value: %v", v)
|
||||
}
|
||||
}
|
||||
index++
|
||||
expectedItems := []kv{
|
||||
{k: []byte("bar"), v: []byte("0002")},
|
||||
{k: []byte("baz"), v: []byte("0001")},
|
||||
{k: []byte("csubbucket"), v: nil},
|
||||
{k: []byte("foo"), v: []byte("0000")},
|
||||
}
|
||||
|
||||
verifyReads := func(b *bolt.Bucket) {
|
||||
var items []kv
|
||||
err := b.ForEach(func(k, v []byte) error {
|
||||
items = append(items, kv{k: k, v: v})
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
assert.NoErrorf(t, err, "b.ForEach failed")
|
||||
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
|
||||
}
|
||||
|
||||
if index != 3 {
|
||||
t.Fatalf("unexpected index: %d", index)
|
||||
}
|
||||
err := db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucket([]byte("widgets"))
|
||||
require.NoError(t, err, "bucket creation failed")
|
||||
|
||||
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
|
||||
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
|
||||
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
|
||||
_, err = b.CreateBucket([]byte("csubbucket"))
|
||||
require.NoErrorf(t, err, "creation of subbucket failed")
|
||||
|
||||
verifyReads(b)
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
})
|
||||
require.NoErrorf(t, err, "db.Update failed")
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("widgets"))
|
||||
require.NotNil(t, b, "bucket opening failed")
|
||||
verifyReads(b)
|
||||
return nil
|
||||
})
|
||||
assert.NoErrorf(t, err, "db.View failed")
|
||||
}
|
||||
|
||||
func TestBucket_ForEachBucket(t *testing.T) {
|
||||
db := MustOpenDB()
|
||||
defer db.MustClose()
|
||||
|
||||
expectedItems := [][]byte{
|
||||
[]byte("csubbucket"),
|
||||
[]byte("zsubbucket"),
|
||||
}
|
||||
|
||||
verifyReads := func(b *bolt.Bucket) {
|
||||
var items [][]byte
|
||||
err := b.ForEachBucket(func(k []byte) error {
|
||||
items = append(items, k)
|
||||
return nil
|
||||
})
|
||||
assert.NoErrorf(t, err, "b.ForEach failed")
|
||||
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
|
||||
}
|
||||
|
||||
err := db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucket([]byte("widgets"))
|
||||
require.NoError(t, err, "bucket creation failed")
|
||||
|
||||
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
|
||||
_, err = b.CreateBucket([]byte("zsubbucket"))
|
||||
require.NoErrorf(t, err, "creation of subbucket failed")
|
||||
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
|
||||
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
|
||||
_, err = b.CreateBucket([]byte("csubbucket"))
|
||||
require.NoErrorf(t, err, "creation of subbucket failed")
|
||||
|
||||
verifyReads(b)
|
||||
|
||||
return nil
|
||||
})
|
||||
assert.NoErrorf(t, err, "db.Update failed")
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("widgets"))
|
||||
require.NotNil(t, b, "bucket opening failed")
|
||||
verifyReads(b)
|
||||
return nil
|
||||
})
|
||||
assert.NoErrorf(t, err, "db.View failed")
|
||||
}
|
||||
|
||||
func TestBucket_ForEachBucket_NoBuckets(t *testing.T) {
|
||||
db := MustOpenDB()
|
||||
defer db.MustClose()
|
||||
|
||||
verifyReads := func(b *bolt.Bucket) {
|
||||
var items [][]byte
|
||||
err := b.ForEachBucket(func(k []byte) error {
|
||||
items = append(items, k)
|
||||
return nil
|
||||
})
|
||||
assert.NoErrorf(t, err, "b.ForEach failed")
|
||||
assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put")
|
||||
}
|
||||
|
||||
err := db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucket([]byte("widgets"))
|
||||
require.NoError(t, err, "bucket creation failed")
|
||||
|
||||
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
|
||||
require.NoErrorf(t, err, "creation of subbucket failed")
|
||||
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
|
||||
require.NoErrorf(t, err, "creation of subbucket failed")
|
||||
|
||||
verifyReads(b)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoErrorf(t, err, "db.Update failed")
|
||||
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket([]byte("widgets"))
|
||||
require.NotNil(t, b, "bucket opening failed")
|
||||
verifyReads(b)
|
||||
return nil
|
||||
})
|
||||
assert.NoErrorf(t, err, "db.View failed")
|
||||
}
|
||||
|
||||
// Ensure a database can stop iteration early.
|
||||
|
|
19
cursor.go
19
cursor.go
|
@ -30,10 +30,18 @@ func (c *Cursor) Bucket() *Bucket {
|
|||
// The returned key and value are only valid for the life of the transaction.
|
||||
func (c *Cursor) First() (key []byte, value []byte) {
|
||||
_assert(c.bucket.tx.db != nil, "tx closed")
|
||||
k, v, flags := c.first()
|
||||
if (flags & uint32(bucketLeafFlag)) != 0 {
|
||||
return k, nil
|
||||
}
|
||||
return k, v
|
||||
}
|
||||
|
||||
func (c *Cursor) first() (key []byte, value []byte, flags uint32) {
|
||||
c.stack = c.stack[:0]
|
||||
p, n := c.bucket.pageNode(c.bucket.root)
|
||||
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
|
||||
c.first()
|
||||
c.goToFirstElementOnTheStack()
|
||||
|
||||
// If we land on an empty page then move to the next value.
|
||||
// https://github.com/boltdb/bolt/issues/450
|
||||
|
@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) {
|
|||
|
||||
k, v, flags := c.keyValue()
|
||||
if (flags & uint32(bucketLeafFlag)) != 0 {
|
||||
return k, nil
|
||||
return k, nil, flags
|
||||
}
|
||||
return k, v
|
||||
|
||||
return k, v, flags
|
||||
}
|
||||
|
||||
// Last moves the cursor to the last item in the bucket and returns its key and value.
|
||||
|
@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
|
|||
}
|
||||
|
||||
// first moves the cursor to the first leaf element under the last page in the stack.
|
||||
func (c *Cursor) first() {
|
||||
func (c *Cursor) goToFirstElementOnTheStack() {
|
||||
for {
|
||||
// Exit when we hit a leaf page.
|
||||
var ref = &c.stack[len(c.stack)-1]
|
||||
|
@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
|
|||
// Otherwise start from where we left off in the stack and find the
|
||||
// first element of the first leaf page.
|
||||
c.stack = c.stack[:i+1]
|
||||
c.first()
|
||||
c.goToFirstElementOnTheStack()
|
||||
|
||||
// If this is an empty page then restart and move back up the stack.
|
||||
// https://github.com/boltdb/bolt/issues/450
|
||||
|
|
2
go.mod
2
go.mod
|
@ -4,7 +4,7 @@ go 1.17
|
|||
|
||||
require (
|
||||
github.com/stretchr/testify v1.8.1
|
||||
golang.org/x/sys v0.2.0
|
||||
golang.org/x/sys v0.3.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
|
4
go.sum
4
go.sum
|
@ -10,8 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
|||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
|
||||
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
|
||||
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
2
tx.go
2
tx.go
|
@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
|
|||
})
|
||||
|
||||
// Check each bucket within this bucket.
|
||||
_ = b.ForEach(func(k, v []byte) error {
|
||||
_ = b.ForEachBucket(func(k []byte) error {
|
||||
if child := b.Bucket(k); child != nil {
|
||||
tx.checkBucket(child, reachable, freed, ch)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue