From e641d0a5add7bb612bdff3234f5f0432c3e41194 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 26 Feb 2022 09:31:45 -0600 Subject: [PATCH] Reuse connection read buffer To avoid extra copies and small allocations previously large read buffers were allocated and never reused. However, the down side of this was greater total memory allocation and the possibility that a reference to a single byte could pin an entire buffer. Now the buffer is reused. --- pgconn/pgconn.go | 8 ++++++-- pgproto3/chunkreader.go | 22 ++++++++++++++++------ pgproto3/chunkreader_test.go | 18 +++++++++--------- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index a9a6de8c..9c5e8e79 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -1433,8 +1433,12 @@ func (rr *ResultReader) Read() *Result { copy(br.FieldDescriptions, rr.FieldDescriptions()) } - row := make([][]byte, len(rr.Values())) - copy(row, rr.Values()) + values := rr.Values() + row := make([][]byte, len(values)) + for i := range row { + row[i] = make([]byte, len(values[i])) + copy(row[i], values[i]) + } br.Rows = append(br.Rows, row) } diff --git a/pgproto3/chunkreader.go b/pgproto3/chunkreader.go index 1781d6cd..598aaa85 100644 --- a/pgproto3/chunkreader.go +++ b/pgproto3/chunkreader.go @@ -6,11 +6,9 @@ import ( // chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and // will read as much as will fit in the current buffer in a single call regardless of how large a read is actually -// requested. The memory returned via Next is owned by the caller. This avoids the need for an additional copy. +// requested. The memory returned via Next is only valid until the next call to Next. // -// The downside of this approach is that a large buffer can be pinned in memory even if only a small slice is -// referenced. For example, an entire 4096 byte block could be pinned in memory by even a 1 byte slice. In these rare -// cases it would be advantageous to copy the bytes to another slice. +// This is roughly equivalent to a bufio.Reader that only uses Peek and Discard to never copy bytes. type chunkReader struct { r io.Reader @@ -38,13 +36,14 @@ func newChunkReader(r io.Reader, minBufSize int) *chunkReader { } } -// Next returns buf filled with the next n bytes. The caller gains ownership of buf. It is not necessary to make a copy -// of buf. If an error occurs, buf will be nil. +// Next returns buf filled with the next n bytes. buf is only valid until next call of Next. If an error occurs, buf +// will be nil. func (r *chunkReader) Next(n int) (buf []byte, err error) { // n bytes already in buf if (r.wp - r.rp) >= n { buf = r.buf[r.rp : r.rp+n] r.rp += n + r.resetBufIfEmpty() return buf, err } @@ -66,6 +65,7 @@ func (r *chunkReader) Next(n int) (buf []byte, err error) { buf = r.buf[r.rp : r.rp+n] r.rp += n + r.resetBufIfEmpty() return buf, nil } @@ -87,3 +87,13 @@ func (r *chunkReader) copyBufContents(dest []byte) { r.rp = 0 r.buf = dest } + +func (r *chunkReader) resetBufIfEmpty() { + if r.rp == r.wp { + if len(r.buf) > r.minBufLen { + r.buf = make([]byte, r.minBufLen) + } + r.rp = 0 + r.wp = 0 + } +} diff --git a/pgproto3/chunkreader_test.go b/pgproto3/chunkreader_test.go index 86fbd8b2..1c0c63d8 100644 --- a/pgproto3/chunkreader_test.go +++ b/pgproto3/chunkreader_test.go @@ -32,11 +32,11 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) { if bytes.Compare(r.buf, src) != 0 { t.Fatalf("Expected r.buf to be %v, but it was %v", src, r.buf) } - if r.rp != 4 { - t.Fatalf("Expected r.rp to be %v, but it was %v", 4, r.rp) + if r.rp != 0 { + t.Fatalf("Expected r.rp to be %v, but it was %v", 0, r.rp) } - if r.wp != 4 { - t.Fatalf("Expected r.wp to be %v, but it was %v", 4, r.wp) + if r.wp != 0 { + t.Fatalf("Expected r.wp to be %v, but it was %v", 0, r.wp) } } @@ -54,12 +54,12 @@ func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) { if bytes.Compare(n1, src[0:5]) != 0 { t.Fatalf("Expected read bytes to be %v, but they were %v", src[0:5], n1) } - if len(r.buf) != 5 { - t.Fatalf("Expected len(r.buf) to be %v, but it was %v", 5, len(r.buf)) + if len(r.buf) != 4 { + t.Fatalf("Expected len(r.buf) to be %v, but it was %v", 4, len(r.buf)) } } -func TestChunkReaderDoesNotReuseBuf(t *testing.T) { +func TestChunkReaderReusesBuf(t *testing.T) { server := &bytes.Buffer{} r := newChunkReader(server, 4) @@ -82,8 +82,8 @@ func TestChunkReaderDoesNotReuseBuf(t *testing.T) { t.Fatalf("Expected read bytes to be %v, but they were %v", src[4:8], n2) } - if bytes.Compare(n1, src[0:4]) != 0 { - t.Fatalf("Expected KeepLast to prevent Next from overwriting buf, expected %v but it was %v", src[0:4], n1) + if bytes.Compare(n1, src[4:8]) != 0 { + t.Fatalf("Expected slice to be reused, expected %v but it was %v", src[4:8], n1) } }