From f5d275b53730adc5a0c988cf79d4fac84c0a3210 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 1 Sep 2016 15:34:35 -0600 Subject: [PATCH] Minor bolt compact revisions --- bucket.go | 22 ++++ bucket_test.go | 42 ++++++++ cmd/bolt/main.go | 230 +++++++++++++++++++++++------------------- cmd/bolt/main_test.go | 21 ++-- 4 files changed, 199 insertions(+), 116 deletions(-) diff --git a/bucket.go b/bucket.go index d2f8c52..8e00380 100644 --- a/bucket.go +++ b/bucket.go @@ -329,6 +329,28 @@ func (b *Bucket) Delete(key []byte) error { return nil } +// Sequence returns the current integer for the bucket without incrementing it. +func (b *Bucket) Sequence() uint64 { return b.bucket.sequence } + +// SetSequence updates the sequence number for the bucket. +func (b *Bucket) SetSequence(v uint64) error { + if b.tx.db == nil { + return ErrTxClosed + } else if !b.Writable() { + return ErrTxNotWritable + } + + // Materialize the root node if it hasn't been already so that the + // bucket will be saved during commit. + if b.rootNode == nil { + _ = b.node(b.root, nil) + } + + // Increment and return the sequence. + b.bucket.sequence = v + return nil +} + // NextSequence returns an autoincrementing integer for the bucket. func (b *Bucket) NextSequence() (uint64, error) { if b.tx.db == nil { diff --git a/bucket_test.go b/bucket_test.go index 528fec2..cddbe27 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -782,6 +782,48 @@ func TestBucket_DeleteBucket_IncompatibleValue(t *testing.T) { } } +// Ensure bucket can set and update its sequence number. +func TestBucket_Sequence(t *testing.T) { + db := MustOpenDB() + defer db.MustClose() + + if err := db.Update(func(tx *bolt.Tx) error { + bkt, err := tx.CreateBucket([]byte("0")) + if err != nil { + t.Fatal(err) + } + + // Retrieve sequence. + if v := bkt.Sequence(); v != 0 { + t.Fatalf("unexpected sequence: %d", v) + } + + // Update sequence. + if err := bkt.SetSequence(1000); err != nil { + t.Fatal(err) + } + + // Read sequence again. + if v := bkt.Sequence(); v != 1000 { + t.Fatalf("unexpected sequence: %d", v) + } + + return nil + }); err != nil { + t.Fatal(err) + } + + // Verify sequence in separate transaction. + if err := db.View(func(tx *bolt.Tx) error { + if v := tx.Bucket([]byte("0")).Sequence(); v != 1000 { + t.Fatalf("unexpected sequence: %d", v) + } + return nil + }); err != nil { + t.Fatal(err) + } +} + // Ensure that a bucket can return an autoincrementing sequence. func TestBucket_NextSequence(t *testing.T) { db := MustOpenDB() diff --git a/cmd/bolt/main.go b/cmd/bolt/main.go index 1f78e03..a132ec0 100644 --- a/cmd/bolt/main.go +++ b/cmd/bolt/main.go @@ -1539,6 +1539,10 @@ type CompactCommand struct { Stdin io.Reader Stdout io.Writer Stderr io.Writer + + SrcPath string + DstPath string + TxMaxSize int64 } // newCompactCommand returns a CompactCommand. @@ -1550,163 +1554,187 @@ func newCompactCommand(m *Main) *CompactCommand { } } -// BucketWalkFunc is the type of the function called for keys (buckets and "normal" values) -// discovered by Walk. -// keys is the list of keys to descend to the bucket owning the discovered key/value pair k/v. -type BucketWalkFunc func(keys [][]byte, k []byte, v []byte) error - -// Walk walks recursively the bolt database db, calling walkFn for each key it finds. -func (cmd *CompactCommand) Walk(db *bolt.DB, walkFn BucketWalkFunc) error { - return db.View(func(tx *bolt.Tx) error { - return tx.ForEach(func(name []byte, b *bolt.Bucket) error { - return cmd.walkBucket(b, nil, name, nil, walkFn) - }) - }) -} - -func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keys [][]byte, k []byte, v []byte, walkFn BucketWalkFunc) error { - if err := walkFn(keys, k, v); err != nil { - return err - } - // not a bucket, exit. - if v != nil { - return nil - } - - keys2 := append(keys, k) - return b.ForEach(func(k, v []byte) error { - if v == nil { - return cmd.walkBucket(b.Bucket(k), keys2, k, nil, walkFn) - } - return cmd.walkBucket(b, keys2, k, v, walkFn) - }) -} - // Run executes the command. func (cmd *CompactCommand) Run(args ...string) (err error) { // Parse flags. fs := flag.NewFlagSet("", flag.ContinueOnError) - fs.SetOutput(cmd.Stderr) - var txMaxSize int64 - fs.Int64Var(&txMaxSize, "tx-max-size", 0, "commit tx when key/value size sum exceed this value. If 0, only one transaction is used. If you are compacting a large database, set this to a value appropriate for the available memory.") - help := fs.Bool("h", false, "print this help") - if err := fs.Parse(args); err != nil { - return err - } else if *help { + fs.SetOutput(ioutil.Discard) + fs.StringVar(&cmd.DstPath, "o", "", "") + fs.Int64Var(&cmd.TxMaxSize, "tx-max-size", 65536, "") + if err := fs.Parse(args); err == flag.ErrHelp { fmt.Fprintln(cmd.Stderr, cmd.Usage()) - fs.PrintDefaults() return ErrUsage + } else if err != nil { + return err + } else if cmd.DstPath == "" { + return fmt.Errorf("output file required") } - // Require database path. - path := fs.Arg(0) - if path == "" { + // Require database paths. + cmd.SrcPath = fs.Arg(0) + if cmd.SrcPath == "" { return ErrPathRequired - } else if _, err := os.Stat(path); os.IsNotExist(err) { - return ErrFileNotFound } - fi, err := os.Stat(path) - if err != nil { + + // Ensure source file exists. + fi, err := os.Stat(cmd.SrcPath) + if os.IsNotExist(err) { + return ErrFileNotFound + } else if err != nil { return err } initialSize := fi.Size() - // Open database. - db, err := bolt.Open(path, 0444, nil) + // Open source database. + src, err := bolt.Open(cmd.SrcPath, 0444, nil) if err != nil { return err } - defer db.Close() + defer src.Close() - var dstPath string - if fs.NArg() < 2 { - f, err := ioutil.TempFile("", "bolt-compact-") - if err != nil { - return fmt.Errorf("temp file: %v", err) - } - _ = f.Close() - _ = os.Remove(f.Name()) - dstPath = f.Name() - fmt.Fprintf(cmd.Stdout, "compacting db to %s\n", dstPath) - } else { - dstPath = fs.Arg(1) - } - - defer func() { - fi, err := os.Stat(dstPath) - if err != nil { - fmt.Fprintln(cmd.Stderr, err) - } - newSize := fi.Size() - if newSize == 0 { - fmt.Fprintln(cmd.Stderr, "db size is 0") - } - fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, newSize, float64(initialSize)/float64(newSize)) - }() - - dstdb, err := bolt.Open(dstPath, 0666, nil) + // Open destination database. + dst, err := bolt.Open(cmd.DstPath, fi.Mode(), nil) if err != nil { return err } - defer dstdb.Close() + defer dst.Close() + // Run compaction. + if err := cmd.compact(dst, src); err != nil { + return err + } + + // Report stats on new size. + fi, err = os.Stat(cmd.DstPath) + if err != nil { + return err + } else if fi.Size() == 0 { + return fmt.Errorf("zero db size") + } + fmt.Fprintf(cmd.Stdout, "%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size())) + + return nil +} + +func (cmd *CompactCommand) compact(dst, src *bolt.DB) error { // commit regularly, or we'll run out of memory for large datasets if using one transaction. var size int64 - tx, err := dstdb.Begin(true) + tx, err := dst.Begin(true) if err != nil { return err } - defer func() { - if err != nil { - _ = tx.Rollback() - } else { - err = tx.Commit() - } - }() - return cmd.Walk(db, func(keys [][]byte, k []byte, v []byte) error { - s := int64(len(k) + len(v)) - if size+s > txMaxSize && txMaxSize != 0 { + defer tx.Rollback() + + if err := cmd.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error { + // On each key/value, check if we have exceeded tx size. + sz := int64(len(k) + len(v)) + if size+sz > cmd.TxMaxSize && cmd.TxMaxSize != 0 { + // Commit previous transaction. if err := tx.Commit(); err != nil { return err } - tx, err = dstdb.Begin(true) + + // Start new transaction. + tx, err = dst.Begin(true) if err != nil { return err } size = 0 } - size += s + size += sz + + // Create bucket on the root transaction if this is the first level. nk := len(keys) if nk == 0 { - _, err := tx.CreateBucket(k) - return err + bkt, err := tx.CreateBucket(k) + if err != nil { + return err + } + if err := bkt.SetSequence(seq); err != nil { + return err + } + return nil } + // Create buckets on subsequent levels, if necessary. b := tx.Bucket(keys[0]) if nk > 1 { for _, k := range keys[1:] { b = b.Bucket(k) } } + + // If there is no value then this is a bucket call. if v == nil { - _, err := b.CreateBucket(k) - return err + bkt, err := b.CreateBucket(k) + if err != nil { + return err + } + if err := bkt.SetSequence(seq); err != nil { + return err + } + return nil } + + // Otherwise treat it as a key/value pair. return b.Put(k, v) + }); err != nil { + return err + } + + return tx.Commit() +} + +// walkFunc is the type of the function called for keys (buckets and "normal" +// values) discovered by Walk. keys is the list of keys to descend to the bucket +// owning the discovered key/value pair k/v. +type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error + +// walk walks recursively the bolt database db, calling walkFn for each key it finds. +func (cmd *CompactCommand) walk(db *bolt.DB, walkFn walkFunc) error { + return db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, b *bolt.Bucket) error { + return cmd.walkBucket(b, nil, name, nil, b.Sequence(), walkFn) + }) + }) +} + +func (cmd *CompactCommand) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error { + // Execute callback. + if err := fn(keypath, k, v, seq); err != nil { + return err + } + + // If this is not a bucket then stop. + if v != nil { + return nil + } + + // Iterate over each child key/value. + keypath = append(keypath, k) + return b.ForEach(func(k, v []byte) error { + if v == nil { + bkt := b.Bucket(k) + return cmd.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn) + } + return cmd.walkBucket(b, keypath, k, v, b.Sequence(), fn) }) } // Usage returns the help message. func (cmd *CompactCommand) Usage() string { return strings.TrimLeft(` -usage: bolt compact PATH [DST_PATH] +usage: bolt compact [options] -o DST SRC -Compact opens a database at PATH and walks it recursively entirely, -copying keys as they are found from all buckets, to a newly created db. +Compact opens a database at SRC path and walks it recursively, copying keys +as they are found from all buckets, to a newly created database at DST path. -If DST_PATH is non-empty, the new db is created at DST_PATH, else it will be -in a temporary location. +The original database is left untouched. -The original db is left untouched. +Additional options include: + + -tx-max-size NUM + Specifies the maximum size of individual transactions. + Defaults to 64KB. `, "\n") } diff --git a/cmd/bolt/main_test.go b/cmd/bolt/main_test.go index e8942a0..0a11ff3 100644 --- a/cmd/bolt/main_test.go +++ b/cmd/bolt/main_test.go @@ -209,6 +209,9 @@ func TestCompactCommand_Run(t *testing.T) { if err != nil { return err } + if err := b.SetSequence(uint64(i)); err != nil { + return err + } if err := fillBucket(b, append(k, '.')); err != nil { return err } @@ -263,7 +266,7 @@ func TestCompactCommand_Run(t *testing.T) { } m := NewMain() - if err := m.Run("compact", db.Path, dstdb.Path); err != nil { + if err := m.Run("compact", "-o", dstdb.Path, db.Path); err != nil { t.Fatal(err) } @@ -336,22 +339,10 @@ func chkdb(path string) ([]byte, error) { } func walkBucket(parent *bolt.Bucket, k []byte, v []byte, w io.Writer) error { - _, err := w.Write(k) - if err != nil { - return err - } - _, err = io.WriteString(w, ":") - if err != nil { - return err - } - _, err = w.Write(v) - if err != nil { - return err - } - _, err = fmt.Fprintln(w) - if err != nil { + if _, err := fmt.Fprintf(w, "%d:%x=%x\n", parent.Sequence(), k, v); err != nil { return err } + // not a bucket, exit. if v != nil { return nil