fix data race on tx.Stats

Fixes: #213

Signed-off-by: Wei Fu <fuweid89@gmail.com>
pull/373/head
Wei Fu 2023-01-02 22:28:55 +08:00
parent b654ce9221
commit 27ac0b8958
4 changed files with 30 additions and 23 deletions

View File

@ -25,7 +25,7 @@ func TestTx_allocatePageStats(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if tx.Stats().PageCount != prePageCnt+allocateCnt { if tx.Stats().PageCount != prePageCnt+int64(allocateCnt) {
t.Errorf("Allocated %d but got %d page in stats", allocateCnt, tx.Stats().PageCount) t.Errorf("Allocated %d but got %d page in stats", allocateCnt, tx.Stats().PageCount)
} }
} }

View File

@ -3,6 +3,7 @@ package bbolt
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"sync/atomic"
"unsafe" "unsafe"
) )
@ -81,7 +82,7 @@ func (b *Bucket) Writable() bool {
// Do not use a cursor after the transaction is closed. // Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor { func (b *Bucket) Cursor() *Cursor {
// Update transaction statistics. // Update transaction statistics.
b.tx.stats.CursorCount++ atomic.AddInt64(&b.tx.stats.CursorCount, 1)
// Allocate and return a cursor. // Allocate and return a cursor.
return &Cursor{ return &Cursor{
@ -681,7 +682,7 @@ func (b *Bucket) node(pgid pgid, parent *node) *node {
b.nodes[pgid] = n b.nodes[pgid] = n
// Update statistics. // Update statistics.
b.tx.stats.NodeCount++ atomic.AddInt64(&b.tx.stats.NodeCount, 1)
return n return n
} }

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"sort" "sort"
"sync/atomic"
"unsafe" "unsafe"
) )
@ -304,7 +305,7 @@ func (n *node) splitTwo(pageSize uintptr) (*node, *node) {
n.inodes = n.inodes[:splitIndex] n.inodes = n.inodes[:splitIndex]
// Update the statistics. // Update the statistics.
n.bucket.tx.stats.Split++ atomic.AddInt64(&n.bucket.tx.stats.Split, 1)
return n, next return n, next
} }
@ -391,7 +392,7 @@ func (n *node) spill() error {
} }
// Update the statistics. // Update the statistics.
tx.stats.Spill++ atomic.AddInt64(&tx.stats.Spill, 1)
} }
// If the root node split and created a new root then we need to spill that // If the root node split and created a new root then we need to spill that
@ -547,7 +548,7 @@ func (n *node) dereference() {
} }
// Update statistics. // Update statistics.
n.bucket.tx.stats.NodeDeref++ atomic.AddInt64(&n.bucket.tx.stats.NodeDeref, 1)
} }
// free adds the node's underlying page to the freelist. // free adds the node's underlying page to the freelist.

39
tx.go
View File

@ -6,6 +6,7 @@ import (
"os" "os"
"sort" "sort"
"strings" "strings"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
) )
@ -151,8 +152,8 @@ func (tx *Tx) Commit() error {
// Rebalance nodes which have had deletions. // Rebalance nodes which have had deletions.
var startTime = time.Now() var startTime = time.Now()
tx.root.rebalance() tx.root.rebalance()
if tx.stats.Rebalance > 0 { if atomic.LoadInt64(&tx.stats.Rebalance) > 0 {
tx.stats.RebalanceTime += time.Since(startTime) atomicAddDuration(&tx.stats.RebalanceTime, time.Since(startTime))
} }
// spill data onto dirty pages. // spill data onto dirty pages.
@ -161,7 +162,7 @@ func (tx *Tx) Commit() error {
tx.rollback() tx.rollback()
return err return err
} }
tx.stats.SpillTime += time.Since(startTime) atomicAddDuration(&tx.stats.SpillTime, time.Since(startTime))
// Free the old root bucket. // Free the old root bucket.
tx.meta.root.root = tx.root.root tx.meta.root.root = tx.root.root
@ -208,7 +209,7 @@ func (tx *Tx) Commit() error {
tx.rollback() tx.rollback()
return err return err
} }
tx.stats.WriteTime += time.Since(startTime) atomicAddDuration(&tx.stats.WriteTime, time.Since(startTime))
// Finalize the transaction. // Finalize the transaction.
tx.close() tx.close()
@ -503,8 +504,8 @@ func (tx *Tx) allocate(count int) (*page, error) {
tx.pages[p.id] = p tx.pages[p.id] = p
// Update statistics. // Update statistics.
tx.stats.PageCount += count atomic.AddInt64(&tx.stats.PageCount, int64(count))
tx.stats.PageAlloc += count * tx.db.pageSize atomic.AddInt64(&tx.stats.PageAlloc, int64(count*tx.db.pageSize))
return p, nil return p, nil
} }
@ -539,7 +540,7 @@ func (tx *Tx) write() error {
} }
// Update statistics. // Update statistics.
tx.stats.Write++ atomic.AddInt64(&tx.stats.Write, 1)
// Exit inner for loop if we've written all the chunks. // Exit inner for loop if we've written all the chunks.
rem -= sz rem -= sz
@ -598,7 +599,7 @@ func (tx *Tx) writeMeta() error {
} }
// Update statistics. // Update statistics.
tx.stats.Write++ atomic.AddInt64(&tx.stats.Write, 1)
return nil return nil
} }
@ -672,27 +673,27 @@ func (tx *Tx) Page(id int) (*PageInfo, error) {
// TxStats represents statistics about the actions performed by the transaction. // TxStats represents statistics about the actions performed by the transaction.
type TxStats struct { type TxStats struct {
// Page statistics. // Page statistics.
PageCount int // number of page allocations PageCount int64 // number of page allocations
PageAlloc int // total bytes allocated PageAlloc int64 // total bytes allocated
// Cursor statistics. // Cursor statistics.
CursorCount int // number of cursors created CursorCount int64 // number of cursors created
// Node statistics // Node statistics
NodeCount int // number of node allocations NodeCount int64 // number of node allocations
NodeDeref int // number of node dereferences NodeDeref int64 // number of node dereferences
// Rebalance statistics. // Rebalance statistics.
Rebalance int // number of node rebalances Rebalance int64 // number of node rebalances
RebalanceTime time.Duration // total time spent rebalancing RebalanceTime time.Duration // total time spent rebalancing
// Split/Spill statistics. // Split/Spill statistics.
Split int // number of nodes split Split int64 // number of nodes split
Spill int // number of nodes spilled Spill int64 // number of nodes spilled
SpillTime time.Duration // total time spent spilling SpillTime time.Duration // total time spent spilling
// Write statistics. // Write statistics.
Write int // number of writes performed Write int64 // number of writes performed
WriteTime time.Duration // total time spent writing to disk WriteTime time.Duration // total time spent writing to disk
} }
@ -730,3 +731,7 @@ func (s *TxStats) Sub(other *TxStats) TxStats {
diff.WriteTime = s.WriteTime - other.WriteTime diff.WriteTime = s.WriteTime - other.WriteTime
return diff return diff
} }
func atomicAddDuration(ptr *time.Duration, du time.Duration) {
atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du))
}