mirror of
https://github.com/jackc/pgx.git
synced 2025-04-27 13:14:32 +00:00
This commit adds a background reader that can optionally buffer reads. It is used whenever a potentially blocking write is made to the server. The background reader is started on a slight delay so there should be no meaningful performance impact as it doesn't run for quick queries and its overhead is minimal relative to slower queries.
141 lines
3.3 KiB
Go
141 lines
3.3 KiB
Go
package bgreader_test
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"io"
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgconn/internal/bgreader"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestBGReaderReadWhenStopped(t *testing.T) {
|
|
r := bytes.NewReader([]byte("foo bar baz"))
|
|
bgr := bgreader.New(r)
|
|
buf, err := io.ReadAll(bgr)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("foo bar baz"), buf)
|
|
}
|
|
|
|
func TestBGReaderReadWhenStarted(t *testing.T) {
|
|
r := bytes.NewReader([]byte("foo bar baz"))
|
|
bgr := bgreader.New(r)
|
|
bgr.Start()
|
|
buf, err := io.ReadAll(bgr)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("foo bar baz"), buf)
|
|
}
|
|
|
|
type mockReadFunc func(p []byte) (int, error)
|
|
|
|
type mockReader struct {
|
|
readFuncs []mockReadFunc
|
|
}
|
|
|
|
func (r *mockReader) Read(p []byte) (int, error) {
|
|
if len(r.readFuncs) == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
fn := r.readFuncs[0]
|
|
r.readFuncs = r.readFuncs[1:]
|
|
|
|
return fn(p)
|
|
}
|
|
|
|
func TestBGReaderReadWaitsForBackgroundRead(t *testing.T) {
|
|
rr := &mockReader{
|
|
readFuncs: []mockReadFunc{
|
|
func(p []byte) (int, error) { time.Sleep(1 * time.Second); return copy(p, []byte("foo")), nil },
|
|
func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
|
|
func(p []byte) (int, error) { return copy(p, []byte("baz")), nil },
|
|
},
|
|
}
|
|
bgr := bgreader.New(rr)
|
|
bgr.Start()
|
|
buf := make([]byte, 3)
|
|
n, err := bgr.Read(buf)
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, 3, n)
|
|
require.Equal(t, []byte("foo"), buf)
|
|
}
|
|
|
|
func TestBGReaderErrorWhenStarted(t *testing.T) {
|
|
rr := &mockReader{
|
|
readFuncs: []mockReadFunc{
|
|
func(p []byte) (int, error) { return copy(p, []byte("foo")), nil },
|
|
func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
|
|
func(p []byte) (int, error) { return copy(p, []byte("baz")), errors.New("oops") },
|
|
},
|
|
}
|
|
|
|
bgr := bgreader.New(rr)
|
|
bgr.Start()
|
|
buf, err := io.ReadAll(bgr)
|
|
require.Equal(t, []byte("foobarbaz"), buf)
|
|
require.EqualError(t, err, "oops")
|
|
}
|
|
|
|
func TestBGReaderErrorWhenStopped(t *testing.T) {
|
|
rr := &mockReader{
|
|
readFuncs: []mockReadFunc{
|
|
func(p []byte) (int, error) { return copy(p, []byte("foo")), nil },
|
|
func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
|
|
func(p []byte) (int, error) { return copy(p, []byte("baz")), errors.New("oops") },
|
|
},
|
|
}
|
|
|
|
bgr := bgreader.New(rr)
|
|
buf, err := io.ReadAll(bgr)
|
|
require.Equal(t, []byte("foobarbaz"), buf)
|
|
require.EqualError(t, err, "oops")
|
|
}
|
|
|
|
type numberReader struct {
|
|
v uint8
|
|
rng *rand.Rand
|
|
}
|
|
|
|
func (nr *numberReader) Read(p []byte) (int, error) {
|
|
n := nr.rng.Intn(len(p))
|
|
for i := 0; i < n; i++ {
|
|
p[i] = nr.v
|
|
nr.v++
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// TestBGReaderStress stress tests BGReader by reading a lot of bytes in random sizes while randomly starting and
|
|
// stopping the background worker from other goroutines.
|
|
func TestBGReaderStress(t *testing.T) {
|
|
nr := &numberReader{rng: rand.New(rand.NewSource(0))}
|
|
bgr := bgreader.New(nr)
|
|
|
|
bytesRead := 0
|
|
var expected uint8
|
|
buf := make([]byte, 10_000)
|
|
rng := rand.New(rand.NewSource(0))
|
|
|
|
for bytesRead < 1_000_000 {
|
|
randomNumber := rng.Intn(100)
|
|
switch {
|
|
case randomNumber < 10:
|
|
go bgr.Start()
|
|
case randomNumber < 20:
|
|
go bgr.Stop()
|
|
default:
|
|
n, err := bgr.Read(buf)
|
|
require.NoError(t, err)
|
|
for i := 0; i < n; i++ {
|
|
require.Equal(t, expected, buf[i])
|
|
expected++
|
|
}
|
|
bytesRead += n
|
|
}
|
|
}
|
|
}
|