Add internal iterator to Bucket that goes over buckets.

So far the code was frequently traversing all the keys (ignoring flag whether key is a bucket)
and trying to open each of the keys as bucket (seeking the same entry from the scratch).

In this proposal, we iterate only through bucket keys.

Signed-off-by: Piotr Tabor <ptab@google.com>
pull/356/head
Piotr Tabor 2022-12-16 15:57:09 +01:00
parent fa80ceeb19
commit ebca452da7
6 changed files with 160 additions and 63 deletions

View File

@ -229,11 +229,9 @@ func (b *Bucket) DeleteBucket(key []byte) error {
// Recursively delete all child buckets. // Recursively delete all child buckets.
child := b.Bucket(key) child := b.Bucket(key)
err := child.ForEach(func(k, v []byte) error { err := child.ForEachBucket(func(k []byte) error {
if _, _, childFlags := child.Cursor().seek(k); (childFlags & bucketLeafFlag) != 0 { if err := child.DeleteBucket(k); err != nil {
if err := child.DeleteBucket(k); err != nil { return fmt.Errorf("delete bucket: %s", err)
return fmt.Errorf("delete bucket: %s", err)
}
} }
return nil return nil
}) })
@ -394,7 +392,22 @@ func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
return nil return nil
} }
// Stat returns stats on a bucket. func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
if b.tx.db == nil {
return ErrTxClosed
}
c := b.Cursor()
for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
if flags&bucketLeafFlag != 0 {
if err := fn(k); err != nil {
return err
}
}
}
return nil
}
// Stats returns stats on a bucket.
func (b *Bucket) Stats() BucketStats { func (b *Bucket) Stats() BucketStats {
var s, subStats BucketStats var s, subStats BucketStats
pageSize := b.tx.db.pageSize pageSize := b.tx.db.pageSize
@ -461,7 +474,7 @@ func (b *Bucket) Stats() BucketStats {
// Keep track of maximum page depth. // Keep track of maximum page depth.
if depth+1 > s.Depth { if depth+1 > s.Depth {
s.Depth = (depth + 1) s.Depth = depth + 1
} }
}) })

View File

@ -5,6 +5,8 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"log" "log"
"math/rand" "math/rand"
"os" "os"
@ -13,7 +15,6 @@ import (
"testing" "testing"
"testing/quick" "testing/quick"
"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
) )
@ -1005,57 +1006,133 @@ func TestBucket_ForEach(t *testing.T) {
db := MustOpenDB() db := MustOpenDB()
defer db.MustClose() defer db.MustClose()
if err := db.Update(func(tx *bolt.Tx) error { type kv struct {
b, err := tx.CreateBucket([]byte("widgets")) k []byte
if err != nil { v []byte
t.Fatal(err) }
}
if err := b.Put([]byte("foo"), []byte("0000")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("baz"), []byte("0001")); err != nil {
t.Fatal(err)
}
if err := b.Put([]byte("bar"), []byte("0002")); err != nil {
t.Fatal(err)
}
var index int expectedItems := []kv{
if err := b.ForEach(func(k, v []byte) error { {k: []byte("bar"), v: []byte("0002")},
switch index { {k: []byte("baz"), v: []byte("0001")},
case 0: {k: []byte("csubbucket"), v: nil},
if !bytes.Equal(k, []byte("bar")) { {k: []byte("foo"), v: []byte("0000")},
t.Fatalf("unexpected key: %v", k) }
} else if !bytes.Equal(v, []byte("0002")) {
t.Fatalf("unexpected value: %v", v) verifyReads := func(b *bolt.Bucket) {
} var items []kv
case 1: err := b.ForEach(func(k, v []byte) error {
if !bytes.Equal(k, []byte("baz")) { items = append(items, kv{k: k, v: v})
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0001")) {
t.Fatalf("unexpected value: %v", v)
}
case 2:
if !bytes.Equal(k, []byte("foo")) {
t.Fatalf("unexpected key: %v", k)
} else if !bytes.Equal(v, []byte("0000")) {
t.Fatalf("unexpected value: %v", v)
}
}
index++
return nil return nil
}); err != nil { })
t.Fatal(err) assert.NoErrorf(t, err, "b.ForEach failed")
} assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}
if index != 3 { err := db.Update(func(tx *bolt.Tx) error {
t.Fatalf("unexpected index: %d", index) b, err := tx.CreateBucket([]byte("widgets"))
} require.NoError(t, err, "bucket creation failed")
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")
verifyReads(b)
return nil return nil
}); err != nil { })
t.Fatal(err) require.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}
func TestBucket_ForEachBucket(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()
expectedItems := [][]byte{
[]byte("csubbucket"),
[]byte("zsubbucket"),
} }
verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Equal(t, expectedItems, items, "what we iterated (ForEach) is not what we put")
}
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
require.NoError(t, err, "bucket creation failed")
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
_, err = b.CreateBucket([]byte("zsubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, b.Put([]byte("bar"), []byte("0002")), "put 'bar' failed")
_, err = b.CreateBucket([]byte("csubbucket"))
require.NoErrorf(t, err, "creation of subbucket failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
}
func TestBucket_ForEachBucket_NoBuckets(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()
verifyReads := func(b *bolt.Bucket) {
var items [][]byte
err := b.ForEachBucket(func(k []byte) error {
items = append(items, k)
return nil
})
assert.NoErrorf(t, err, "b.ForEach failed")
assert.Emptyf(t, items, "what we iterated (ForEach) is not what we put")
}
err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
require.NoError(t, err, "bucket creation failed")
require.NoErrorf(t, b.Put([]byte("foo"), []byte("0000")), "put 'foo' failed")
require.NoErrorf(t, err, "creation of subbucket failed")
require.NoErrorf(t, b.Put([]byte("baz"), []byte("0001")), "put 'baz' failed")
require.NoErrorf(t, err, "creation of subbucket failed")
verifyReads(b)
return nil
})
require.NoErrorf(t, err, "db.Update failed")
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
require.NotNil(t, b, "bucket opening failed")
verifyReads(b)
return nil
})
assert.NoErrorf(t, err, "db.View failed")
} }
// Ensure a database can stop iteration early. // Ensure a database can stop iteration early.

View File

@ -30,10 +30,18 @@ func (c *Cursor) Bucket() *Bucket {
// The returned key and value are only valid for the life of the transaction. // The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) { func (c *Cursor) First() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed") _assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.first()
if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil
}
return k, v
}
func (c *Cursor) first() (key []byte, value []byte, flags uint32) {
c.stack = c.stack[:0] c.stack = c.stack[:0]
p, n := c.bucket.pageNode(c.bucket.root) p, n := c.bucket.pageNode(c.bucket.root)
c.stack = append(c.stack, elemRef{page: p, node: n, index: 0}) c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
c.first() c.goToFirstElementOnTheStack()
// If we land on an empty page then move to the next value. // If we land on an empty page then move to the next value.
// https://github.com/boltdb/bolt/issues/450 // https://github.com/boltdb/bolt/issues/450
@ -43,10 +51,9 @@ func (c *Cursor) First() (key []byte, value []byte) {
k, v, flags := c.keyValue() k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 { if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil return k, nil, flags
} }
return k, v return k, v, flags
} }
// Last moves the cursor to the last item in the bucket and returns its key and value. // Last moves the cursor to the last item in the bucket and returns its key and value.
@ -155,7 +162,7 @@ func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
} }
// first moves the cursor to the first leaf element under the last page in the stack. // first moves the cursor to the first leaf element under the last page in the stack.
func (c *Cursor) first() { func (c *Cursor) goToFirstElementOnTheStack() {
for { for {
// Exit when we hit a leaf page. // Exit when we hit a leaf page.
var ref = &c.stack[len(c.stack)-1] var ref = &c.stack[len(c.stack)-1]
@ -223,7 +230,7 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
// Otherwise start from where we left off in the stack and find the // Otherwise start from where we left off in the stack and find the
// first element of the first leaf page. // first element of the first leaf page.
c.stack = c.stack[:i+1] c.stack = c.stack[:i+1]
c.first() c.goToFirstElementOnTheStack()
// If this is an empty page then restart and move back up the stack. // If this is an empty page then restart and move back up the stack.
// https://github.com/boltdb/bolt/issues/450 // https://github.com/boltdb/bolt/issues/450

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.17
require ( require (
github.com/stretchr/testify v1.8.1 github.com/stretchr/testify v1.8.1
golang.org/x/sys v0.2.0 golang.org/x/sys v0.3.0
) )
require ( require (

4
go.sum
View File

@ -10,8 +10,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

2
tx.go
View File

@ -484,7 +484,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bo
}) })
// Check each bucket within this bucket. // Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error { _ = b.ForEachBucket(func(k []byte) error {
if child := b.Bucket(k); child != nil { if child := b.Bucket(k); child != nil {
tx.checkBucket(child, reachable, freed, ch) tx.checkBucket(child, reachable, freed, ch)
} }