From 20b26eac781372336ebfaddadc559c5115295c82 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 24 Jan 2014 12:51:56 -0700 Subject: [PATCH] TODO --- README.md | 8 + cursor.go | 11 - db.go | 737 +++------------------------ node.go | 10 + page.go | 3 +- rwcursor.go | 634 +++++++++++------------ rwtransaction.go | 122 +++++ transaction.go | 1257 ++-------------------------------------------- 8 files changed, 561 insertions(+), 2221 deletions(-) diff --git a/README.md b/README.md index 4c61813..4c96076 100644 --- a/README.md +++ b/README.md @@ -185,3 +185,11 @@ There are several different types of pages: Within each page there are one or more elements called nodes. In branch pages, these nodes store references to other child pages in the tree. In leaf pages, these nodes store the actual key/value data. + + +## TODO + +The following is a list of items to do on the Bolt project: + +1. Resize map. (Make sure there are no reader txns before resizing) +2. DB.Copy() diff --git a/cursor.go b/cursor.go index 13a56cf..d61ce80 100644 --- a/cursor.go +++ b/cursor.go @@ -10,17 +10,6 @@ const ( c_untrack = 0x40 /**< Un-track cursor when closing */ ) -/* -type Cursor interface { - First() error - Last() error - Next() ([]byte, []byte, error) - Prev() ([]byte, []byte, error) - Current() ([]byte, []byte, error) - Get([]byte) ([]byte, error) -} -*/ - type Cursor struct { flags int next *Cursor diff --git a/db.go b/db.go index d7510c2..4b2859d 100644 --- a/db.go +++ b/db.go @@ -8,21 +8,15 @@ import ( ) const ( - NoSync = iota - NoMetaSync - DupSort - IntegerKey - IntegerDupKey + db_nosync = iota + db_nometasync ) -var DatabaseNotOpenError = &Error{"db is not open", nil} -var DatabaseAlreadyOpenedError = &Error{"db already open", nil} -var TransactionInProgressError = &Error{"writable transaction is already in progress", nil} - -// TODO: #define MDB_FATAL_ERROR 0x80000000U /** Failed to update the meta page. Probably an I/O error. */ -// TODO: #define MDB_ENV_ACTIVE 0x20000000U /** Some fields are initialized. */ -// TODO: #define MDB_ENV_TXKEY 0x10000000U /** me_txkey is set */ -// TODO: #define MDB_LIVE_READER 0x08000000U /** Have liveness lock in reader table */ +var ( + DatabaseNotOpenError = &Error{"db is not open", nil} + DatabaseAlreadyOpenedError = &Error{"db already open", nil} + TransactionInProgressError = &Error{"writable transaction is already in progress", nil} +) type DB struct { sync.Mutex @@ -30,45 +24,48 @@ type DB struct { os _os syscall _syscall + path string file file metafile file data []byte buf []byte meta0 *meta meta1 *meta - pageSize int - readers []*reader - buckets []*Bucket - bucketFlags []int /**< array of flags from MDB_db.md_flags */ - path string - mmapSize int /**< size of the data memory map */ - size int /**< current file size */ - pbuf []byte - transaction *RWTransaction /**< current write transaction */ + rwtransaction *RWTransaction + transactions []*Transaction maxPageNumber int /**< me_mapsize / me_psize */ - dpages []*page /**< list of malloc'd blocks for re-use */ freePages []int /** IDL of pages that became unused in a write txn */ dirtyPages []int /** ID2L of pages written during a write txn. Length MDB_IDL_UM_SIZE. */ + + // TODO: scratch []*page // list of temp pages for writing. + + readers []*reader maxFreeOnePage int /** Max number of freelist items that can fit in a single overflow page */ maxPageDataSize int maxNodeSize int /** Max size of a node on a page */ maxKeySize int /**< max size of a key */ } +// NewDB creates a new DB instance. func NewDB() *DB { return &DB{} } +// Path returns the path to currently open database file. func (db *DB) Path() string { return db.path } +// Open opens a data file at the given path and initializes the database. +// If the file does not exist then it will be created automatically. func (db *DB) Open(path string, mode os.FileMode) error { var err error db.Lock() defer db.Unlock() + // Initialize OS/Syscall references. + // These are overridden by mocks during some tests. if db.os == nil { db.os = &sysos{} } @@ -96,12 +93,12 @@ func (db *DB) Open(path string, mode os.FileMode) error { var m, m0, m1 *meta var buf [pageHeaderSize + int(unsafe.Sizeof(meta{}))]byte if _, err := db.file.ReadAt(buf[:], 0); err == nil { - if m0, _ = db.page(buf[:], 0).meta(); m0 != nil { + if m0, _ = db.pageInBuffer(buf[:], 0).meta(); m0 != nil { db.pageSize = int(m0.free.pad) } } if _, err := db.file.ReadAt(buf[:], int64(db.pageSize)); err == nil { - m1, _ = db.page(buf[:], 0).meta() + m1, _ = db.pageInBuffer(buf[:], 0).meta() } if m0 != nil && m1 != nil { if m0.txnid > m1.txnid { @@ -143,7 +140,7 @@ func (db *DB) Open(path string, mode os.FileMode) error { return nil } -// int mdb_env_map(MDB_env *env, void *addr, int newsize) +// mmap opens the underlying memory-mapped file and initializes the meta references. func (db *DB) mmap() error { var err error @@ -162,13 +159,11 @@ func (db *DB) mmap() error { return err } - // TODO?: If nordahead, then: madvise(env->me_map, env->me_mapsize, MADV_RANDOM); - // Save references to the meta pages. - if db.meta0, err = db.page(db.data, 0).meta(); err != nil { + if db.meta0, err = db.page(0).meta(); err != nil { return &Error{"meta0 error", err} } - if db.meta1, err = db.page(db.data, 1).meta(); err != nil { + if db.meta1, err = db.page(1).meta(); err != nil { return &Error{"meta1 error", err} } @@ -186,7 +181,7 @@ func (db *DB) init() error { // Create two meta pages on a buffer. buf := make([]byte, db.pageSize*2) for i := 0; i < 2; i++ { - p := db.page(buf[:], i) + p := db.pageInBuffer(buf[:], i) p.id = pgno(i) p.init(db.pageSize) } @@ -199,12 +194,20 @@ func (db *DB) init() error { return nil } -func (db *DB) close() { - // TODO +// Close releases all resources related to the database. +func (db *DB) Close() { + db.Lock() + defer db.Unlock() + s.close() } -// Transaction creates a transaction that's associated with this database. -func (db *DB) Transaction(writable bool) (*Transaction, error) { +func (db *DB) close() { + // TODO: Undo everything in Open(). +} + +// Transaction creates a read-only transaction. +// Multiple read-only transactions can be used concurrently. +func (db *DB) Transaction() (*Transaction, error) { db.Lock() defer db.Unlock() @@ -212,18 +215,13 @@ func (db *DB) Transaction(writable bool) (*Transaction, error) { if !db.opened { return nil, DatabaseNotOpenError } - // Exit if a writable transaction is currently in progress. - if writable && db.transaction != nil { - return nil, TransactionInProgressError - } // Create a transaction associated with the database. t := &Transaction{ - db: db, - meta: db.meta(), - writable: writable, - buckets: make(map[string]*Bucket), - cursors: make(map[uint32]*Cursor), + db: db, + meta: db.meta(), + buckets: make(map[string]*Bucket), + cursors: make(map[uint32]*Cursor), } // Save references to the sys•free and sys•buckets buckets. @@ -232,16 +230,32 @@ func (db *DB) Transaction(writable bool) (*Transaction, error) { t.sysbuckets.transaction = t t.sysbuckets.bucket = &t.meta.buckets - // We only allow one writable transaction at a time so save the reference. - if writable { - db.transaction = t + return t, nil +} + +// RWTransaction creates a read/write transaction. +// Only one read/write transaction is allowed at a time. +func (db *DB) RWTransaction() (*RWTransaction, error) { + // TODO: db.writerMutex.Lock() + // TODO: Add unlock to RWTransaction.Commit() / Abort() + + t := &RWTransaction{} + + // Exit if a read-write transaction is currently in progress. + if db.transaction != nil { + return nil, TransactionInProgressError } return t, nil } -// page retrieves a page reference from a given byte array based on the current page size. -func (db *DB) page(b []byte, id int) *page { +// page retrieves a page reference from the mmap based on the current page size. +func (db *DB) page(id int) *page { + return (*page)(unsafe.Pointer(&db.data[id*db.pageSize])) +} + +// pageInBuffer retrieves a page reference from a given byte array based on the current page size. +func (db *DB) pageInBuffer(b []byte, id int) *page { return (*page)(unsafe.Pointer(&b[id*db.pageSize])) } @@ -253,629 +267,22 @@ func (db *DB) meta() *meta { return db.meta1 } -// // -// // -// // -// // -// // -// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ CONVERTED ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // -// // -// // -// // -// // -// // - -func (db *DB) freePage(p *page) { - /* - mp->mp_next = env->me_dpages; - VGMEMP_FREE(env, mp); - env->me_dpages = mp; - */ -} - -func (db *DB) freeDirtyPage(p *page) { - /* - if (!IS_OVERFLOW(dp) || dp->mp_pages == 1) { - mdb_page_free(env, dp); - } else { - // large pages just get freed directly - VGMEMP_FREE(env, dp); - free(dp); - } - */ -} - -func (db *DB) freeAllDirtyPages(p *page) { - /* - MDB_env *env = txn->mt_env; - MDB_ID2L dl = txn->mt_u.dirty_list; - unsigned i, n = dl[0].mid; - - for (i = 1; i <= n; i++) { - mdb_dpage_free(env, dl[i].mptr); - } - dl[0].mid = 0; - */ -} - +// sync flushes the file descriptor to disk unless "no sync" is enabled. func (db *DB) sync(force bool) error { - /* - int rc = 0; - if (force || !F_ISSET(env->me_flags, MDB_NOSYNC)) { - if (env->me_flags & MDB_WRITEMAP) { - int flags = ((env->me_flags & MDB_MAPASYNC) && !force) - ? MS_ASYNC : MS_SYNC; - if (MDB_MSYNC(env->me_map, env->me_mapsize, flags)) - rc = ErrCode(); - #ifdef _WIN32 - else if (flags == MS_SYNC && MDB_FDATASYNC(env->me_fd)) - rc = ErrCode(); - #endif - } else { - if (MDB_FDATASYNC(env->me_fd)) - rc = ErrCode(); - } - } - return rc; - */ - return nil -} - -// Check both meta pages to see which one is newer. -// @param[in] env the environment handle -// @return meta toggle (0 or 1). -func (db *DB) pickMeta() int { - /* - return (env->me_metas[0]->mm_txnid < env->me_metas[1]->mm_txnid); - */ - return 0 -} - -func (db *DB) Create() error { - /* - MDB_env *e; - - e = calloc(1, sizeof(MDB_env)); - if (!e) - return ENOMEM; - - e->me_maxreaders = DEFAULT_READERS; - e->me_maxdbs = e->me_numdbs = 2; - e->me_fd = INVALID_HANDLE_VALUE; - e->me_lfd = INVALID_HANDLE_VALUE; - e->me_mfd = INVALID_HANDLE_VALUE; - #ifdef MDB_USE_POSIX_SEM - e->me_rmutex = SEM_FAILED; - e->me_wmutex = SEM_FAILED; - #endif - e->me_pid = getpid(); - GET_PAGESIZE(e->me_os_psize); - VGMEMP_CREATE(e,0,0); - *env = e; - return MDB_SUCCESS; - */ - return nil -} - -func (db *DB) setMapSize(size int) error { - /* - // If env is already open, caller is responsible for making - // sure there are no active txns. - if (env->me_map) { - int rc; - void *old; - if (env->me_txn) - return EINVAL; - if (!size) - size = env->me_metas[mdb_env_pick_meta(env)]->mm_mapsize; - else if (size < env->me_mapsize) { - // If the configured size is smaller, make sure it's - // still big enough. Silently round up to minimum if not. - size_t minsize = (env->me_metas[mdb_env_pick_meta(env)]->mm_last_pg + 1) * env->me_psize; - if (size < minsize) - size = minsize; - } - munmap(env->me_map, env->me_mapsize); - env->me_mapsize = size; - old = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : NULL; - rc = mdb_env_map(env, old, 1); - if (rc) - return rc; + if !db.noSync { + if err := syscall.Fsync(int(db.file.Fd())); err != nil { + return err } - env->me_mapsize = size; - if (env->me_psize) - env->me_maxpg = env->me_mapsize / env->me_psize; - return MDB_SUCCESS; - */ - return nil -} - -func (db *DB) setMaxBucketCount(count int) error { - /* - if (env->me_map) - return EINVAL; - env->me_maxdbs = dbs + 2; // Named databases + main and free DB - return MDB_SUCCESS; - */ - return nil -} - -func (db *DB) setMaxReaderCount(count int) error { - /* - if (env->me_map || readers < 1) - return EINVAL; - env->me_maxreaders = readers; - return MDB_SUCCESS; - */ - return nil -} - -func (db *DB) getMaxReaderCount(count int) (int, error) { - /* - if (!env || !readers) - return EINVAL; - *readers = env->me_maxreaders; - return MDB_SUCCESS; - */ - return 0, nil -} - -// Destroy resources from mdb_env_open(), clear our readers & DBIs -func (db *DB) close0(excl int) { - /* - int i; - - if (!(env->me_flags & MDB_ENV_ACTIVE)) - return; - - // Doing this here since me_dbxs may not exist during mdb_env_close - for (i = env->me_maxdbs; --i > MAIN_DBI; ) - free(env->me_dbxs[i].md_name.mv_data); - - free(env->me_pbuf); - free(env->me_dbflags); - free(env->me_dbxs); - free(env->me_path); - free(env->me_dirty_list); - mdb_midl_free(env->me_free_pgs); - - if (env->me_flags & MDB_ENV_TXKEY) { - pthread_key_delete(env->me_txkey); - #ifdef _WIN32 - // Delete our key from the global list - for (i=0; ime_txkey) { - mdb_tls_keys[i] = mdb_tls_keys[mdb_tls_nkeys-1]; - mdb_tls_nkeys--; - break; - } - #endif - } - - if (env->me_map) { - munmap(env->me_map, env->me_mapsize); - } - if (env->me_mfd != env->me_fd && env->me_mfd != INVALID_HANDLE_VALUE) - (void) close(env->me_mfd); - if (env->me_fd != INVALID_HANDLE_VALUE) - (void) close(env->me_fd); - if (env->me_txns) { - MDB_PID_T pid = env->me_pid; - // Clearing readers is done in this function because - // me_txkey with its destructor must be disabled first. - for (i = env->me_numreaders; --i >= 0; ) - if (env->me_txns->mti_readers[i].mr_pid == pid) - env->me_txns->mti_readers[i].mr_pid = 0; - #ifdef _WIN32 - if (env->me_rmutex) { - CloseHandle(env->me_rmutex); - if (env->me_wmutex) CloseHandle(env->me_wmutex); - } - // Windows automatically destroys the mutexes when - // the last handle closes. - #elif defined(MDB_USE_POSIX_SEM) - if (env->me_rmutex != SEM_FAILED) { - sem_close(env->me_rmutex); - if (env->me_wmutex != SEM_FAILED) - sem_close(env->me_wmutex); - // If we have the filelock: If we are the - // only remaining user, clean up semaphores. - if (excl == 0) - mdb_env_excl_lock(env, &excl); - if (excl > 0) { - sem_unlink(env->me_txns->mti_rmname); - sem_unlink(env->me_txns->mti_wmname); - } - } - #endif - munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo)); - } - if (env->me_lfd != INVALID_HANDLE_VALUE) { - #ifdef _WIN32 - if (excl >= 0) { - // Unlock the lockfile. Windows would have unlocked it - // after closing anyway, but not necessarily at once. - UnlockFile(env->me_lfd, 0, 0, 1, 0); - } - #endif - (void) close(env->me_lfd); - } - - env->me_flags &= ~(MDB_ENV_ACTIVE|MDB_ENV_TXKEY); - */ -} - -func (db *DB) copyfd(handle int) error { - /* - MDB_txn *txn = NULL; - int rc; - size_t wsize; - char *ptr; - #ifdef _WIN32 - DWORD len, w2; - #define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL) - #else - ssize_t len; - size_t w2; - #define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0) - #endif - - // Do the lock/unlock of the reader mutex before starting the - // write txn. Otherwise other read txns could block writers. - rc = mdb_txn_begin(env, NULL, MDB_RDONLY, &txn); - if (rc) - return rc; - - if (env->me_txns) { - // We must start the actual read txn after blocking writers - mdb_txn_reset0(txn, "reset-stage1"); - - // Temporarily block writers until we snapshot the meta pages - LOCK_MUTEX_W(env); - - rc = mdb_txn_renew0(txn); - if (rc) { - UNLOCK_MUTEX_W(env); - goto leave; - } - } - - wsize = env->me_psize * 2; - ptr = env->me_map; - w2 = wsize; - while (w2 > 0) { - DO_WRITE(rc, fd, ptr, w2, len); - if (!rc) { - rc = ErrCode(); - break; - } else if (len > 0) { - rc = MDB_SUCCESS; - ptr += len; - w2 -= len; - continue; - } else { - // Non-blocking or async handles are not supported - rc = EIO; - break; - } - } - if (env->me_txns) - UNLOCK_MUTEX_W(env); - - if (rc) - goto leave; - - wsize = txn->mt_next_pgno * env->me_psize - wsize; - while (wsize > 0) { - if (wsize > MAX_WRITE) - w2 = MAX_WRITE; - else - w2 = wsize; - DO_WRITE(rc, fd, ptr, w2, len); - if (!rc) { - rc = ErrCode(); - break; - } else if (len > 0) { - rc = MDB_SUCCESS; - ptr += len; - wsize -= len; - continue; - } else { - rc = EIO; - break; - } - } - - leave: - mdb_txn_abort(txn); - return rc; - } - - int - mdb_env_copy(MDB_env *env, const char *path) - { - int rc, len; - char *lpath; - HANDLE newfd = INVALID_HANDLE_VALUE; - - if (env->me_flags & MDB_NOSUBDIR) { - lpath = (char *)path; - } else { - len = strlen(path); - len += sizeof(DATANAME); - lpath = malloc(len); - if (!lpath) - return ENOMEM; - sprintf(lpath, "%s" DATANAME, path); - } - - // The destination path must exist, but the destination file must not. - // We don't want the OS to cache the writes, since the source data is - // already in the OS cache. - #ifdef _WIN32 - newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW, - FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL); - #else - newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666); - #endif - if (newfd == INVALID_HANDLE_VALUE) { - rc = ErrCode(); - goto leave; - } - - #ifdef O_DIRECT - // Set O_DIRECT if the file system supports it - if ((rc = fcntl(newfd, F_GETFL)) != -1) - (void) fcntl(newfd, F_SETFL, rc | O_DIRECT); - #endif - #ifdef F_NOCACHE // __APPLE__ - rc = fcntl(newfd, F_NOCACHE, 1); - if (rc) { - rc = ErrCode(); - goto leave; - } - #endif - - rc = mdb_env_copyfd(env, newfd); - - leave: - if (!(env->me_flags & MDB_NOSUBDIR)) - free(lpath); - if (newfd != INVALID_HANDLE_VALUE) - if (close(newfd) < 0 && rc == MDB_SUCCESS) - rc = ErrCode(); - - return rc; - */ - return nil -} - -func (db *DB) Close() { - /* - MDB_page *dp; - - if (env == NULL) - return; - - VGMEMP_DESTROY(env); - while ((dp = env->me_dpages) != NULL) { - VGMEMP_DEFINED(&dp->mp_next, sizeof(dp->mp_next)); - env->me_dpages = dp->mp_next; - free(dp); - } - - mdb_env_close0(env, 0); - free(env); - */ -} - -// Calculate the size of a leaf node. -// The size depends on the environment's page size; if a data item -// is too large it will be put onto an overflow page and the node -// size will only include the key and not the data. Sizes are always -// rounded up to an even number of bytes, to guarantee 2-byte alignment -// of the #MDB_node headers. -// @param[in] env The environment handle. -// @param[in] key The key for the node. -// @param[in] data The data for the node. -// @return The number of bytes needed to store the node. -func (db *DB) LeafSize(key []byte, data []byte) int { - /* - size_t sz; - - sz = LEAFSIZE(key, data); - if (sz > env->me_nodemax) { - // put on overflow page - sz -= data->mv_size - sizeof(pgno_t); - } - - return EVEN(sz + sizeof(indx_t)); - */ - return 0 -} - -// Calculate the size of a branch node. -// The size should depend on the environment's page size but since -// we currently don't support spilling large keys onto overflow -// pages, it's simply the size of the #MDB_node header plus the -// size of the key. Sizes are always rounded up to an even number -// of bytes, to guarantee 2-byte alignment of the #MDB_node headers. -// @param[in] env The environment handle. -// @param[in] key The key for the node. -// @return The number of bytes needed to store the node. -func (db *DB) BranchSize(key []byte) int { - /* - size_t sz; - - sz = INDXSIZE(key); - if (sz > env->me_nodemax) { - // put on overflow page - // not implemented - // sz -= key->size - sizeof(pgno_t); - } - - return sz + sizeof(indx_t); - */ - return 0 -} - -func (db *DB) SetFlags(flag int, onoff bool) error { - /* - if ((flag & CHANGEABLE) != flag) - return EINVAL; - if (onoff) - env->me_flags |= flag; - else - env->me_flags &= ~flag; - return MDB_SUCCESS; - */ + } return nil } func (db *DB) Stat() *stat { - /* - int toggle; - - if (env == NULL || arg == NULL) - return EINVAL; - - toggle = mdb_env_pick_meta(env); - stat := &Stat{} - stat->ms_psize = env->me_psize; - stat->ms_depth = db->md_depth; - stat->ms_branch_pages = db->md_branch_pages; - stat->ms_leaf_pages = db->md_leaf_pages; - stat->ms_overflow_pages = db->md_overflow_pages; - stat->ms_entries = db->md_entries; - - //return mdb_stat0(env, &env->me_metas[toggle]->mm_dbs[MAIN_DBI], stat); - return stat - */ + // TODO: Calculate size, depth, page count (by type), entry count, readers, etc. return nil } -func (db *DB) Info() *Info { - /* - int toggle; - - if (env == NULL || arg == NULL) - return EINVAL; - - toggle = mdb_env_pick_meta(env); - arg->me_mapaddr = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : 0; - arg->me_mapsize = env->me_mapsize; - arg->me_maxreaders = env->me_maxreaders; - - // me_numreaders may be zero if this process never used any readers. Use - // the shared numreader count if it exists. - arg->me_numreaders = env->me_txns ? env->me_txns->mti_numreaders : env->me_numreaders; - - arg->me_last_pgno = env->me_metas[toggle]->mm_last_pg; - arg->me_last_txnid = env->me_metas[toggle]->mm_txnid; - return MDB_SUCCESS; - */ - return nil -} - -// TODO: Move to bucket.go -func (db *DB) CloseBucket(b Bucket) { - /* - char *ptr; - if (dbi <= MAIN_DBI || dbi >= env->me_maxdbs) - return; - ptr = env->me_dbxs[dbi].md_name.mv_data; - env->me_dbxs[dbi].md_name.mv_data = NULL; - env->me_dbxs[dbi].md_name.mv_size = 0; - env->me_dbflags[dbi] = 0; - free(ptr); - */ -} - -//int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) -func (db *DB) getReaderList() error { - /* - unsigned int i, rdrs; - MDB_reader *mr; - char buf[64]; - int rc = 0, first = 1; - - if (!env || !func) - return -1; - if (!env->me_txns) { - return func("(no reader locks)\n", ctx); - } - rdrs = env->me_txns->mti_numreaders; - mr = env->me_txns->mti_readers; - for (i=0; ime_txns) - return MDB_SUCCESS; - rdrs = env->me_txns->mti_numreaders; - pids = malloc((rdrs+1) * sizeof(MDB_PID_T)); - if (!pids) - return ENOMEM; - pids[0] = 0; - mr = env->me_txns->mti_readers; - for (i=0; ime_pid) { - pid = mr[i].mr_pid; - if (mdb_pid_insert(pids, pid) == 0) { - if (!mdb_reader_pid(env, Pidcheck, pid)) { - LOCK_MUTEX_R(env); - // Recheck, a new process may have reused pid - if (!mdb_reader_pid(env, Pidcheck, pid)) { - for (j=i; jmc_ki[mc->mc_top])); - if ((mc->mc_db->md_flags & MDB_DUPSORT) && - LEAFSIZE(key, data) > env->me_nodemax) - { - // Too big for a node, insert in sub-DB - fp_flags = P_LEAF|P_DIRTY; - fp = env->me_pbuf; - fp->mp_pad = data->mv_size; // used if MDB_DUPFIXED - fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ; - goto prep_subDB; - } - } else { - - more: - leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - olddata.mv_size = NODEDSZ(leaf); - olddata.mv_data = NODEDATA(leaf); - - // DB has dups? - if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { - // Prepare (sub-)page/sub-DB to accept the new item, - // if needed. fp: old sub-page or a header faking - // it. mp: new (sub-)page. offset: growth in page - // size. xdata: node data with new page or DB. - ssize_t i, offset = 0; - mp = fp = xdata.mv_data = env->me_pbuf; - mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; - - // Was a single item before, must convert now - if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { - // Just overwrite the current item - if (flags == MDB_CURRENT) - goto current; - - #if UINT_MAX < SIZE_MAX - if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t)) - #ifdef MISALIGNED_OK - mc->mc_dbx->md_dcmp = mdb_cmp_long; - #else - mc->mc_dbx->md_dcmp = mdb_cmp_cint; - #endif - #endif - // if data matches, skip it - if (!mc->mc_dbx->md_dcmp(data, &olddata)) { - if (flags & MDB_NODUPDATA) - rc = MDB_KEYEXIST; - else if (flags & MDB_MULTIPLE) - goto next_mult; - else - rc = MDB_SUCCESS; - return rc; - } - - // Back up original data item - dkey.mv_size = olddata.mv_size; - dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size); - - // Make sub-page header for the dup items, with dummy body - fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP; - fp->mp_lower = PAGEHDRSZ; - xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size; - if (mc->mc_db->md_flags & MDB_DUPFIXED) { - fp->mp_flags |= P_LEAF2; - fp->mp_pad = data->mv_size; - xdata.mv_size += 2 * data->mv_size; // leave space for 2 more - } else { - xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) + - (dkey.mv_size & 1) + (data->mv_size & 1); - } - fp->mp_upper = xdata.mv_size; - olddata.mv_size = fp->mp_upper; // pretend olddata is fp - } else if (leaf->mn_flags & F_SUBDATA) { - // Data is on sub-DB, just store it - flags |= F_DUPDATA|F_SUBDATA; - goto put_sub; - } else { - // Data is on sub-page - fp = olddata.mv_data; - switch (flags) { - default: - i = -(ssize_t)SIZELEFT(fp); - if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { - offset = i += (ssize_t) EVEN( - sizeof(indx_t) + NODESIZE + data->mv_size); - } else { - i += offset = fp->mp_pad; - offset *= 4; // space for 4 more - } - if (i > 0) - break; - // FALLTHRU: Sub-page is big enough - case MDB_CURRENT: - fp->mp_flags |= P_DIRTY; - COPY_PGNO(fp->mp_pgno, mp->mp_pgno); - mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; - flags |= F_DUPDATA; - goto put_sub; - } - xdata.mv_size = olddata.mv_size + offset; + insert = rc; + if (insert) { + // The key does not exist + DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); + if ((mc->mc_db->md_flags & MDB_DUPSORT) && + LEAFSIZE(key, data) > env->me_nodemax) + { + // Too big for a node, insert in sub-DB + fp_flags = P_LEAF|P_DIRTY; + fp = env->me_pbuf; + fp->mp_pad = data->mv_size; // used if MDB_DUPFIXED + fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ; + goto prep_subDB; } + } else { - fp_flags = fp->mp_flags; - if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) { - // Too big for a sub-page, convert to sub-DB - fp_flags &= ~P_SUBP; - prep_subDB: - dummy.md_pad = 0; - dummy.md_flags = 0; - dummy.md_depth = 1; - dummy.md_branch_pages = 0; - dummy.md_leaf_pages = 1; - dummy.md_overflow_pages = 0; - dummy.md_entries = NUMKEYS(fp); - xdata.mv_size = sizeof(MDB_db); - xdata.mv_data = &dummy; - if ((rc = mdb_page_alloc(mc, 1, &mp))) - return rc; - offset = env->me_psize - olddata.mv_size; - flags |= F_DUPDATA|F_SUBDATA; - dummy.md_root = mp->mp_pgno; - } - if (mp != fp) { - mp->mp_flags = fp_flags | P_DIRTY; - mp->mp_pad = fp->mp_pad; - mp->mp_lower = fp->mp_lower; - mp->mp_upper = fp->mp_upper + offset; - if (fp_flags & P_LEAF2) { - memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); - } else { - memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, - olddata.mv_size - fp->mp_upper); - for (i = NUMKEYS(fp); --i >= 0; ) - mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset; - } - } - - rdata = &xdata; - flags |= F_DUPDATA; - do_sub = 1; - if (!insert) - mdb_node_del(mc, 0); - goto new_sub; - } - current: - // overflow page overwrites need special handling - if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { - MDB_page *omp; - pgno_t pg; - int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); - - memcpy(&pg, olddata.mv_data, sizeof(pg)); - if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0) - return rc2; - ovpages = omp->mp_pages; - - // Is the ov page large enough? - if (ovpages >= dpages) { - if (!(omp->mp_flags & P_DIRTY) && - (level || (env->me_flags & MDB_WRITEMAP))) - { - rc = mdb_page_unspill(mc->mc_txn, omp, &omp); - if (rc) - return rc; - level = 0; // dirty in this txn or clean - } - // Is it dirty? - if (omp->mp_flags & P_DIRTY) { - // yes, overwrite it. Note in this case we don't - // bother to try shrinking the page if the new data - // is smaller than the overflow threshold. - if (level > 1) { - // It is writable only in a parent txn - size_t sz = (size_t) env->me_psize * ovpages, off; - MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages); - MDB_ID2 id2; - if (!np) - return ENOMEM; - id2.mid = pg; - id2.mptr = np; - rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2); - mdb_cassert(mc, rc == 0); - if (!(flags & MDB_RESERVE)) { - // Copy end of page, adjusting alignment so - // compiler may copy words instead of bytes. - off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t); - memcpy((size_t *)((char *)np + off), - (size_t *)((char *)omp + off), sz - off); - sz = PAGEHDRSZ; - } - memcpy(np, omp, sz); // Copy beginning of page - omp = np; - } - SETDSZ(leaf, data->mv_size); - if (F_ISSET(flags, MDB_RESERVE)) - data->mv_data = METADATA(omp); - else - memcpy(METADATA(omp), data->mv_data, data->mv_size); - goto done; - } - } - if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS) - return rc2; - } else if (data->mv_size == olddata.mv_size) { - // same size, just replace it. Note that we could - // also reuse this node if the new data is smaller, - // but instead we opt to shrink the node in that case. - if (F_ISSET(flags, MDB_RESERVE)) - data->mv_data = olddata.mv_data; - else if (data->mv_size) - memcpy(olddata.mv_data, data->mv_data, data->mv_size); - else - memcpy(NODEKEY(leaf), key->mv_data, key->mv_size); - goto done; - } - mdb_node_del(mc, 0); - mc->mc_db->md_entries--; - } - - rdata = data; - - new_sub: - nflags = flags & NODE_ADD_FLAGS; - nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata); - if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { - if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA ) - nflags &= ~MDB_APPEND; - if (!insert) - nflags |= MDB_SPLIT_REPLACE; - rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags); - } else { - // There is room already in this leaf page. - rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags); - if (rc == 0 && !do_sub && insert) { - // Adjust other cursors pointing to mp - MDB_cursor *m2, *m3; - MDB_dbi dbi = mc->mc_dbi; - unsigned i = mc->mc_top; - MDB_page *mp = mc->mc_pg[i]; - - for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { - if (mc->mc_flags & C_SUB) - m3 = &m2->mc_xcursor->mx_cursor; - else - m3 = m2; - if (m3 == mc || m3->mc_snum < mc->mc_snum) continue; - if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) { - m3->mc_ki[i]++; - } - } - } - } - - if (rc != MDB_SUCCESS) - mc->mc_txn->mt_flags |= MDB_TXN_ERROR; - else { - // Now store the actual data in the child DB. Note that we're - // storing the user data in the keys field, so there are strict - // size limits on dupdata. The actual data fields of the child - // DB are all zero size. - if (do_sub) { - int xflags; - put_sub: - xdata.mv_size = 0; - xdata.mv_data = ""; + more: leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - if (flags & MDB_CURRENT) { - xflags = MDB_CURRENT|MDB_NOSPILL; - } else { - mdb_xcursor_init1(mc, leaf); - xflags = (flags & MDB_NODUPDATA) ? - MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL; - } - // converted, write the original data first - if (dkey.mv_size) { - rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); - if (rc) - return rc; - { - // Adjust other cursors pointing to mp - MDB_cursor *m2; - unsigned i = mc->mc_top; - MDB_page *mp = mc->mc_pg[i]; + olddata.mv_size = NODEDSZ(leaf); + olddata.mv_data = NODEDATA(leaf); - for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) { - if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; - if (!(m2->mc_flags & C_INITIALIZED)) continue; - if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) { - mdb_xcursor_init1(m2, leaf); + // DB has dups? + if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { + // Prepare (sub-)page/sub-DB to accept the new item, + // if needed. fp: old sub-page or a header faking + // it. mp: new (sub-)page. offset: growth in page + // size. xdata: node data with new page or DB. + ssize_t i, offset = 0; + mp = fp = xdata.mv_data = env->me_pbuf; + mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + + // Was a single item before, must convert now + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + // Just overwrite the current item + if (flags == MDB_CURRENT) + goto current; + + #if UINT_MAX < SIZE_MAX + if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t)) + #ifdef MISALIGNED_OK + mc->mc_dbx->md_dcmp = mdb_cmp_long; + #else + mc->mc_dbx->md_dcmp = mdb_cmp_cint; + #endif + #endif + // if data matches, skip it + if (!mc->mc_dbx->md_dcmp(data, &olddata)) { + if (flags & MDB_NODUPDATA) + rc = MDB_KEYEXIST; + else if (flags & MDB_MULTIPLE) + goto next_mult; + else + rc = MDB_SUCCESS; + return rc; + } + + // Back up original data item + dkey.mv_size = olddata.mv_size; + dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size); + + // Make sub-page header for the dup items, with dummy body + fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP; + fp->mp_lower = PAGEHDRSZ; + xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size; + if (mc->mc_db->md_flags & MDB_DUPFIXED) { + fp->mp_flags |= P_LEAF2; + fp->mp_pad = data->mv_size; + xdata.mv_size += 2 * data->mv_size; // leave space for 2 more + } else { + xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) + + (dkey.mv_size & 1) + (data->mv_size & 1); + } + fp->mp_upper = xdata.mv_size; + olddata.mv_size = fp->mp_upper; // pretend olddata is fp + } else if (leaf->mn_flags & F_SUBDATA) { + // Data is on sub-DB, just store it + flags |= F_DUPDATA|F_SUBDATA; + goto put_sub; + } else { + // Data is on sub-page + fp = olddata.mv_data; + switch (flags) { + default: + i = -(ssize_t)SIZELEFT(fp); + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + offset = i += (ssize_t) EVEN( + sizeof(indx_t) + NODESIZE + data->mv_size); + } else { + i += offset = fp->mp_pad; + offset *= 4; // space for 4 more + } + if (i > 0) + break; + // FALLTHRU: Sub-page is big enough + case MDB_CURRENT: + fp->mp_flags |= P_DIRTY; + COPY_PGNO(fp->mp_pgno, mp->mp_pgno); + mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; + flags |= F_DUPDATA; + goto put_sub; + } + xdata.mv_size = olddata.mv_size + offset; + } + + fp_flags = fp->mp_flags; + if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) { + // Too big for a sub-page, convert to sub-DB + fp_flags &= ~P_SUBP; + prep_subDB: + dummy.md_pad = 0; + dummy.md_flags = 0; + dummy.md_depth = 1; + dummy.md_branch_pages = 0; + dummy.md_leaf_pages = 1; + dummy.md_overflow_pages = 0; + dummy.md_entries = NUMKEYS(fp); + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + if ((rc = mdb_page_alloc(mc, 1, &mp))) + return rc; + offset = env->me_psize - olddata.mv_size; + flags |= F_DUPDATA|F_SUBDATA; + dummy.md_root = mp->mp_pgno; + } + if (mp != fp) { + mp->mp_flags = fp_flags | P_DIRTY; + mp->mp_pad = fp->mp_pad; + mp->mp_lower = fp->mp_lower; + mp->mp_upper = fp->mp_upper + offset; + if (fp_flags & P_LEAF2) { + memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); + } else { + memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, + olddata.mv_size - fp->mp_upper); + for (i = NUMKEYS(fp); --i >= 0; ) + mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset; + } + } + + rdata = &xdata; + flags |= F_DUPDATA; + do_sub = 1; + if (!insert) + mdb_node_del(mc, 0); + goto new_sub; + } + current: + // overflow page overwrites need special handling + if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { + MDB_page *omp; + pgno_t pg; + int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); + + memcpy(&pg, olddata.mv_data, sizeof(pg)); + if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0) + return rc2; + ovpages = omp->mp_pages; + + // Is the ov page large enough? + if (ovpages >= dpages) { + if (!(omp->mp_flags & P_DIRTY) && + (level || (env->me_flags & MDB_WRITEMAP))) + { + rc = mdb_page_unspill(mc->mc_txn, omp, &omp); + if (rc) + return rc; + level = 0; // dirty in this txn or clean + } + // Is it dirty? + if (omp->mp_flags & P_DIRTY) { + // yes, overwrite it. Note in this case we don't + // bother to try shrinking the page if the new data + // is smaller than the overflow threshold. + if (level > 1) { + // It is writable only in a parent txn + size_t sz = (size_t) env->me_psize * ovpages, off; + MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages); + MDB_ID2 id2; + if (!np) + return ENOMEM; + id2.mid = pg; + id2.mptr = np; + rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2); + mdb_cassert(mc, rc == 0); + if (!(flags & MDB_RESERVE)) { + // Copy end of page, adjusting alignment so + // compiler may copy words instead of bytes. + off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t); + memcpy((size_t *)((char *)np + off), + (size_t *)((char *)omp + off), sz - off); + sz = PAGEHDRSZ; + } + memcpy(np, omp, sz); // Copy beginning of page + omp = np; + } + SETDSZ(leaf, data->mv_size); + if (F_ISSET(flags, MDB_RESERVE)) + data->mv_data = METADATA(omp); + else + memcpy(METADATA(omp), data->mv_data, data->mv_size); + goto done; + } + } + if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS) + return rc2; + } else if (data->mv_size == olddata.mv_size) { + // same size, just replace it. Note that we could + // also reuse this node if the new data is smaller, + // but instead we opt to shrink the node in that case. + if (F_ISSET(flags, MDB_RESERVE)) + data->mv_data = olddata.mv_data; + else if (data->mv_size) + memcpy(olddata.mv_data, data->mv_data, data->mv_size); + else + memcpy(NODEKEY(leaf), key->mv_data, key->mv_size); + goto done; + } + mdb_node_del(mc, 0); + mc->mc_db->md_entries--; + } + + rdata = data; + + new_sub: + nflags = flags & NODE_ADD_FLAGS; + nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata); + if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { + if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA ) + nflags &= ~MDB_APPEND; + if (!insert) + nflags |= MDB_SPLIT_REPLACE; + rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags); + } else { + // There is room already in this leaf page. + rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags); + if (rc == 0 && !do_sub && insert) { + // Adjust other cursors pointing to mp + MDB_cursor *m2, *m3; + MDB_dbi dbi = mc->mc_dbi; + unsigned i = mc->mc_top; + MDB_page *mp = mc->mc_pg[i]; + + for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + if (mc->mc_flags & C_SUB) + m3 = &m2->mc_xcursor->mx_cursor; + else + m3 = m2; + if (m3 == mc || m3->mc_snum < mc->mc_snum) continue; + if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) { + m3->mc_ki[i]++; + } + } + } + } + + if (rc != MDB_SUCCESS) + mc->mc_txn->mt_flags |= MDB_TXN_ERROR; + else { + // Now store the actual data in the child DB. Note that we're + // storing the user data in the keys field, so there are strict + // size limits on dupdata. The actual data fields of the child + // DB are all zero size. + if (do_sub) { + int xflags; + put_sub: + xdata.mv_size = 0; + xdata.mv_data = ""; + leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + if (flags & MDB_CURRENT) { + xflags = MDB_CURRENT|MDB_NOSPILL; + } else { + mdb_xcursor_init1(mc, leaf); + xflags = (flags & MDB_NODUPDATA) ? + MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL; + } + // converted, write the original data first + if (dkey.mv_size) { + rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); + if (rc) + return rc; + { + // Adjust other cursors pointing to mp + MDB_cursor *m2; + unsigned i = mc->mc_top; + MDB_page *mp = mc->mc_pg[i]; + + for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) { + if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; + if (!(m2->mc_flags & C_INITIALIZED)) continue; + if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) { + mdb_xcursor_init1(m2, leaf); + } } } + // we've done our job + dkey.mv_size = 0; + } + if (flags & MDB_APPENDDUP) + xflags |= MDB_APPEND; + rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); + if (flags & F_SUBDATA) { + void *db = NODEDATA(leaf); + memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); } - // we've done our job - dkey.mv_size = 0; } - if (flags & MDB_APPENDDUP) - xflags |= MDB_APPEND; - rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); - if (flags & F_SUBDATA) { - void *db = NODEDATA(leaf); - memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); - } - } - // sub-writes might have failed so check rc again. - // Don't increment count if we just replaced an existing item. - if (!rc && !(flags & MDB_CURRENT)) - mc->mc_db->md_entries++; - if (flags & MDB_MULTIPLE) { - if (!rc) { - next_mult: - mcount++; - // let caller know how many succeeded, if any - data[1].mv_size = mcount; - if (mcount < dcount) { - data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; - goto more; + // sub-writes might have failed so check rc again. + // Don't increment count if we just replaced an existing item. + if (!rc && !(flags & MDB_CURRENT)) + mc->mc_db->md_entries++; + if (flags & MDB_MULTIPLE) { + if (!rc) { + next_mult: + mcount++; + // let caller know how many succeeded, if any + data[1].mv_size = mcount; + if (mcount < dcount) { + data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; + goto more; + } } } } - } - done: - // If we succeeded and the key didn't exist before, make sure - // the cursor is marked valid. - if (!rc && insert) - mc->mc_flags |= C_INITIALIZED; - return rc; + done: + // If we succeeded and the key didn't exist before, make sure + // the cursor is marked valid. + if (!rc && insert) + mc->mc_flags |= C_INITIALIZED; + return rc; */ return nil } diff --git a/rwtransaction.go b/rwtransaction.go index d0fe440..cb0cc0f 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -4,4 +4,126 @@ package bolt // Only one read/write transaction can be active for a DB at a time. type RWTransaction struct { Transaction + + dirtyPages map[int]*page + freePages map[int]*page +} + +// TODO: Allocate scratch meta page. +// TODO: Allocate scratch data pages. +// TODO: Track dirty pages (?) + +func (t *RWTransaction) Commit() error { + // TODO: Update non-system bucket pointers. + // TODO: Save freelist. + // TODO: Flush data. + + // TODO: Initialize new meta object, Update system bucket nodes, last pgno, txnid. + // meta.mm_dbs[0] = txn->mt_dbs[0]; + // meta.mm_dbs[1] = txn->mt_dbs[1]; + // meta.mm_last_pg = txn->mt_next_pgno - 1; + // meta.mm_txnid = txn->mt_txnid; + + // TODO: Pick sync or async file descriptor. + // TODO: Write meta page to file. + + // TODO(?): Write checksum at the end. + + return nil +} + +func (t *RWTransaction) Rollback() error { + return t.close() +} + +func (t *RWTransaction) close() error { + // TODO: Free scratch pages. + // TODO: Release writer lock. + return nil +} + +// CreateBucket creates a new bucket. +func (t *RWTransaction) CreateBucket(name string, dupsort bool) (*Bucket, error) { + if t.db == nil { + return nil, InvalidTransactionError + } + + // Check if bucket already exists. + if b := t.buckets[name]; b != nil { + return nil, &Error{"bucket already exists", nil} + } + + // Create a new bucket entry. + var buf [unsafe.Sizeof(bucket{})]byte + var raw = (*bucket)(unsafe.Pointer(&buf[0])) + raw.root = p_invalid + // TODO: Set dupsort flag. + + // Open cursor to system bucket. + c, err := t.Cursor(&t.sysbuckets) + if err != nil { + return nil, err + } + + // Put new entry into system bucket. + if err := c.Put([]byte(name), buf[:]); err != nil { + return nil, err + } + + // Save reference to bucket. + b := &Bucket{name: name, bucket: raw, isNew: true} + t.buckets[name] = b + + // TODO: dbflag |= DB_DIRTY; + + return b, nil +} + +// DropBucket deletes a bucket. +func (t *RWTransaction) DeleteBucket(b *Bucket) error { + // TODO: Find bucket. + // TODO: Remove entry from system bucket. + return nil +} + +// Put sets the value for a key in a given bucket. +func (t *Transaction) Put(name string, key []byte, value []byte) error { + c, err := t.Cursor(name) + if err != nil { + return nil, err + } + return c.Put(key, value) +} + + +// page returns a reference to the page with a given id. +// If page has been written to then a temporary bufferred page is returned. +func (t *Transaction) page(id int) *page { + // Check the dirty pages first. + if p, ok := t.pages[id]; ok { + return p + } + + // Otherwise return directly from the mmap. + return t.Transaction.page(id) +} + +// Flush (some) dirty pages to the map, after clearing their dirty flag. +// @param[in] txn the transaction that's being committed +// @param[in] keep number of initial pages in dirty_list to keep dirty. +// @return 0 on success, non-zero on failure. +func (t *Transaction) flush(keep bool) error { + // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. + + // TODO: Loop over each dirty page and write it to disk. + return nil +} + +func (t *RWTransaction) DeleteBucket(name string) error { + // TODO: Remove from main DB. + // TODO: Delete entry from system bucket. + // TODO: Free all pages. + // TODO: Remove cursor. + + return nil } diff --git a/transaction.go b/transaction.go index e95bf7c..6319b9e 100644 --- a/transaction.go +++ b/transaction.go @@ -5,14 +5,9 @@ import ( "unsafe" ) -var InvalidTransactionError = &Error{"txn is invalid", nil} -var BucketAlreadyExistsError = &Error{"bucket already exists", nil} - -const ( - txnb_dirty = 0x01 /**< DB was modified or is DUPSORT data */ - txnb_stale = 0x02 /**< Named-DB record is older than txnID */ - txnb_new = 0x04 /**< Named-DB handle opened in this txn */ - txnb_valid = 0x08 /**< DB handle is valid, see also #MDB_VALID */ +var ( + InvalidTransactionError = &Error{"txn is invalid", nil} + BucketAlreadyExistsError = &Error{"bucket already exists", nil} ) const ( @@ -43,50 +38,21 @@ type Transaction struct { dirty_room int } -// CreateBucket creates a new bucket. -func (t *Transaction) CreateBucket(name string, dupsort bool) (*Bucket, error) { - if t.db == nil { - return nil, InvalidTransactionError - } +// init initializes the transaction and associates it with a database. +func (t *Transaction) init(db *DB, meta *meta) error { - // Check if bucket already exists. - if b := t.buckets[name]; b != nil { - return nil, &Error{"bucket already exists", nil} - } - - // Create a new bucket entry. - var buf [unsafe.Sizeof(bucket{})]byte - var raw = (*bucket)(unsafe.Pointer(&buf[0])) - raw.root = p_invalid - // TODO: Set dupsort flag. - - // Open cursor to system bucket. - c, err := t.Cursor(&t.sysbuckets) - if err != nil { - return nil, err - } - - // Put new entry into system bucket. - if err := c.Put([]byte(name), buf[:]); err != nil { - return nil, err - } - - // Save reference to bucket. - b := &Bucket{name: name, bucket: raw, isNew: true} - t.buckets[name] = b - - // TODO: dbflag |= DB_DIRTY; - - return b, nil } -// DropBucket deletes a bucket. -func (t *Transaction) DeleteBucket(b *Bucket) error { - // TODO: Find bucket. - // TODO: Remove entry from system bucket. +func (t *Transaction) Close() error { + // TODO: Close cursors. + // TODO: Close buckets. return nil } +func (t *Transaction) DB() *DB { + return t.db +} + // Bucket retrieves a bucket by name. func (t *Transaction) Bucket(name string) (*Bucket, error) { if strings.HasPrefix(name, "sys*") { @@ -158,16 +124,28 @@ func (t *Transaction) Cursor(b *Bucket) (*Cursor, error) { c.pages = append(c.pages, p) } - if (b.flags & MDB_DUPSORT) != 0 { - c.subcursor = &Cursor{ - transaction: t, - bucket: b, - } - } - return nil, nil } +// Get retrieves the value for a key in a given bucket. +func (t *Transaction) Get(name string, key []byte) ([]byte, error) { + c, err := t.Cursor(name) + if err != nil { + return nil, err + } + return c.Get(key) +} + +func (t *Transaction) page(id int) (*page, error) { + return t.db.page(id) +} + +// Stat returns information about a bucket's internal structure. +func (t *Transaction) Stat(name string) *stat { + // TODO + return nil +} + // // // // // // @@ -180,469 +158,6 @@ func (t *Transaction) Cursor(b *Bucket) (*Cursor, error) { // // // // -func (t *Transaction) allocPage(num int) *page { - /* - MDB_env *env = txn->mt_env; - MDB_page *ret = env->me_dpages; - size_t psize = env->me_psize, sz = psize, off; - // For ! #MDB_NOMEMINIT, psize counts how much to init. - // For a single page alloc, we init everything after the page header. - // For multi-page, we init the final page; if the caller needed that - // many pages they will be filling in at least up to the last page. - if (num == 1) { - if (ret) { - VGMEMP_ALLOC(env, ret, sz); - VGMEMP_DEFINED(ret, sizeof(ret->mp_next)); - env->me_dpages = ret->mp_next; - return ret; - } - psize -= off = PAGEHDRSZ; - } else { - sz *= num; - off = sz - psize; - } - if ((ret = malloc(sz)) != NULL) { - VGMEMP_ALLOC(env, ret, sz); - if (!(env->me_flags & MDB_NOMEMINIT)) { - memset((char *)ret + off, 0, psize); - ret->mp_pad = 0; - } - } else { - txn->mt_flags |= MDB_TXN_ERROR; - } - return ret; - */ - return nil -} - -// Find oldest txnid still referenced. Expects txn->mt_txnid > 0. -func (t *Transaction) oldest() int { - /* - int i; - txnid_t mr, oldest = txn->mt_txnid - 1; - if (txn->mt_env->me_txns) { - MDB_reader *r = txn->mt_env->me_txns->mti_readers; - for (i = txn->mt_env->me_txns->mti_numreaders; --i >= 0; ) { - if (r[i].mr_pid) { - mr = r[i].mr_txnid; - if (oldest > mr) - oldest = mr; - } - } - } - return oldest; - */ - return 0 -} - -// Add a page to the txn's dirty list -func (t *Transaction) addDirtyPage(p *page) { - /* - MDB_ID2 mid; - int rc, (*insert)(MDB_ID2L, MDB_ID2 *); - - if (txn->mt_env->me_flags & MDB_WRITEMAP) { - insert = mdb_mid2l_append; - } else { - insert = mdb_mid2l_insert; - } - mid.mid = mp->mp_pgno; - mid.mptr = mp; - rc = insert(txn->mt_u.dirty_list, &mid); - mdb_tassert(txn, rc == 0); - txn->mt_dirty_room--; - */ -} - -// Pull a page off the txn's spill list, if present. -// If a page being referenced was spilled to disk in this txn, bring -// it back and make it dirty/writable again. -// @param[in] txn the transaction handle. -// @param[in] mp the page being referenced. It must not be dirty. -// @param[out] ret the writable page, if any. ret is unchanged if -// mp wasn't spilled. -func (t *Transaction) unspill(p *page) *page { - /* - MDB_env *env = txn->mt_env; - const MDB_txn *tx2; - unsigned x; - pgno_t pgno = mp->mp_pgno, pn = pgno << 1; - - for (tx2 = txn; tx2; tx2=tx2->mt_parent) { - if (!tx2->mt_spill_pgs) - continue; - x = mdb_midl_search(tx2->mt_spill_pgs, pn); - if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { - MDB_page *np; - int num; - if (txn->mt_dirty_room == 0) - return MDB_TXN_FULL; - if (IS_OVERFLOW(mp)) - num = mp->mp_pages; - else - num = 1; - if (env->me_flags & MDB_WRITEMAP) { - np = mp; - } else { - np = mdb_page_malloc(txn, num); - if (!np) - return ENOMEM; - if (num > 1) - memcpy(np, mp, num * env->me_psize); - else - mdb_page_copy(np, mp, env->me_psize); - } - if (tx2 == txn) { - // If in current txn, this page is no longer spilled. - // If it happens to be the last page, truncate the spill list. - // Otherwise mark it as deleted by setting the LSB. - if (x == txn->mt_spill_pgs[0]) - txn->mt_spill_pgs[0]--; - else - txn->mt_spill_pgs[x] |= 1; - } // otherwise, if belonging to a parent txn, the - // page remains spilled until child commits - - mdb_page_dirty(txn, np); - np->mp_flags |= P_DIRTY; - *ret = np; - break; - } - } - return MDB_SUCCESS; - */ - return nil -} - -// Back up parent txn's cursors, then grab the originals for tracking -func (t *Transaction) shadow(dst *Transaction) error { - /* - MDB_cursor *mc, *bk; - MDB_xcursor *mx; - size_t size; - int i; - - for (i = src->mt_numdbs; --i >= 0; ) { - if ((mc = src->mt_cursors[i]) != NULL) { - size = sizeof(MDB_cursor); - if (mc->mc_xcursor) - size += sizeof(MDB_xcursor); - for (; mc; mc = bk->mc_next) { - bk = malloc(size); - if (!bk) - return ENOMEM; - *bk = *mc; - mc->mc_backup = bk; - mc->mc_db = &dst->mt_dbs[i]; - // Kill pointers into src - and dst to reduce abuse: The - // user may not use mc until dst ends. Otherwise we'd... - mc->mc_txn = NULL; // ...set this to dst - mc->mc_dbflag = NULL; // ...and &dst->mt_dbflags[i] - if ((mx = mc->mc_xcursor) != NULL) { - *(MDB_xcursor *)(bk+1) = *mx; - mx->mx_cursor.mc_txn = NULL; // ...and dst. - } - mc->mc_next = dst->mt_cursors[i]; - dst->mt_cursors[i] = mc; - } - } - } - return MDB_SUCCESS; - */ - return nil -} - -// Close this write txn's cursors, give parent txn's cursors back to parent. -// @param[in] txn the transaction handle. -// @param[in] merge true to keep changes to parent cursors, false to revert. -// @return 0 on success, non-zero on failure. -func (t *Transaction) closeCursors(merge bool) { - /* - MDB_cursor **cursors = txn->mt_cursors, *mc, *next, *bk; - MDB_xcursor *mx; - int i; - - for (i = txn->mt_numdbs; --i >= 0; ) { - for (mc = cursors[i]; mc; mc = next) { - next = mc->mc_next; - if ((bk = mc->mc_backup) != NULL) { - if (merge) { - // Commit changes to parent txn - mc->mc_next = bk->mc_next; - mc->mc_backup = bk->mc_backup; - mc->mc_txn = bk->mc_txn; - mc->mc_db = bk->mc_db; - mc->mc_dbflag = bk->mc_dbflag; - if ((mx = mc->mc_xcursor) != NULL) - mx->mx_cursor.mc_txn = bk->mc_txn; - } else { - // Abort nested txn - *mc = *bk; - if ((mx = mc->mc_xcursor) != NULL) - *mx = *(MDB_xcursor *)(bk+1); - } - mc = bk; - } - // Only malloced cursors are permanently tracked. - free(mc); - } - cursors[i] = NULL; - } - */ -} - -// Common code for #mdb_txn_begin() and #mdb_txn_renew(). -// @param[in] txn the transaction handle to initialize -// @return 0 on success, non-zero on failure. -func (t *Transaction) renew() error { - /* - MDB_env *env = txn->mt_env; - MDB_txninfo *ti = env->me_txns; - MDB_meta *meta; - unsigned int i, nr; - uint16_t x; - int rc, new_notls = 0; - - // Setup db info - txn->mt_numdbs = env->me_numdbs; - txn->mt_dbxs = env->me_dbxs; // mostly static anyway - - if (txn->mt_flags & MDB_TXN_RDONLY) { - if (!ti) { - meta = env->me_metas[ mdb_env_pick_meta(env) ]; - txn->mt_txnid = meta->mm_txnid; - txn->mt_u.reader = NULL; - } else { - MDB_reader *r = (env->me_flags & MDB_NOTLS) ? txn->mt_u.reader : - pthread_getspecific(env->me_txkey); - if (r) { - if (r->mr_pid != env->me_pid || r->mr_txnid != (txnid_t)-1) - return MDB_BAD_RSLOT; - } else { - MDB_PID_T pid = env->me_pid; - pthread_t tid = pthread_self(); - - if (!(env->me_flags & MDB_LIVE_READER)) { - rc = mdb_reader_pid(env, Pidset, pid); - if (rc) - return rc; - env->me_flags |= MDB_LIVE_READER; - } - - LOCK_MUTEX_R(env); - nr = ti->mti_numreaders; - for (i=0; imti_readers[i].mr_pid == 0) - break; - if (i == env->me_maxreaders) { - UNLOCK_MUTEX_R(env); - return MDB_READERS_FULL; - } - ti->mti_readers[i].mr_pid = pid; - ti->mti_readers[i].mr_tid = tid; - if (i == nr) - ti->mti_numreaders = ++nr; - // Save numreaders for un-mutexed mdb_env_close() - env->me_numreaders = nr; - UNLOCK_MUTEX_R(env); - - r = &ti->mti_readers[i]; - new_notls = (env->me_flags & MDB_NOTLS); - if (!new_notls && (rc=pthread_setspecific(env->me_txkey, r))) { - r->mr_pid = 0; - return rc; - } - } - txn->mt_txnid = r->mr_txnid = ti->mti_txnid; - txn->mt_u.reader = r; - meta = env->me_metas[txn->mt_txnid & 1]; - } - } else { - if (ti) { - LOCK_MUTEX_W(env); - - txn->mt_txnid = ti->mti_txnid; - meta = env->me_metas[txn->mt_txnid & 1]; - } else { - meta = env->me_metas[ mdb_env_pick_meta(env) ]; - txn->mt_txnid = meta->mm_txnid; - } - txn->mt_txnid++; - #if MDB_DEBUG - if (txn->mt_txnid == mdb_debug_start) - mdb_debug = 1; - #endif - txn->mt_dirty_room = MDB_IDL_UM_MAX; - txn->mt_u.dirty_list = env->me_dirty_list; - txn->mt_u.dirty_list[0].mid = 0; - txn->mt_free_pgs = env->me_free_pgs; - txn->mt_free_pgs[0] = 0; - txn->mt_spill_pgs = NULL; - env->me_txn = txn; - } - - // Copy the DB info and flags - memcpy(txn->mt_dbs, meta->mm_dbs, 2 * sizeof(MDB_db)); - - // Moved to here to avoid a data race in read TXNs - txn->mt_next_pgno = meta->mm_last_pg+1; - - for (i=2; imt_numdbs; i++) { - x = env->me_dbflags[i]; - txn->mt_dbs[i].md_flags = x & PERSISTENT_FLAGS; - txn->mt_dbflags[i] = (x & MDB_VALID) ? DB_VALID|DB_STALE : 0; - } - txn->mt_dbflags[0] = txn->mt_dbflags[1] = DB_VALID; - - if (env->me_maxpg < txn->mt_next_pgno) { - mdb_txn_reset0(txn, "renew0-mapfail"); - if (new_notls) { - txn->mt_u.reader->mr_pid = 0; - txn->mt_u.reader = NULL; - } - return MDB_MAP_RESIZED; - } - - return MDB_SUCCESS; - */ - return nil -} - -func (t *Transaction) Renew() error { - /* - int rc; - - if (!txn || txn->mt_dbxs) // A reset txn has mt_dbxs==NULL - return EINVAL; - - if (txn->mt_env->me_flags & MDB_FATAL_ERROR) { - DPUTS("environment had fatal error, must shutdown!"); - return MDB_PANIC; - } - - rc = mdb_txn_renew0(txn); - if (rc == MDB_SUCCESS) { - DPRINTF(("renew txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", - txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *)txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root)); - } - return rc; - */ - return nil -} - -func (t *Transaction) DB() *DB { - return t.db -} - -// Export or close DBI handles opened in this txn. -func (t *Transaction) updateBuckets(keep bool) { - /* - int i; - MDB_dbi n = txn->mt_numdbs; - MDB_env *env = txn->mt_env; - unsigned char *tdbflags = txn->mt_dbflags; - - for (i = n; --i >= 2;) { - if (tdbflags[i] & DB_NEW) { - if (keep) { - env->me_dbflags[i] = txn->mt_dbs[i].md_flags | MDB_VALID; - } else { - char *ptr = env->me_dbxs[i].md_name.mv_data; - env->me_dbxs[i].md_name.mv_data = NULL; - env->me_dbxs[i].md_name.mv_size = 0; - env->me_dbflags[i] = 0; - free(ptr); - } - } - } - if (keep && env->me_numdbs < n) - env->me_numdbs = n; - */ -} - -// Common code for #mdb_txn_reset() and #mdb_txn_abort(). -// May be called twice for readonly txns: First reset it, then abort. -// @param[in] txn the transaction handle to reset -// @param[in] act why the transaction is being reset -func (t *Transaction) reset(act string) { - /* - MDB_env *env = txn->mt_env; - - // Close any DBI handles opened in this txn - mdb_dbis_update(txn, 0); - - DPRINTF(("%s txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", - act, txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root)); - - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - if (txn->mt_u.reader) { - txn->mt_u.reader->mr_txnid = (txnid_t)-1; - if (!(env->me_flags & MDB_NOTLS)) - txn->mt_u.reader = NULL; // txn does not own reader - } - txn->mt_numdbs = 0; // close nothing if called again - txn->mt_dbxs = NULL; // mark txn as reset - } else { - mdb_cursors_close(txn, 0); - - if (!(env->me_flags & MDB_WRITEMAP)) { - mdb_dlist_free(txn); - } - mdb_midl_free(env->me_pghead); - - if (txn->mt_parent) { - txn->mt_parent->mt_child = NULL; - env->me_pgstate = ((MDB_ntxn *)txn)->mnt_pgstate; - mdb_midl_free(txn->mt_free_pgs); - mdb_midl_free(txn->mt_spill_pgs); - free(txn->mt_u.dirty_list); - return; - } - - if (mdb_midl_shrink(&txn->mt_free_pgs)) - env->me_free_pgs = txn->mt_free_pgs; - env->me_pghead = NULL; - env->me_pglast = 0; - - env->me_txn = NULL; - // The writer mutex was locked in mdb_txn_begin. - if (env->me_txns) - UNLOCK_MUTEX_W(env); - } - */ -} - -func (t *Transaction) Reset() { - /* - if (txn == NULL) - return; - - // This call is only valid for read-only txns - if (!(txn->mt_flags & MDB_TXN_RDONLY)) - return; - - mdb_txn_reset0(txn, "reset"); - */ -} - -func (t *Transaction) Abort() { - /* - if (txn == NULL) - return; - - if (txn->mt_child) - mdb_txn_abort(txn->mt_child); - - mdb_txn_reset0(txn, "abort"); - // Free reader slot tied to this txn (if MDB_NOTLS && writable FS) - if ((txn->mt_flags & MDB_TXN_RDONLY) && txn->mt_u.reader) - txn->mt_u.reader->mr_pid = 0; - - free(txn); - */ -} // Save the freelist as of this transaction to the freeDB. // This changes the freelist. Keep trying until it stabilizes. @@ -800,530 +315,8 @@ func (t *Transaction) saveFreelist() error { return nil } -// Flush (some) dirty pages to the map, after clearing their dirty flag. -// @param[in] txn the transaction that's being committed -// @param[in] keep number of initial pages in dirty_list to keep dirty. -// @return 0 on success, non-zero on failure. -func (t *Transaction) flush(keep bool) error { - /* - MDB_env *env = txn->mt_env; - MDB_ID2L dl = txn->mt_u.dirty_list; - unsigned psize = env->me_psize, j; - int i, pagecount = dl[0].mid, rc; - size_t size = 0, pos = 0; - pgno_t pgno = 0; - MDB_page *dp = NULL; - #ifdef _WIN32 - OVERLAPPED ov; - #else - struct iovec iov[MDB_COMMIT_PAGES]; - ssize_t wpos = 0, wsize = 0, wres; - size_t next_pos = 1; // impossible pos, so pos != next_pos - int n = 0; - #endif - j = i = keep; - if (env->me_flags & MDB_WRITEMAP) { - // Clear dirty flags - while (++i <= pagecount) { - dp = dl[i].mptr; - // Don't flush this page yet - if (dp->mp_flags & P_KEEP) { - dp->mp_flags ^= P_KEEP; - dl[++j] = dl[i]; - continue; - } - dp->mp_flags &= ~P_DIRTY; - } - goto done; - } - - // Write the pages - for (;;) { - if (++i <= pagecount) { - dp = dl[i].mptr; - // Don't flush this page yet - if (dp->mp_flags & P_KEEP) { - dp->mp_flags ^= P_KEEP; - dl[i].mid = 0; - continue; - } - pgno = dl[i].mid; - // clear dirty flag - dp->mp_flags &= ~P_DIRTY; - pos = pgno * psize; - size = psize; - if (IS_OVERFLOW(dp)) size *= dp->mp_pages; - } - #ifdef _WIN32 - else break; - - // Windows actually supports scatter/gather I/O, but only on - // unbuffered file handles. Since we're relying on the OS page - // cache for all our data, that's self-defeating. So we just - // write pages one at a time. We use the ov structure to set - // the write offset, to at least save the overhead of a Seek - // system call. - DPRINTF(("committing page %"Z"u", pgno)); - memset(&ov, 0, sizeof(ov)); - ov.Offset = pos & 0xffffffff; - ov.OffsetHigh = pos >> 16 >> 16; - if (!WriteFile(env->me_fd, dp, size, NULL, &ov)) { - rc = ErrCode(); - DPRINTF(("WriteFile: %d", rc)); - return rc; - } - #else - // Write up to MDB_COMMIT_PAGES dirty pages at a time. - if (pos!=next_pos || n==MDB_COMMIT_PAGES || wsize+size>MAX_WRITE) { - if (n) { - // Write previous page(s) - #ifdef MDB_USE_PWRITEV - wres = pwritev(env->me_fd, iov, n, wpos); - #else - if (n == 1) { - wres = pwrite(env->me_fd, iov[0].iov_base, wsize, wpos); - } else { - if (lseek(env->me_fd, wpos, SEEK_SET) == -1) { - rc = ErrCode(); - DPRINTF(("lseek: %s", strerror(rc))); - return rc; - } - wres = writev(env->me_fd, iov, n); - } - #endif - if (wres != wsize) { - if (wres < 0) { - rc = ErrCode(); - DPRINTF(("Write error: %s", strerror(rc))); - } else { - rc = EIO; // TODO: Use which error code? - DPUTS("short write, filesystem full?"); - } - return rc; - } - n = 0; - } - if (i > pagecount) - break; - wpos = pos; - wsize = 0; - } - DPRINTF(("committing page %"Z"u", pgno)); - next_pos = pos + size; - iov[n].iov_len = size; - iov[n].iov_base = (char *)dp; - wsize += size; - n++; - #endif // _WIN32 - } - - for (i = keep; ++i <= pagecount; ) { - dp = dl[i].mptr; - // This is a page we skipped above - if (!dl[i].mid) { - dl[++j] = dl[i]; - dl[j].mid = dp->mp_pgno; - continue; - } - mdb_dpage_free(env, dp); - } - - done: - i--; - txn->mt_dirty_room += i - j; - dl[0].mid = j; - return MDB_SUCCESS; - } - - int - mdb_txn_commit(MDB_txn *txn) - { - int rc; - unsigned int i; - MDB_env *env; - - if (txn == NULL || txn->mt_env == NULL) - return EINVAL; - - if (txn->mt_child) { - rc = mdb_txn_commit(txn->mt_child); - txn->mt_child = NULL; - if (rc) - goto fail; - } - - env = txn->mt_env; - - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - mdb_dbis_update(txn, 1); - txn->mt_numdbs = 2; // so txn_abort() doesn't close any new handles - mdb_txn_abort(txn); - return MDB_SUCCESS; - } - - if (F_ISSET(txn->mt_flags, MDB_TXN_ERROR)) { - DPUTS("error flag is set, can't commit"); - if (txn->mt_parent) - txn->mt_parent->mt_flags |= MDB_TXN_ERROR; - rc = MDB_BAD_TXN; - goto fail; - } - - if (txn->mt_parent) { - MDB_txn *parent = txn->mt_parent; - MDB_ID2L dst, src; - MDB_IDL pspill; - unsigned x, y, len, ps_len; - - // Append our free list to parent's - rc = mdb_midl_append_list(&parent->mt_free_pgs, txn->mt_free_pgs); - if (rc) - goto fail; - mdb_midl_free(txn->mt_free_pgs); - // Failures after this must either undo the changes - // to the parent or set MDB_TXN_ERROR in the parent. - - parent->mt_next_pgno = txn->mt_next_pgno; - parent->mt_flags = txn->mt_flags; - - // Merge our cursors into parent's and close them - mdb_cursors_close(txn, 1); - - // Update parent's DB table. - memcpy(parent->mt_dbs, txn->mt_dbs, txn->mt_numdbs * sizeof(MDB_db)); - parent->mt_numdbs = txn->mt_numdbs; - parent->mt_dbflags[0] = txn->mt_dbflags[0]; - parent->mt_dbflags[1] = txn->mt_dbflags[1]; - for (i=2; imt_numdbs; i++) { - // preserve parent's DB_NEW status - x = parent->mt_dbflags[i] & DB_NEW; - parent->mt_dbflags[i] = txn->mt_dbflags[i] | x; - } - - dst = parent->mt_u.dirty_list; - src = txn->mt_u.dirty_list; - // Remove anything in our dirty list from parent's spill list - if ((pspill = parent->mt_spill_pgs) && (ps_len = pspill[0])) { - x = y = ps_len; - pspill[0] = (pgno_t)-1; - // Mark our dirty pages as deleted in parent spill list - for (i=0, len=src[0].mid; ++i <= len; ) { - MDB_ID pn = src[i].mid << 1; - while (pn > pspill[x]) - x--; - if (pn == pspill[x]) { - pspill[x] = 1; - y = --x; - } - } - // Squash deleted pagenums if we deleted any - for (x=y; ++x <= ps_len; ) - if (!(pspill[x] & 1)) - pspill[++y] = pspill[x]; - pspill[0] = y; - } - - // Find len = length of merging our dirty list with parent's - x = dst[0].mid; - dst[0].mid = 0; // simplify loops - if (parent->mt_parent) { - len = x + src[0].mid; - y = mdb_mid2l_search(src, dst[x].mid + 1) - 1; - for (i = x; y && i; y--) { - pgno_t yp = src[y].mid; - while (yp < dst[i].mid) - i--; - if (yp == dst[i].mid) { - i--; - len--; - } - } - } else { // Simplify the above for single-ancestor case - len = MDB_IDL_UM_MAX - txn->mt_dirty_room; - } - // Merge our dirty list with parent's - y = src[0].mid; - for (i = len; y; dst[i--] = src[y--]) { - pgno_t yp = src[y].mid; - while (yp < dst[x].mid) - dst[i--] = dst[x--]; - if (yp == dst[x].mid) - free(dst[x--].mptr); - } - mdb_tassert(txn, i == x); - dst[0].mid = len; - free(txn->mt_u.dirty_list); - parent->mt_dirty_room = txn->mt_dirty_room; - if (txn->mt_spill_pgs) { - if (parent->mt_spill_pgs) { - // TODO: Prevent failure here, so parent does not fail - rc = mdb_midl_append_list(&parent->mt_spill_pgs, txn->mt_spill_pgs); - if (rc) - parent->mt_flags |= MDB_TXN_ERROR; - mdb_midl_free(txn->mt_spill_pgs); - mdb_midl_sort(parent->mt_spill_pgs); - } else { - parent->mt_spill_pgs = txn->mt_spill_pgs; - } - } - - parent->mt_child = NULL; - mdb_midl_free(((MDB_ntxn *)txn)->mnt_pgstate.mf_pghead); - free(txn); - return rc; - } - - if (txn != env->me_txn) { - DPUTS("attempt to commit unknown transaction"); - rc = EINVAL; - goto fail; - } - - mdb_cursors_close(txn, 0); - - if (!txn->mt_u.dirty_list[0].mid && - !(txn->mt_flags & (MDB_TXN_DIRTY|MDB_TXN_SPILLS))) - goto done; - - DPRINTF(("committing txn %"Z"u %p on mdbenv %p, root page %"Z"u", - txn->mt_txnid, (void*)txn, (void*)env, txn->mt_dbs[MAIN_DBI].md_root)); - - // Update DB root pointers - if (txn->mt_numdbs > 2) { - MDB_cursor mc; - MDB_dbi i; - MDB_val data; - data.mv_size = sizeof(MDB_db); - - mdb_cursor_init(&mc, txn, MAIN_DBI, NULL); - for (i = 2; i < txn->mt_numdbs; i++) { - if (txn->mt_dbflags[i] & DB_DIRTY) { - data.mv_data = &txn->mt_dbs[i]; - rc = mdb_cursor_put(&mc, &txn->mt_dbxs[i].md_name, &data, 0); - if (rc) - goto fail; - } - } - } - - rc = mdb_freelist_save(txn); - if (rc) - goto fail; - - mdb_midl_free(env->me_pghead); - env->me_pghead = NULL; - if (mdb_midl_shrink(&txn->mt_free_pgs)) - env->me_free_pgs = txn->mt_free_pgs; - - #if (MDB_DEBUG) > 2 - mdb_audit(txn); - #endif - - if ((rc = mdb_page_flush(txn, 0)) || - (rc = mdb_env_sync(env, 0)) || - (rc = mdb_env_write_meta(txn))) - goto fail; - - done: - env->me_pglast = 0; - env->me_txn = NULL; - mdb_dbis_update(txn, 1); - - if (env->me_txns) - UNLOCK_MUTEX_W(env); - free(txn); - - return MDB_SUCCESS; - - fail: - mdb_txn_abort(txn); - return rc; - */ - return nil -} - -// Update the environment info to commit a transaction. -// @param[in] txn the transaction that's being committed -// @return 0 on success, non-zero on failure. -func (t *Transaction) writeMeta() error { - /* - MDB_env *env; - MDB_meta meta, metab, *mp; - off_t off; - int rc, len, toggle; - char *ptr; - HANDLE mfd; - #ifdef _WIN32 - OVERLAPPED ov; - #else - int r2; - #endif - - toggle = txn->mt_txnid & 1; - DPRINTF(("writing meta page %d for root page %"Z"u", - toggle, txn->mt_dbs[MAIN_DBI].md_root)); - - env = txn->mt_env; - mp = env->me_metas[toggle]; - - if (env->me_flags & MDB_WRITEMAP) { - // Persist any increases of mapsize config - if (env->me_mapsize > mp->mm_mapsize) - mp->mm_mapsize = env->me_mapsize; - mp->mm_dbs[0] = txn->mt_dbs[0]; - mp->mm_dbs[1] = txn->mt_dbs[1]; - mp->mm_last_pg = txn->mt_next_pgno - 1; - mp->mm_txnid = txn->mt_txnid; - if (!(env->me_flags & (MDB_NOMETASYNC|MDB_NOSYNC))) { - unsigned meta_size = env->me_psize; - rc = (env->me_flags & MDB_MAPASYNC) ? MS_ASYNC : MS_SYNC; - ptr = env->me_map; - if (toggle) { - #ifndef _WIN32 // POSIX msync() requires ptr = start of OS page - if (meta_size < env->me_os_psize) - meta_size += meta_size; - else - #endif - ptr += meta_size; - } - if (MDB_MSYNC(ptr, meta_size, rc)) { - rc = ErrCode(); - goto fail; - } - } - goto done; - } - metab.mm_txnid = env->me_metas[toggle]->mm_txnid; - metab.mm_last_pg = env->me_metas[toggle]->mm_last_pg; - - ptr = (char *)&meta; - if (env->me_mapsize > mp->mm_mapsize) { - // Persist any increases of mapsize config - meta.mm_mapsize = env->me_mapsize; - off = offsetof(MDB_meta, mm_mapsize); - } else { - off = offsetof(MDB_meta, mm_dbs[0].md_depth); - } - len = sizeof(MDB_meta) - off; - - ptr += off; - meta.mm_dbs[0] = txn->mt_dbs[0]; - meta.mm_dbs[1] = txn->mt_dbs[1]; - meta.mm_last_pg = txn->mt_next_pgno - 1; - meta.mm_txnid = txn->mt_txnid; - - if (toggle) - off += env->me_psize; - off += PAGEHDRSZ; - - // Write to the SYNC fd - mfd = env->me_flags & (MDB_NOSYNC|MDB_NOMETASYNC) ? - env->me_fd : env->me_mfd; - #ifdef _WIN32 - { - memset(&ov, 0, sizeof(ov)); - ov.Offset = off; - if (!WriteFile(mfd, ptr, len, (DWORD *)&rc, &ov)) - rc = -1; - } - #else - rc = pwrite(mfd, ptr, len, off); - #endif - if (rc != len) { - rc = rc < 0 ? ErrCode() : EIO; - DPUTS("write failed, disk error?"); - // On a failure, the pagecache still contains the new data. - // Write some old data back, to prevent it from being used. - // Use the non-SYNC fd; we know it will fail anyway. - meta.mm_last_pg = metab.mm_last_pg; - meta.mm_txnid = metab.mm_txnid; - #ifdef _WIN32 - memset(&ov, 0, sizeof(ov)); - ov.Offset = off; - WriteFile(env->me_fd, ptr, len, NULL, &ov); - #else - r2 = pwrite(env->me_fd, ptr, len, off); - (void)r2; // Silence warnings. We don't care about pwrite's return value - #endif - fail: - env->me_flags |= MDB_FATAL_ERROR; - return rc; - } - done: - // Memory ordering issues are irrelevant; since the entire writer - // is wrapped by wmutex, all of these changes will become visible - // after the wmutex is unlocked. Since the DB is multi-version, - // readers will get consistent data regardless of how fresh or - // how stale their view of these values is. - if (env->me_txns) - env->me_txns->mti_txnid = txn->mt_txnid; - - return MDB_SUCCESS; - */ - return nil -} - -// Find the address of the page corresponding to a given page number. -// @param[in] txn the transaction for this access. -// @param[in] pgno the page number for the page to retrieve. -// @param[out] ret address of a pointer where the page's address will be stored. -// @param[out] lvl dirty_list inheritance level of found page. 1=current txn, 0=mapped page. -// @return 0 on success, non-zero on failure. -func (t *Transaction) getPage(id int) (*page, int, error) { - /* - MDB_env *env = txn->mt_env; - MDB_page *p = NULL; - int level; - - if (!((txn->mt_flags & MDB_TXN_RDONLY) | (env->me_flags & MDB_WRITEMAP))) { - MDB_txn *tx2 = txn; - level = 1; - do { - MDB_ID2L dl = tx2->mt_u.dirty_list; - unsigned x; - // Spilled pages were dirtied in this txn and flushed - // because the dirty list got full. Bring this page - // back in from the map (but don't unspill it here, - // leave that unless page_touch happens again). - if (tx2->mt_spill_pgs) { - MDB_ID pn = pgno << 1; - x = mdb_midl_search(tx2->mt_spill_pgs, pn); - if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { - p = (MDB_page *)(env->me_map + env->me_psize * pgno); - goto done; - } - } - if (dl[0].mid) { - unsigned x = mdb_mid2l_search(dl, pgno); - if (x <= dl[0].mid && dl[x].mid == pgno) { - p = dl[x].mptr; - goto done; - } - } - level++; - } while ((tx2 = tx2->mt_parent) != NULL); - } - - if (pgno < txn->mt_next_pgno) { - level = 0; - p = (MDB_page *)(env->me_map + env->me_psize * pgno); - } else { - DPRINTF(("page %"Z"u not found", pgno)); - txn->mt_flags |= MDB_TXN_ERROR; - return MDB_PAGE_NOTFOUND; - } - - done: - *ret = p; - if (lvl) - *lvl = level; - return MDB_SUCCESS; - */ - - return nil, 0, nil -} // Return the data associated with a given node. // @param[in] txn The transaction for this operation. @@ -1355,191 +348,3 @@ func (t *Transaction) readNode(leaf *node, data []byte) error { */ return nil } - -func (t *Transaction) Get(bucket Bucket, key []byte) ([]byte, error) { - /* - MDB_cursor mc; - MDB_xcursor mx; - int exact = 0; - DKBUF; - - if (key == NULL || data == NULL) - return EINVAL; - - DPRINTF(("===> get db %u key [%s]", dbi, DKEY(key))); - - if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) - return EINVAL; - - if (txn->mt_flags & MDB_TXN_ERROR) - return MDB_BAD_TXN; - - mdb_cursor_init(&mc, txn, dbi, &mx); - return mdb_cursor_set(&mc, key, data, MDB_SET, &exact); - */ - return nil, nil -} - -func (t *Transaction) Renew1(c Cursor) error { - /* - if (txn == NULL || mc == NULL || mc->mc_dbi >= txn->mt_numdbs) - return EINVAL; - - if ((mc->mc_flags & C_UNTRACK) || txn->mt_cursors) - return EINVAL; - - mdb_cursor_init(mc, txn, mc->mc_dbi, mc->mc_xcursor); - return MDB_SUCCESS; - */ - return nil -} - -func (t *Transaction) Delete(b *Bucket, key []byte, data []byte) error { - /* - MDB_cursor mc; - MDB_xcursor mx; - MDB_cursor_op op; - MDB_val rdata, *xdata; - int rc, exact; - DKBUF; - - if (key == NULL) - return EINVAL; - - DPRINTF(("====> delete db %u key [%s]", dbi, DKEY(key))); - - if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) - return EINVAL; - - if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) - return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; - - mdb_cursor_init(&mc, txn, dbi, &mx); - - exact = 0; - if (!F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { - // must ignore any data - data = NULL; - } - if (data) { - op = MDB_GET_BOTH; - rdata = *data; - xdata = &rdata; - } else { - op = MDB_SET; - xdata = NULL; - } - rc = mdb_cursor_set(&mc, key, xdata, op, &exact); - if (rc == 0) { - // let mdb_page_split know about this cursor if needed: - // delete will trigger a rebalance; if it needs to move - // a node from one page to another, it will have to - // update the parent's separator key(s). If the new sepkey - // is larger than the current one, the parent page may - // run out of space, triggering a split. We need this - // cursor to be consistent until the end of the rebalance. - mc.mc_flags |= C_UNTRACK; - mc.mc_next = txn->mt_cursors[dbi]; - txn->mt_cursors[dbi] = &mc; - rc = mdb_cursor_del(&mc, data ? 0 : MDB_NODUPDATA); - txn->mt_cursors[dbi] = mc.mc_next; - } - return rc; - */ - return nil -} - -func (t *Transaction) Put(b Bucket, key []byte, data []byte, flags int) error { - /* - MDB_cursor mc; - MDB_xcursor mx; - - if (key == NULL || data == NULL) - return EINVAL; - - if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) - return EINVAL; - - if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags) - return EINVAL; - - mdb_cursor_init(&mc, txn, dbi, &mx); - return mdb_cursor_put(&mc, key, data, flags); - */ - return nil -} - -func (t *Transaction) Stat(b Bucket) *stat { - /* - if (txn == NULL || arg == NULL || dbi >= txn->mt_numdbs) - return EINVAL; - - if (txn->mt_dbflags[dbi] & DB_STALE) { - MDB_cursor mc; - MDB_xcursor mx; - // Stale, must read the DB's root. cursor_init does it for us. - mdb_cursor_init(&mc, txn, dbi, &mx); - } - return mdb_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg); - */ - return nil -} - -func (t *Transaction) BucketFlags(b Bucket) (int, error) { - /* - // We could return the flags for the FREE_DBI too but what's the point? - if (txn == NULL || dbi < MAIN_DBI || dbi >= txn->mt_numdbs) - return EINVAL; - *flags = txn->mt_dbs[dbi].md_flags & PERSISTENT_FLAGS; - return MDB_SUCCESS; - */ - return 0, nil -} - -func (t *Transaction) Drop(b *Bucket, del int) error { - /* - MDB_cursor *mc, *m2; - int rc; - - if (!txn || !dbi || dbi >= txn->mt_numdbs || (unsigned)del > 1 || !(txn->mt_dbflags[dbi] & DB_VALID)) - return EINVAL; - - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) - return EACCES; - - rc = mdb_cursor_open(txn, dbi, &mc); - if (rc) - return rc; - - rc = mdb_drop0(mc, mc->mc_db->md_flags & MDB_DUPSORT); - // Invalidate the dropped DB's cursors - for (m2 = txn->mt_cursors[dbi]; m2; m2 = m2->mc_next) - m2->mc_flags &= ~(C_INITIALIZED|C_EOF); - if (rc) - goto leave; - - // Can't delete the main DB - if (del && dbi > MAIN_DBI) { - rc = mdb_del(txn, MAIN_DBI, &mc->mc_dbx->md_name, NULL); - if (!rc) { - txn->mt_dbflags[dbi] = DB_STALE; - mdb_dbi_close(txn->mt_env, dbi); - } - } else { - // reset the DB record, mark it dirty - txn->mt_dbflags[dbi] |= DB_DIRTY; - txn->mt_dbs[dbi].md_depth = 0; - txn->mt_dbs[dbi].md_branch_pages = 0; - txn->mt_dbs[dbi].md_leaf_pages = 0; - txn->mt_dbs[dbi].md_overflow_pages = 0; - txn->mt_dbs[dbi].md_entries = 0; - txn->mt_dbs[dbi].md_root = P_INVALID; - - txn->mt_flags |= MDB_TXN_DIRTY; - } - leave: - mdb_cursor_close(mc); - return rc; - */ - return nil -}