fix(cursor): Last needs skip empty pages

Signed-off-by: dchaofei <dchaofei@163.com>
pull/341/head
dchaofei 2022-10-13 10:40:00 +08:00
parent eedea6cb26
commit f1918806b2
3 changed files with 84 additions and 20 deletions

View File

@ -6,6 +6,7 @@ Note that we start to track changes starting from v1.3.7.
### BoltDB ### BoltDB
- [Add support for loong64 arch](https://github.com/etcd-io/bbolt/pull/303). - [Add support for loong64 arch](https://github.com/etcd-io/bbolt/pull/303).
- Fix [the "Last" method might return no data due to not skipping the empty pages](https://github.com/etcd-io/bbolt/pull/341)
### CMD ### CMD
- [Open db file readonly in compact CLI command](https://github.com/etcd-io/bbolt/pull/292). - [Open db file readonly in compact CLI command](https://github.com/etcd-io/bbolt/pull/292).

View File

@ -60,6 +60,17 @@ func (c *Cursor) Last() (key []byte, value []byte) {
ref.index = ref.count() - 1 ref.index = ref.count() - 1
c.stack = append(c.stack, ref) c.stack = append(c.stack, ref)
c.last() c.last()
// If this is an empty page (calling Delete may result in empty pages)
// we call prev to find the last page that is not empty
for len(c.stack) > 0 && c.stack[len(c.stack)-1].count() == 0 {
c.prev()
}
if len(c.stack) == 0 {
return nil, nil
}
k, v, flags := c.keyValue() k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 { if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil return k, nil
@ -84,26 +95,7 @@ func (c *Cursor) Next() (key []byte, value []byte) {
// The returned key and value are only valid for the life of the transaction. // The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Prev() (key []byte, value []byte) { func (c *Cursor) Prev() (key []byte, value []byte) {
_assert(c.bucket.tx.db != nil, "tx closed") _assert(c.bucket.tx.db != nil, "tx closed")
k, v, flags := c.prev()
// Attempt to move back one element until we're successful.
// Move up the stack as we hit the beginning of each page in our stack.
for i := len(c.stack) - 1; i >= 0; i-- {
elem := &c.stack[i]
if elem.index > 0 {
elem.index--
break
}
c.stack = c.stack[:i]
}
// If we've hit the end then return nil.
if len(c.stack) == 0 {
return nil, nil
}
// Move down the stack to find the last element of the last leaf under this branch.
c.last()
k, v, flags := c.keyValue()
if (flags & uint32(bucketLeafFlag)) != 0 { if (flags & uint32(bucketLeafFlag)) != 0 {
return k, nil return k, nil
} }
@ -243,6 +235,30 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
} }
} }
// prev moves the cursor to the previous item in the bucket and returns its key and value.
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
func (c *Cursor) prev() (key []byte, value []byte, flags uint32) {
// Attempt to move back one element until we're successful.
// Move up the stack as we hit the beginning of each page in our stack.
for i := len(c.stack) - 1; i >= 0; i-- {
elem := &c.stack[i]
if elem.index > 0 {
elem.index--
break
}
c.stack = c.stack[:i]
}
// If we've hit the end then return nil.
if len(c.stack) == 0 {
return nil, nil, 0
}
// Move down the stack to find the last element of the last leaf under this branch.
c.last()
return c.keyValue()
}
// search recursively performs a binary search against a given page/node until it finds a given key. // search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) { func (c *Cursor) search(key []byte, pgid pgid) {
p, n := c.bucket.pageNode(pgid) p, n := c.bucket.pageNode(pgid)

View File

@ -507,6 +507,53 @@ func TestCursor_First_EmptyPages(t *testing.T) {
} }
} }
// Ensure that a cursor can skip over empty pages that have been deleted.
func TestCursor_Last_EmptyPages(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()
// Create 1000 keys in the "widgets" bucket.
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucket([]byte("widgets"))
if err != nil {
t.Fatal(err)
}
for i := 0; i < 1000; i++ {
if err := b.Put(u64tob(uint64(i)), []byte{}); err != nil {
t.Fatal(err)
}
}
return nil
}); err != nil {
t.Fatal(err)
}
// Delete last 800 elements to ensure last page is empty
if err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("widgets"))
for i := 200; i < 1000; i++ {
if err := b.Delete(u64tob(uint64(i))); err != nil {
t.Fatal(err)
}
}
c := b.Cursor()
var n int
for k, _ := c.Last(); k != nil; k, _ = c.Prev() {
n++
}
if n != 200 {
t.Fatalf("unexpected key count: %d", n)
}
return nil
}); err != nil {
t.Fatal(err)
}
}
// Ensure that a Tx can iterate over all elements in a bucket. // Ensure that a Tx can iterate over all elements in a bucket.
func TestCursor_QuickCheck(t *testing.T) { func TestCursor_QuickCheck(t *testing.T) {
f := func(items testdata) bool { f := func(items testdata) bool {