mirror of https://github.com/etcd-io/bbolt.git
parent
2eb7227ade
commit
a0458a2b35
22
freelist.go
22
freelist.go
|
@ -349,6 +349,28 @@ func (f *freelist) reload(p *page) {
|
||||||
f.readIDs(a)
|
f.readIDs(a)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// noSyncReload reads the freelist from pgids and filters out pending items.
|
||||||
|
func (f *freelist) noSyncReload(pgids []pgid) {
|
||||||
|
// Build a cache of only pending pages.
|
||||||
|
pcache := make(map[pgid]bool)
|
||||||
|
for _, txp := range f.pending {
|
||||||
|
for _, pendingID := range txp.ids {
|
||||||
|
pcache[pendingID] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check each page in the freelist and build a new available freelist
|
||||||
|
// with any pages not in the pending lists.
|
||||||
|
var a []pgid
|
||||||
|
for _, id := range pgids {
|
||||||
|
if !pcache[id] {
|
||||||
|
a = append(a, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f.readIDs(a)
|
||||||
|
}
|
||||||
|
|
||||||
// reindex rebuilds the free cache based on available and pending free lists.
|
// reindex rebuilds the free cache based on available and pending free lists.
|
||||||
func (f *freelist) reindex() {
|
func (f *freelist) reindex() {
|
||||||
ids := f.getFreePageIDs()
|
ids := f.getFreePageIDs()
|
||||||
|
|
23
tx.go
23
tx.go
|
@ -254,17 +254,36 @@ func (tx *Tx) Rollback() error {
|
||||||
if tx.db == nil {
|
if tx.db == nil {
|
||||||
return ErrTxClosed
|
return ErrTxClosed
|
||||||
}
|
}
|
||||||
tx.rollback()
|
tx.nonPhysicalRollback()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
|
||||||
|
func (tx *Tx) nonPhysicalRollback() {
|
||||||
|
if tx.db == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tx.writable {
|
||||||
|
tx.db.freelist.rollback(tx.meta.txid)
|
||||||
|
}
|
||||||
|
tx.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// rollback needs to reload the free pages from disk in case some system error happens like fsync error.
|
||||||
func (tx *Tx) rollback() {
|
func (tx *Tx) rollback() {
|
||||||
if tx.db == nil {
|
if tx.db == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if tx.writable {
|
if tx.writable {
|
||||||
tx.db.freelist.rollback(tx.meta.txid)
|
tx.db.freelist.rollback(tx.meta.txid)
|
||||||
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
|
if !tx.db.hasSyncedFreelist() {
|
||||||
|
// Reconstruct free page list by scanning the DB to get the whole free page list.
|
||||||
|
// Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode.
|
||||||
|
tx.db.freelist.noSyncReload(tx.db.freepages())
|
||||||
|
} else {
|
||||||
|
// Read free page list from freelist page.
|
||||||
|
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tx.close()
|
tx.close()
|
||||||
}
|
}
|
||||||
|
|
51
tx_test.go
51
tx_test.go
|
@ -654,6 +654,57 @@ func TestTx_CopyFile_Error_Normal(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTx_Rollback ensures there is no error when tx rollback whether we sync freelist or not.
|
||||||
|
func TestTx_Rollback(t *testing.T) {
|
||||||
|
for _, isSyncFreelist := range []bool{false, true} {
|
||||||
|
// Open the database.
|
||||||
|
db, err := bolt.Open(tempfile(), 0666, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.Remove(db.Path())
|
||||||
|
db.NoFreelistSync = isSyncFreelist
|
||||||
|
|
||||||
|
tx, err := db.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error starting tx: %v", err)
|
||||||
|
}
|
||||||
|
bucket := []byte("mybucket")
|
||||||
|
if _, err := tx.CreateBucket(bucket); err != nil {
|
||||||
|
t.Fatalf("Error creating bucket: %v", err)
|
||||||
|
}
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
t.Fatalf("Error on commit: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err = db.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error starting tx: %v", err)
|
||||||
|
}
|
||||||
|
b := tx.Bucket(bucket)
|
||||||
|
if err := b.Put([]byte("k"), []byte("v")); err != nil {
|
||||||
|
t.Fatalf("Error on put: %v", err)
|
||||||
|
}
|
||||||
|
// Imagine there is an error and tx needs to be rolled-back
|
||||||
|
if err := tx.Rollback(); err != nil {
|
||||||
|
t.Fatalf("Error on rollback: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err = db.Begin(false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error starting tx: %v", err)
|
||||||
|
}
|
||||||
|
b = tx.Bucket(bucket)
|
||||||
|
if v := b.Get([]byte("k")); v != nil {
|
||||||
|
t.Fatalf("Value for k should not have been stored")
|
||||||
|
}
|
||||||
|
if err := tx.Rollback(); err != nil {
|
||||||
|
t.Fatalf("Error on rollback: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestTx_releaseRange ensures db.freePages handles page releases
|
// TestTx_releaseRange ensures db.freePages handles page releases
|
||||||
// correctly when there are transaction that are no longer reachable
|
// correctly when there are transaction that are no longer reachable
|
||||||
// via any read/write transactions and are "between" ongoing read
|
// via any read/write transactions and are "between" ongoing read
|
||||||
|
|
Loading…
Reference in New Issue