Make Chunkreader an internal implementation detail

query-exec-mode
Jack Christensen 2022-02-26 08:50:46 -06:00
parent d13f651810
commit 2e0ec225de
14 changed files with 124 additions and 203 deletions

View File

@ -1,104 +0,0 @@
// Package chunkreader provides an io.Reader wrapper that minimizes IO reads and memory allocations.
package chunkreader
import (
"io"
)
// ChunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and
// will read as much as will fit in the current buffer in a single call regardless of how large a read is actually
// requested. The memory returned via Next is owned by the caller. This avoids the need for an additional copy.
//
// The downside of this approach is that a large buffer can be pinned in memory even if only a small slice is
// referenced. For example, an entire 4096 byte block could be pinned in memory by even a 1 byte slice. In these rare
// cases it would be advantageous to copy the bytes to another slice.
type ChunkReader struct {
r io.Reader
buf []byte
rp, wp int // buf read position and write position
config Config
}
// Config contains configuration parameters for ChunkReader.
type Config struct {
MinBufLen int // Minimum buffer length
}
// New creates and returns a new ChunkReader for r with default configuration.
func New(r io.Reader) *ChunkReader {
cr, err := NewConfig(r, Config{})
if err != nil {
panic("default config can't be bad")
}
return cr
}
// NewConfig creates and a new ChunkReader for r configured by config.
func NewConfig(r io.Reader, config Config) (*ChunkReader, error) {
if config.MinBufLen == 0 {
// By historical reasons Postgres currently has 8KB send buffer inside,
// so here we want to have at least the same size buffer.
// @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134
// @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru
config.MinBufLen = 8192
}
return &ChunkReader{
r: r,
buf: make([]byte, config.MinBufLen),
config: config,
}, nil
}
// Next returns buf filled with the next n bytes. The caller gains ownership of buf. It is not necessary to make a copy
// of buf. If an error occurs, buf will be nil.
func (r *ChunkReader) Next(n int) (buf []byte, err error) {
// n bytes already in buf
if (r.wp - r.rp) >= n {
buf = r.buf[r.rp : r.rp+n]
r.rp += n
return buf, err
}
// available space in buf is less than n
if len(r.buf) < n {
r.copyBufContents(r.newBuf(n))
}
// buf is large enough, but need to shift filled area to start to make enough contiguous space
minReadCount := n - (r.wp - r.rp)
if (len(r.buf) - r.wp) < minReadCount {
newBuf := r.newBuf(n)
r.copyBufContents(newBuf)
}
if err := r.appendAtLeast(minReadCount); err != nil {
return nil, err
}
buf = r.buf[r.rp : r.rp+n]
r.rp += n
return buf, nil
}
func (r *ChunkReader) appendAtLeast(fillLen int) error {
n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen)
r.wp += n
return err
}
func (r *ChunkReader) newBuf(size int) []byte {
if size < r.config.MinBufLen {
size = r.config.MinBufLen
}
return make([]byte, size)
}
func (r *ChunkReader) copyBufContents(dest []byte) {
r.wp = copy(dest, r.buf[r.rp:r.wp])
r.rp = 0
r.buf = dest
}

View File

@ -62,7 +62,7 @@ func TestScript(t *testing.T) {
return
}
err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn))
err = script.Run(pgproto3.NewBackend(conn, conn))
if err != nil {
serverErrChan <- err
return

View File

@ -19,7 +19,6 @@ import (
"github.com/jackc/pgpassfile"
"github.com/jackc/pgservicefile"
"github.com/jackc/pgx/v5/chunkreader"
"github.com/jackc/pgx/v5/pgproto3"
)
@ -183,8 +182,6 @@ func NetworkAddress(host string, port uint16) (network, address string) {
//
// In addition, ParseConfig accepts the following options:
//
// min_read_buffer_size
// The minimum size of the internal read buffer. Default 8192.
// servicefile
// libpq only reads servicefile from the PGSERVICEFILE environment variable. ParseConfig accepts servicefile as a
// part of the connection string.
@ -219,18 +216,15 @@ func ParseConfig(connString string) (*Config, error) {
settings = mergeSettings(defaultSettings, envSettings, serviceSettings, connStringSettings)
}
minReadBufferSize, err := strconv.ParseInt(settings["min_read_buffer_size"], 10, 32)
if err != nil {
return nil, &parseConfigError{connString: connString, msg: "cannot parse min_read_buffer_size", err: err}
}
config := &Config{
createdByParseConfig: true,
Database: settings["database"],
User: settings["user"],
Password: settings["password"],
RuntimeParams: make(map[string]string),
BuildFrontend: makeDefaultBuildFrontendFunc(int(minReadBufferSize)),
BuildFrontend: func(r io.Reader, w io.Writer) Frontend {
return pgproto3.NewFrontend(r, w)
},
}
if connectTimeoutSetting, present := settings["connect_timeout"]; present {
@ -260,7 +254,6 @@ func ParseConfig(connString string) (*Config, error) {
"sslcert": {},
"sslrootcert": {},
"target_session_attrs": {},
"min_read_buffer_size": {},
"service": {},
"servicefile": {},
}
@ -693,18 +686,6 @@ func makeDefaultResolver() *net.Resolver {
return net.DefaultResolver
}
func makeDefaultBuildFrontendFunc(minBufferLen int) BuildFrontendFunc {
return func(r io.Reader, w io.Writer) Frontend {
cr, err := chunkreader.NewConfig(r, chunkreader.Config{MinBufLen: minBufferLen})
if err != nil {
panic(fmt.Sprintf("BUG: chunkreader.NewConfig failed: %v", err))
}
frontend := pgproto3.NewFrontend(cr, w)
return frontend
}
}
func parseConnectTimeoutSetting(s string) (time.Duration, error) {
timeout, err := strconv.ParseInt(s, 10, 64)
if err != nil {

View File

@ -572,13 +572,13 @@ func TestParseConfig(t *testing.T) {
name: "target_session_attrs primary",
connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=primary",
config: &pgconn.Config{
User: "jack",
Password: "secret",
Host: "localhost",
Port: 5432,
Database: "mydb",
TLSConfig: nil,
RuntimeParams: map[string]string{},
User: "jack",
Password: "secret",
Host: "localhost",
Port: 5432,
Database: "mydb",
TLSConfig: nil,
RuntimeParams: map[string]string{},
ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsPrimary,
},
},
@ -586,13 +586,13 @@ func TestParseConfig(t *testing.T) {
name: "target_session_attrs standby",
connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=standby",
config: &pgconn.Config{
User: "jack",
Password: "secret",
Host: "localhost",
Port: 5432,
Database: "mydb",
TLSConfig: nil,
RuntimeParams: map[string]string{},
User: "jack",
Password: "secret",
Host: "localhost",
Port: 5432,
Database: "mydb",
TLSConfig: nil,
RuntimeParams: map[string]string{},
ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsStandby,
},
},
@ -967,15 +967,3 @@ application_name = spaced string
assertConfigsEqual(t, tt.config, config, fmt.Sprintf("Test %d (%s)", i, tt.name))
}
}
func TestParseConfigExtractsMinReadBufferSize(t *testing.T) {
t.Parallel()
config, err := pgconn.ParseConfig("min_read_buffer_size=0")
require.NoError(t, err)
_, present := config.RuntimeParams["min_read_buffer_size"]
require.False(t, present)
// The buffer size is internal so there isn't much that can be done to test it other than see that the runtime param
// was removed.
}

View File

@ -1,3 +1,4 @@
//go:build !windows
// +build !windows
package pgconn
@ -39,8 +40,6 @@ func defaultSettings() map[string]string {
settings["target_session_attrs"] = "any"
settings["min_read_buffer_size"] = "8192"
return settings
}

View File

@ -46,8 +46,6 @@ func defaultSettings() map[string]string {
settings["target_session_attrs"] = "any"
settings["min_read_buffer_size"] = "8192"
return settings
}

View File

@ -141,7 +141,7 @@ func TestConnectTimeout(t *testing.T) {
return
}
err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn))
err = script.Run(pgproto3.NewBackend(conn, conn))
if err != nil {
serverErrChan <- err
return
@ -2044,7 +2044,7 @@ func TestFatalErrorReceivedAfterCommandComplete(t *testing.T) {
return
}
err = script.Run(pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn))
err = script.Run(pgproto3.NewBackend(conn, conn))
if err != nil {
serverErrChan <- err
return

View File

@ -8,7 +8,7 @@ import (
// Backend acts as a server for the PostgreSQL wire protocol version 3.
type Backend struct {
cr ChunkReader
cr *chunkReader
w io.Writer
// Frontend message flyweights
@ -30,11 +30,10 @@ type Backend struct {
sync Sync
terminate Terminate
bodyLen int
msgType byte
partialMsg bool
authType uint32
bodyLen int
msgType byte
partialMsg bool
authType uint32
}
const (
@ -43,7 +42,8 @@ const (
)
// NewBackend creates a new Backend.
func NewBackend(cr ChunkReader, w io.Writer) *Backend {
func NewBackend(r io.Reader, w io.Writer) *Backend {
cr := newChunkReader(r, 0)
return &Backend{cr: cr, w: w}
}

View File

@ -16,7 +16,7 @@ func TestBackendReceiveInterrupted(t *testing.T) {
server := &interruptReader{}
server.push([]byte{'Q', 0, 0, 0, 6})
backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil)
backend := pgproto3.NewBackend(server, nil)
msg, err := backend.Receive()
if err == nil {
@ -43,7 +43,7 @@ func TestBackendReceiveUnexpectedEOF(t *testing.T) {
server := &interruptReader{}
server.push([]byte{'Q', 0, 0, 0, 6})
backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil)
backend := pgproto3.NewBackend(server, nil)
// Receive regular msg
msg, err := backend.Receive()
@ -77,7 +77,7 @@ func TestStartupMessage(t *testing.T) {
server := &interruptReader{}
server.push(dst)
backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil)
backend := pgproto3.NewBackend(server, nil)
msg, err := backend.ReceiveStartupMessage()
require.NoError(t, err)
@ -110,7 +110,7 @@ func TestStartupMessage(t *testing.T) {
dst = pgio.AppendUint32(dst, pgproto3.ProtocolVersionNumber)
server.push(dst)
backend := pgproto3.NewBackend(pgproto3.NewChunkReader(server), nil)
backend := pgproto3.NewBackend(server, nil)
msg, err := backend.ReceiveStartupMessage()
require.Error(t, err)

View File

@ -2,18 +2,88 @@ package pgproto3
import (
"io"
"github.com/jackc/pgx/v5/chunkreader"
)
// ChunkReader is an interface to decouple github.com/jackc/chunkreader from this package.
type ChunkReader interface {
// Next returns buf filled with the next n bytes. If an error (including a partial read) occurs,
// buf must be nil. Next must preserve any partially read data. Next must not reuse buf.
Next(n int) (buf []byte, err error)
// chunkReader is a io.Reader wrapper that minimizes IO reads and memory allocations. It allocates memory in chunks and
// will read as much as will fit in the current buffer in a single call regardless of how large a read is actually
// requested. The memory returned via Next is owned by the caller. This avoids the need for an additional copy.
//
// The downside of this approach is that a large buffer can be pinned in memory even if only a small slice is
// referenced. For example, an entire 4096 byte block could be pinned in memory by even a 1 byte slice. In these rare
// cases it would be advantageous to copy the bytes to another slice.
type chunkReader struct {
r io.Reader
buf []byte
rp, wp int // buf read position and write position
minBufLen int
}
// NewChunkReader creates and returns a new default ChunkReader.
func NewChunkReader(r io.Reader) ChunkReader {
return chunkreader.New(r)
// newChunkReader creates and returns a new chunkReader for r with default configuration with minBufSize internal buffer.
// If bufSize is <= 0 it uses a default value.
func newChunkReader(r io.Reader, minBufSize int) *chunkReader {
if minBufSize <= 0 {
// By historical reasons Postgres currently has 8KB send buffer inside,
// so here we want to have at least the same size buffer.
// @see https://github.com/postgres/postgres/blob/249d64999615802752940e017ee5166e726bc7cd/src/backend/libpq/pqcomm.c#L134
// @see https://www.postgresql.org/message-id/0cdc5485-cb3c-5e16-4a46-e3b2f7a41322%40ya.ru
minBufSize = 8192
}
return &chunkReader{
r: r,
buf: make([]byte, minBufSize),
minBufLen: minBufSize,
}
}
// Next returns buf filled with the next n bytes. The caller gains ownership of buf. It is not necessary to make a copy
// of buf. If an error occurs, buf will be nil.
func (r *chunkReader) Next(n int) (buf []byte, err error) {
// n bytes already in buf
if (r.wp - r.rp) >= n {
buf = r.buf[r.rp : r.rp+n]
r.rp += n
return buf, err
}
// available space in buf is less than n
if len(r.buf) < n {
r.copyBufContents(r.newBuf(n))
}
// buf is large enough, but need to shift filled area to start to make enough contiguous space
minReadCount := n - (r.wp - r.rp)
if (len(r.buf) - r.wp) < minReadCount {
newBuf := r.newBuf(n)
r.copyBufContents(newBuf)
}
if err := r.appendAtLeast(minReadCount); err != nil {
return nil, err
}
buf = r.buf[r.rp : r.rp+n]
r.rp += n
return buf, nil
}
func (r *chunkReader) appendAtLeast(fillLen int) error {
n, err := io.ReadAtLeast(r.r, r.buf[r.wp:], fillLen)
r.wp += n
return err
}
func (r *chunkReader) newBuf(size int) []byte {
if size < r.minBufLen {
size = r.minBufLen
}
return make([]byte, size)
}
func (r *chunkReader) copyBufContents(dest []byte) {
r.wp = copy(dest, r.buf[r.rp:r.wp])
r.rp = 0
r.buf = dest
}

View File

@ -1,4 +1,4 @@
package chunkreader
package pgproto3
import (
"bytes"
@ -8,10 +8,7 @@ import (
func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) {
server := &bytes.Buffer{}
r, err := NewConfig(server, Config{MinBufLen: 4})
if err != nil {
t.Fatal(err)
}
r := newChunkReader(server, 4)
src := []byte{1, 2, 3, 4}
server.Write(src)
@ -45,10 +42,7 @@ func TestChunkReaderNextDoesNotReadIfAlreadyBuffered(t *testing.T) {
func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) {
server := &bytes.Buffer{}
r, err := NewConfig(server, Config{MinBufLen: 4})
if err != nil {
t.Fatal(err)
}
r := newChunkReader(server, 4)
src := []byte{1, 2, 3, 4, 5, 6, 7, 8}
server.Write(src)
@ -67,10 +61,7 @@ func TestChunkReaderNextExpandsBufAsNeeded(t *testing.T) {
func TestChunkReaderDoesNotReuseBuf(t *testing.T) {
server := &bytes.Buffer{}
r, err := NewConfig(server, Config{MinBufLen: 4})
if err != nil {
t.Fatal(err)
}
r := newChunkReader(server, 4)
src := []byte{1, 2, 3, 4, 5, 6, 7, 8}
server.Write(src)
@ -108,10 +99,7 @@ func (r *randomReader) Read(p []byte) (n int, err error) {
func TestChunkReaderNextFuzz(t *testing.T) {
rr := &randomReader{rnd: rand.New(rand.NewSource(1))}
r, err := NewConfig(rr, Config{MinBufLen: 8192})
if err != nil {
t.Fatal(err)
}
r := newChunkReader(rr, 8192)
randomSizes := rand.New(rand.NewSource(0))

View File

@ -14,7 +14,7 @@ type PgFortuneBackend struct {
}
func NewPgFortuneBackend(conn net.Conn, responder func() ([]byte, error)) *PgFortuneBackend {
backend := pgproto3.NewBackend(pgproto3.NewChunkReader(conn), conn)
backend := pgproto3.NewBackend(conn, conn)
connHandler := &PgFortuneBackend{
backend: backend,

View File

@ -9,7 +9,7 @@ import (
// Frontend acts as a client for the PostgreSQL wire protocol version 3.
type Frontend struct {
cr ChunkReader
cr *chunkReader
w io.Writer
// Backend message flyweights
@ -49,7 +49,8 @@ type Frontend struct {
}
// NewFrontend creates a new Frontend.
func NewFrontend(cr ChunkReader, w io.Writer) *Frontend {
func NewFrontend(r io.Reader, w io.Writer) *Frontend {
cr := newChunkReader(r, 0)
return &Frontend{cr: cr, w: w}
}

View File

@ -38,7 +38,7 @@ func TestFrontendReceiveInterrupted(t *testing.T) {
server := &interruptReader{}
server.push([]byte{'Z', 0, 0, 0, 5})
frontend := pgproto3.NewFrontend(pgproto3.NewChunkReader(server), nil)
frontend := pgproto3.NewFrontend(server, nil)
msg, err := frontend.Receive()
if err == nil {
@ -65,7 +65,7 @@ func TestFrontendReceiveUnexpectedEOF(t *testing.T) {
server := &interruptReader{}
server.push([]byte{'Z', 0, 0, 0, 5})
frontend := pgproto3.NewFrontend(pgproto3.NewChunkReader(server), nil)
frontend := pgproto3.NewFrontend(server, nil)
msg, err := frontend.Receive()
if err == nil {
@ -109,7 +109,7 @@ func TestErrorResponse(t *testing.T) {
server := &interruptReader{}
server.push(raw)
frontend := pgproto3.NewFrontend(pgproto3.NewChunkReader(server), nil)
frontend := pgproto3.NewFrontend(server, nil)
got, err := frontend.Receive()
require.NoError(t, err)