TxStats: provides Getter/Inc functions

Signed-off-by: Wei Fu <fuweid89@gmail.com>
pull/373/head
Wei Fu 2023-01-04 20:06:51 +08:00
parent b7f2da4843
commit dd4458ce3a
8 changed files with 340 additions and 55 deletions

View File

@ -18,14 +18,16 @@ func TestTx_allocatePageStats(t *testing.T) {
pages: make(map[pgid]*page),
}
prePageCnt := tx.Stats().PageCount
txStats := tx.Stats()
prePageCnt := txStats.GetPageCount()
allocateCnt := f.free_count()
if _, err := tx.allocate(allocateCnt); err != nil {
t.Fatal(err)
}
if tx.Stats().PageCount != prePageCnt+int64(allocateCnt) {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, tx.Stats().PageCount)
txStats = tx.Stats()
if txStats.GetPageCount() != prePageCnt+int64(allocateCnt) {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, txStats.GetPageCount())
}
}

View File

@ -3,7 +3,6 @@ package bbolt
import (
"bytes"
"fmt"
"sync/atomic"
"unsafe"
)
@ -82,7 +81,7 @@ func (b *Bucket) Writable() bool {
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics.
atomic.AddInt64(&b.tx.stats.CursorCount, 1)
b.tx.stats.IncCursorCount(1)
// Allocate and return a cursor.
return &Cursor{
@ -682,7 +681,7 @@ func (b *Bucket) node(pgid pgid, parent *node) *node {
b.nodes[pgid] = n
// Update statistics.
atomic.AddInt64(&b.tx.stats.NodeCount, 1)
b.tx.stats.IncNodeCount(1)
return n
}

View File

@ -1038,8 +1038,8 @@ func TestDB_Stats(t *testing.T) {
}
stats := db.Stats()
if stats.TxStats.PageCount != 2 {
t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.PageCount)
if stats.TxStats.GetPageCount() != 2 {
t.Fatalf("unexpected TxStats.PageCount: %d", stats.TxStats.GetPageCount())
} else if stats.FreePageN != 0 {
t.Fatalf("unexpected FreePageN != 0: %d", stats.FreePageN)
} else if stats.PendingPageN != 2 {
@ -1122,8 +1122,8 @@ func TestDBStats_Sub(t *testing.T) {
b.TxStats.PageCount = 10
b.FreePageN = 14
diff := b.Sub(&a)
if diff.TxStats.PageCount != 7 {
t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.PageCount)
if diff.TxStats.GetPageCount() != 7 {
t.Fatalf("unexpected TxStats.PageCount: %d", diff.TxStats.GetPageCount())
}
// free page stats are copied from the receiver and not subtracted

View File

@ -189,14 +189,14 @@ func (db *DB) CopyTempFile() {
func (db *DB) PrintStats() {
var stats = db.Stats()
fmt.Printf("[db] %-20s %-20s %-20s\n",
fmt.Sprintf("pg(%d/%d)", stats.TxStats.PageCount, stats.TxStats.PageAlloc),
fmt.Sprintf("cur(%d)", stats.TxStats.CursorCount),
fmt.Sprintf("node(%d/%d)", stats.TxStats.NodeCount, stats.TxStats.NodeDeref),
fmt.Sprintf("pg(%d/%d)", stats.TxStats.GetPageCount(), stats.TxStats.GetPageAlloc()),
fmt.Sprintf("cur(%d)", stats.TxStats.GetCursorCount()),
fmt.Sprintf("node(%d/%d)", stats.TxStats.GetNodeCount(), stats.TxStats.GetNodeDeref()),
)
fmt.Printf(" %-20s %-20s %-20s\n",
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.Rebalance, truncDuration(stats.TxStats.RebalanceTime)),
fmt.Sprintf("spill(%d/%v)", stats.TxStats.Spill, truncDuration(stats.TxStats.SpillTime)),
fmt.Sprintf("w(%d/%v)", stats.TxStats.Write, truncDuration(stats.TxStats.WriteTime)),
fmt.Sprintf("rebal(%d/%v)", stats.TxStats.GetRebalance(), truncDuration(stats.TxStats.GetRebalanceTime())),
fmt.Sprintf("spill(%d/%v)", stats.TxStats.GetSpill(), truncDuration(stats.TxStats.GetSpillTime())),
fmt.Sprintf("w(%d/%v)", stats.TxStats.GetWrite(), truncDuration(stats.TxStats.GetWriteTime())),
)
}

View File

@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"sort"
"sync/atomic"
"unsafe"
)
@ -305,7 +304,7 @@ func (n *node) splitTwo(pageSize uintptr) (*node, *node) {
n.inodes = n.inodes[:splitIndex]
// Update the statistics.
atomic.AddInt64(&n.bucket.tx.stats.Split, 1)
n.bucket.tx.stats.IncSplit(1)
return n, next
}
@ -392,7 +391,7 @@ func (n *node) spill() error {
}
// Update the statistics.
atomic.AddInt64(&tx.stats.Spill, 1)
tx.stats.IncSpill(1)
}
// If the root node split and created a new root then we need to spill that
@ -414,7 +413,7 @@ func (n *node) rebalance() {
n.unbalanced = false
// Update statistics.
n.bucket.tx.stats.Rebalance++
n.bucket.tx.stats.IncRebalance(1)
// Ignore if node is above threshold (25%) and has enough keys.
var threshold = n.bucket.tx.db.pageSize / 4
@ -548,7 +547,7 @@ func (n *node) dereference() {
}
// Update statistics.
atomic.AddInt64(&n.bucket.tx.stats.NodeDeref, 1)
n.bucket.tx.stats.IncNodeDeref(1)
}
// free adds the node's underlying page to the freelist.

192
tx.go
View File

@ -152,8 +152,8 @@ func (tx *Tx) Commit() error {
// Rebalance nodes which have had deletions.
var startTime = time.Now()
tx.root.rebalance()
if atomic.LoadInt64(&tx.stats.Rebalance) > 0 {
atomicAddDuration(&tx.stats.RebalanceTime, time.Since(startTime))
if tx.stats.GetRebalance() > 0 {
tx.stats.IncRebalanceTime(time.Since(startTime))
}
// spill data onto dirty pages.
@ -162,7 +162,7 @@ func (tx *Tx) Commit() error {
tx.rollback()
return err
}
atomicAddDuration(&tx.stats.SpillTime, time.Since(startTime))
tx.stats.IncSpillTime(time.Since(startTime))
// Free the old root bucket.
tx.meta.root.root = tx.root.root
@ -209,7 +209,7 @@ func (tx *Tx) Commit() error {
tx.rollback()
return err
}
atomicAddDuration(&tx.stats.WriteTime, time.Since(startTime))
tx.stats.IncWriteTime(time.Since(startTime))
// Finalize the transaction.
tx.close()
@ -504,8 +504,8 @@ func (tx *Tx) allocate(count int) (*page, error) {
tx.pages[p.id] = p
// Update statistics.
atomic.AddInt64(&tx.stats.PageCount, int64(count))
atomic.AddInt64(&tx.stats.PageAlloc, int64(count*tx.db.pageSize))
tx.stats.IncPageCount(int64(count))
tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
return p, nil
}
@ -540,7 +540,7 @@ func (tx *Tx) write() error {
}
// Update statistics.
atomic.AddInt64(&tx.stats.Write, 1)
tx.stats.IncWrite(1)
// Exit inner for loop if we've written all the chunks.
rem -= sz
@ -599,7 +599,7 @@ func (tx *Tx) writeMeta() error {
}
// Update statistics.
atomic.AddInt64(&tx.stats.Write, 1)
tx.stats.IncWrite(1)
return nil
}
@ -698,18 +698,18 @@ type TxStats struct {
}
func (s *TxStats) add(other *TxStats) {
s.PageCount += other.PageCount
s.PageAlloc += other.PageAlloc
s.CursorCount += other.CursorCount
s.NodeCount += other.NodeCount
s.NodeDeref += other.NodeDeref
s.Rebalance += other.Rebalance
s.RebalanceTime += other.RebalanceTime
s.Split += other.Split
s.Spill += other.Spill
s.SpillTime += other.SpillTime
s.Write += other.Write
s.WriteTime += other.WriteTime
s.IncPageCount(other.GetPageCount())
s.IncPageAlloc(other.GetPageAlloc())
s.IncCursorCount(other.GetCursorCount())
s.IncNodeCount(other.GetNodeCount())
s.IncNodeDeref(other.GetNodeDeref())
s.IncRebalance(other.GetRebalance())
s.IncRebalanceTime(other.GetRebalanceTime())
s.IncSplit(other.GetSplit())
s.IncSpill(other.GetSpill())
s.IncSpillTime(other.GetSpillTime())
s.IncWrite(other.GetWrite())
s.IncWriteTime(other.GetWriteTime())
}
// Sub calculates and returns the difference between two sets of transaction stats.
@ -717,21 +717,145 @@ func (s *TxStats) add(other *TxStats) {
// you need the performance counters that occurred within that time span.
func (s *TxStats) Sub(other *TxStats) TxStats {
var diff TxStats
diff.PageCount = s.PageCount - other.PageCount
diff.PageAlloc = s.PageAlloc - other.PageAlloc
diff.CursorCount = s.CursorCount - other.CursorCount
diff.NodeCount = s.NodeCount - other.NodeCount
diff.NodeDeref = s.NodeDeref - other.NodeDeref
diff.Rebalance = s.Rebalance - other.Rebalance
diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
diff.Split = s.Split - other.Split
diff.Spill = s.Spill - other.Spill
diff.SpillTime = s.SpillTime - other.SpillTime
diff.Write = s.Write - other.Write
diff.WriteTime = s.WriteTime - other.WriteTime
diff.PageCount = s.GetPageCount() - other.GetPageCount()
diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
diff.Rebalance = s.GetRebalance() - other.GetRebalance()
diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
diff.Split = s.GetSplit() - other.GetSplit()
diff.Spill = s.GetSpill() - other.GetSpill()
diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
diff.Write = s.GetWrite() - other.GetWrite()
diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
return diff
}
func atomicAddDuration(ptr *time.Duration, du time.Duration) {
atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du))
// GetPageCount returns PageCount atomically.
func (s *TxStats) GetPageCount() int64 {
return atomic.LoadInt64(&s.PageCount)
}
// IncPageCount increases PageCount atomically and returns the new value.
func (s *TxStats) IncPageCount(delta int64) int64 {
return atomic.AddInt64(&s.PageCount, delta)
}
// GetPageAlloc returns PageAlloc atomically.
func (s *TxStats) GetPageAlloc() int64 {
return atomic.LoadInt64(&s.PageAlloc)
}
// IncPageAlloc increases PageAlloc atomically and returns the new value.
func (s *TxStats) IncPageAlloc(delta int64) int64 {
return atomic.AddInt64(&s.PageAlloc, delta)
}
// GetCursorCount returns CursorCount atomically.
func (s *TxStats) GetCursorCount() int64 {
return atomic.LoadInt64(&s.CursorCount)
}
// IncCursorCount increases CursorCount atomically and return the new value.
func (s *TxStats) IncCursorCount(delta int64) int64 {
return atomic.AddInt64(&s.CursorCount, delta)
}
// GetNodeCount returns NodeCount atomically.
func (s *TxStats) GetNodeCount() int64 {
return atomic.LoadInt64(&s.NodeCount)
}
// IncNodeCount increases NodeCount atomically and returns the new value.
func (s *TxStats) IncNodeCount(delta int64) int64 {
return atomic.AddInt64(&s.NodeCount, delta)
}
// GetNodeDeref returns NodeDeref atomically.
func (s *TxStats) GetNodeDeref() int64 {
return atomic.LoadInt64(&s.NodeDeref)
}
// IncNodeDeref increases NodeDeref atomically and returns the new value.
func (s *TxStats) IncNodeDeref(delta int64) int64 {
return atomic.AddInt64(&s.NodeDeref, delta)
}
// GetRebalance returns Rebalance atomically.
func (s *TxStats) GetRebalance() int64 {
return atomic.LoadInt64(&s.Rebalance)
}
// IncRebalance increases Rebalance atomically and returns the new value.
func (s *TxStats) IncRebalance(delta int64) int64 {
return atomic.AddInt64(&s.Rebalance, delta)
}
// GetRebalanceTime returns RebalanceTime atomically.
func (s *TxStats) GetRebalanceTime() time.Duration {
return atomicLoadDuration(&s.RebalanceTime)
}
// IncRebalanceTime increases RebalanceTime atomically and returns the new value.
func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.RebalanceTime, delta)
}
// GetSplit returns Split atomically.
func (s *TxStats) GetSplit() int64 {
return atomic.LoadInt64(&s.Split)
}
// IncSplit increases Split atomically and returns the new value.
func (s *TxStats) IncSplit(delta int64) int64 {
return atomic.AddInt64(&s.Split, delta)
}
// GetSpill returns Spill atomically.
func (s *TxStats) GetSpill() int64 {
return atomic.LoadInt64(&s.Spill)
}
// IncSpill increases Spill atomically and returns the new value.
func (s *TxStats) IncSpill(delta int64) int64 {
return atomic.AddInt64(&s.Spill, delta)
}
// GetSpillTime returns SpillTime atomically.
func (s *TxStats) GetSpillTime() time.Duration {
return atomicLoadDuration(&s.SpillTime)
}
// IncSpillTime increases SpillTime atomically and returns the new value.
func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.SpillTime, delta)
}
// GetWrite returns Write atomically.
func (s *TxStats) GetWrite() int64 {
return atomic.LoadInt64(&s.Write)
}
// IncWrite increases Write atomically and returns the new value.
func (s *TxStats) IncWrite(delta int64) int64 {
return atomic.AddInt64(&s.Write, delta)
}
// GetWriteTime returns WriteTime atomically.
func (s *TxStats) GetWriteTime() time.Duration {
return atomicLoadDuration(&s.WriteTime)
}
// IncWriteTime increases WriteTime atomically and returns the new value.
func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
return atomicAddDuration(&s.WriteTime, delta)
}
func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
}
func atomicLoadDuration(ptr *time.Duration) time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
}

54
tx_stats_test.go Normal file
View File

@ -0,0 +1,54 @@
package bbolt
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTxStats_add(t *testing.T) {
statsA := TxStats{
PageCount: 1,
PageAlloc: 2,
CursorCount: 3,
NodeCount: 100,
NodeDeref: 101,
Rebalance: 1000,
RebalanceTime: 1001 * time.Second,
Split: 10000,
Spill: 10001,
SpillTime: 10001 * time.Second,
Write: 100000,
WriteTime: 100001 * time.Second,
}
statsB := TxStats{
PageCount: 2,
PageAlloc: 3,
CursorCount: 4,
NodeCount: 101,
NodeDeref: 102,
Rebalance: 1001,
RebalanceTime: 1002 * time.Second,
Split: 11001,
Spill: 11002,
SpillTime: 11002 * time.Second,
Write: 110001,
WriteTime: 110010 * time.Second,
}
statsB.add(&statsA)
assert.Equal(t, int64(3), statsB.GetPageCount())
assert.Equal(t, int64(5), statsB.GetPageAlloc())
assert.Equal(t, int64(7), statsB.GetCursorCount())
assert.Equal(t, int64(201), statsB.GetNodeCount())
assert.Equal(t, int64(203), statsB.GetNodeDeref())
assert.Equal(t, int64(2001), statsB.GetRebalance())
assert.Equal(t, 2003*time.Second, statsB.GetRebalanceTime())
assert.Equal(t, int64(21001), statsB.GetSplit())
assert.Equal(t, int64(21003), statsB.GetSpill())
assert.Equal(t, 21003*time.Second, statsB.GetSpillTime())
assert.Equal(t, int64(210001), statsB.GetWrite())
assert.Equal(t, 210011*time.Second, statsB.GetWriteTime())
}

View File

@ -7,6 +7,9 @@ import (
"log"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt"
"go.etcd.io/bbolt/internal/btesting"
@ -902,3 +905,107 @@ func ExampleTx_CopyFile() {
// Output:
// The value for 'foo' in the clone is: bar
}
func TestTxStats_GetAndIncAtomically(t *testing.T) {
var stats bolt.TxStats
stats.IncPageCount(1)
assert.Equal(t, int64(1), stats.GetPageCount())
stats.IncPageAlloc(2)
assert.Equal(t, int64(2), stats.GetPageAlloc())
stats.IncCursorCount(3)
assert.Equal(t, int64(3), stats.GetCursorCount())
stats.IncNodeCount(100)
assert.Equal(t, int64(100), stats.GetNodeCount())
stats.IncNodeDeref(101)
assert.Equal(t, int64(101), stats.GetNodeDeref())
stats.IncRebalance(1000)
assert.Equal(t, int64(1000), stats.GetRebalance())
stats.IncRebalanceTime(1001 * time.Second)
assert.Equal(t, 1001*time.Second, stats.GetRebalanceTime())
stats.IncSplit(10000)
assert.Equal(t, int64(10000), stats.GetSplit())
stats.IncSpill(10001)
assert.Equal(t, int64(10001), stats.GetSpill())
stats.IncSpillTime(10001 * time.Second)
assert.Equal(t, 10001*time.Second, stats.GetSpillTime())
stats.IncWrite(100000)
assert.Equal(t, int64(100000), stats.GetWrite())
stats.IncWriteTime(100001 * time.Second)
assert.Equal(t, 100001*time.Second, stats.GetWriteTime())
assert.Equal(t,
bolt.TxStats{
PageCount: 1,
PageAlloc: 2,
CursorCount: 3,
NodeCount: 100,
NodeDeref: 101,
Rebalance: 1000,
RebalanceTime: 1001 * time.Second,
Split: 10000,
Spill: 10001,
SpillTime: 10001 * time.Second,
Write: 100000,
WriteTime: 100001 * time.Second,
},
stats,
)
}
func TestTxStats_Sub(t *testing.T) {
statsA := bolt.TxStats{
PageCount: 1,
PageAlloc: 2,
CursorCount: 3,
NodeCount: 100,
NodeDeref: 101,
Rebalance: 1000,
RebalanceTime: 1001 * time.Second,
Split: 10000,
Spill: 10001,
SpillTime: 10001 * time.Second,
Write: 100000,
WriteTime: 100001 * time.Second,
}
statsB := bolt.TxStats{
PageCount: 2,
PageAlloc: 3,
CursorCount: 4,
NodeCount: 101,
NodeDeref: 102,
Rebalance: 1001,
RebalanceTime: 1002 * time.Second,
Split: 11001,
Spill: 11002,
SpillTime: 11002 * time.Second,
Write: 110001,
WriteTime: 110010 * time.Second,
}
diff := statsB.Sub(&statsA)
assert.Equal(t, int64(1), diff.GetPageCount())
assert.Equal(t, int64(1), diff.GetPageAlloc())
assert.Equal(t, int64(1), diff.GetCursorCount())
assert.Equal(t, int64(1), diff.GetNodeCount())
assert.Equal(t, int64(1), diff.GetNodeDeref())
assert.Equal(t, int64(1), diff.GetRebalance())
assert.Equal(t, time.Second, diff.GetRebalanceTime())
assert.Equal(t, int64(1001), diff.GetSplit())
assert.Equal(t, int64(1001), diff.GetSpill())
assert.Equal(t, 1001*time.Second, diff.GetSpillTime())
assert.Equal(t, int64(10001), diff.GetWrite())
assert.Equal(t, 10009*time.Second, diff.GetWriteTime())
}