mirror of https://github.com/etcd-io/bbolt.git
*: add option to skip freelist sync
When the database has a lot of freepages, the cost to sync all freepages down to disk is high. If the total database size is small (<10GB), and the application can tolerate ~10 seconds recovery time, then it is reasonable to simply not sync freelist and rescan the db to rebuild freelist on recovery.pull/1/head
parent
bffefe5dd6
commit
7149270521
53
db.go
53
db.go
|
@ -61,6 +61,11 @@ type DB struct {
|
|||
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
|
||||
NoSync bool
|
||||
|
||||
// When true, skips syncing freelist to disk. This improves the database
|
||||
// write performance under normal operation, but requires a full database
|
||||
// re-sync during recovery.
|
||||
NoFreelistSync bool
|
||||
|
||||
// When true, skips the truncate call when growing the database.
|
||||
// Setting this to true is only safe on non-ext3/ext4 systems.
|
||||
// Skipping truncation avoids preallocation of hard drive space and
|
||||
|
@ -156,6 +161,7 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
|
|||
}
|
||||
db.NoGrowSync = options.NoGrowSync
|
||||
db.MmapFlags = options.MmapFlags
|
||||
db.NoFreelistSync = options.NoFreelistSync
|
||||
|
||||
// Set default values for later DB operations.
|
||||
db.MaxBatchSize = DefaultMaxBatchSize
|
||||
|
@ -232,9 +238,14 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Read in the freelist.
|
||||
db.freelist = newFreelist()
|
||||
db.freelist.read(db.page(db.meta().freelist))
|
||||
if db.NoFreelistSync {
|
||||
db.freelist = newFreelist()
|
||||
db.freelist.readIDs(db.freepages())
|
||||
} else {
|
||||
// Read in the freelist.
|
||||
db.freelist = newFreelist()
|
||||
db.freelist.read(db.page(db.meta().freelist))
|
||||
}
|
||||
|
||||
// Mark the database as opened and return.
|
||||
return db, nil
|
||||
|
@ -893,6 +904,38 @@ func (db *DB) IsReadOnly() bool {
|
|||
return db.readOnly
|
||||
}
|
||||
|
||||
func (db *DB) freepages() []pgid {
|
||||
tx, err := db.beginTx()
|
||||
defer func() {
|
||||
err = tx.Rollback()
|
||||
if err != nil {
|
||||
panic("freepages: failed to rollback tx")
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
panic("freepages: failed to open read only tx")
|
||||
}
|
||||
|
||||
reachable := make(map[pgid]*page)
|
||||
nofreed := make(map[pgid]bool)
|
||||
ech := make(chan error)
|
||||
go func() {
|
||||
for e := range ech {
|
||||
panic(fmt.Sprintf("freepages: failed to get all reachable pages (%v)", e))
|
||||
}
|
||||
}()
|
||||
tx.checkBucket(&tx.root, reachable, nofreed, ech)
|
||||
close(ech)
|
||||
|
||||
var fids []pgid
|
||||
for i := pgid(2); i < db.meta().pgid; i++ {
|
||||
if _, ok := reachable[i]; !ok {
|
||||
fids = append(fids, i)
|
||||
}
|
||||
}
|
||||
return fids
|
||||
}
|
||||
|
||||
// Options represents the options that can be set when opening a database.
|
||||
type Options struct {
|
||||
// Timeout is the amount of time to wait to obtain a file lock.
|
||||
|
@ -903,6 +946,10 @@ type Options struct {
|
|||
// Sets the DB.NoGrowSync flag before memory mapping the file.
|
||||
NoGrowSync bool
|
||||
|
||||
// Do not sync freelist to disk. This improves the database write performance
|
||||
// under normal operation, but requires a full database re-sync during recovery.
|
||||
NoFreelistSync bool
|
||||
|
||||
// Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
|
||||
// grab a shared lock (UNIX).
|
||||
ReadOnly bool
|
||||
|
|
33
db_test.go
33
db_test.go
|
@ -1366,15 +1366,35 @@ func validateBatchBench(b *testing.B, db *DB) {
|
|||
// DB is a test wrapper for bolt.DB.
|
||||
type DB struct {
|
||||
*bolt.DB
|
||||
f string
|
||||
o *bolt.Options
|
||||
}
|
||||
|
||||
// MustOpenDB returns a new, open DB at a temporary location.
|
||||
func MustOpenDB() *DB {
|
||||
db, err := bolt.Open(tempfile(), 0666, nil)
|
||||
f := tempfile()
|
||||
db, err := bolt.Open(f, 0666, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &DB{db}
|
||||
return &DB{
|
||||
DB: db,
|
||||
f: f,
|
||||
}
|
||||
}
|
||||
|
||||
// MustOpenDBWithOption returns a new, open DB at a temporary location with given options.
|
||||
func MustOpenWithOption(o *bolt.Options) *DB {
|
||||
f := tempfile()
|
||||
db, err := bolt.Open(f, 0666, o)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &DB{
|
||||
DB: db,
|
||||
f: f,
|
||||
o: o,
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the database and deletes the underlying file.
|
||||
|
@ -1399,6 +1419,15 @@ func (db *DB) MustClose() {
|
|||
}
|
||||
}
|
||||
|
||||
// MustReopen reopen the database. Panic on error.
|
||||
func (db *DB) MustReopen() {
|
||||
indb, err := bolt.Open(db.f, 0666, db.o)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
db.DB = indb
|
||||
}
|
||||
|
||||
// PrintStats prints the database stats
|
||||
func (db *DB) PrintStats() {
|
||||
var stats = db.Stats()
|
||||
|
|
|
@ -185,6 +185,12 @@ func (f *freelist) read(p *page) {
|
|||
f.reindex()
|
||||
}
|
||||
|
||||
// read initializes the freelist from a given list of ids.
|
||||
func (f *freelist) readIDs(ids []pgid) {
|
||||
f.ids = ids
|
||||
f.reindex()
|
||||
}
|
||||
|
||||
// write writes the page ids onto a freelist page. All free and pending ids are
|
||||
// saved to disk since in the event of a program crash, all pending ids will
|
||||
// become free.
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
package bolt_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
)
|
||||
|
||||
func TestSimulateNoFreeListSync_1op_1p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1, 1)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_10op_1p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10, 1)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_100op_1p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 100, 1)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_1000op_1p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1000, 1)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_10000op_1p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 1)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_10op_10p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10, 10)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_100op_10p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 100, 10)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_1000op_10p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1000, 10)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_10000op_10p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 10)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_100op_100p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 100, 100)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_1000op_100p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 1000, 100)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_10000op_100p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 100)
|
||||
}
|
||||
func TestSimulateNoFreeListSync_10000op_1000p(t *testing.T) {
|
||||
testSimulate(t, &bolt.Options{NoFreelistSync: true}, 8, 10000, 1000)
|
||||
}
|
|
@ -10,25 +10,25 @@ import (
|
|||
"github.com/coreos/bbolt"
|
||||
)
|
||||
|
||||
func TestSimulate_1op_1p(t *testing.T) { testSimulate(t, 1, 1) }
|
||||
func TestSimulate_10op_1p(t *testing.T) { testSimulate(t, 10, 1) }
|
||||
func TestSimulate_100op_1p(t *testing.T) { testSimulate(t, 100, 1) }
|
||||
func TestSimulate_1000op_1p(t *testing.T) { testSimulate(t, 1000, 1) }
|
||||
func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, 10000, 1) }
|
||||
func TestSimulate_1op_1p(t *testing.T) { testSimulate(t, nil, 1, 1, 1) }
|
||||
func TestSimulate_10op_1p(t *testing.T) { testSimulate(t, nil, 1, 10, 1) }
|
||||
func TestSimulate_100op_1p(t *testing.T) { testSimulate(t, nil, 1, 100, 1) }
|
||||
func TestSimulate_1000op_1p(t *testing.T) { testSimulate(t, nil, 1, 1000, 1) }
|
||||
func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1) }
|
||||
|
||||
func TestSimulate_10op_10p(t *testing.T) { testSimulate(t, 10, 10) }
|
||||
func TestSimulate_100op_10p(t *testing.T) { testSimulate(t, 100, 10) }
|
||||
func TestSimulate_1000op_10p(t *testing.T) { testSimulate(t, 1000, 10) }
|
||||
func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, 10000, 10) }
|
||||
func TestSimulate_10op_10p(t *testing.T) { testSimulate(t, nil, 1, 10, 10) }
|
||||
func TestSimulate_100op_10p(t *testing.T) { testSimulate(t, nil, 1, 100, 10) }
|
||||
func TestSimulate_1000op_10p(t *testing.T) { testSimulate(t, nil, 1, 1000, 10) }
|
||||
func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, nil, 1, 10000, 10) }
|
||||
|
||||
func TestSimulate_100op_100p(t *testing.T) { testSimulate(t, 100, 100) }
|
||||
func TestSimulate_1000op_100p(t *testing.T) { testSimulate(t, 1000, 100) }
|
||||
func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, 10000, 100) }
|
||||
func TestSimulate_100op_100p(t *testing.T) { testSimulate(t, nil, 1, 100, 100) }
|
||||
func TestSimulate_1000op_100p(t *testing.T) { testSimulate(t, nil, 1, 1000, 100) }
|
||||
func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, nil, 1, 10000, 100) }
|
||||
|
||||
func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, 10000, 1000) }
|
||||
func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1000) }
|
||||
|
||||
// Randomly generate operations on a given database with multiple clients to ensure consistency and thread safety.
|
||||
func testSimulate(t *testing.T, threadCount, parallelism int) {
|
||||
func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, parallelism int) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode.")
|
||||
}
|
||||
|
@ -42,81 +42,88 @@ func testSimulate(t *testing.T, threadCount, parallelism int) {
|
|||
var versions = make(map[int]*QuickDB)
|
||||
versions[1] = NewQuickDB()
|
||||
|
||||
db := MustOpenDB()
|
||||
db := MustOpenWithOption(openOption)
|
||||
defer db.MustClose()
|
||||
|
||||
var mutex sync.Mutex
|
||||
|
||||
// Run n threads in parallel, each with their own operation.
|
||||
var wg sync.WaitGroup
|
||||
var threads = make(chan bool, parallelism)
|
||||
var i int
|
||||
for {
|
||||
threads <- true
|
||||
wg.Add(1)
|
||||
writable := ((rand.Int() % 100) < 20) // 20% writers
|
||||
|
||||
// Choose an operation to execute.
|
||||
var handler simulateHandler
|
||||
if writable {
|
||||
handler = writerHandlers[rand.Intn(len(writerHandlers))]
|
||||
} else {
|
||||
handler = readerHandlers[rand.Intn(len(readerHandlers))]
|
||||
}
|
||||
for n := 0; n < round; n++ {
|
||||
|
||||
// Execute a thread for the given operation.
|
||||
go func(writable bool, handler simulateHandler) {
|
||||
defer wg.Done()
|
||||
var threads = make(chan bool, parallelism)
|
||||
var i int
|
||||
for {
|
||||
threads <- true
|
||||
wg.Add(1)
|
||||
writable := ((rand.Int() % 100) < 20) // 20% writers
|
||||
|
||||
// Start transaction.
|
||||
tx, err := db.Begin(writable)
|
||||
if err != nil {
|
||||
t.Fatal("tx begin: ", err)
|
||||
}
|
||||
|
||||
// Obtain current state of the dataset.
|
||||
mutex.Lock()
|
||||
var qdb = versions[tx.ID()]
|
||||
// Choose an operation to execute.
|
||||
var handler simulateHandler
|
||||
if writable {
|
||||
qdb = versions[tx.ID()-1].Copy()
|
||||
}
|
||||
mutex.Unlock()
|
||||
|
||||
// Make sure we commit/rollback the tx at the end and update the state.
|
||||
if writable {
|
||||
defer func() {
|
||||
mutex.Lock()
|
||||
versions[tx.ID()] = qdb
|
||||
mutex.Unlock()
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
handler = writerHandlers[rand.Intn(len(writerHandlers))]
|
||||
} else {
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
handler = readerHandlers[rand.Intn(len(readerHandlers))]
|
||||
}
|
||||
|
||||
// Ignore operation if we don't have data yet.
|
||||
if qdb == nil {
|
||||
return
|
||||
// Execute a thread for the given operation.
|
||||
go func(writable bool, handler simulateHandler) {
|
||||
defer wg.Done()
|
||||
|
||||
// Start transaction.
|
||||
tx, err := db.Begin(writable)
|
||||
if err != nil {
|
||||
t.Fatal("tx begin: ", err)
|
||||
}
|
||||
|
||||
// Obtain current state of the dataset.
|
||||
mutex.Lock()
|
||||
var qdb = versions[tx.ID()]
|
||||
if writable {
|
||||
qdb = versions[tx.ID()-1].Copy()
|
||||
}
|
||||
mutex.Unlock()
|
||||
|
||||
// Make sure we commit/rollback the tx at the end and update the state.
|
||||
if writable {
|
||||
defer func() {
|
||||
mutex.Lock()
|
||||
versions[tx.ID()] = qdb
|
||||
mutex.Unlock()
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
}
|
||||
|
||||
// Ignore operation if we don't have data yet.
|
||||
if qdb == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Execute handler.
|
||||
handler(tx, qdb)
|
||||
|
||||
// Release a thread back to the scheduling loop.
|
||||
<-threads
|
||||
}(writable, handler)
|
||||
|
||||
i++
|
||||
if i > threadCount {
|
||||
break
|
||||
}
|
||||
|
||||
// Execute handler.
|
||||
handler(tx, qdb)
|
||||
|
||||
// Release a thread back to the scheduling loop.
|
||||
<-threads
|
||||
}(writable, handler)
|
||||
|
||||
i++
|
||||
if i > threadCount {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until all threads are done.
|
||||
wg.Wait()
|
||||
// Wait until all threads are done.
|
||||
wg.Wait()
|
||||
|
||||
db.MustClose()
|
||||
db.MustReopen()
|
||||
}
|
||||
}
|
||||
|
||||
type simulateHandler func(tx *bolt.Tx, qdb *QuickDB)
|
||||
|
|
56
tx.go
56
tx.go
|
@ -169,26 +169,9 @@ func (tx *Tx) Commit() error {
|
|||
// Free the old root bucket.
|
||||
tx.meta.root.root = tx.root.root
|
||||
|
||||
opgid := tx.meta.pgid
|
||||
|
||||
// Free the freelist and allocate new pages for it. This will overestimate
|
||||
// the size of the freelist but not underestimate the size (which would be bad).
|
||||
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
|
||||
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
|
||||
if err != nil {
|
||||
tx.rollback()
|
||||
return err
|
||||
}
|
||||
if err := tx.db.freelist.write(p); err != nil {
|
||||
tx.rollback()
|
||||
return err
|
||||
}
|
||||
tx.meta.freelist = p.id
|
||||
|
||||
// If the high water mark has moved up then attempt to grow the database.
|
||||
if tx.meta.pgid > opgid {
|
||||
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
|
||||
tx.rollback()
|
||||
if !tx.db.NoFreelistSync {
|
||||
err := tx.commitFreelist()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -235,6 +218,33 @@ func (tx *Tx) Commit() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (tx *Tx) commitFreelist() error {
|
||||
opgid := tx.meta.pgid
|
||||
|
||||
// Free the freelist and allocate new pages for it. This will overestimate
|
||||
// the size of the freelist but not underestimate the size (which would be bad).
|
||||
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
|
||||
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
|
||||
if err != nil {
|
||||
tx.rollback()
|
||||
return err
|
||||
}
|
||||
if err := tx.db.freelist.write(p); err != nil {
|
||||
tx.rollback()
|
||||
return err
|
||||
}
|
||||
tx.meta.freelist = p.id
|
||||
// If the high water mark has moved up then attempt to grow the database.
|
||||
if tx.meta.pgid > opgid {
|
||||
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
|
||||
tx.rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Rollback closes the transaction and ignores all previous updates. Read-only
|
||||
// transactions must be rolled back and not committed.
|
||||
func (tx *Tx) Rollback() error {
|
||||
|
@ -394,8 +404,10 @@ func (tx *Tx) check(ch chan error) {
|
|||
reachable := make(map[pgid]*page)
|
||||
reachable[0] = tx.page(0) // meta0
|
||||
reachable[1] = tx.page(1) // meta1
|
||||
for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
|
||||
reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
|
||||
if !tx.DB().NoFreelistSync {
|
||||
for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
|
||||
reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
|
||||
}
|
||||
}
|
||||
|
||||
// Recursively check buckets.
|
||||
|
|
Loading…
Reference in New Issue