diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go index b96e6f7..1f78e03 100644 --- a/cmd/bolt/main.go +++ b/cmd/bolt/main.go @@ -102,6 +102,8 @@ func (m *Main) Run(args ...string) error { return newBenchCommand(m).Run(args[1:]...) case "check": return newCheckCommand(m).Run(args[1:]...) + case "compact": + return newCompactCommand(m).Run(args[1:]...) case "dump": return newDumpCommand(m).Run(args[1:]...) case "info": @@ -130,6 +132,7 @@ The commands are: bench run synthetic benchmark against bolt check verifies integrity of bolt database + compact copies a bolt database, compacting it in the process info print basic info help print this screen pages print list of pages with their types @@ -1530,3 +1533,180 @@ func (n *leafPageElement) value() []byte { buf := (*[maxAllocSize]byte)(unsafe.Pointer(n)) return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize] } + +// CompactCommand represents the "compact" command execution. +type CompactCommand struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer +} + +// newCompactCommand returns a CompactCommand. +func newCompactCommand(m *Main) *CompactCommand { + return &CompactCommand{ + Stdin: m.Stdin, + Stdout: m.Stdout, + Stderr: m.Stderr, + } +} + +// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values) +// discovered by Walk. +// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v. +type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error + +// Walk walks recursively the bolt database db, calling walkFn for each key it finds. +func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error { + return db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return cmd.walkBucket(b, nil, name, nil, walkFn) + }) + }) +} + +func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error { + if err := walkFn(keys, k, v); err != nil { + return err + } + // not a bucket, exit. + if v != nil { + return nil + } + + keys2 := append(keys, k) + return b.ForEach(func(k, v []byte) error { + if v == nil { + return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn) + } + return cmd.walkBucket(b, keys2, k, v, walkFn) + }) +} + +// Run executes the command. +func (cmd *CompactCommand) Run(args ...string) (err error) { + // Parse flags. + fs := flag.NewFlagSet("", flag.ContinueOnError) + fs.SetOutput(cmd.Stderr) + var txMaxSize int64 + fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.") + help := fs.Bool("h", false, "print this help") + if err := fs.Parse(args); err != nil { + return err + } else if *help { + fmt.Fprintln(cmd.Stderr, cmd.Usage()) + fs.PrintDefaults() + return ErrUsage + } + + // Require database path. + path := fs.Arg(0) + if path == "" { + return ErrPathRequired + } else if _, err := os.Stat(path); os.IsNotExist(err) { + return ErrFileNotFound + } + fi, err := os.Stat(path) + if err != nil { + return err + } + initialSize := fi.Size() + + // Open database. + db, err := bolt.Open(path, 0444, nil) + if err != nil { + return err + } + defer db.Close() + + var dstPath string + if fs.NArg() < 2 { + f, err := ioutil.TempFile("", "bolt-compact-") + if err != nil { + return fmt.Errorf("temp file: %v", err) + } + _ = f.Close() + _ = os.Remove(f.Name()) + dstPath = f.Name() + fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath) + } else { + dstPath = fs.Arg(1) + } + + defer func() { + fi, err := os.Stat(dstPath) + if err != nil { + fmt.Fprintln(cmd.Stderr, err) + } + newSize := fi.Size() + if newSize == 0 { + fmt.Fprintln(cmd.Stderr, "db size is 0") + } + fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize)) + }() + + dstdb, err := bolt.Open(dstPath, 0666, nil) + if err != nil { + return err + } + defer dstdb.Close() + + // commit regularly, or we'll run out of memory for large datasets if using one transaction. + var size int64 + tx, err := dstdb.Begin(true) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } else { + err = tx.Commit() + } + }() + return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error { + s := int64(len(k) + len(v)) + if size+s > txMaxSize && txMaxSize != 0 { + if err := tx.Commit(); err != nil { + return err + } + tx, err = dstdb.Begin(true) + if err != nil { + return err + } + size = 0 + } + size += s + nk := len(keys) + if nk == 0 { + _, err := tx.CreateBucket(k) + return err + } + + b := tx.Bucket(keys[0]) + if nk > 1 { + for _, k := range keys[1:] { + b = b.Bucket(k) + } + } + if v == nil { + _, err := b.CreateBucket(k) + return err + } + return b.Put(k, v) + }) +} + +// Usage returns the help message. +func (cmd *CompactCommand) Usage() string { + return strings.TrimLeft(` +usage: bolt compact PATH [DST_PATH] + +Compact opens a database at PATH and walks it recursively entirely, +copying keys as they are found from all buckets, to a newly created db. + +If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be +in a temporary location. + +The original db is left untouched. +`, "\n") +} diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go index c378b79..e8942a0 100644 --- a/cmd/bolt/main_test.go +++ b/cmd/bolt/main_test.go @@ -2,7 +2,12 @@ package main_test import ( "bytes" + crypto "crypto/rand" + "encoding/binary" + "fmt" + "io" "io/ioutil" + "math/rand" "os" "strconv" "testing" @@ -183,3 +188,178 @@ func (db *DB) Close() error { defer os.Remove(db.Path) return db.DB.Close() } + +func TestCompactCommand_Run(t *testing.T) { + var s int64 + if err := binary.Read(crypto.Reader, binary.BigEndian, &s); err != nil { + t.Fatal(err) + } + rand.Seed(s) + + dstdb := MustOpen(0666, nil) + dstdb.Close() + + // fill the db + db := MustOpen(0666, nil) + if err := db.Update(func(tx *bolt.Tx) error { + n := 2 + rand.Intn(5) + for i := 0; i < n; i++ { + k := []byte(fmt.Sprintf("b%d", i)) + b, err := tx.CreateBucketIfNotExists(k) + if err != nil { + return err + } + if err := fillBucket(b, append(k, '.')); err != nil { + return err + } + } + return nil + }); err != nil { + db.Close() + t.Fatal(err) + } + + // make the db grow by adding large values, and delete them. + if err := db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte("large_vals")) + if err != nil { + return err + } + n := 5 + rand.Intn(5) + for i := 0; i < n; i++ { + v := make([]byte, 1000*1000*(1+rand.Intn(5))) + _, err := crypto.Read(v) + if err != nil { + return err + } + if err := b.Put([]byte(fmt.Sprintf("l%d", i)), v); err != nil { + return err + } + } + return nil + }); err != nil { + db.Close() + t.Fatal(err) + } + if err := db.Update(func(tx *bolt.Tx) error { + c := tx.Bucket([]byte("large_vals")).Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + if err := c.Delete(); err != nil { + return err + } + } + return tx.DeleteBucket([]byte("large_vals")) + }); err != nil { + db.Close() + t.Fatal(err) + } + db.DB.Close() + defer db.Close() + defer dstdb.Close() + + dbChk, err := chkdb(db.Path) + if err != nil { + t.Fatal(err) + } + + m := NewMain() + if err := m.Run("compact", db.Path, dstdb.Path); err != nil { + t.Fatal(err) + } + + dbChkAfterCompact, err := chkdb(db.Path) + if err != nil { + t.Fatal(err) + } + + dstdbChk, err := chkdb(dstdb.Path) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(dbChk, dbChkAfterCompact) { + t.Error("the original db has been touched") + } + if !bytes.Equal(dbChk, dstdbChk) { + t.Error("the compacted db data isn't the same than the original db") + } +} + +func fillBucket(b *bolt.Bucket, prefix []byte) error { + n := 10 + rand.Intn(50) + for i := 0; i < n; i++ { + v := make([]byte, 10*(1+rand.Intn(4))) + _, err := crypto.Read(v) + if err != nil { + return err + } + k := append(prefix, []byte(fmt.Sprintf("k%d", i))...) + if err := b.Put(k, v); err != nil { + return err + } + } + // limit depth of subbuckets + s := 2 + rand.Intn(4) + if len(prefix) > (2*s + 1) { + return nil + } + n = 1 + rand.Intn(3) + for i := 0; i < n; i++ { + k := append(prefix, []byte(fmt.Sprintf("b%d", i))...) + sb, err := b.CreateBucket(k) + if err != nil { + return err + } + if err := fillBucket(sb, append(k, '.')); err != nil { + return err + } + } + return nil +} + +func chkdb(path string) ([]byte, error) { + db, err := bolt.Open(path, 0666, nil) + if err != nil { + return nil, err + } + defer db.Close() + var buf bytes.Buffer + err = db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return walkBucket(b, name, nil, &buf) + }) + }) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error { + _, err := w.Write(k) + if err != nil { + return err + } + _, err = io.WriteString(w, ":") + if err != nil { + return err + } + _, err = w.Write(v) + if err != nil { + return err + } + _, err = fmt.Fprintln(w) + if err != nil { + return err + } + // not a bucket, exit. + if v != nil { + return nil + } + return parent.ForEach(func(k, v []byte) error { + if v == nil { + return walkBucket(parent.Bucket(k), k, nil, w) + } + return walkBucket(parent, k, v, w) + }) +}