iobufpool uses *[]byte instead of []byte to reduce allocations

pull/1490/head
Jack Christensen 2023-01-28 08:02:49 -06:00
parent bc754291c1
commit eee854fb06
5 changed files with 48 additions and 35 deletions

View File

@ -1,4 +1,7 @@
// Package iobufpool implements a global segregated-fit pool of buffers for IO. // Package iobufpool implements a global segregated-fit pool of buffers for IO.
//
// It uses *[]byte instead of []byte to avoid the sync.Pool allocation with Put. Unfortunately, using a pointer to avoid
// an allocation is purposely not documented. https://github.com/golang/go/issues/16323
package iobufpool package iobufpool
import "sync" import "sync"
@ -10,17 +13,27 @@ var pools [18]*sync.Pool
func init() { func init() {
for i := range pools { for i := range pools {
bufLen := 1 << (minPoolExpOf2 + i) bufLen := 1 << (minPoolExpOf2 + i)
pools[i] = &sync.Pool{New: func() any { return make([]byte, bufLen) }} pools[i] = &sync.Pool{
New: func() any {
buf := make([]byte, bufLen)
return &buf
},
}
} }
} }
// Get gets a []byte of len size with cap <= size*2. // Get gets a []byte of len size with cap <= size*2.
func Get(size int) []byte { func Get(size int) *[]byte {
i := getPoolIdx(size) i := getPoolIdx(size)
if i >= len(pools) { if i >= len(pools) {
return make([]byte, size) buf := make([]byte, size)
return &buf
} }
return pools[i].Get().([]byte)[:size]
ptrBuf := (pools[i].Get().(*[]byte))
*ptrBuf = (*ptrBuf)[:size]
return ptrBuf
} }
func getPoolIdx(size int) int { func getPoolIdx(size int) int {
@ -36,8 +49,8 @@ func getPoolIdx(size int) int {
} }
// Put returns buf to the pool. // Put returns buf to the pool.
func Put(buf []byte) { func Put(buf *[]byte) {
i := putPoolIdx(cap(buf)) i := putPoolIdx(cap(*buf))
if i < 0 { if i < 0 {
return return
} }

View File

@ -8,11 +8,11 @@ const minBufferQueueLen = 8
type bufferQueue struct { type bufferQueue struct {
lock sync.Mutex lock sync.Mutex
queue [][]byte queue []*[]byte
r, w int r, w int
} }
func (bq *bufferQueue) pushBack(buf []byte) { func (bq *bufferQueue) pushBack(buf *[]byte) {
bq.lock.Lock() bq.lock.Lock()
defer bq.lock.Unlock() defer bq.lock.Unlock()
@ -23,7 +23,7 @@ func (bq *bufferQueue) pushBack(buf []byte) {
bq.w++ bq.w++
} }
func (bq *bufferQueue) pushFront(buf []byte) { func (bq *bufferQueue) pushFront(buf *[]byte) {
bq.lock.Lock() bq.lock.Lock()
defer bq.lock.Unlock() defer bq.lock.Unlock()
@ -35,7 +35,7 @@ func (bq *bufferQueue) pushFront(buf []byte) {
bq.w++ bq.w++
} }
func (bq *bufferQueue) popFront() []byte { func (bq *bufferQueue) popFront() *[]byte {
bq.lock.Lock() bq.lock.Lock()
defer bq.lock.Unlock() defer bq.lock.Unlock()
@ -51,7 +51,7 @@ func (bq *bufferQueue) popFront() []byte {
bq.r = 0 bq.r = 0
bq.w = 0 bq.w = 0
if len(bq.queue) > minBufferQueueLen { if len(bq.queue) > minBufferQueueLen {
bq.queue = make([][]byte, minBufferQueueLen) bq.queue = make([]*[]byte, minBufferQueueLen)
} }
} }
@ -64,7 +64,7 @@ func (bq *bufferQueue) growQueue() {
desiredLen = minBufferQueueLen desiredLen = minBufferQueueLen
} }
newQueue := make([][]byte, desiredLen) newQueue := make([]*[]byte, desiredLen)
copy(newQueue, bq.queue) copy(newQueue, bq.queue)
bq.queue = newQueue bq.queue = newQueue
} }

View File

@ -129,9 +129,9 @@ func (c *NetConn) Read(b []byte) (n int, err error) {
if buf == nil { if buf == nil {
break break
} }
copiedN := copy(b[n:], buf) copiedN := copy(b[n:], *buf)
if copiedN < len(buf) { if copiedN < len(*buf) {
buf = buf[copiedN:] *buf = (*buf)[copiedN:]
c.readQueue.pushFront(buf) c.readQueue.pushFront(buf)
} else { } else {
iobufpool.Put(buf) iobufpool.Put(buf)
@ -168,7 +168,7 @@ func (c *NetConn) Write(b []byte) (n int, err error) {
} }
buf := iobufpool.Get(len(b)) buf := iobufpool.Get(len(b))
copy(buf, b) copy(*buf, b)
c.writeQueue.pushBack(buf) c.writeQueue.pushBack(buf)
return len(b), nil return len(b), nil
} }
@ -286,14 +286,14 @@ func (c *NetConn) flush() error {
}() }()
for buf := c.writeQueue.popFront(); buf != nil; buf = c.writeQueue.popFront() { for buf := c.writeQueue.popFront(); buf != nil; buf = c.writeQueue.popFront() {
remainingBuf := buf remainingBuf := *buf
for len(remainingBuf) > 0 { for len(remainingBuf) > 0 {
n, err := c.nonblockingWrite(remainingBuf) n, err := c.nonblockingWrite(remainingBuf)
remainingBuf = remainingBuf[n:] remainingBuf = remainingBuf[n:]
if err != nil { if err != nil {
if !errors.Is(err, ErrWouldBlock) { if !errors.Is(err, ErrWouldBlock) {
buf = buf[:len(remainingBuf)] *buf = (*buf)[:len(remainingBuf)]
copy(buf, remainingBuf) copy(*buf, remainingBuf)
c.writeQueue.pushFront(buf) c.writeQueue.pushFront(buf)
return err return err
} }
@ -321,9 +321,9 @@ func (c *NetConn) flush() error {
func (c *NetConn) BufferReadUntilBlock() error { func (c *NetConn) BufferReadUntilBlock() error {
for { for {
buf := iobufpool.Get(8 * 1024) buf := iobufpool.Get(8 * 1024)
n, err := c.nonblockingRead(buf) n, err := c.nonblockingRead(*buf)
if n > 0 { if n > 0 {
buf = buf[:n] *buf = (*buf)[:n]
c.readQueue.pushBack(buf) c.readQueue.pushBack(buf)
} else if n == 0 { } else if n == 0 {
iobufpool.Put(buf) iobufpool.Put(buf)

View File

@ -1175,20 +1175,20 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
buf := iobufpool.Get(65536) buf := iobufpool.Get(65536)
defer iobufpool.Put(buf) defer iobufpool.Put(buf)
buf[0] = 'd' (*buf)[0] = 'd'
var readErr, pgErr error var readErr, pgErr error
for pgErr == nil { for pgErr == nil {
// Read chunk from r. // Read chunk from r.
var n int var n int
n, readErr = r.Read(buf[5:cap(buf)]) n, readErr = r.Read((*buf)[5:cap(*buf)])
// Send chunk to PostgreSQL. // Send chunk to PostgreSQL.
if n > 0 { if n > 0 {
buf = buf[0 : n+5] *buf = (*buf)[0 : n+5]
pgio.SetInt32(buf[1:], int32(n+4)) pgio.SetInt32((*buf)[1:], int32(n+4))
writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(buf) writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf)
if writeErr != nil { if writeErr != nil {
pgConn.asyncClose() pgConn.asyncClose()
return CommandTag{}, err return CommandTag{}, err

View File

@ -14,7 +14,7 @@ import (
type chunkReader struct { type chunkReader struct {
r io.Reader r io.Reader
buf []byte buf *[]byte
rp, wp int // buf read position and write position rp, wp int // buf read position and write position
minBufSize int minBufSize int
@ -45,7 +45,7 @@ func newChunkReader(r io.Reader, minBufSize int) *chunkReader {
func (r *chunkReader) Next(n int) (buf []byte, err error) { func (r *chunkReader) Next(n int) (buf []byte, err error) {
// Reset the buffer if it is empty // Reset the buffer if it is empty
if r.rp == r.wp { if r.rp == r.wp {
if len(r.buf) != r.minBufSize { if len(*r.buf) != r.minBufSize {
iobufpool.Put(r.buf) iobufpool.Put(r.buf)
r.buf = iobufpool.Get(r.minBufSize) r.buf = iobufpool.Get(r.minBufSize)
} }
@ -55,15 +55,15 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) {
// n bytes already in buf // n bytes already in buf
if (r.wp - r.rp) >= n { if (r.wp - r.rp) >= n {
buf = r.buf[r.rp : r.rp+n : r.rp+n] buf = (*r.buf)[r.rp : r.rp+n : r.rp+n]
r.rp += n r.rp += n
return buf, err return buf, err
} }
// buf is smaller than requested number of bytes // buf is smaller than requested number of bytes
if len(r.buf) < n { if len(*r.buf) < n {
bigBuf := iobufpool.Get(n) bigBuf := iobufpool.Get(n)
r.wp = copy(bigBuf, r.buf[r.rp:r.wp]) r.wp = copy((*bigBuf), (*r.buf)[r.rp:r.wp])
r.rp = 0 r.rp = 0
iobufpool.Put(r.buf) iobufpool.Put(r.buf)
r.buf = bigBuf r.buf = bigBuf
@ -71,20 +71,20 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) {
// buf is large enough, but need to shift filled area to start to make enough contiguous space // buf is large enough, but need to shift filled area to start to make enough contiguous space
minReadCount := n - (r.wp - r.rp) minReadCount := n - (r.wp - r.rp)
if (len(r.buf) - r.wp) < minReadCount { if (len(*r.buf) - r.wp) < minReadCount {
r.wp = copy(r.buf, r.buf[r.rp:r.wp]) r.wp = copy((*r.buf), (*r.buf)[r.rp:r.wp])
r.rp = 0 r.rp = 0
} }
// Read at least the required number of bytes from the underlying io.Reader // Read at least the required number of bytes from the underlying io.Reader
readBytesCount, err := io.ReadAtLeast(r.r, r.buf[r.wp:], minReadCount) readBytesCount, err := io.ReadAtLeast(r.r, (*r.buf)[r.wp:], minReadCount)
r.wp += readBytesCount r.wp += readBytesCount
// fmt.Println("read", n) // fmt.Println("read", n)
if err != nil { if err != nil {
return nil, err return nil, err
} }
buf = r.buf[r.rp : r.rp+n : r.rp+n] buf = (*r.buf)[r.rp : r.rp+n : r.rp+n]
r.rp += n r.rp += n
return buf, nil return buf, nil
} }