Merge branch 'compact-db' of https://github.com/vincent-petithory/bolt into vincent-petithory-compact-db

pull/24/head
Ben Johnson 2016-09-01 14:47:06 -06:00
commit 52d0f5e6a9
No known key found for this signature in database
GPG Key ID: 81741CD251883081
2 changed files with 360 additions and 0 deletions

View File

@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error {
return newBenchCommand(m).Run(args[1:]...) return newBenchCommand(m).Run(args[1:]...)
case "check": case "check":
return newCheckCommand(m).Run(args[1:]...) return newCheckCommand(m).Run(args[1:]...)
case "compact":
return newCompactCommand(m).Run(args[1:]...)
case "dump": case "dump":
return newDumpCommand(m).Run(args[1:]...) return newDumpCommand(m).Run(args[1:]...)
case "info": case "info":
@ -130,6 +132,7 @@ The commands are:
bench run synthetic benchmark against bolt bench run synthetic benchmark against bolt
check verifies integrity of bolt database check verifies integrity of bolt database
compact copies a bolt database, compacting it in the process
info print basic info info print basic info
help print this screen help print this screen
pages print list of pages with their types pages print list of pages with their types
@ -1530,3 +1533,180 @@ func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize] return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
} }
// CompactCommand represents the "compact" command execution.
type CompactCommand struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
// newCompactCommand returns a CompactCommand.
func newCompactCommand(m *Main) *CompactCommand {
return &CompactCommand{
Stdin: m.Stdin,
Stdout: m.Stdout,
Stderr: m.Stderr,
}
}
// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values)
// discovered by Walk.
// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v.
type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error
// Walk walks recursively the bolt database db, calling walkFn for each key it finds.
func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error {
return db.View(func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return cmd.walkBucket(b, nil, name, nil, walkFn)
})
})
}
func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error {
if err := walkFn(keys, k, v); err != nil {
return err
}
// not a bucket, exit.
if v != nil {
return nil
}
keys2 := append(keys, k)
return b.ForEach(func(k, v []byte) error {
if v == nil {
return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn)
}
return cmd.walkBucket(b, keys2, k, v, walkFn)
})
}
// Run executes the command.
func (cmd *CompactCommand) Run(args ...string) (err error) {
// Parse flags.
fs := flag.NewFlagSet("", flag.ContinueOnError)
fs.SetOutput(cmd.Stderr)
var txMaxSize int64
fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.")
help := fs.Bool("h", false, "print this help")
if err := fs.Parse(args); err != nil {
return err
} else if *help {
fmt.Fprintln(cmd.Stderr, cmd.Usage())
fs.PrintDefaults()
return ErrUsage
}
// Require database path.
path := fs.Arg(0)
if path == "" {
return ErrPathRequired
} else if _, err := os.Stat(path); os.IsNotExist(err) {
return ErrFileNotFound
}
fi, err := os.Stat(path)
if err != nil {
return err
}
initialSize := fi.Size()
// Open database.
db, err := bolt.Open(path, 0444, nil)
if err != nil {
return err
}
defer db.Close()
var dstPath string
if fs.NArg() < 2 {
f, err := ioutil.TempFile("", "bolt-compact-")
if err != nil {
return fmt.Errorf("temp file: %v", err)
}
_ = f.Close()
_ = os.Remove(f.Name())
dstPath = f.Name()
fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath)
} else {
dstPath = fs.Arg(1)
}
defer func() {
fi, err := os.Stat(dstPath)
if err != nil {
fmt.Fprintln(cmd.Stderr, err)
}
newSize := fi.Size()
if newSize == 0 {
fmt.Fprintln(cmd.Stderr, "db size is 0")
}
fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize))
}()
dstdb, err := bolt.Open(dstPath, 0666, nil)
if err != nil {
return err
}
defer dstdb.Close()
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
var size int64
tx, err := dstdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
_ = tx.Rollback()
} else {
err = tx.Commit()
}
}()
return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error {
s := int64(len(k) + len(v))
if size+s > txMaxSize && txMaxSize != 0 {
if err := tx.Commit(); err != nil {
return err
}
tx, err = dstdb.Begin(true)
if err != nil {
return err
}
size = 0
}
size += s
nk := len(keys)
if nk == 0 {
_, err := tx.CreateBucket(k)
return err
}
b := tx.Bucket(keys[0])
if nk > 1 {
for _, k := range keys[1:] {
b = b.Bucket(k)
}
}
if v == nil {
_, err := b.CreateBucket(k)
return err
}
return b.Put(k, v)
})
}
// Usage returns the help message.
func (cmd *CompactCommand) Usage() string {
return strings.TrimLeft(`
usage: bolt compact PATH [DST_PATH]
Compact opens a database at PATH and walks it recursively entirely,
copying keys as they are found from all buckets, to a newly created db.
If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be
in a temporary location.
The original db is left untouched.
`, "\n")
}

View File

@ -2,7 +2,12 @@ package main_test
import ( import (
"bytes" "bytes"
crypto "crypto/rand"
"encoding/binary"
"fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"strconv" "strconv"
"testing" "testing"
@ -183,3 +188,178 @@ func (db *DB) Close() error {
defer os.Remove(db.Path) defer os.Remove(db.Path)
return db.DB.Close() return db.DB.Close()
} }
func TestCompactCommand_Run(t *testing.T) {
var s int64
if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil {
t.Fatal(err)
}
rand.Seed(s)
dstdb := MustOpen(0666, nil)
dstdb.Close()
// fill the db
db := MustOpen(0666, nil)
if err := db.Update(func(tx *bolt.Tx) error {
n := 2 + rand.Intn(5)
for i := 0; i < n; i++ {
k := []byte(fmt.Sprintf("b%d", i))
b, err := tx.CreateBucketIfNotExists(k)
if err != nil {
return err
}
if err := fillBucket(b, append(k, '.')); err != nil {
return err
}
}
return nil
}); err != nil {
db.Close()
t.Fatal(err)
}
// make the db grow by adding large values, and delete them.
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("large_vals"))
if err != nil {
return err
}
n := 5 + rand.Intn(5)
for i := 0; i < n; i++ {
v := make([]byte, 1000*1000*(1+rand.Intn(5)))
_, err := crypto.Read(v)
if err != nil {
return err
}
if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil {
return err
}
}
return nil
}); err != nil {
db.Close()
t.Fatal(err)
}
if err := db.Update(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte("large_vals")).Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if err := c.Delete(); err != nil {
return err
}
}
return tx.DeleteBucket([]byte("large_vals"))
}); err != nil {
db.Close()
t.Fatal(err)
}
db.DB.Close()
defer db.Close()
defer dstdb.Close()
dbChk, err := chkdb(db.Path)
if err != nil {
t.Fatal(err)
}
m := NewMain()
if err := m.Run("compact", db.Path, dstdb.Path); err != nil {
t.Fatal(err)
}
dbChkAfterCompact, err := chkdb(db.Path)
if err != nil {
t.Fatal(err)
}
dstdbChk, err := chkdb(dstdb.Path)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(dbChk, dbChkAfterCompact) {
t.Error("the original db has been touched")
}
if !bytes.Equal(dbChk, dstdbChk) {
t.Error("the compacted db data isn't the same than the original db")
}
}
func fillBucket(b *bolt.Bucket, prefix []byte) error {
n := 10 + rand.Intn(50)
for i := 0; i < n; i++ {
v := make([]byte, 10*(1+rand.Intn(4)))
_, err := crypto.Read(v)
if err != nil {
return err
}
k := append(prefix, []byte(fmt.Sprintf("k%d", i))...)
if err := b.Put(k, v); err != nil {
return err
}
}
// limit depth of subbuckets
s := 2 + rand.Intn(4)
if len(prefix) > (2*s + 1) {
return nil
}
n = 1 + rand.Intn(3)
for i := 0; i < n; i++ {
k := append(prefix, []byte(fmt.Sprintf("b%d", i))...)
sb, err := b.CreateBucket(k)
if err != nil {
return err
}
if err := fillBucket(sb, append(k, '.')); err != nil {
return err
}
}
return nil
}
func chkdb(path string) ([]byte, error) {
db, err := bolt.Open(path, 0666, nil)
if err != nil {
return nil, err
}
defer db.Close()
var buf bytes.Buffer
err = db.View(func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return walkBucket(b, name, nil, &buf)
})
})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error {
_, err := w.Write(k)
if err != nil {
return err
}
_, err = io.WriteString(w, ":")
if err != nil {
return err
}
_, err = w.Write(v)
if err != nil {
return err
}
_, err = fmt.Fprintln(w)
if err != nil {
return err
}
// not a bucket, exit.
if v != nil {
return nil
}
return parent.ForEach(func(k, v []byte) error {
if v == nil {
return walkBucket(parent.Bucket(k), k, nil, w)
}
return walkBucket(parent, k, v, w)
})
}