Merge pull request #590 from benbjohnson/vincent-petithory-compact-db

Compaction Command
pull/24/head
Ben Johnson 2016-09-06 13:57:39 -06:00 committed by GitHub
commit 1ff46c182d
4 changed files with 443 additions and 0 deletions

View File

@ -329,6 +329,28 @@ func (b *Bucket) Delete(key []byte) error {
return nil return nil
} }
// Sequence returns the current integer for the bucket without incrementing it.
func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
// SetSequence updates the sequence number for the bucket.
func (b *Bucket) SetSequence(v uint64) error {
if b.tx.db == nil {
return ErrTxClosed
} else if !b.Writable() {
return ErrTxNotWritable
}
// Materialize the root node if it hasn't been already so that the
// bucket will be saved during commit.
if b.rootNode == nil {
_ = b.node(b.root, nil)
}
// Increment and return the sequence.
b.bucket.sequence = v
return nil
}
// NextSequence returns an autoincrementing integer for the bucket. // NextSequence returns an autoincrementing integer for the bucket.
func (b *Bucket) NextSequence() (uint64, error) { func (b *Bucket) NextSequence() (uint64, error) {
if b.tx.db == nil { if b.tx.db == nil {

View File

@ -782,6 +782,48 @@ func TestBucket_DeleteBucket_IncompatibleValue(t *testing.T) {
} }
} }
// Ensure bucket can set and update its sequence number.
func TestBucket_Sequence(t *testing.T) {
db := MustOpenDB()
defer db.MustClose()
if err := db.Update(func(tx *bolt.Tx) error {
bkt, err := tx.CreateBucket([]byte("0"))
if err != nil {
t.Fatal(err)
}
// Retrieve sequence.
if v := bkt.Sequence(); v != 0 {
t.Fatalf("unexpected sequence: %d", v)
}
// Update sequence.
if err := bkt.SetSequence(1000); err != nil {
t.Fatal(err)
}
// Read sequence again.
if v := bkt.Sequence(); v != 1000 {
t.Fatalf("unexpected sequence: %d", v)
}
return nil
}); err != nil {
t.Fatal(err)
}
// Verify sequence in separate transaction.
if err := db.View(func(tx *bolt.Tx) error {
if v := tx.Bucket([]byte("0")).Sequence(); v != 1000 {
t.Fatalf("unexpected sequence: %d", v)
}
return nil
}); err != nil {
t.Fatal(err)
}
}
// Ensure that a bucket can return an autoincrementing sequence. // Ensure that a bucket can return an autoincrementing sequence.
func TestBucket_NextSequence(t *testing.T) { func TestBucket_NextSequence(t *testing.T) {
db := MustOpenDB() db := MustOpenDB()

View File

@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error {
return newBenchCommand(m).Run(args[1:]...) return newBenchCommand(m).Run(args[1:]...)
case "check": case "check":
return newCheckCommand(m).Run(args[1:]...) return newCheckCommand(m).Run(args[1:]...)
case "compact":
return newCompactCommand(m).Run(args[1:]...)
case "dump": case "dump":
return newDumpCommand(m).Run(args[1:]...) return newDumpCommand(m).Run(args[1:]...)
case "info": case "info":
@ -130,6 +132,7 @@ The commands are:
bench run synthetic benchmark against bolt bench run synthetic benchmark against bolt
check verifies integrity of bolt database check verifies integrity of bolt database
compact copies a bolt database, compacting it in the process
info print basic info info print basic info
help print this screen help print this screen
pages print list of pages with their types pages print list of pages with their types
@ -1530,3 +1533,208 @@ func (n *leafPageElement) value() []byte {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize] return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
} }
// CompactCommand represents the "compact" command execution.
type CompactCommand struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
SrcPath string
DstPath string
TxMaxSize int64
}
// newCompactCommand returns a CompactCommand.
func newCompactCommand(m *Main) *CompactCommand {
return &CompactCommand{
Stdin: m.Stdin,
Stdout: m.Stdout,
Stderr: m.Stderr,
}
}
// Run executes the command.
func (cmd *CompactCommand) Run(args ...string) (err error) {
// Parse flags.
fs := flag.NewFlagSet("", flag.ContinueOnError)
fs.SetOutput(ioutil.Discard)
fs.StringVar(&cmd.DstPath, "o", "", "")
fs.Int64Var(&cmd.TxMaxSize, "tx-max-size", 65536, "")
if err := fs.Parse(args); err == flag.ErrHelp {
fmt.Fprintln(cmd.Stderr, cmd.Usage())
return ErrUsage
} else if err != nil {
return err
} else if cmd.DstPath == "" {
return fmt.Errorf("output file required")
}
// Require database paths.
cmd.SrcPath = fs.Arg(0)
if cmd.SrcPath == "" {
return ErrPathRequired
}
// Ensure source file exists.
fi, err := os.Stat(cmd.SrcPath)
if os.IsNotExist(err) {
return ErrFileNotFound
} else if err != nil {
return err
}
initialSize := fi.Size()
// Open source database.
src, err := bolt.Open(cmd.SrcPath, 0444, nil)
if err != nil {
return err
}
defer src.Close()
// Open destination database.
dst, err := bolt.Open(cmd.DstPath, fi.Mode(), nil)
if err != nil {
return err
}
defer dst.Close()
// Run compaction.
if err := cmd.compact(dst, src); err != nil {
return err
}
// Report stats on new size.
fi, err = os.Stat(cmd.DstPath)
if err != nil {
return err
} else if fi.Size() == 0 {
return fmt.Errorf("zero db size")
}
fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size()))
return nil
}
func (cmd *CompactCommand) compact(dst, src *bolt.DB) error {
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
var size int64
tx, err := dst.Begin(true)
if err != nil {
return err
}
defer tx.Rollback()
if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
// On each key/value, check if we have exceeded tx size.
sz := int64(len(k) + len(v))
if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 {
// Commit previous transaction.
if err := tx.Commit(); err != nil {
return err
}
// Start new transaction.
tx, err = dst.Begin(true)
if err != nil {
return err
}
size = 0
}
size += sz
// Create bucket on the root transaction if this is the first level.
nk := len(keys)
if nk == 0 {
bkt, err := tx.CreateBucket(k)
if err != nil {
return err
}
if err := bkt.SetSequence(seq); err != nil {
return err
}
return nil
}
// Create buckets on subsequent levels, if necessary.
b := tx.Bucket(keys[0])
if nk > 1 {
for _, k := range keys[1:] {
b = b.Bucket(k)
}
}
// If there is no value then this is a bucket call.
if v == nil {
bkt, err := b.CreateBucket(k)
if err != nil {
return err
}
if err := bkt.SetSequence(seq); err != nil {
return err
}
return nil
}
// Otherwise treat it as a key/value pair.
return b.Put(k, v)
}); err != nil {
return err
}
return tx.Commit()
}
// walkFunc is the type of the function called for keys (buckets and "normal"
// values) discovered by Walk. keys is the list of keys to descend to the bucket
// owning the discovered key/value pair k/v.
type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
// walk walks recursively the bolt database db, calling walkFn for each key it finds.
func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error {
return db.View(func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
})
})
}
func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
// Execute callback.
if err := fn(keypath, k, v, seq); err != nil {
return err
}
// If this is not a bucket then stop.
if v != nil {
return nil
}
// Iterate over each child key/value.
keypath = append(keypath, k)
return b.ForEach(func(k, v []byte) error {
if v == nil {
bkt := b.Bucket(k)
return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
}
return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn)
})
}
// Usage returns the help message.
func (cmd *CompactCommand) Usage() string {
return strings.TrimLeft(`
usage: bolt compact [options] -o DST SRC
Compact opens a database at SRC path and walks it recursively, copying keys
as they are found from all buckets, to a newly created database at DST path.
The original database is left untouched.
Additional options include:
-tx-max-size NUM
Specifies the maximum size of individual transactions.
Defaults to 64KB.
`, "\n")
}

View File

@ -2,7 +2,12 @@ package main_test
import ( import (
"bytes" "bytes"
crypto "crypto/rand"
"encoding/binary"
"fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand"
"os" "os"
"strconv" "strconv"
"testing" "testing"
@ -183,3 +188,169 @@ func (db *DB) Close() error {
defer os.Remove(db.Path) defer os.Remove(db.Path)
return db.DB.Close() return db.DB.Close()
} }
func TestCompactCommand_Run(t *testing.T) {
var s int64
if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil {
t.Fatal(err)
}
rand.Seed(s)
dstdb := MustOpen(0666, nil)
dstdb.Close()
// fill the db
db := MustOpen(0666, nil)
if err := db.Update(func(tx *bolt.Tx) error {
n := 2 + rand.Intn(5)
for i := 0; i < n; i++ {
k := []byte(fmt.Sprintf("b%d", i))
b, err := tx.CreateBucketIfNotExists(k)
if err != nil {
return err
}
if err := b.SetSequence(uint64(i)); err != nil {
return err
}
if err := fillBucket(b, append(k, '.')); err != nil {
return err
}
}
return nil
}); err != nil {
db.Close()
t.Fatal(err)
}
// make the db grow by adding large values, and delete them.
if err := db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("large_vals"))
if err != nil {
return err
}
n := 5 + rand.Intn(5)
for i := 0; i < n; i++ {
v := make([]byte, 1000*1000*(1+rand.Intn(5)))
_, err := crypto.Read(v)
if err != nil {
return err
}
if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil {
return err
}
}
return nil
}); err != nil {
db.Close()
t.Fatal(err)
}
if err := db.Update(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte("large_vals")).Cursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
if err := c.Delete(); err != nil {
return err
}
}
return tx.DeleteBucket([]byte("large_vals"))
}); err != nil {
db.Close()
t.Fatal(err)
}
db.DB.Close()
defer db.Close()
defer dstdb.Close()
dbChk, err := chkdb(db.Path)
if err != nil {
t.Fatal(err)
}
m := NewMain()
if err := m.Run("compact", "-o", dstdb.Path, db.Path); err != nil {
t.Fatal(err)
}
dbChkAfterCompact, err := chkdb(db.Path)
if err != nil {
t.Fatal(err)
}
dstdbChk, err := chkdb(dstdb.Path)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(dbChk, dbChkAfterCompact) {
t.Error("the original db has been touched")
}
if !bytes.Equal(dbChk, dstdbChk) {
t.Error("the compacted db data isn't the same than the original db")
}
}
func fillBucket(b *bolt.Bucket, prefix []byte) error {
n := 10 + rand.Intn(50)
for i := 0; i < n; i++ {
v := make([]byte, 10*(1+rand.Intn(4)))
_, err := crypto.Read(v)
if err != nil {
return err
}
k := append(prefix, []byte(fmt.Sprintf("k%d", i))...)
if err := b.Put(k, v); err != nil {
return err
}
}
// limit depth of subbuckets
s := 2 + rand.Intn(4)
if len(prefix) > (2*s + 1) {
return nil
}
n = 1 + rand.Intn(3)
for i := 0; i < n; i++ {
k := append(prefix, []byte(fmt.Sprintf("b%d", i))...)
sb, err := b.CreateBucket(k)
if err != nil {
return err
}
if err := fillBucket(sb, append(k, '.')); err != nil {
return err
}
}
return nil
}
func chkdb(path string) ([]byte, error) {
db, err := bolt.Open(path, 0666, nil)
if err != nil {
return nil, err
}
defer db.Close()
var buf bytes.Buffer
err = db.View(func(tx *bolt.Tx) error {
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
return walkBucket(b, name, nil, &buf)
})
})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error {
if _, err := fmt.Fprintf(w, "%d:%x=%x\n", parent.Sequence(), k, v); err != nil {
return err
}
// not a bucket, exit.
if v != nil {
return nil
}
return parent.ForEach(func(k, v []byte) error {
if v == nil {
return walkBucket(parent.Bucket(k), k, nil, w)
}
return walkBucket(parent, k, v, w)
})
}