Refactoring to RWCursor, RWTxn, and branch/leaf nodes and pages.

pull/34/head
Ben Johnson 2014-01-17 15:23:39 -07:00
parent cec7b942e7
commit 153372abd4
10 changed files with 1048 additions and 1203 deletions

View File

@ -1,25 +1,29 @@
package bolt
import (
"unsafe"
)
const (
bigNode = 0x01
subNode = 0x02
dupNode = 0x04
)
type node struct {
lo int
hi int
flags int
keySize int
data []byte
// branchNode represents a node on a branch page.
type branchNode struct {
pgno uint32
flags uint16
keySize uint16
data uintptr // Pointer to the beginning of the data.
}
func (n *node) setFlags(f int) {
// Valid flags: (F_DUPDATA|F_SUBDATA|MDB_RESERVE|MDB_APPEND)
// TODO
// key returns a byte slice that of the key data.
func (n *branchNode) key() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n.data))[:n.keySize]
}
func (n *node) size() int {
func (n *branchNode) size() int {
return 0 // TODO: offsetof(MDB_node, mn_data)
}

View File

@ -29,3 +29,4 @@ type bucket struct {
entries uint64
root pgno
}

1377
cursor.go

File diff suppressed because it is too large Load Diff

5
db.go
View File

@ -45,9 +45,8 @@ type DB struct {
mmapSize int /**< size of the data memory map */
size int /**< current file size */
pbuf []byte
transaction *Transaction /**< current write transaction */
transaction *RWTransaction /**< current write transaction */
maxPageNumber int /**< me_mapsize / me_psize */
pagestate pagestate /**< state of old pages from freeDB */
dpages []*page /**< list of malloc'd blocks for re-use */
freePages []int /** IDL of pages that became unused in a write txn */
dirtyPages []int /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */
@ -223,6 +222,8 @@ func (db *DB) Transaction(writable bool) (*Transaction, error) {
db: db,
meta: db.meta(),
writable: writable,
buckets: make(map[string]*Bucket),
cursors: make(map[uint32]*Cursor),
}
// Save references to the sys•free and sys•buckets buckets.

18
leaf_node.go Normal file
View File

@ -0,0 +1,18 @@
package bolt
import (
"unsafe"
)
// leafNode represents a node on a leaf page.
type leafNode struct {
flags uint16
keySize uint16
dataSize uint32
data uintptr // Pointer to the beginning of the data.
}
// key returns a byte slice that of the key data.
func (n *leafNode) key() []byte {
return (*[MaxKeySize]byte)(unsafe.Pointer(&n.data))[:n.keySize]
}

72
page.go
View File

@ -1,6 +1,7 @@
package bolt
import (
"bytes"
"unsafe"
)
@ -53,16 +54,11 @@ type indx uint16
type page struct {
id pgno
flags int
flags uint16
lower indx
upper indx
overflow int
ptr int
}
type pagestate struct {
head int /**< Reclaimed freeDB pages, or NULL before use */
last int /**< ID of last used record, or 0 if !mf_pghead */
overflow uint32
ptr uintptr
}
// meta returns a pointer to the metadata section of the page.
@ -92,12 +88,66 @@ func (p *page) init(pageSize int) {
m.buckets.root = p_invalid
}
// nodeCount returns the number of nodes on the page.
func (p *page) nodeCount() int {
return 0 // (p.header.lower - unsafe.Sizeof(p.header) >> 1
// branchNode retrieves the branch node at the given index within the page.
func (p *page) branchNode(index indx) *branchNode {
b := (*[maxPageSize]byte)(unsafe.Pointer(&p.ptr))
return (*branchNode)(unsafe.Pointer(&b[index * indx(unsafe.Sizeof(index))]))
}
// leafNode retrieves the leaf node at the given index within the page.
func (p *page) leafNode(index indx) *leafNode {
b := (*[maxPageSize]byte)(unsafe.Pointer(&p.ptr))
return (*leafNode)(unsafe.Pointer(&b[index * indx(unsafe.Sizeof(index))]))
}
// numkeys returns the number of nodes in the page.
func (p *page) numkeys() int {
return int((p.lower - indx(pageHeaderSize)) >> 1)
}
// remainingSize returns the number of bytes left in the page.
func (p *page) remainingSize() int {
return int(p.upper - p.lower)
}
// find returns the node with the smallest entry larger or equal to the key.
// This function also returns a boolean stating if an exact match was made.
func (p *page) find(key []byte, pageSize int) (*node, int, bool) {
// TODO: MDB_page *mp = mc->mc_pg[mc->mc_top];
var node *node
nkeys := p.numkeys()
low, high := 1, nkeys - 1
if (p.flags & p_leaf) != 0 {
low = 0
}
// Perform a binary search to find the correct node.
var i, rc int
for ; low <= high; {
i = (low + high) / 2
node = p.node(indx(i))
rc = bytes.Compare(key, node.key())
if rc == 0 {
break;
} else if rc > 0 {
low = i + 1
} else {
high = i - 1
}
}
// Found entry is less than key so grab the next one.
if rc > 0 {
i++
}
// If index is beyond key range then return nil.
if i >= nkeys {
node = nil
}
exact := (rc == 0 && nkeys > 0)
return node, i, exact
}

601
rwcursor.go Normal file
View File

@ -0,0 +1,601 @@
package bolt
// RWCursor represents a cursor that can read and write data for a bucket.
type RWCursor struct {
Cursor
transaction *RWTransaction
reclaimed []pgno /**< Reclaimed freeDB pages, or NULL before use (was me_pghead) */
last txnid /**< ID of last used record, or 0 if len(reclaimed) == 0 */
}
func (c *RWCursor) Put(key []byte, value []byte) error {
// Make sure this cursor was created by a transaction.
if c.transaction == nil {
return &Error{"invalid cursor", nil}
}
db := c.transaction.db
// Validate the key we're using.
if key == nil {
return &Error{"key required", nil}
} else if len(key) > db.maxKeySize {
return &Error{"key too large", nil}
}
// TODO: Validate data size based on MaxKeySize if DUPSORT.
// Validate the size of our data.
if len(data) > MaxDataSize {
return &Error{"data too large", nil}
}
// If we don't have a root page then add one.
if c.bucket.root == p_invalid {
p, err := c.newLeafPage()
if err != nil {
return err
}
c.push(p)
c.bucket.root = p.id
c.bucket.root++
// TODO: *mc->mc_dbflag |= DB_DIRTY;
// TODO? mc->mc_flags |= C_INITIALIZED;
}
// TODO: Move to key.
exists, err := c.moveTo(key)
if err != nil {
return err
}
// TODO: spill?
if err := c.spill(key, data); err != nil {
return err
}
// Make sure all cursor pages are writable
if err := c.touch(); err != nil {
return err
}
// If key does not exist the
if exists {
node := c.currentNode()
}
/*
insert = rc;
if (insert) {
// The key does not exist
DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top]));
if ((mc->mc_db->md_flags & MDB_DUPSORT) &&
LEAFSIZE(key, data) > env->me_nodemax)
{
// Too big for a node, insert in sub-DB
fp_flags = P_LEAF|P_DIRTY;
fp = env->me_pbuf;
fp->mp_pad = data->mv_size; // used if MDB_DUPFIXED
fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ;
goto prep_subDB;
}
} else {
more:
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
olddata.mv_size = NODEDSZ(leaf);
olddata.mv_data = NODEDATA(leaf);
// DB has dups?
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
// Prepare (sub-)page/sub-DB to accept the new item,
// if needed. fp: old sub-page or a header faking
// it. mp: new (sub-)page. offset: growth in page
// size. xdata: node data with new page or DB.
ssize_t i, offset = 0;
mp = fp = xdata.mv_data = env->me_pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
// Was a single item before, must convert now
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
// Just overwrite the current item
if (flags == MDB_CURRENT)
goto current;
#if UINT_MAX < SIZE_MAX
if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t))
#ifdef MISALIGNED_OK
mc->mc_dbx->md_dcmp = mdb_cmp_long;
#else
mc->mc_dbx->md_dcmp = mdb_cmp_cint;
#endif
#endif
// if data matches, skip it
if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
if (flags & MDB_NODUPDATA)
rc = MDB_KEYEXIST;
else if (flags & MDB_MULTIPLE)
goto next_mult;
else
rc = MDB_SUCCESS;
return rc;
}
// Back up original data item
dkey.mv_size = olddata.mv_size;
dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size);
// Make sub-page header for the dup items, with dummy body
fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP;
fp->mp_lower = PAGEHDRSZ;
xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size;
if (mc->mc_db->md_flags & MDB_DUPFIXED) {
fp->mp_flags |= P_LEAF2;
fp->mp_pad = data->mv_size;
xdata.mv_size += 2 * data->mv_size; // leave space for 2 more
} else {
xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) +
(dkey.mv_size & 1) + (data->mv_size & 1);
}
fp->mp_upper = xdata.mv_size;
olddata.mv_size = fp->mp_upper; // pretend olddata is fp
} else if (leaf->mn_flags & F_SUBDATA) {
// Data is on sub-DB, just store it
flags |= F_DUPDATA|F_SUBDATA;
goto put_sub;
} else {
// Data is on sub-page
fp = olddata.mv_data;
switch (flags) {
default:
i = -(ssize_t)SIZELEFT(fp);
if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) {
offset = i += (ssize_t) EVEN(
sizeof(indx_t) + NODESIZE + data->mv_size);
} else {
i += offset = fp->mp_pad;
offset *= 4; // space for 4 more
}
if (i > 0)
break;
// FALLTHRU: Sub-page is big enough
case MDB_CURRENT:
fp->mp_flags |= P_DIRTY;
COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
mc->mc_xcursor->mx_cursor.mc_pg[0] = fp;
flags |= F_DUPDATA;
goto put_sub;
}
xdata.mv_size = olddata.mv_size + offset;
}
fp_flags = fp->mp_flags;
if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) {
// Too big for a sub-page, convert to sub-DB
fp_flags &= ~P_SUBP;
prep_subDB:
dummy.md_pad = 0;
dummy.md_flags = 0;
dummy.md_depth = 1;
dummy.md_branch_pages = 0;
dummy.md_leaf_pages = 1;
dummy.md_overflow_pages = 0;
dummy.md_entries = NUMKEYS(fp);
xdata.mv_size = sizeof(MDB_db);
xdata.mv_data = &dummy;
if ((rc = mdb_page_alloc(mc, 1, &mp)))
return rc;
offset = env->me_psize - olddata.mv_size;
flags |= F_DUPDATA|F_SUBDATA;
dummy.md_root = mp->mp_pgno;
}
if (mp != fp) {
mp->mp_flags = fp_flags | P_DIRTY;
mp->mp_pad = fp->mp_pad;
mp->mp_lower = fp->mp_lower;
mp->mp_upper = fp->mp_upper + offset;
if (fp_flags & P_LEAF2) {
memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad);
} else {
memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper,
olddata.mv_size - fp->mp_upper);
for (i = NUMKEYS(fp); --i >= 0; )
mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset;
}
}
rdata = &xdata;
flags |= F_DUPDATA;
do_sub = 1;
if (!insert)
mdb_node_del(mc, 0);
goto new_sub;
}
current:
// overflow page overwrites need special handling
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDB_page *omp;
pgno_t pg;
int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize);
memcpy(&pg, olddata.mv_data, sizeof(pg));
if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0)
return rc2;
ovpages = omp->mp_pages;
// Is the ov page large enough?
if (ovpages >= dpages) {
if (!(omp->mp_flags & P_DIRTY) &&
(level || (env->me_flags & MDB_WRITEMAP)))
{
rc = mdb_page_unspill(mc->mc_txn, omp, &omp);
if (rc)
return rc;
level = 0; // dirty in this txn or clean
}
// Is it dirty?
if (omp->mp_flags & P_DIRTY) {
// yes, overwrite it. Note in this case we don't
// bother to try shrinking the page if the new data
// is smaller than the overflow threshold.
if (level > 1) {
// It is writable only in a parent txn
size_t sz = (size_t) env->me_psize * ovpages, off;
MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages);
MDB_ID2 id2;
if (!np)
return ENOMEM;
id2.mid = pg;
id2.mptr = np;
rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2);
mdb_cassert(mc, rc == 0);
if (!(flags & MDB_RESERVE)) {
// Copy end of page, adjusting alignment so
// compiler may copy words instead of bytes.
off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t);
memcpy((size_t *)((char *)np + off),
(size_t *)((char *)omp + off), sz - off);
sz = PAGEHDRSZ;
}
memcpy(np, omp, sz); // Copy beginning of page
omp = np;
}
SETDSZ(leaf, data->mv_size);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = METADATA(omp);
else
memcpy(METADATA(omp), data->mv_data, data->mv_size);
goto done;
}
}
if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS)
return rc2;
} else if (data->mv_size == olddata.mv_size) {
// same size, just replace it. Note that we could
// also reuse this node if the new data is smaller,
// but instead we opt to shrink the node in that case.
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = olddata.mv_data;
else if (data->mv_size)
memcpy(olddata.mv_data, data->mv_data, data->mv_size);
else
memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
goto done;
}
mdb_node_del(mc, 0);
mc->mc_db->md_entries--;
}
rdata = data;
new_sub:
nflags = flags & NODE_ADD_FLAGS;
nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata);
if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) {
if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA )
nflags &= ~MDB_APPEND;
if (!insert)
nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
// There is room already in this leaf page.
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
if (rc == 0 && !do_sub && insert) {
// Adjust other cursors pointing to mp
MDB_cursor *m2, *m3;
MDB_dbi dbi = mc->mc_dbi;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) {
if (mc->mc_flags & C_SUB)
m3 = &m2->mc_xcursor->mx_cursor;
else
m3 = m2;
if (m3 == mc || m3->mc_snum < mc->mc_snum) continue;
if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) {
m3->mc_ki[i]++;
}
}
}
}
if (rc != MDB_SUCCESS)
mc->mc_txn->mt_flags |= MDB_TXN_ERROR;
else {
// Now store the actual data in the child DB. Note that we're
// storing the user data in the keys field, so there are strict
// size limits on dupdata. The actual data fields of the child
// DB are all zero size.
if (do_sub) {
int xflags;
put_sub:
xdata.mv_size = 0;
xdata.mv_data = "";
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT|MDB_NOSPILL;
} else {
mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ?
MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
}
// converted, write the original data first
if (dkey.mv_size) {
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
if (rc)
return rc;
{
// Adjust other cursors pointing to mp
MDB_cursor *m2;
unsigned i = mc->mc_top;
MDB_page *mp = mc->mc_pg[i];
for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) {
if (m2 == mc || m2->mc_snum < mc->mc_snum) continue;
if (!(m2->mc_flags & C_INITIALIZED)) continue;
if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) {
mdb_xcursor_init1(m2, leaf);
}
}
}
// we've done our job
dkey.mv_size = 0;
}
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db));
}
}
// sub-writes might have failed so check rc again.
// Don't increment count if we just replaced an existing item.
if (!rc && !(flags & MDB_CURRENT))
mc->mc_db->md_entries++;
if (flags & MDB_MULTIPLE) {
if (!rc) {
next_mult:
mcount++;
// let caller know how many succeeded, if any
data[1].mv_size = mcount;
if (mcount < dcount) {
data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size;
goto more;
}
}
}
}
done:
// If we succeeded and the key didn't exist before, make sure
// the cursor is marked valid.
if (!rc && insert)
mc->mc_flags |= C_INITIALIZED;
return rc;
*/
return nil
}
// newLeafPage allocates and initialize new a new leaf page.
func (c *RWCursor) newLeafPage() (*page, error) {
// Allocate page.
p, err := c.allocatePage(1)
if err != nil {
return nil, err
}
// Set flags and bounds.
p.flags = p_leaf | p_dirty
p.lower = pageHeaderSize
p.upper = c.transaction.db.pageSize
c.leafs += 1
return p, nil
}
// newBranchPage allocates and initialize new a new branch page.
func (b *RWCursor) newBranchPage() (*page, error) {
// Allocate page.
p, err := c.allocatePage(1)
if err != nil {
return nil, err
}
// Set flags and bounds.
p.flags = p_branch | p_dirty
p.lower = pageHeaderSize
p.upper = c.transaction.db.pageSize
c.bucket.branches += 1
return p, nil
}
// newOverflowPage allocates and initialize new overflow pages.
func (b *RWCursor) newOverflowPage(count int) (*page, error) {
// Allocate page.
p, err := c.allocatePage(count)
if err != nil {
return nil, err
}
// Set flags and bounds.
p.flags = p_overflow | p_dirty
p.lower = pageHeaderSize
p.upper = c.transaction.db.pageSize
c.bucket.overflows += count
return p, nil
}
// Allocate page numbers and memory for writing. Maintain me_pglast,
// me_pghead and mt_next_pgno.
//
// If there are free pages available from older transactions, they
// are re-used first. Otherwise allocate a new page at mt_next_pgno.
// Do not modify the freedB, just merge freeDB records into me_pghead[]
// and move me_pglast to say which records were consumed. Only this
// function can create me_pghead and move me_pglast/mt_next_pgno.
// @param[in] mc cursor A cursor handle identifying the transaction and
// database for which we are allocating.
// @param[in] num the number of pages to allocate.
// @param[out] mp Address of the allocated page(s). Requests for multiple pages
// will always be satisfied by a single contiguous chunk of memory.
// @return 0 on success, non-zero on failure.
// allocatePage allocates a new page.
func (c *RWCursor) allocatePage(count int) (*page, error) {
head := env.pagestate.head
// TODO?
// If our dirty list is already full, we can't do anything
// if (txn->mt_dirty_room == 0) {
// rc = MDB_TXN_FULL;
// goto fail;
// }
/*
int rc, retry = INT_MAX;
MDB_txn *txn = mc->mc_txn;
MDB_env *env = txn->mt_env;
pgno_t pgno, *mop = env->me_pghead;
unsigned i, j, k, mop_len = mop ? mop[0] : 0, n2 = num-1;
MDB_page *np;
txnid_t oldest = 0, last;
MDB_cursor_op op;
MDB_cursor m2;
*mp = NULL;
for (op = MDB_FIRST;; op = MDB_NEXT) {
MDB_val key, data;
MDB_node *leaf;
pgno_t *idl, old_id, new_id;
// Seek a big enough contiguous page range. Prefer
// pages at the tail, just truncating the list.
if (mop_len > n2) {
i = mop_len;
do {
pgno = mop[i];
if (mop[i-n2] == pgno+n2)
goto search_done;
} while (--i > n2);
if (Max_retries < INT_MAX && --retry < 0)
break;
}
if (op == MDB_FIRST) { // 1st iteration
// Prepare to fetch more and coalesce
oldest = mdb_find_oldest(txn);
last = env->me_pglast;
mdb_cursor_init(&m2, txn, FREE_DBI, NULL);
if (last) {
op = MDB_SET_RANGE;
key.mv_data = &last; // will look up last+1
key.mv_size = sizeof(last);
}
}
last++;
// Do not fetch more if the record will be too recent
if (oldest <= last)
break;
rc = mdb_cursor_get(&m2, &key, NULL, op);
if (rc) {
if (rc == MDB_NOTFOUND)
break;
goto fail;
}
last = *(txnid_t*)key.mv_data;
if (oldest <= last)
break;
np = m2.mc_pg[m2.mc_top];
leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]);
if ((rc = mdb_node_read(txn, leaf, &data)) != MDB_SUCCESS)
return rc;
idl = (MDB_ID *) data.mv_data;
i = idl[0];
if (!mop) {
if (!(env->me_pghead = mop = mdb_midl_alloc(i))) {
rc = ENOMEM;
goto fail;
}
} else {
if ((rc = mdb_midl_need(&env->me_pghead, i)) != 0)
goto fail;
mop = env->me_pghead;
}
env->me_pglast = last;
// Merge in descending sorted order
j = mop_len;
k = mop_len += i;
mop[0] = (pgno_t)-1;
old_id = mop[j];
while (i) {
new_id = idl[i--];
for (; old_id < new_id; old_id = mop[--j])
mop[k--] = old_id;
mop[k--] = new_id;
}
mop[0] = mop_len;
}
// Use new pages from the map when nothing suitable in the freeDB
i = 0;
pgno = txn->mt_next_pgno;
if (pgno + num >= env->me_maxpg) {
DPUTS("DB size maxed out");
rc = MDB_MAP_FULL;
goto fail;
}
search_done:
if (!(np = mdb_page_malloc(txn, num))) {
rc = ENOMEM;
goto fail;
}
if (i) {
mop[0] = mop_len -= num;
// Move any stragglers down
for (j = i-num; j < mop_len; )
mop[++j] = mop[++i];
} else {
txn->mt_next_pgno = pgno + num;
}
np->mp_pgno = pgno;
mdb_page_dirty(txn, np);
*mp = np;
return MDB_SUCCESS;
fail:
txn->mt_flags |= MDB_TXN_ERROR;
return rc;
*/
return nil
}

9
rwtransaction.go Normal file
View File

@ -0,0 +1,9 @@
package bolt
// RWTransaction represents a transaction that can read and write data.
// Only one read/write transaction can be active for a DB at a time.
type RWTransaction struct {
Transaction
pagestate pagestate
}

View File

@ -2,10 +2,11 @@ package bolt
import (
"strings"
"unsafe"
)
var TransactionExistingChildError = &Error{"txn already has a child", nil}
var TransactionReadOnlyChildError = &Error{"read-only txn cannot create a child", nil}
var InvalidTransactionError = &Error{"txn is invalid", nil}
var BucketAlreadyExistsError = &Error{"bucket already exists", nil}
const (
txnb_dirty = 0x01 /**< DB was modified or is DUPSORT data */
@ -24,7 +25,6 @@ const (
type Transaction struct {
id int
db *DB
writable bool
dirty bool
spilled bool
err error
@ -41,25 +41,43 @@ type Transaction struct {
reader *reader
// Implicit from slices? TODO: MDB_dbi mt_numdbs;
dirty_room int
pagestate pagestate
}
// CreateBucket creates a new bucket.
func (t *Transaction) CreateBucket(name string, dupsort bool) (*Bucket, error) {
// TODO: Check if bucket already exists.
// TODO: Put new entry into system bucket.
if t.db == nil {
return nil, InvalidTransactionError
}
/*
MDB_db dummy;
data.mv_size = sizeof(MDB_db);
data.mv_data = &dummy;
memset(&dummy, 0, sizeof(dummy));
dummy.md_root = P_INVALID;
dummy.md_flags = flags & PERSISTENT_FLAGS;
rc = mdb_cursor_put(&mc, &key, &data, F_SUBDATA);
dbflag |= DB_DIRTY;
*/
return nil, nil
// Check if bucket already exists.
if b := t.buckets[name]; b != nil {
return nil, &Error{"bucket already exists", nil}
}
// Create a new bucket entry.
var buf [unsafe.Sizeof(bucket{})]byte
var raw = (*bucket)(unsafe.Pointer(&buf[0]))
raw.root = p_invalid
// TODO: Set dupsort flag.
// Open cursor to system bucket.
c, err := t.Cursor(&t.sysbuckets)
if err != nil {
return nil, err
}
// Put new entry into system bucket.
if err := c.Put([]byte(name), buf[:]); err != nil {
return nil, err
}
// Save reference to bucket.
b := &Bucket{name: name, bucket: raw, isNew: true}
t.buckets[name] = b
// TODO: dbflag |= DB_DIRTY;
return b, nil
}
// DropBucket deletes a bucket.
@ -113,30 +131,33 @@ func (t *Transaction) bucket(name string) (*Bucket, error) {
func (t *Transaction) Cursor(b *Bucket) (*Cursor, error) {
if b == nil {
return nil, &Error{"bucket required", nil}
} else if t.db == nil {
return nil, InvalidTransactionError
}
// Allow read access to the freelist
// TODO: if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
return t.cursor(b)
}
func (t *Transaction) cursor(b *Bucket) (*Cursor, error) {
// TODO: if !(txn->mt_dbflags[dbi] & DB_VALID) return InvalidBucketError
// TODO: if (txn->mt_flags & MDB_TXN_ERROR) return BadTransactionError
// Return existing cursor for the bucket if one exists.
if b != nil {
if c := t.cursors[b.id]; c != nil {
return c, nil
}
if c := t.cursors[b.id]; c != nil {
return c, nil
}
// Create a new cursor and associate it with the transaction and bucket.
c := &Cursor{
transaction: t,
bucket: b,
top: -1,
pages: []*page{},
}
// Set the first page if available.
if b.root != p_invalid {
p := t.db.page(t.db.data, int(b.root))
c.top = 0
c.pages = append(c.pages, p)
}
if (b.flags & MDB_DUPSORT) != 0 {
c.subcursor = &Cursor{
transaction: t,
@ -144,34 +165,6 @@ func (t *Transaction) cursor(b *Bucket) (*Cursor, error) {
}
}
// Find the root page if the bucket is stale.
if (c.bucket.flags & txnb_stale) != 0 {
c.findPage(nil, ps_rootonly)
}
/*
MDB_cursor *mc;
size_t size = sizeof(MDB_cursor);
// Allow read access to the freelist
if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY))
return EINVAL;
if ((mc = malloc(size)) != NULL) {
mdb_cursor_init(mc, txn, dbi, (MDB_xcursor *)(mc + 1));
if (txn->mt_cursors) {
mc->mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = mc;
mc->mc_flags |= C_UNTRACK;
}
} else {
return ENOMEM;
}
*ret = mc;
return MDB_SUCCESS;
*/
return nil, nil
}

View File

@ -3,20 +3,37 @@ package bolt
import (
"testing"
// "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/assert"
)
//--------------------------------------
// Cursor()
//--------------------------------------
// Ensure that a read transaction can get a cursor.
func TestTransactionCursor(t *testing.T) {
// Ensure that a bucket can be created and retrieved.
func TestTransactionCreateBucket(t *testing.T) {
withOpenDB(func(db *DB, path string) {
/*
txn, _ := db.Transaction(false)
c := txn.Cursor()
assert.NotNil(t, c)
*/
b, err := txn.CreateBucket("foo", false)
if assert.NoError(t, err) && assert.NotNil(t, b) {
b2, err := txn.Bucket("foo")
assert.NoError(t, err)
assert.Equal(t, b, b2)
}
})
}
// Ensure that a user-created transaction cannot be used to create a bucket.
func TestTransactionInvalidCreateBucket(t *testing.T) {
withOpenDB(func(db *DB, path string) {
txn := new(Transaction)
_, err := txn.CreateBucket("foo", false)
assert.Equal(t, err, InvalidTransactionError)
})
}
// Ensure that an existing bucket cannot be created.
func TestTransactionCreateExistingBucket(t *testing.T) {
withOpenDB(func(db *DB, path string) {
txn, _ := db.Transaction(false)
txn.CreateBucket("foo", false)
_, err := txn.CreateBucket("foo", false)
assert.Equal(t, err, BucketAlreadyExistsError)
})
}