add log messages

Signed-off-by: Benjamin Wang <benjamin.ahrtr@gmail.com>
pull/646/head
Benjamin Wang 2023-12-07 16:03:05 +00:00
parent de6c42bcf2
commit 4c7075efe6
3 changed files with 180 additions and 42 deletions

View File

@ -145,7 +145,16 @@ func (b *Bucket) openBucket(value []byte) *Bucket {
// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
lg := b.tx.db.Logger()
lg.Debugf("Creating bucket %q", string(key))
defer func() {
if err != nil {
lg.Errorf("Creating bucket %q failed: %v", string(key), err)
} else {
lg.Debugf("Creating bucket %q successfully", string(key))
}
}()
if b.tx.db == nil {
return nil, errors.ErrTxClosed
} else if !b.tx.writable {
@ -192,7 +201,17 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
lg := b.tx.db.Logger()
lg.Debugf("Creating bucket if not exist %q", string(key))
defer func() {
if err != nil {
lg.Errorf("Creating bucket if not exist %q failed: %v", string(key), err)
} else {
lg.Debugf("Creating bucket if not exist %q successfully", string(key))
}
}()
if b.tx.db == nil {
return nil, errors.ErrTxClosed
} else if !b.tx.writable {
@ -249,7 +268,17 @@ func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
// DeleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
func (b *Bucket) DeleteBucket(key []byte) error {
func (b *Bucket) DeleteBucket(key []byte) (err error) {
lg := b.tx.db.Logger()
lg.Debugf("Deleting bucket %q", string(key))
defer func() {
if err != nil {
lg.Errorf("Deleting bucket %q failed: %v", string(key), err)
} else {
lg.Debugf("Deleting bucket %q successfully", string(key))
}
}()
if b.tx.db == nil {
return errors.ErrTxClosed
} else if !b.Writable() {
@ -269,7 +298,7 @@ func (b *Bucket) DeleteBucket(key []byte) error {
// Recursively delete all child buckets.
child := b.Bucket(key)
err := child.ForEachBucket(func(k []byte) error {
err = child.ForEachBucket(func(k []byte) error {
if err := child.DeleteBucket(k); err != nil {
return fmt.Errorf("delete bucket: %s", err)
}
@ -316,7 +345,16 @@ func (b *Bucket) Get(key []byte) []byte {
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) error {
func (b *Bucket) Put(key []byte, value []byte) (err error) {
lg := b.tx.db.Logger()
lg.Debugf("Putting key %q", string(key))
defer func() {
if err != nil {
lg.Errorf("Putting key %q failed: %v", string(key), err)
} else {
lg.Debugf("Putting key %q successfully", string(key))
}
}()
if b.tx.db == nil {
return errors.ErrTxClosed
} else if !b.Writable() {
@ -353,7 +391,17 @@ func (b *Bucket) Put(key []byte, value []byte) error {
// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) error {
func (b *Bucket) Delete(key []byte) (err error) {
lg := b.tx.db.Logger()
lg.Debugf("Deleting key %q", string(key))
defer func() {
if err != nil {
lg.Errorf("Deleting key %q failed: %v", string(key), err)
} else {
lg.Debugf("Deleting key %q successfully", string(key))
}
}()
if b.tx.db == nil {
return errors.ErrTxClosed
} else if !b.Writable() {

114
db.go
View File

@ -116,8 +116,7 @@ type DB struct {
// Supported only on Unix via mlock/munlock syscalls.
Mlock bool
// Logger is the logger used for bbolt.
Logger Logger
logger Logger
path string
openFile func(string, int, os.FileMode) (*os.File, error)
@ -176,10 +175,11 @@ func (db *DB) String() string {
// If the file does not exist then it will be created automatically with a given file mode.
// Passing in nil options will cause Bolt to open the database with the default options.
// Note: For read/write transactions, ensure the owner has write permission on the created/opened database file, e.g. 0600
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
db := &DB{
func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
db = &DB{
opened: true,
}
// Set default options if no options are provided.
if options == nil {
options = DefaultOptions
@ -198,11 +198,21 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
db.AllocSize = common.DefaultAllocSize
if options.Logger == nil {
db.Logger = getDiscardLogger()
db.logger = getDiscardLogger()
} else {
db.Logger = options.Logger
db.logger = options.Logger
}
lg := db.Logger()
lg.Infof("Opening db file (%s) with mode %x and with options: %s", path, mode, options)
defer func() {
if err != nil {
lg.Errorf("Opening bbolt db (%s) failed: %v", path, err)
} else {
lg.Infof("Opening bbolt db (%s) successfully", path)
}
}()
flag := os.O_RDWR
if options.ReadOnly {
flag = os.O_RDONLY
@ -219,9 +229,9 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
}
// Open data file and separate sync handler for metadata writes.
var err error
if db.file, err = db.openFile(path, flag, mode); err != nil {
_ = db.close()
lg.Errorf("failed to open db file (%s): %v", path, err)
return nil, err
}
db.path = db.file.Name()
@ -233,8 +243,9 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// if !options.ReadOnly.
// The database file is locked using the shared lock (more than one process may
// hold a lock at the same time) otherwise (options.ReadOnly is set).
if err := flock(db, !db.readOnly, options.Timeout); err != nil {
if err = flock(db, !db.readOnly, options.Timeout); err != nil {
_ = db.close()
lg.Errorf("failed to lock db file (%s), readonly: %t, error: %v", path, db.readOnly, err)
return nil, err
}
@ -247,23 +258,24 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
}
// Initialize the database if it doesn't exist.
if info, err := db.file.Stat(); err != nil {
if info, statErr := db.file.Stat(); statErr != nil {
_ = db.close()
return nil, err
lg.Errorf("failed to get db file's stats (%s): %v", path, err)
return nil, statErr
} else if info.Size() == 0 {
// Initialize new files with meta pages.
if err := db.init(); err != nil {
if err = db.init(); err != nil {
// clean up file descriptor on initialization fail
_ = db.close()
lg.Errorf("failed to initialize db file (%s): %v", path, err)
return nil, err
}
} else {
// try to get the page size from the metadata pages
if pgSize, err := db.getPageSize(); err == nil {
db.pageSize = pgSize
} else {
if db.pageSize, err = db.getPageSize(); err != nil {
_ = db.close()
return nil, berrors.ErrInvalid
lg.Errorf("failed to get page size from db file (%s): %v", path, err)
return nil, err
}
}
@ -275,8 +287,9 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
}
// Memory map the data file.
if err := db.mmap(options.InitialMmapSize); err != nil {
if err = db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
lg.Errorf("failed to map db file (%s): %v", path, err)
return nil, err
}
@ -291,18 +304,18 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// Flush freelist when transitioning from no sync to sync so
// NoFreelistSync unaware boltdb can open the db later.
if !db.NoFreelistSync && !db.hasSyncedFreelist() {
tx, err := db.Begin(true)
tx, txErr := db.Begin(true)
if tx != nil {
err = tx.Commit()
txErr = tx.Commit()
}
if err != nil {
if txErr != nil {
lg.Errorf("starting readwrite transaction failed: %v", txErr)
_ = db.close()
return nil, err
return nil, txErr
}
}
// Mark the database as opened and return.
db.Logger.Debug("bbolt opened successfully")
return db, nil
}
@ -435,9 +448,13 @@ func (db *DB) mmap(minsz int) (err error) {
db.mmaplock.Lock()
defer db.mmaplock.Unlock()
lg := db.Logger()
// Ensure the size is at least the minimum size.
fileSize, err := db.fileSize()
var fileSize int
fileSize, err = db.fileSize()
if err != nil {
lg.Errorf("getting file size failed: %w", err)
return err
}
var size = fileSize
@ -446,6 +463,7 @@ func (db *DB) mmap(minsz int) (err error) {
}
size, err = db.mmapSize(size)
if err != nil {
lg.Errorf("getting map size failed: %w", err)
return err
}
@ -470,6 +488,7 @@ func (db *DB) mmap(minsz int) (err error) {
// gofail: var mapError string
// return errors.New(mapError)
if err = mmap(db, size); err != nil {
lg.Errorf("[GOOS: %s, GOARCH: %s] mmap failed, size: %d, error: %v", runtime.GOOS, runtime.GOARCH, size, err)
return err
}
@ -500,6 +519,7 @@ func (db *DB) mmap(minsz int) (err error) {
err0 := db.meta0.Validate()
err1 := db.meta1.Validate()
if err0 != nil && err1 != nil {
lg.Errorf("both meta pages are invalid, meta0: %v, meta1: %v", err0, err1)
return err0
}
@ -522,6 +542,7 @@ func (db *DB) munmap() error {
// gofail: var unmapError string
// return errors.New(unmapError)
if err := munmap(db); err != nil {
db.Logger().Errorf("[GOOS: %s, GOARCH: %s] munmap failed, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, db.datasz, err)
return fmt.Errorf("unmap error: " + err.Error())
}
@ -569,6 +590,7 @@ func (db *DB) munlock(fileSize int) error {
// gofail: var munlockError string
// return errors.New(munlockError)
if err := munlock(db, fileSize); err != nil {
db.Logger().Errorf("[GOOS: %s, GOARCH: %s] munlock failed, fileSize: %d, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, fileSize, db.datasz, err)
return fmt.Errorf("munlock error: " + err.Error())
}
return nil
@ -578,6 +600,7 @@ func (db *DB) mlock(fileSize int) error {
// gofail: var mlockError string
// return errors.New(mlockError)
if err := mlock(db, fileSize); err != nil {
db.Logger().Errorf("[GOOS: %s, GOARCH: %s] mlock failed, fileSize: %d, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, fileSize, db.datasz, err)
return fmt.Errorf("mlock error: " + err.Error())
}
return nil
@ -628,9 +651,11 @@ func (db *DB) init() error {
// Write the buffer to our data file.
if _, err := db.ops.writeAt(buf, 0); err != nil {
db.Logger().Errorf("writeAt failed: %w", err)
return err
}
if err := fdatasync(db); err != nil {
db.Logger().Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
return err
}
@ -713,13 +738,29 @@ func (db *DB) close() error {
//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db *DB) Begin(writable bool) (*Tx, error) {
func (db *DB) Begin(writable bool) (t *Tx, err error) {
db.Logger().Debugf("Starting a new transaction [writable: %t]", writable)
defer func() {
if err != nil {
db.Logger().Errorf("Starting a new transaction [writable: %t] failed: %v", writable, err)
} else {
db.Logger().Debugf("Starting a new transaction [writable: %t] successfully", writable)
}
}()
if writable {
return db.beginRWTx()
}
return db.beginTx()
}
func (db *DB) Logger() Logger {
if db == nil || db.logger == nil {
return getDiscardLogger()
}
return db.logger
}
func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
@ -1053,7 +1094,18 @@ func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
//
// This is not necessary under normal operation, however, if you use NoSync
// then it allows you to force the database file to sync against the disk.
func (db *DB) Sync() error { return fdatasync(db) }
func (db *DB) Sync() (err error) {
db.Logger().Debug("Syncing bbolt db (%s)", db.path)
defer func() {
if err != nil {
db.Logger().Errorf("[GOOS: %s, GOARCH: %s] syncing bbolt db (%s) failed: %v", runtime.GOOS, runtime.GOARCH, db.path, err)
} else {
db.Logger().Debugf("Syncing bbolt db (%s) successfully", db.path)
}
}()
return fdatasync(db)
}
// Stats retrieves ongoing performance stats for the database.
// This is only updated when a transaction closes.
@ -1142,8 +1194,10 @@ func (db *DB) allocate(txid common.Txid, count int) (*common.Page, error) {
// grow grows the size of the database to the given sz.
func (db *DB) grow(sz int) error {
// Ignore if the new size is less than available file size.
lg := db.Logger()
fileSize, err := db.fileSize()
if err != nil {
lg.Errorf("getting file size failed: %w", err)
return err
}
if sz <= fileSize {
@ -1165,10 +1219,12 @@ func (db *DB) grow(sz int) error {
// gofail: var resizeFileError string
// return errors.New(resizeFileError)
if err := db.file.Truncate(int64(sz)); err != nil {
lg.Errorf("[GOOS: %s, GOARCH: %s] truncating file failed, size: %d, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, sz, db.datasz, err)
return fmt.Errorf("file resize error: %s", err)
}
}
if err := db.file.Sync(); err != nil {
lg.Errorf("[GOOS: %s, GOARCH: %s] syncing file failed, db.datasz: %d, error: %v", runtime.GOOS, runtime.GOARCH, db.datasz, err)
return fmt.Errorf("file sync error: %s", err)
}
if db.Mlock {
@ -1283,6 +1339,16 @@ type Options struct {
Logger Logger
}
func (o *Options) String() string {
if o == nil {
return "{}"
}
return fmt.Sprintf("{Timeout: %s, NoGrowSync: %t, NoFreelistSync: %t, PreLoadFreelist: %t, FreelistType: %s, ReadOnly: %t, MmapFlags: %x, InitialMmapSize: %d, PageSize: %d, NoSync: %t, OpenFile: %p, Mlock: %t, Logger: %p}",
o.Timeout, o.NoGrowSync, o.NoFreelistSync, o.PreLoadFreelist, o.FreelistType, o.ReadOnly, o.MmapFlags, o.InitialMmapSize, o.PageSize, o.NoSync, o.OpenFile, o.Mlock, o.Logger)
}
// DefaultOptions represent the options used if nil options are passed into Open().
// No timeout is used which will cause Bolt to wait indefinitely for a lock.
var DefaultOptions = &Options{

48
tx.go
View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"runtime"
"sort"
"strings"
"sync/atomic"
@ -32,7 +33,6 @@ type Tx struct {
pages map[common.Pgid]*common.Page
stats TxStats
commitHandlers []func()
Logger Logger
// WriteFlag specifies the flag for write-related methods like WriteTo().
// Tx opens the database file with the specified flag to copy the data.
@ -57,8 +57,6 @@ func (tx *Tx) init(db *DB) {
tx.root.InBucket = &common.InBucket{}
*tx.root.InBucket = *(tx.meta.RootBucket())
tx.Logger = db.Logger
// Increment the transaction id and add a page cache for writable transactions.
if tx.writable {
tx.pages = make(map[common.Pgid]*common.Page)
@ -68,6 +66,9 @@ func (tx *Tx) init(db *DB) {
// ID returns the transaction id.
func (tx *Tx) ID() int {
if tx == nil || tx.meta == nil {
return -1
}
return int(tx.meta.Txid())
}
@ -143,7 +144,18 @@ func (tx *Tx) OnCommit(fn func()) {
// Commit writes all changes to disk, updates the meta page and closes the transaction.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error {
func (tx *Tx) Commit() (err error) {
txId := tx.ID()
lg := tx.db.Logger()
lg.Debugf("Committing transaction %d", txId)
defer func() {
if err != nil {
lg.Errorf("Committing transaction failed: %v", err)
} else {
lg.Debugf("Committing transaction %d successfully", txId)
}
}()
common.Assert(!tx.managed, "managed tx commit not allowed")
if tx.db == nil {
return berrors.ErrTxClosed
@ -151,7 +163,6 @@ func (tx *Tx) Commit() error {
return berrors.ErrTxNotWritable
}
tx.Logger.Infof("Committing transaction %d", tx.ID())
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// Rebalance nodes which have had deletions.
@ -165,7 +176,8 @@ func (tx *Tx) Commit() error {
// spill data onto dirty pages.
startTime = time.Now()
if err := tx.root.spill(); err != nil {
if err = tx.root.spill(); err != nil {
lg.Errorf("spilling data onto dirty pages failed: %v", err)
tx.rollback()
return err
}
@ -180,8 +192,9 @@ func (tx *Tx) Commit() error {
}
if !tx.db.NoFreelistSync {
err := tx.commitFreelist()
err = tx.commitFreelist()
if err != nil {
lg.Errorf("committing freelist failed: %v", err)
return err
}
} else {
@ -194,7 +207,8 @@ func (tx *Tx) Commit() error {
// gofail: var lackOfDiskSpace string
// tx.rollback()
// return errors.New(lackOfDiskSpace)
if err := tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil {
if err = tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil {
lg.Errorf("growing db size failed, pgid: %d, pagesize: %d, error: %v", tx.meta.Pgid(), tx.db.pageSize, err)
tx.rollback()
return err
}
@ -202,7 +216,8 @@ func (tx *Tx) Commit() error {
// Write dirty pages to disk.
startTime = time.Now()
if err := tx.write(); err != nil {
if err = tx.write(); err != nil {
lg.Errorf("writing data failed: %v", err)
tx.rollback()
return err
}
@ -212,11 +227,11 @@ func (tx *Tx) Commit() error {
ch := tx.Check()
var errs []string
for {
err, ok := <-ch
chkErr, ok := <-ch
if !ok {
break
}
errs = append(errs, err.Error())
errs = append(errs, chkErr.Error())
}
if len(errs) > 0 {
panic("check fail: " + strings.Join(errs, "\n"))
@ -224,7 +239,8 @@ func (tx *Tx) Commit() error {
}
// Write meta to disk.
if err := tx.writeMeta(); err != nil {
if err = tx.writeMeta(); err != nil {
lg.Errorf("writeMeta failed: %v", err)
tx.rollback()
return err
}
@ -418,8 +434,10 @@ func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*common.Page, error) {
lg := tx.db.Logger()
p, err := tx.db.allocate(tx.meta.Txid(), count)
if err != nil {
lg.Errorf("allocating failed, txid: %d, count: %d, error: %v", tx.meta.Txid(), count, err)
return nil, err
}
@ -436,6 +454,7 @@ func (tx *Tx) allocate(count int) (*common.Page, error) {
// write writes any dirty pages to disk.
func (tx *Tx) write() error {
// Sort pages by id.
lg := tx.db.Logger()
pages := make(common.Pages, 0, len(tx.pages))
for _, p := range tx.pages {
pages = append(pages, p)
@ -459,6 +478,7 @@ func (tx *Tx) write() error {
buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
lg.Errorf("writeAt failed, offset: %d: %w", offset, err)
return err
}
@ -481,6 +501,7 @@ func (tx *Tx) write() error {
if !tx.db.NoSync || common.IgnoreNoSync {
// gofail: var beforeSyncDataPages struct{}
if err := fdatasync(tx.db); err != nil {
lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
return err
}
}
@ -508,17 +529,20 @@ func (tx *Tx) write() error {
// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
// Create a temporary buffer for the meta page.
lg := tx.db.Logger()
buf := make([]byte, tx.db.pageSize)
p := tx.db.pageInBuffer(buf, 0)
tx.meta.Write(p)
// Write the meta page to file.
if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil {
lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err)
return err
}
if !tx.db.NoSync || common.IgnoreNoSync {
// gofail: var beforeSyncMetaPage struct{}
if err := fdatasync(tx.db); err != nil {
lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
return err
}
}