diff --git a/internal/iobufpool/iobufpool.go b/internal/iobufpool/iobufpool.go index 9e55c435..89e0c227 100644 --- a/internal/iobufpool/iobufpool.go +++ b/internal/iobufpool/iobufpool.go @@ -1,4 +1,7 @@ // Package iobufpool implements a global segregated-fit pool of buffers for IO. +// +// It uses *[]byte instead of []byte to avoid the sync.Pool allocation with Put. Unfortunately, using a pointer to avoid +// an allocation is purposely not documented. https://github.com/golang/go/issues/16323 package iobufpool import "sync" @@ -10,17 +13,27 @@ var pools [18]*sync.Pool func init() { for i := range pools { bufLen := 1 << (minPoolExpOf2 + i) - pools[i] = &sync.Pool{New: func() any { return make([]byte, bufLen) }} + pools[i] = &sync.Pool{ + New: func() any { + buf := make([]byte, bufLen) + return &buf + }, + } } } // Get gets a []byte of len size with cap <= size*2. -func Get(size int) []byte { +func Get(size int) *[]byte { i := getPoolIdx(size) if i >= len(pools) { - return make([]byte, size) + buf := make([]byte, size) + return &buf } - return pools[i].Get().([]byte)[:size] + + ptrBuf := (pools[i].Get().(*[]byte)) + *ptrBuf = (*ptrBuf)[:size] + + return ptrBuf } func getPoolIdx(size int) int { @@ -36,8 +49,8 @@ func getPoolIdx(size int) int { } // Put returns buf to the pool. -func Put(buf []byte) { - i := putPoolIdx(cap(buf)) +func Put(buf *[]byte) { + i := putPoolIdx(cap(*buf)) if i < 0 { return } diff --git a/internal/nbconn/bufferqueue.go b/internal/nbconn/bufferqueue.go index 138a9aa5..4bf25481 100644 --- a/internal/nbconn/bufferqueue.go +++ b/internal/nbconn/bufferqueue.go @@ -8,11 +8,11 @@ const minBufferQueueLen = 8 type bufferQueue struct { lock sync.Mutex - queue [][]byte + queue []*[]byte r, w int } -func (bq *bufferQueue) pushBack(buf []byte) { +func (bq *bufferQueue) pushBack(buf *[]byte) { bq.lock.Lock() defer bq.lock.Unlock() @@ -23,7 +23,7 @@ func (bq *bufferQueue) pushBack(buf []byte) { bq.w++ } -func (bq *bufferQueue) pushFront(buf []byte) { +func (bq *bufferQueue) pushFront(buf *[]byte) { bq.lock.Lock() defer bq.lock.Unlock() @@ -35,7 +35,7 @@ func (bq *bufferQueue) pushFront(buf []byte) { bq.w++ } -func (bq *bufferQueue) popFront() []byte { +func (bq *bufferQueue) popFront() *[]byte { bq.lock.Lock() defer bq.lock.Unlock() @@ -51,7 +51,7 @@ func (bq *bufferQueue) popFront() []byte { bq.r = 0 bq.w = 0 if len(bq.queue) > minBufferQueueLen { - bq.queue = make([][]byte, minBufferQueueLen) + bq.queue = make([]*[]byte, minBufferQueueLen) } } @@ -64,7 +64,7 @@ func (bq *bufferQueue) growQueue() { desiredLen = minBufferQueueLen } - newQueue := make([][]byte, desiredLen) + newQueue := make([]*[]byte, desiredLen) copy(newQueue, bq.queue) bq.queue = newQueue } diff --git a/internal/nbconn/nbconn.go b/internal/nbconn/nbconn.go index 44daebfb..22050dc7 100644 --- a/internal/nbconn/nbconn.go +++ b/internal/nbconn/nbconn.go @@ -129,9 +129,9 @@ func (c *NetConn) Read(b []byte) (n int, err error) { if buf == nil { break } - copiedN := copy(b[n:], buf) - if copiedN < len(buf) { - buf = buf[copiedN:] + copiedN := copy(b[n:], *buf) + if copiedN < len(*buf) { + *buf = (*buf)[copiedN:] c.readQueue.pushFront(buf) } else { iobufpool.Put(buf) @@ -168,7 +168,7 @@ func (c *NetConn) Write(b []byte) (n int, err error) { } buf := iobufpool.Get(len(b)) - copy(buf, b) + copy(*buf, b) c.writeQueue.pushBack(buf) return len(b), nil } @@ -286,14 +286,14 @@ func (c *NetConn) flush() error { }() for buf := c.writeQueue.popFront(); buf != nil; buf = c.writeQueue.popFront() { - remainingBuf := buf + remainingBuf := *buf for len(remainingBuf) > 0 { n, err := c.nonblockingWrite(remainingBuf) remainingBuf = remainingBuf[n:] if err != nil { if !errors.Is(err, ErrWouldBlock) { - buf = buf[:len(remainingBuf)] - copy(buf, remainingBuf) + *buf = (*buf)[:len(remainingBuf)] + copy(*buf, remainingBuf) c.writeQueue.pushFront(buf) return err } @@ -321,9 +321,9 @@ func (c *NetConn) flush() error { func (c *NetConn) BufferReadUntilBlock() error { for { buf := iobufpool.Get(8 * 1024) - n, err := c.nonblockingRead(buf) + n, err := c.nonblockingRead(*buf) if n > 0 { - buf = buf[:n] + *buf = (*buf)[:n] c.readQueue.pushBack(buf) } else if n == 0 { iobufpool.Put(buf) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 0a4863f1..8656ea51 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -1175,20 +1175,20 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co buf := iobufpool.Get(65536) defer iobufpool.Put(buf) - buf[0] = 'd' + (*buf)[0] = 'd' var readErr, pgErr error for pgErr == nil { // Read chunk from r. var n int - n, readErr = r.Read(buf[5:cap(buf)]) + n, readErr = r.Read((*buf)[5:cap(*buf)]) // Send chunk to PostgreSQL. if n > 0 { - buf = buf[0 : n+5] - pgio.SetInt32(buf[1:], int32(n+4)) + *buf = (*buf)[0 : n+5] + pgio.SetInt32((*buf)[1:], int32(n+4)) - writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(buf) + writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf) if writeErr != nil { pgConn.asyncClose() return CommandTag{}, err diff --git a/pgproto3/chunkreader.go b/pgproto3/chunkreader.go index 3c35d0b1..fc0fa61e 100644 --- a/pgproto3/chunkreader.go +++ b/pgproto3/chunkreader.go @@ -14,7 +14,7 @@ import ( type chunkReader struct { r io.Reader - buf []byte + buf *[]byte rp, wp int // buf read position and write position minBufSize int @@ -45,7 +45,7 @@ func newChunkReader(r io.Reader, minBufSize int) *chunkReader { func (r *chunkReader) Next(n int) (buf []byte, err error) { // Reset the buffer if it is empty if r.rp == r.wp { - if len(r.buf) != r.minBufSize { + if len(*r.buf) != r.minBufSize { iobufpool.Put(r.buf) r.buf = iobufpool.Get(r.minBufSize) } @@ -55,15 +55,15 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) { // n bytes already in buf if (r.wp - r.rp) >= n { - buf = r.buf[r.rp : r.rp+n : r.rp+n] + buf = (*r.buf)[r.rp : r.rp+n : r.rp+n] r.rp += n return buf, err } // buf is smaller than requested number of bytes - if len(r.buf) < n { + if len(*r.buf) < n { bigBuf := iobufpool.Get(n) - r.wp = copy(bigBuf, r.buf[r.rp:r.wp]) + r.wp = copy((*bigBuf), (*r.buf)[r.rp:r.wp]) r.rp = 0 iobufpool.Put(r.buf) r.buf = bigBuf @@ -71,20 +71,20 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) { // buf is large enough, but need to shift filled area to start to make enough contiguous space minReadCount := n - (r.wp - r.rp) - if (len(r.buf) - r.wp) < minReadCount { - r.wp = copy(r.buf, r.buf[r.rp:r.wp]) + if (len(*r.buf) - r.wp) < minReadCount { + r.wp = copy((*r.buf), (*r.buf)[r.rp:r.wp]) r.rp = 0 } // Read at least the required number of bytes from the underlying io.Reader - readBytesCount, err := io.ReadAtLeast(r.r, r.buf[r.wp:], minReadCount) + readBytesCount, err := io.ReadAtLeast(r.r, (*r.buf)[r.wp:], minReadCount) r.wp += readBytesCount // fmt.Println("read", n) if err != nil { return nil, err } - buf = r.buf[r.rp : r.rp+n : r.rp+n] + buf = (*r.buf)[r.rp : r.rp+n : r.rp+n] r.rp += n return buf, nil }