mirror of https://github.com/etcd-io/bbolt.git
Merge pull request #248 from andrewpmartinez/expose.compact
fix etcd-io/bbolt#247 move compact logicpull/264/head
commit
4e3eed0f9e
|
@ -3,3 +3,4 @@
|
||||||
*.swp
|
*.swp
|
||||||
/bin/
|
/bin/
|
||||||
cover.out
|
cover.out
|
||||||
|
/.idea
|
||||||
|
|
|
@ -4,7 +4,7 @@ go_import_path: go.etcd.io/bbolt
|
||||||
sudo: false
|
sudo: false
|
||||||
|
|
||||||
go:
|
go:
|
||||||
- 1.12
|
- 1.15
|
||||||
|
|
||||||
before_install:
|
before_install:
|
||||||
- go get -v honnef.co/go/tools/...
|
- go get -v honnef.co/go/tools/...
|
||||||
|
|
|
@ -1993,7 +1993,7 @@ func (cmd *CompactCommand) Run(args ...string) (err error) {
|
||||||
defer dst.Close()
|
defer dst.Close()
|
||||||
|
|
||||||
// Run compaction.
|
// Run compaction.
|
||||||
if err := cmd.compact(dst, src); err != nil {
|
if err := bolt.Compact(dst, src, cmd.TxMaxSize); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2009,114 +2009,6 @@ func (cmd *CompactCommand) Run(args ...string) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *CompactCommand) compact(dst, src *bolt.DB) error {
|
|
||||||
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
|
|
||||||
var size int64
|
|
||||||
tx, err := dst.Begin(true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Rollback()
|
|
||||||
|
|
||||||
if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
|
|
||||||
// On each key/value, check if we have exceeded tx size.
|
|
||||||
sz := int64(len(k) + len(v))
|
|
||||||
if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 {
|
|
||||||
// Commit previous transaction.
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start new transaction.
|
|
||||||
tx, err = dst.Begin(true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
size = 0
|
|
||||||
}
|
|
||||||
size += sz
|
|
||||||
|
|
||||||
// Create bucket on the root transaction if this is the first level.
|
|
||||||
nk := len(keys)
|
|
||||||
if nk == 0 {
|
|
||||||
bkt, err := tx.CreateBucket(k)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := bkt.SetSequence(seq); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create buckets on subsequent levels, if necessary.
|
|
||||||
b := tx.Bucket(keys[0])
|
|
||||||
if nk > 1 {
|
|
||||||
for _, k := range keys[1:] {
|
|
||||||
b = b.Bucket(k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fill the entire page for best compaction.
|
|
||||||
b.FillPercent = 1.0
|
|
||||||
|
|
||||||
// If there is no value then this is a bucket call.
|
|
||||||
if v == nil {
|
|
||||||
bkt, err := b.CreateBucket(k)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := bkt.SetSequence(seq); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise treat it as a key/value pair.
|
|
||||||
return b.Put(k, v)
|
|
||||||
}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
// walkFunc is the type of the function called for keys (buckets and "normal"
|
|
||||||
// values) discovered by Walk. keys is the list of keys to descend to the bucket
|
|
||||||
// owning the discovered key/value pair k/v.
|
|
||||||
type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
|
|
||||||
|
|
||||||
// walk walks recursively the bolt database db, calling walkFn for each key it finds.
|
|
||||||
func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error {
|
|
||||||
return db.View(func(tx *bolt.Tx) error {
|
|
||||||
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
|
|
||||||
return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
|
|
||||||
// Execute callback.
|
|
||||||
if err := fn(keypath, k, v, seq); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this is not a bucket then stop.
|
|
||||||
if v != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over each child key/value.
|
|
||||||
keypath = append(keypath, k)
|
|
||||||
return b.ForEach(func(k, v []byte) error {
|
|
||||||
if v == nil {
|
|
||||||
bkt := b.Bucket(k)
|
|
||||||
return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
|
|
||||||
}
|
|
||||||
return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Usage returns the help message.
|
// Usage returns the help message.
|
||||||
func (cmd *CompactCommand) Usage() string {
|
func (cmd *CompactCommand) Usage() string {
|
||||||
return strings.TrimLeft(`
|
return strings.TrimLeft(`
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
package bbolt
|
||||||
|
|
||||||
|
// Compact will create a copy of the source DB and in the destination DB. This may
|
||||||
|
// reclaim space that the source database no longer has use for. txMaxSize can be
|
||||||
|
// used to limit the transactions size of this process and may trigger intermittent
|
||||||
|
// commits. A value of zero will ignore transaction sizes.
|
||||||
|
// TODO: merge with: https://github.com/etcd-io/etcd/blob/b7f0f52a16dbf83f18ca1d803f7892d750366a94/mvcc/backend/backend.go#L349
|
||||||
|
func Compact(dst, src *DB, txMaxSize int64) error {
|
||||||
|
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
|
||||||
|
var size int64
|
||||||
|
tx, err := dst.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
if err := walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
|
||||||
|
// On each key/value, check if we have exceeded tx size.
|
||||||
|
sz := int64(len(k) + len(v))
|
||||||
|
if size+sz > txMaxSize && txMaxSize != 0 {
|
||||||
|
// Commit previous transaction.
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start new transaction.
|
||||||
|
tx, err = dst.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
size = 0
|
||||||
|
}
|
||||||
|
size += sz
|
||||||
|
|
||||||
|
// Create bucket on the root transaction if this is the first level.
|
||||||
|
nk := len(keys)
|
||||||
|
if nk == 0 {
|
||||||
|
bkt, err := tx.CreateBucket(k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := bkt.SetSequence(seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create buckets on subsequent levels, if necessary.
|
||||||
|
b := tx.Bucket(keys[0])
|
||||||
|
if nk > 1 {
|
||||||
|
for _, k := range keys[1:] {
|
||||||
|
b = b.Bucket(k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill the entire page for best compaction.
|
||||||
|
b.FillPercent = 1.0
|
||||||
|
|
||||||
|
// If there is no value then this is a bucket call.
|
||||||
|
if v == nil {
|
||||||
|
bkt, err := b.CreateBucket(k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := bkt.SetSequence(seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise treat it as a key/value pair.
|
||||||
|
return b.Put(k, v)
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// walkFunc is the type of the function called for keys (buckets and "normal"
|
||||||
|
// values) discovered by Walk. keys is the list of keys to descend to the bucket
|
||||||
|
// owning the discovered key/value pair k/v.
|
||||||
|
type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
|
||||||
|
|
||||||
|
// walk walks recursively the bolt database db, calling walkFn for each key it finds.
|
||||||
|
func walk(db *DB, walkFn walkFunc) error {
|
||||||
|
return db.View(func(tx *Tx) error {
|
||||||
|
return tx.ForEach(func(name []byte, b *Bucket) error {
|
||||||
|
return walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func walkBucket(b *Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
|
||||||
|
// Execute callback.
|
||||||
|
if err := fn(keypath, k, v, seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is not a bucket then stop.
|
||||||
|
if v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate over each child key/value.
|
||||||
|
keypath = append(keypath, k)
|
||||||
|
return b.ForEach(func(k, v []byte) error {
|
||||||
|
if v == nil {
|
||||||
|
bkt := b.Bucket(k)
|
||||||
|
return walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
|
||||||
|
}
|
||||||
|
return walkBucket(b, keypath, k, v, b.Sequence(), fn)
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue