From 2e0ec225def2ba825e185e8813fe8d9ade63cfde Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 26 Feb 2022 08:50:46 -0600 Subject: [PATCH] Make Chunkreader an internal implementation detail --- chunkreader/chunkreader.go | 104 ------------------ internal/pgmock/pgmock_test.go | 2 +- pgconn/config.go | 25 +---- pgconn/config_test.go | 40 +++---- pgconn/defaults.go | 3 +- pgconn/defaults_windows.go | 2 - pgconn/pgconn_test.go | 4 +- pgproto3/backend.go | 14 +-- pgproto3/backend_test.go | 8 +- pgproto3/chunkreader.go | 90 +++++++++++++-- {chunkreader => pgproto3}/chunkreader_test.go | 22 +--- pgproto3/example/pgfortune/server.go | 2 +- pgproto3/frontend.go | 5 +- pgproto3/frontend_test.go | 6 +- 14 files changed, 124 insertions(+), 203 deletions(-) delete mode 100644 chunkreader/chunkreader.go rename {chunkreader => pgproto3}/chunkreader_test.go (87%) diff --git a/chunkreader/chunkreader.go b/chunkreader/chunkreader.go deleted file mode 100644 index afea1c52..00000000 --- a/chunkreader/chunkreader.go +++ /dev/null @@ -1,104 +0,0 @@ -// Package chunkreader provides an io.Reader wrapper that minimizes IO reads and memory allocations. -package chunkreader - -import ( - "io" -) - -// ChunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and -// will read as much as will fit in the current buffer in a single call regardless of how large a read is actually -// requested. The memory returned via Next is owned by the caller. This avoids the need for an additional copy. -// -// The downside of this approach is that a large buffer can be pinned in memory even if only a small slice is -// referenced. For example, an entire 4096 byte block could be pinned in memory by even a 1 byte slice. In these rare -// cases it would be advantageous to copy the bytes to another slice. -type ChunkReader struct { - r io.Reader - - buf []byte - rp, wp int // buf read position and write position - - config Config -} - -// Config contains configuration parameters for ChunkReader. -type Config struct { - MinBufLen int // Minimum buffer length -} - -// New creates and returns a new ChunkReader for r with default configuration. -func New(r io.Reader) *ChunkReader { - cr, err := NewConfig(r, Config{}) - if err != nil { - panic("default config can't be bad") - } - - return cr -} - -// NewConfig creates and a new ChunkReader for r configured by config. -func NewConfig(r io.Reader, config Config) (*ChunkReader, error) { - if config.MinBufLen == 0 { - // By historical reasons Postgres currently has 8KB send buffer inside, - // so here we want to have at least the same size buffer. - // @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134 - // @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru - config.MinBufLen = 8192 - } - - return &ChunkReader{ - r: r, - buf: make([]byte, config.MinBufLen), - config: config, - }, nil -} - -// Next returns buf filled with the next n bytes. The caller gains ownership of buf. It is not necessary to make a copy -// of buf. If an error occurs, buf will be nil. -func (r *ChunkReader) Next(n int) (buf []byte, err error) { - // n bytes already in buf - if (r.wp - r.rp) >= n { - buf = r.buf[r.rp : r.rp+n] - r.rp += n - return buf, err - } - - // available space in buf is less than n - if len(r.buf) < n { - r.copyBufContents(r.newBuf(n)) - } - - // buf is large enough, but need to shift filled area to start to make enough contiguous space - minReadCount := n - (r.wp - r.rp) - if (len(r.buf) - r.wp) < minReadCount { - newBuf := r.newBuf(n) - r.copyBufContents(newBuf) - } - - if err := r.appendAtLeast(minReadCount); err != nil { - return nil, err - } - - buf = r.buf[r.rp : r.rp+n] - r.rp += n - return buf, nil -} - -func (r *ChunkReader) appendAtLeast(fillLen int) error { - n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen) - r.wp += n - return err -} - -func (r *ChunkReader) newBuf(size int) []byte { - if size < r.config.MinBufLen { - size = r.config.MinBufLen - } - return make([]byte, size) -} - -func (r *ChunkReader) copyBufContents(dest []byte) { - r.wp = copy(dest, r.buf[r.rp:r.wp]) - r.rp = 0 - r.buf = dest -} diff --git a/internal/pgmock/pgmock_test.go b/internal/pgmock/pgmock_test.go index 1e22cbcb..bc787398 100644 --- a/internal/pgmock/pgmock_test.go +++ b/internal/pgmock/pgmock_test.go @@ -62,7 +62,7 @@ func TestScript(t *testing.T) { return } - err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn)) + err = script.Run(pgproto3.NewBackend(conn, conn)) if err != nil { serverErrChan <- err return diff --git a/pgconn/config.go b/pgconn/config.go index 6f7f3ca5..e99dabfb 100644 --- a/pgconn/config.go +++ b/pgconn/config.go @@ -19,7 +19,6 @@ import ( "github.com/jackc/pgpassfile" "github.com/jackc/pgservicefile" - "github.com/jackc/pgx/v5/chunkreader" "github.com/jackc/pgx/v5/pgproto3" ) @@ -183,8 +182,6 @@ func NetworkAddress(host string, port uint16) (network, address string) { // // In addition, ParseConfig accepts the following options: // -// min_read_buffer_size -// The minimum size of the internal read buffer. Default 8192. // servicefile // libpq only reads servicefile from the PGSERVICEFILE environment variable. ParseConfig accepts servicefile as a // part of the connection string. @@ -219,18 +216,15 @@ func ParseConfig(connString string) (*Config, error) { settings = mergeSettings(defaultSettings, envSettings, serviceSettings, connStringSettings) } - minReadBufferSize, err := strconv.ParseInt(settings["min_read_buffer_size"], 10, 32) - if err != nil { - return nil, &parseConfigError{connString: connString, msg: "cannot parse min_read_buffer_size", err: err} - } - config := &Config{ createdByParseConfig: true, Database: settings["database"], User: settings["user"], Password: settings["password"], RuntimeParams: make(map[string]string), - BuildFrontend: makeDefaultBuildFrontendFunc(int(minReadBufferSize)), + BuildFrontend: func(r io.Reader, w io.Writer) Frontend { + return pgproto3.NewFrontend(r, w) + }, } if connectTimeoutSetting, present := settings["connect_timeout"]; present { @@ -260,7 +254,6 @@ func ParseConfig(connString string) (*Config, error) { "sslcert": {}, "sslrootcert": {}, "target_session_attrs": {}, - "min_read_buffer_size": {}, "service": {}, "servicefile": {}, } @@ -693,18 +686,6 @@ func makeDefaultResolver() *net.Resolver { return net.DefaultResolver } -func makeDefaultBuildFrontendFunc(minBufferLen int) BuildFrontendFunc { - return func(r io.Reader, w io.Writer) Frontend { - cr, err := chunkreader.NewConfig(r, chunkreader.Config{MinBufLen: minBufferLen}) - if err != nil { - panic(fmt.Sprintf("BUG: chunkreader.NewConfig failed: %v", err)) - } - frontend := pgproto3.NewFrontend(cr, w) - - return frontend - } -} - func parseConnectTimeoutSetting(s string) (time.Duration, error) { timeout, err := strconv.ParseInt(s, 10, 64) if err != nil { diff --git a/pgconn/config_test.go b/pgconn/config_test.go index 335a25ca..40db7bc2 100644 --- a/pgconn/config_test.go +++ b/pgconn/config_test.go @@ -572,13 +572,13 @@ func TestParseConfig(t *testing.T) { name: "target_session_attrs primary", connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=primary", config: &pgconn.Config{ - User: "jack", - Password: "secret", - Host: "localhost", - Port: 5432, - Database: "mydb", - TLSConfig: nil, - RuntimeParams: map[string]string{}, + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsPrimary, }, }, @@ -586,13 +586,13 @@ func TestParseConfig(t *testing.T) { name: "target_session_attrs standby", connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=standby", config: &pgconn.Config{ - User: "jack", - Password: "secret", - Host: "localhost", - Port: 5432, - Database: "mydb", - TLSConfig: nil, - RuntimeParams: map[string]string{}, + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsStandby, }, }, @@ -967,15 +967,3 @@ application_name = spaced string assertConfigsEqual(t, tt.config, config, fmt.Sprintf("Test %d (%s)", i, tt.name)) } } - -func TestParseConfigExtractsMinReadBufferSize(t *testing.T) { - t.Parallel() - - config, err := pgconn.ParseConfig("min_read_buffer_size=0") - require.NoError(t, err) - _, present := config.RuntimeParams["min_read_buffer_size"] - require.False(t, present) - - // The buffer size is internal so there isn't much that can be done to test it other than see that the runtime param - // was removed. -} diff --git a/pgconn/defaults.go b/pgconn/defaults.go index f69cad31..1dd514ff 100644 --- a/pgconn/defaults.go +++ b/pgconn/defaults.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package pgconn @@ -39,8 +40,6 @@ func defaultSettings() map[string]string { settings["target_session_attrs"] = "any" - settings["min_read_buffer_size"] = "8192" - return settings } diff --git a/pgconn/defaults_windows.go b/pgconn/defaults_windows.go index 71eb77db..33b4a1ff 100644 --- a/pgconn/defaults_windows.go +++ b/pgconn/defaults_windows.go @@ -46,8 +46,6 @@ func defaultSettings() map[string]string { settings["target_session_attrs"] = "any" - settings["min_read_buffer_size"] = "8192" - return settings } diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 42214d2c..3ae0d1d4 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -141,7 +141,7 @@ func TestConnectTimeout(t *testing.T) { return } - err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn)) + err = script.Run(pgproto3.NewBackend(conn, conn)) if err != nil { serverErrChan <- err return @@ -2044,7 +2044,7 @@ func TestFatalErrorReceivedAfterCommandComplete(t *testing.T) { return } - err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn)) + err = script.Run(pgproto3.NewBackend(conn, conn)) if err != nil { serverErrChan <- err return diff --git a/pgproto3/backend.go b/pgproto3/backend.go index 9c42ad02..c8d2f331 100644 --- a/pgproto3/backend.go +++ b/pgproto3/backend.go @@ -8,7 +8,7 @@ import ( // Backend acts as a server for the PostgreSQL wire protocol version 3. type Backend struct { - cr ChunkReader + cr *chunkReader w io.Writer // Frontend message flyweights @@ -30,11 +30,10 @@ type Backend struct { sync Sync terminate Terminate - bodyLen int - msgType byte - partialMsg bool - authType uint32 - + bodyLen int + msgType byte + partialMsg bool + authType uint32 } const ( @@ -43,7 +42,8 @@ const ( ) // NewBackend creates a new Backend. -func NewBackend(cr ChunkReader, w io.Writer) *Backend { +func NewBackend(r io.Reader, w io.Writer) *Backend { + cr := newChunkReader(r, 0) return &Backend{cr: cr, w: w} } diff --git a/pgproto3/backend_test.go b/pgproto3/backend_test.go index 75755f22..596245dd 100644 --- a/pgproto3/backend_test.go +++ b/pgproto3/backend_test.go @@ -16,7 +16,7 @@ func TestBackendReceiveInterrupted(t *testing.T) { server := &interruptReader{} server.push([]byte{'Q', 0, 0, 0, 6}) - backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil) + backend := pgproto3.NewBackend(server, nil) msg, err := backend.Receive() if err == nil { @@ -43,7 +43,7 @@ func TestBackendReceiveUnexpectedEOF(t *testing.T) { server := &interruptReader{} server.push([]byte{'Q', 0, 0, 0, 6}) - backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil) + backend := pgproto3.NewBackend(server, nil) // Receive regular msg msg, err := backend.Receive() @@ -77,7 +77,7 @@ func TestStartupMessage(t *testing.T) { server := &interruptReader{} server.push(dst) - backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil) + backend := pgproto3.NewBackend(server, nil) msg, err := backend.ReceiveStartupMessage() require.NoError(t, err) @@ -110,7 +110,7 @@ func TestStartupMessage(t *testing.T) { dst = pgio.AppendUint32(dst, pgproto3.ProtocolVersionNumber) server.push(dst) - backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil) + backend := pgproto3.NewBackend(server, nil) msg, err := backend.ReceiveStartupMessage() require.Error(t, err) diff --git a/pgproto3/chunkreader.go b/pgproto3/chunkreader.go index 3f878183..1781d6cd 100644 --- a/pgproto3/chunkreader.go +++ b/pgproto3/chunkreader.go @@ -2,18 +2,88 @@ package pgproto3 import ( "io" - - "github.com/jackc/pgx/v5/chunkreader" ) -// ChunkReader is an interface to decouple github.com/jackc/chunkreader from this package. -type ChunkReader interface { - // Next returns buf filled with the next n bytes. If an error (including a partial read) occurs, - // buf must be nil. Next must preserve any partially read data. Next must not reuse buf. - Next(n int) (buf []byte, err error) +// chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and +// will read as much as will fit in the current buffer in a single call regardless of how large a read is actually +// requested. The memory returned via Next is owned by the caller. This avoids the need for an additional copy. +// +// The downside of this approach is that a large buffer can be pinned in memory even if only a small slice is +// referenced. For example, an entire 4096 byte block could be pinned in memory by even a 1 byte slice. In these rare +// cases it would be advantageous to copy the bytes to another slice. +type chunkReader struct { + r io.Reader + + buf []byte + rp, wp int // buf read position and write position + + minBufLen int } -// NewChunkReader creates and returns a new default ChunkReader. -func NewChunkReader(r io.Reader) ChunkReader { - return chunkreader.New(r) +// newChunkReader creates and returns a new chunkReader for r with default configuration with minBufSize internal buffer. +// If bufSize is <= 0 it uses a default value. +func newChunkReader(r io.Reader, minBufSize int) *chunkReader { + if minBufSize <= 0 { + // By historical reasons Postgres currently has 8KB send buffer inside, + // so here we want to have at least the same size buffer. + // @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134 + // @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru + minBufSize = 8192 + } + + return &chunkReader{ + r: r, + buf: make([]byte, minBufSize), + minBufLen: minBufSize, + } +} + +// Next returns buf filled with the next n bytes. The caller gains ownership of buf. It is not necessary to make a copy +// of buf. If an error occurs, buf will be nil. +func (r *chunkReader) Next(n int) (buf []byte, err error) { + // n bytes already in buf + if (r.wp - r.rp) >= n { + buf = r.buf[r.rp : r.rp+n] + r.rp += n + return buf, err + } + + // available space in buf is less than n + if len(r.buf) < n { + r.copyBufContents(r.newBuf(n)) + } + + // buf is large enough, but need to shift filled area to start to make enough contiguous space + minReadCount := n - (r.wp - r.rp) + if (len(r.buf) - r.wp) < minReadCount { + newBuf := r.newBuf(n) + r.copyBufContents(newBuf) + } + + if err := r.appendAtLeast(minReadCount); err != nil { + return nil, err + } + + buf = r.buf[r.rp : r.rp+n] + r.rp += n + return buf, nil +} + +func (r *chunkReader) appendAtLeast(fillLen int) error { + n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen) + r.wp += n + return err +} + +func (r *chunkReader) newBuf(size int) []byte { + if size < r.minBufLen { + size = r.minBufLen + } + return make([]byte, size) +} + +func (r *chunkReader) copyBufContents(dest []byte) { + r.wp = copy(dest, r.buf[r.rp:r.wp]) + r.rp = 0 + r.buf = dest } diff --git a/chunkreader/chunkreader_test.go b/pgproto3/chunkreader_test.go similarity index 87% rename from chunkreader/chunkreader_test.go rename to pgproto3/chunkreader_test.go index ddc2fbf6..86fbd8b2 100644 --- a/chunkreader/chunkreader_test.go +++ b/pgproto3/chunkreader_test.go @@ -1,4 +1,4 @@ -package chunkreader +package pgproto3 import ( "bytes" @@ -8,10 +8,7 @@ import ( func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) { server := &bytes.Buffer{} - r, err := NewConfig(server, Config{MinBufLen: 4}) - if err != nil { - t.Fatal(err) - } + r := newChunkReader(server, 4) src := []byte{1, 2, 3, 4} server.Write(src) @@ -45,10 +42,7 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) { func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) { server := &bytes.Buffer{} - r, err := NewConfig(server, Config{MinBufLen: 4}) - if err != nil { - t.Fatal(err) - } + r := newChunkReader(server, 4) src := []byte{1, 2, 3, 4, 5, 6, 7, 8} server.Write(src) @@ -67,10 +61,7 @@ func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) { func TestChunkReaderDoesNotReuseBuf(t *testing.T) { server := &bytes.Buffer{} - r, err := NewConfig(server, Config{MinBufLen: 4}) - if err != nil { - t.Fatal(err) - } + r := newChunkReader(server, 4) src := []byte{1, 2, 3, 4, 5, 6, 7, 8} server.Write(src) @@ -108,10 +99,7 @@ func (r *randomReader) Read(p []byte) (n int, err error) { func TestChunkReaderNextFuzz(t *testing.T) { rr := &randomReader{rnd: rand.New(rand.NewSource(1))} - r, err := NewConfig(rr, Config{MinBufLen: 8192}) - if err != nil { - t.Fatal(err) - } + r := newChunkReader(rr, 8192) randomSizes := rand.New(rand.NewSource(0)) diff --git a/pgproto3/example/pgfortune/server.go b/pgproto3/example/pgfortune/server.go index fe406452..14ae71f8 100644 --- a/pgproto3/example/pgfortune/server.go +++ b/pgproto3/example/pgfortune/server.go @@ -14,7 +14,7 @@ type PgFortuneBackend struct { } func NewPgFortuneBackend(conn net.Conn, responder func() ([]byte, error)) *PgFortuneBackend { - backend := pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn) + backend := pgproto3.NewBackend(conn, conn) connHandler := &PgFortuneBackend{ backend: backend, diff --git a/pgproto3/frontend.go b/pgproto3/frontend.go index c33dfb08..ea6757ad 100644 --- a/pgproto3/frontend.go +++ b/pgproto3/frontend.go @@ -9,7 +9,7 @@ import ( // Frontend acts as a client for the PostgreSQL wire protocol version 3. type Frontend struct { - cr ChunkReader + cr *chunkReader w io.Writer // Backend message flyweights @@ -49,7 +49,8 @@ type Frontend struct { } // NewFrontend creates a new Frontend. -func NewFrontend(cr ChunkReader, w io.Writer) *Frontend { +func NewFrontend(r io.Reader, w io.Writer) *Frontend { + cr := newChunkReader(r, 0) return &Frontend{cr: cr, w: w} } diff --git a/pgproto3/frontend_test.go b/pgproto3/frontend_test.go index 595877bd..e02457d6 100644 --- a/pgproto3/frontend_test.go +++ b/pgproto3/frontend_test.go @@ -38,7 +38,7 @@ func TestFrontendReceiveInterrupted(t *testing.T) { server := &interruptReader{} server.push([]byte{'Z', 0, 0, 0, 5}) - frontend := pgproto3.NewFrontend(pgproto3.NewChunkReader(server), nil) + frontend := pgproto3.NewFrontend(server, nil) msg, err := frontend.Receive() if err == nil { @@ -65,7 +65,7 @@ func TestFrontendReceiveUnexpectedEOF(t *testing.T) { server := &interruptReader{} server.push([]byte{'Z', 0, 0, 0, 5}) - frontend := pgproto3.NewFrontend(pgproto3.NewChunkReader(server), nil) + frontend := pgproto3.NewFrontend(server, nil) msg, err := frontend.Receive() if err == nil { @@ -109,7 +109,7 @@ func TestErrorResponse(t *testing.T) { server := &interruptReader{} server.push(raw) - frontend := pgproto3.NewFrontend(pgproto3.NewChunkReader(server), nil) + frontend := pgproto3.NewFrontend(server, nil) got, err := frontend.Receive() require.NoError(t, err)