From dd4458ce3a82c1e6f7134d746adb5b64d36495e5 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Wed, 4 Jan 2023 20:06:51 +0800 Subject: [PATCH] TxStats: provides Getter/Inc functions Signed-off-by: Wei Fu --- allocate_test.go | 8 +- bucket.go | 5 +- db_test.go | 8 +- internal/btesting/btesting.go | 12 +-- node.go | 9 +- tx.go | 192 ++++++++++++++++++++++++++++------ tx_stats_test.go | 54 ++++++++++ tx_test.go | 107 +++++++++++++++++++ 8 files changed, 340 insertions(+), 55 deletions(-) create mode 100644 tx_stats_test.go diff --git a/allocate_test.go b/allocate_test.go index cc9380d..94e9116 100644 --- a/allocate_test.go +++ b/allocate_test.go @@ -18,14 +18,16 @@ func TestTx_allocatePageStats(t *testing.T) { pages: make(map[pgid]*page), } - prePageCnt := tx.Stats().PageCount + txStats := tx.Stats() + prePageCnt := txStats.GetPageCount() allocateCnt := f.free_count() if _, err := tx.allocate(allocateCnt); err != nil { t.Fatal(err) } - if tx.Stats().PageCount != prePageCnt+int64(allocateCnt) { - t.Errorf("Allocated %d but got %d page in stats", allocateCnt, tx.Stats().PageCount) + txStats = tx.Stats() + if txStats.GetPageCount() != prePageCnt+int64(allocateCnt) { + t.Errorf("Allocated %d but got %d page in stats", allocateCnt, txStats.GetPageCount()) } } diff --git a/bucket.go b/bucket.go index 20e812e..5ec9da7 100644 --- a/bucket.go +++ b/bucket.go @@ -3,7 +3,6 @@ package bbolt import ( "bytes" "fmt" - "sync/atomic" "unsafe" ) @@ -82,7 +81,7 @@ func (b *Bucket) Writable() bool { // Do not use a cursor after the transaction is closed. func (b *Bucket) Cursor() *Cursor { // Update transaction statistics. - atomic.AddInt64(&b.tx.stats.CursorCount, 1) + b.tx.stats.IncCursorCount(1) // Allocate and return a cursor. return &Cursor{ @@ -682,7 +681,7 @@ func (b *Bucket) node(pgid pgid, parent *node) *node { b.nodes[pgid] = n // Update statistics. - atomic.AddInt64(&b.tx.stats.NodeCount, 1) + b.tx.stats.IncNodeCount(1) return n } diff --git a/db_test.go b/db_test.go index fa8fd69..3d9f229 100644 --- a/db_test.go +++ b/db_test.go @@ -1038,8 +1038,8 @@ func TestDB_Stats(t *testing.T) { } stats := db.Stats() - if stats.TxStats.PageCount != 2 { - t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.PageCount) + if stats.TxStats.GetPageCount() != 2 { + t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.GetPageCount()) } else if stats.FreePageN != 0 { t.Fatalf("unexpected FreePageN != 0: %d", stats.FreePageN) } else if stats.PendingPageN != 2 { @@ -1122,8 +1122,8 @@ func TestDBStats_Sub(t *testing.T) { b.TxStats.PageCount = 10 b.FreePageN = 14 diff := b.Sub(&a) - if diff.TxStats.PageCount != 7 { - t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.PageCount) + if diff.TxStats.GetPageCount() != 7 { + t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.GetPageCount()) } // free page stats are copied from the receiver and not subtracted diff --git a/internal/btesting/btesting.go b/internal/btesting/btesting.go index e070ec7..b305072 100644 --- a/internal/btesting/btesting.go +++ b/internal/btesting/btesting.go @@ -189,14 +189,14 @@ func (db *DB) CopyTempFile() { func (db *DB) PrintStats() { var stats = db.Stats() fmt.Printf("[db] %-20s %-20s %-20s\n", - fmt.Sprintf("pg(%d/%d)", stats.TxStats.PageCount, stats.TxStats.PageAlloc), - fmt.Sprintf("cur(%d)", stats.TxStats.CursorCount), - fmt.Sprintf("node(%d/%d)", stats.TxStats.NodeCount, stats.TxStats.NodeDeref), + fmt.Sprintf("pg(%d/%d)", stats.TxStats.GetPageCount(), stats.TxStats.GetPageAlloc()), + fmt.Sprintf("cur(%d)", stats.TxStats.GetCursorCount()), + fmt.Sprintf("node(%d/%d)", stats.TxStats.GetNodeCount(), stats.TxStats.GetNodeDeref()), ) fmt.Printf(" %-20s %-20s %-20s\n", - fmt.Sprintf("rebal(%d/%v)", stats.TxStats.Rebalance, truncDuration(stats.TxStats.RebalanceTime)), - fmt.Sprintf("spill(%d/%v)", stats.TxStats.Spill, truncDuration(stats.TxStats.SpillTime)), - fmt.Sprintf("w(%d/%v)", stats.TxStats.Write, truncDuration(stats.TxStats.WriteTime)), + fmt.Sprintf("rebal(%d/%v)", stats.TxStats.GetRebalance(), truncDuration(stats.TxStats.GetRebalanceTime())), + fmt.Sprintf("spill(%d/%v)", stats.TxStats.GetSpill(), truncDuration(stats.TxStats.GetSpillTime())), + fmt.Sprintf("w(%d/%v)", stats.TxStats.GetWrite(), truncDuration(stats.TxStats.GetWriteTime())), ) } diff --git a/node.go b/node.go index 581283e..b5ddce6 100644 --- a/node.go +++ b/node.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "sort" - "sync/atomic" "unsafe" ) @@ -305,7 +304,7 @@ func (n *node) splitTwo(pageSize uintptr) (*node, *node) { n.inodes = n.inodes[:splitIndex] // Update the statistics. - atomic.AddInt64(&n.bucket.tx.stats.Split, 1) + n.bucket.tx.stats.IncSplit(1) return n, next } @@ -392,7 +391,7 @@ func (n *node) spill() error { } // Update the statistics. - atomic.AddInt64(&tx.stats.Spill, 1) + tx.stats.IncSpill(1) } // If the root node split and created a new root then we need to spill that @@ -414,7 +413,7 @@ func (n *node) rebalance() { n.unbalanced = false // Update statistics. - n.bucket.tx.stats.Rebalance++ + n.bucket.tx.stats.IncRebalance(1) // Ignore if node is above threshold (25%) and has enough keys. var threshold = n.bucket.tx.db.pageSize / 4 @@ -548,7 +547,7 @@ func (n *node) dereference() { } // Update statistics. - atomic.AddInt64(&n.bucket.tx.stats.NodeDeref, 1) + n.bucket.tx.stats.IncNodeDeref(1) } // free adds the node's underlying page to the freelist. diff --git a/tx.go b/tx.go index a2919fe..4d571b1 100644 --- a/tx.go +++ b/tx.go @@ -152,8 +152,8 @@ func (tx *Tx) Commit() error { // Rebalance nodes which have had deletions. var startTime = time.Now() tx.root.rebalance() - if atomic.LoadInt64(&tx.stats.Rebalance) > 0 { - atomicAddDuration(&tx.stats.RebalanceTime, time.Since(startTime)) + if tx.stats.GetRebalance() > 0 { + tx.stats.IncRebalanceTime(time.Since(startTime)) } // spill data onto dirty pages. @@ -162,7 +162,7 @@ func (tx *Tx) Commit() error { tx.rollback() return err } - atomicAddDuration(&tx.stats.SpillTime, time.Since(startTime)) + tx.stats.IncSpillTime(time.Since(startTime)) // Free the old root bucket. tx.meta.root.root = tx.root.root @@ -209,7 +209,7 @@ func (tx *Tx) Commit() error { tx.rollback() return err } - atomicAddDuration(&tx.stats.WriteTime, time.Since(startTime)) + tx.stats.IncWriteTime(time.Since(startTime)) // Finalize the transaction. tx.close() @@ -504,8 +504,8 @@ func (tx *Tx) allocate(count int) (*page, error) { tx.pages[p.id] = p // Update statistics. - atomic.AddInt64(&tx.stats.PageCount, int64(count)) - atomic.AddInt64(&tx.stats.PageAlloc, int64(count*tx.db.pageSize)) + tx.stats.IncPageCount(int64(count)) + tx.stats.IncPageAlloc(int64(count * tx.db.pageSize)) return p, nil } @@ -540,7 +540,7 @@ func (tx *Tx) write() error { } // Update statistics. - atomic.AddInt64(&tx.stats.Write, 1) + tx.stats.IncWrite(1) // Exit inner for loop if we've written all the chunks. rem -= sz @@ -599,7 +599,7 @@ func (tx *Tx) writeMeta() error { } // Update statistics. - atomic.AddInt64(&tx.stats.Write, 1) + tx.stats.IncWrite(1) return nil } @@ -698,18 +698,18 @@ type TxStats struct { } func (s *TxStats) add(other *TxStats) { - s.PageCount += other.PageCount - s.PageAlloc += other.PageAlloc - s.CursorCount += other.CursorCount - s.NodeCount += other.NodeCount - s.NodeDeref += other.NodeDeref - s.Rebalance += other.Rebalance - s.RebalanceTime += other.RebalanceTime - s.Split += other.Split - s.Spill += other.Spill - s.SpillTime += other.SpillTime - s.Write += other.Write - s.WriteTime += other.WriteTime + s.IncPageCount(other.GetPageCount()) + s.IncPageAlloc(other.GetPageAlloc()) + s.IncCursorCount(other.GetCursorCount()) + s.IncNodeCount(other.GetNodeCount()) + s.IncNodeDeref(other.GetNodeDeref()) + s.IncRebalance(other.GetRebalance()) + s.IncRebalanceTime(other.GetRebalanceTime()) + s.IncSplit(other.GetSplit()) + s.IncSpill(other.GetSpill()) + s.IncSpillTime(other.GetSpillTime()) + s.IncWrite(other.GetWrite()) + s.IncWriteTime(other.GetWriteTime()) } // Sub calculates and returns the difference between two sets of transaction stats. @@ -717,21 +717,145 @@ func (s *TxStats) add(other *TxStats) { // you need the performance counters that occurred within that time span. func (s *TxStats) Sub(other *TxStats) TxStats { var diff TxStats - diff.PageCount = s.PageCount - other.PageCount - diff.PageAlloc = s.PageAlloc - other.PageAlloc - diff.CursorCount = s.CursorCount - other.CursorCount - diff.NodeCount = s.NodeCount - other.NodeCount - diff.NodeDeref = s.NodeDeref - other.NodeDeref - diff.Rebalance = s.Rebalance - other.Rebalance - diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime - diff.Split = s.Split - other.Split - diff.Spill = s.Spill - other.Spill - diff.SpillTime = s.SpillTime - other.SpillTime - diff.Write = s.Write - other.Write - diff.WriteTime = s.WriteTime - other.WriteTime + diff.PageCount = s.GetPageCount() - other.GetPageCount() + diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc() + diff.CursorCount = s.GetCursorCount() - other.GetCursorCount() + diff.NodeCount = s.GetNodeCount() - other.GetNodeCount() + diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref() + diff.Rebalance = s.GetRebalance() - other.GetRebalance() + diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime() + diff.Split = s.GetSplit() - other.GetSplit() + diff.Spill = s.GetSpill() - other.GetSpill() + diff.SpillTime = s.GetSpillTime() - other.GetSpillTime() + diff.Write = s.GetWrite() - other.GetWrite() + diff.WriteTime = s.GetWriteTime() - other.GetWriteTime() return diff } -func atomicAddDuration(ptr *time.Duration, du time.Duration) { - atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)) +// GetPageCount returns PageCount atomically. +func (s *TxStats) GetPageCount() int64 { + return atomic.LoadInt64(&s.PageCount) +} + +// IncPageCount increases PageCount atomically and returns the new value. +func (s *TxStats) IncPageCount(delta int64) int64 { + return atomic.AddInt64(&s.PageCount, delta) +} + +// GetPageAlloc returns PageAlloc atomically. +func (s *TxStats) GetPageAlloc() int64 { + return atomic.LoadInt64(&s.PageAlloc) +} + +// IncPageAlloc increases PageAlloc atomically and returns the new value. +func (s *TxStats) IncPageAlloc(delta int64) int64 { + return atomic.AddInt64(&s.PageAlloc, delta) +} + +// GetCursorCount returns CursorCount atomically. +func (s *TxStats) GetCursorCount() int64 { + return atomic.LoadInt64(&s.CursorCount) +} + +// IncCursorCount increases CursorCount atomically and return the new value. +func (s *TxStats) IncCursorCount(delta int64) int64 { + return atomic.AddInt64(&s.CursorCount, delta) +} + +// GetNodeCount returns NodeCount atomically. +func (s *TxStats) GetNodeCount() int64 { + return atomic.LoadInt64(&s.NodeCount) +} + +// IncNodeCount increases NodeCount atomically and returns the new value. +func (s *TxStats) IncNodeCount(delta int64) int64 { + return atomic.AddInt64(&s.NodeCount, delta) +} + +// GetNodeDeref returns NodeDeref atomically. +func (s *TxStats) GetNodeDeref() int64 { + return atomic.LoadInt64(&s.NodeDeref) +} + +// IncNodeDeref increases NodeDeref atomically and returns the new value. +func (s *TxStats) IncNodeDeref(delta int64) int64 { + return atomic.AddInt64(&s.NodeDeref, delta) +} + +// GetRebalance returns Rebalance atomically. +func (s *TxStats) GetRebalance() int64 { + return atomic.LoadInt64(&s.Rebalance) +} + +// IncRebalance increases Rebalance atomically and returns the new value. +func (s *TxStats) IncRebalance(delta int64) int64 { + return atomic.AddInt64(&s.Rebalance, delta) +} + +// GetRebalanceTime returns RebalanceTime atomically. +func (s *TxStats) GetRebalanceTime() time.Duration { + return atomicLoadDuration(&s.RebalanceTime) +} + +// IncRebalanceTime increases RebalanceTime atomically and returns the new value. +func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration { + return atomicAddDuration(&s.RebalanceTime, delta) +} + +// GetSplit returns Split atomically. +func (s *TxStats) GetSplit() int64 { + return atomic.LoadInt64(&s.Split) +} + +// IncSplit increases Split atomically and returns the new value. +func (s *TxStats) IncSplit(delta int64) int64 { + return atomic.AddInt64(&s.Split, delta) +} + +// GetSpill returns Spill atomically. +func (s *TxStats) GetSpill() int64 { + return atomic.LoadInt64(&s.Spill) +} + +// IncSpill increases Spill atomically and returns the new value. +func (s *TxStats) IncSpill(delta int64) int64 { + return atomic.AddInt64(&s.Spill, delta) +} + +// GetSpillTime returns SpillTime atomically. +func (s *TxStats) GetSpillTime() time.Duration { + return atomicLoadDuration(&s.SpillTime) +} + +// IncSpillTime increases SpillTime atomically and returns the new value. +func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration { + return atomicAddDuration(&s.SpillTime, delta) +} + +// GetWrite returns Write atomically. +func (s *TxStats) GetWrite() int64 { + return atomic.LoadInt64(&s.Write) +} + +// IncWrite increases Write atomically and returns the new value. +func (s *TxStats) IncWrite(delta int64) int64 { + return atomic.AddInt64(&s.Write, delta) +} + +// GetWriteTime returns WriteTime atomically. +func (s *TxStats) GetWriteTime() time.Duration { + return atomicLoadDuration(&s.WriteTime) +} + +// IncWriteTime increases WriteTime atomically and returns the new value. +func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration { + return atomicAddDuration(&s.WriteTime, delta) +} + +func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration { + return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du))) +} + +func atomicLoadDuration(ptr *time.Duration) time.Duration { + return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr)))) } diff --git a/tx_stats_test.go b/tx_stats_test.go new file mode 100644 index 0000000..e0cbbd4 --- /dev/null +++ b/tx_stats_test.go @@ -0,0 +1,54 @@ +package bbolt + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTxStats_add(t *testing.T) { + statsA := TxStats{ + PageCount: 1, + PageAlloc: 2, + CursorCount: 3, + NodeCount: 100, + NodeDeref: 101, + Rebalance: 1000, + RebalanceTime: 1001 * time.Second, + Split: 10000, + Spill: 10001, + SpillTime: 10001 * time.Second, + Write: 100000, + WriteTime: 100001 * time.Second, + } + + statsB := TxStats{ + PageCount: 2, + PageAlloc: 3, + CursorCount: 4, + NodeCount: 101, + NodeDeref: 102, + Rebalance: 1001, + RebalanceTime: 1002 * time.Second, + Split: 11001, + Spill: 11002, + SpillTime: 11002 * time.Second, + Write: 110001, + WriteTime: 110010 * time.Second, + } + + statsB.add(&statsA) + assert.Equal(t, int64(3), statsB.GetPageCount()) + assert.Equal(t, int64(5), statsB.GetPageAlloc()) + assert.Equal(t, int64(7), statsB.GetCursorCount()) + assert.Equal(t, int64(201), statsB.GetNodeCount()) + assert.Equal(t, int64(203), statsB.GetNodeDeref()) + assert.Equal(t, int64(2001), statsB.GetRebalance()) + assert.Equal(t, 2003*time.Second, statsB.GetRebalanceTime()) + assert.Equal(t, int64(21001), statsB.GetSplit()) + assert.Equal(t, int64(21003), statsB.GetSpill()) + assert.Equal(t, 21003*time.Second, statsB.GetSpillTime()) + assert.Equal(t, int64(210001), statsB.GetWrite()) + assert.Equal(t, 210011*time.Second, statsB.GetWriteTime()) +} diff --git a/tx_test.go b/tx_test.go index 7cfcef4..8b1b46c 100644 --- a/tx_test.go +++ b/tx_test.go @@ -7,6 +7,9 @@ import ( "log" "os" "testing" + "time" + + "github.com/stretchr/testify/assert" bolt "go.etcd.io/bbolt" "go.etcd.io/bbolt/internal/btesting" @@ -902,3 +905,107 @@ func ExampleTx_CopyFile() { // Output: // The value for 'foo' in the clone is: bar } + +func TestTxStats_GetAndIncAtomically(t *testing.T) { + var stats bolt.TxStats + + stats.IncPageCount(1) + assert.Equal(t, int64(1), stats.GetPageCount()) + + stats.IncPageAlloc(2) + assert.Equal(t, int64(2), stats.GetPageAlloc()) + + stats.IncCursorCount(3) + assert.Equal(t, int64(3), stats.GetCursorCount()) + + stats.IncNodeCount(100) + assert.Equal(t, int64(100), stats.GetNodeCount()) + + stats.IncNodeDeref(101) + assert.Equal(t, int64(101), stats.GetNodeDeref()) + + stats.IncRebalance(1000) + assert.Equal(t, int64(1000), stats.GetRebalance()) + + stats.IncRebalanceTime(1001 * time.Second) + assert.Equal(t, 1001*time.Second, stats.GetRebalanceTime()) + + stats.IncSplit(10000) + assert.Equal(t, int64(10000), stats.GetSplit()) + + stats.IncSpill(10001) + assert.Equal(t, int64(10001), stats.GetSpill()) + + stats.IncSpillTime(10001 * time.Second) + assert.Equal(t, 10001*time.Second, stats.GetSpillTime()) + + stats.IncWrite(100000) + assert.Equal(t, int64(100000), stats.GetWrite()) + + stats.IncWriteTime(100001 * time.Second) + assert.Equal(t, 100001*time.Second, stats.GetWriteTime()) + + assert.Equal(t, + bolt.TxStats{ + PageCount: 1, + PageAlloc: 2, + CursorCount: 3, + NodeCount: 100, + NodeDeref: 101, + Rebalance: 1000, + RebalanceTime: 1001 * time.Second, + Split: 10000, + Spill: 10001, + SpillTime: 10001 * time.Second, + Write: 100000, + WriteTime: 100001 * time.Second, + }, + stats, + ) +} + +func TestTxStats_Sub(t *testing.T) { + statsA := bolt.TxStats{ + PageCount: 1, + PageAlloc: 2, + CursorCount: 3, + NodeCount: 100, + NodeDeref: 101, + Rebalance: 1000, + RebalanceTime: 1001 * time.Second, + Split: 10000, + Spill: 10001, + SpillTime: 10001 * time.Second, + Write: 100000, + WriteTime: 100001 * time.Second, + } + + statsB := bolt.TxStats{ + PageCount: 2, + PageAlloc: 3, + CursorCount: 4, + NodeCount: 101, + NodeDeref: 102, + Rebalance: 1001, + RebalanceTime: 1002 * time.Second, + Split: 11001, + Spill: 11002, + SpillTime: 11002 * time.Second, + Write: 110001, + WriteTime: 110010 * time.Second, + } + + diff := statsB.Sub(&statsA) + assert.Equal(t, int64(1), diff.GetPageCount()) + assert.Equal(t, int64(1), diff.GetPageAlloc()) + assert.Equal(t, int64(1), diff.GetCursorCount()) + assert.Equal(t, int64(1), diff.GetNodeCount()) + assert.Equal(t, int64(1), diff.GetNodeDeref()) + assert.Equal(t, int64(1), diff.GetRebalance()) + assert.Equal(t, time.Second, diff.GetRebalanceTime()) + assert.Equal(t, int64(1001), diff.GetSplit()) + assert.Equal(t, int64(1001), diff.GetSpill()) + assert.Equal(t, 1001*time.Second, diff.GetSpillTime()) + assert.Equal(t, int64(10001), diff.GetWrite()) + assert.Equal(t, 10009*time.Second, diff.GetWriteTime()) +}