mirror of https://github.com/jackc/pgx.git
Introduce testVariants in prep for TLS
parent
ecebd7b103
commit
5b64432afb
internal/nbconn
|
@ -9,7 +9,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWriteIsBuffered(t *testing.T) {
|
||||
func testVariants(t *testing.T, f func(t *testing.T, local *nbconn.Conn, remote net.Conn)) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
|
@ -17,302 +17,268 @@ func TestWriteIsBuffered(t *testing.T) {
|
|||
}()
|
||||
|
||||
conn := nbconn.New(local)
|
||||
f(t, conn, remote)
|
||||
}
|
||||
|
||||
// net.Pipe is synchronous so the Write would block if not buffered.
|
||||
writeBuf := []byte("test")
|
||||
n, err := conn.Write(writeBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
func TestWriteIsBuffered(t *testing.T) {
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
// net.Pipe is synchronous so the Write would block if not buffered.
|
||||
writeBuf := []byte("test")
|
||||
n, err := conn.Write(writeBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := conn.Flush()
|
||||
errChan <- err
|
||||
}()
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := conn.Flush()
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
readBuf := make([]byte, len(writeBuf))
|
||||
_, err = remote.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
readBuf := make([]byte, len(writeBuf))
|
||||
_, err = remote.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, <-errChan)
|
||||
require.NoError(t, <-errChan)
|
||||
})
|
||||
}
|
||||
|
||||
func TestReadFlushesWriteBuffer(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
writeBuf := []byte("test")
|
||||
n, err := conn.Write(writeBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
|
||||
conn := nbconn.New(local)
|
||||
errChan := make(chan error, 2)
|
||||
go func() {
|
||||
readBuf := make([]byte, len(writeBuf))
|
||||
_, err := remote.Read(readBuf)
|
||||
errChan <- err
|
||||
|
||||
writeBuf := []byte("test")
|
||||
n, err := conn.Write(writeBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
_, err = remote.Write([]byte("okay"))
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
errChan := make(chan error, 2)
|
||||
go func() {
|
||||
readBuf := make([]byte, len(writeBuf))
|
||||
_, err := remote.Read(readBuf)
|
||||
errChan <- err
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("okay"), readBuf)
|
||||
|
||||
_, err = remote.Write([]byte("okay"))
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte("okay"), readBuf)
|
||||
|
||||
require.NoError(t, <-errChan)
|
||||
require.NoError(t, <-errChan)
|
||||
require.NoError(t, <-errChan)
|
||||
require.NoError(t, <-errChan)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCloseFlushesWriteBuffer(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
writeBuf := []byte("test")
|
||||
n, err := conn.Write(writeBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
|
||||
conn := nbconn.New(local)
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
readBuf := make([]byte, len(writeBuf))
|
||||
_, err := remote.Read(readBuf)
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
writeBuf := []byte("test")
|
||||
n, err := conn.Write(writeBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
err = conn.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
readBuf := make([]byte, len(writeBuf))
|
||||
_, err := remote.Read(readBuf)
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
err = conn.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, <-errChan)
|
||||
require.NoError(t, <-errChan)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNonBlockingRead(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
err := conn.SetReadDeadline(nbconn.NonBlockingDeadline)
|
||||
require.NoError(t, err)
|
||||
|
||||
conn := nbconn.New(local)
|
||||
buf := make([]byte, 4)
|
||||
n, err := conn.Read(buf)
|
||||
require.ErrorIs(t, err, nbconn.ErrWouldBlock)
|
||||
require.EqualValues(t, 0, n)
|
||||
|
||||
err := conn.SetReadDeadline(nbconn.NonBlockingDeadline)
|
||||
require.NoError(t, err)
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := remote.Write([]byte("okay"))
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
buf := make([]byte, 4)
|
||||
n, err := conn.Read(buf)
|
||||
require.ErrorIs(t, err, nbconn.ErrWouldBlock)
|
||||
require.EqualValues(t, 0, n)
|
||||
err = conn.SetReadDeadline(time.Time{})
|
||||
require.NoError(t, err)
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := remote.Write([]byte("okay"))
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
err = conn.SetReadDeadline(time.Time{})
|
||||
require.NoError(t, err)
|
||||
|
||||
n, err = conn.Read(buf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
n, err = conn.Read(buf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, n)
|
||||
})
|
||||
}
|
||||
|
||||
func TestReadPreviouslyBuffered(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
conn := nbconn.New(local)
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
readBuf := make([]byte, 5)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 5, n)
|
||||
require.Equal(t, []byte("alpha"), readBuf)
|
||||
readBuf := make([]byte, 5)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 5, n)
|
||||
require.Equal(t, []byte("alpha"), readBuf)
|
||||
})
|
||||
}
|
||||
|
||||
func TestReadPreviouslyBufferedPartialRead(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
conn := nbconn.New(local)
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
readBuf := make([]byte, 2)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 2, n)
|
||||
require.Equal(t, []byte("al"), readBuf)
|
||||
|
||||
readBuf := make([]byte, 2)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 2, n)
|
||||
require.Equal(t, []byte("al"), readBuf)
|
||||
|
||||
readBuf = make([]byte, 3)
|
||||
n, err = conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 3, n)
|
||||
require.Equal(t, []byte("pha"), readBuf)
|
||||
readBuf = make([]byte, 3)
|
||||
n, err = conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 3, n)
|
||||
require.Equal(t, []byte("pha"), readBuf)
|
||||
})
|
||||
}
|
||||
|
||||
func TestReadMultiplePreviouslyBuffered(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = remote.Write([]byte("beta"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = remote.Write([]byte("beta"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
conn := nbconn.New(local)
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
readBuf := make([]byte, 9)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 9, n)
|
||||
require.Equal(t, []byte("alphabeta"), readBuf)
|
||||
readBuf := make([]byte, 9)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 9, n)
|
||||
require.Equal(t, []byte("alphabeta"), readBuf)
|
||||
})
|
||||
}
|
||||
|
||||
func TestReadPreviouslyBufferedAndReadMore(t *testing.T) {
|
||||
local, remote := net.Pipe()
|
||||
defer func() {
|
||||
local.Close()
|
||||
remote.Close()
|
||||
}()
|
||||
testVariants(t, func(t *testing.T, conn *nbconn.Conn, remote net.Conn) {
|
||||
|
||||
flushCompleteChan := make(chan struct{})
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
flushCompleteChan := make(chan struct{})
|
||||
errChan := make(chan error, 1)
|
||||
go func() {
|
||||
err := func() error {
|
||||
_, err := remote.Write([]byte("alpha"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readBuf := make([]byte, 4)
|
||||
_, err = remote.Read(readBuf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-flushCompleteChan
|
||||
<-flushCompleteChan
|
||||
|
||||
_, err = remote.Write([]byte("beta"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = remote.Write([]byte("beta"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
errChan <- err
|
||||
}()
|
||||
|
||||
conn := nbconn.New(local)
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err := conn.Write([]byte("test"))
|
||||
require.NoError(t, err)
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Because net.Pipe() is synchronous conn.Flust must buffer a read.
|
||||
err = conn.Flush()
|
||||
require.NoError(t, err)
|
||||
close(flushCompleteChan)
|
||||
|
||||
close(flushCompleteChan)
|
||||
|
||||
readBuf := make([]byte, 9)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 9, n)
|
||||
require.Equal(t, []byte("alphabeta"), readBuf)
|
||||
readBuf := make([]byte, 9)
|
||||
n, err := conn.Read(readBuf)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 9, n)
|
||||
require.Equal(t, []byte("alphabeta"), readBuf)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue