Rename nbbconn to nbconn

non-blocking
Jack Christensen 2022-06-04 09:04:48 -05:00
parent 73398bc67a
commit 09c64d8cf3
4 changed files with 18 additions and 18 deletions

View File

@ -1,4 +1,4 @@
package nbbconn package nbconn
import ( import (
"sync" "sync"

View File

@ -1,5 +1,5 @@
// Package nbbconn implements a non-blocking, buffered net.Conn wrapper. // Package nbconn implements a non-blocking net.Conn wrapper.
package nbbconn package nbconn
import ( import (
"errors" "errors"
@ -19,7 +19,7 @@ const fakeNonblockingWaitDuration = 100 * time.Millisecond
var NonBlockingDeadline = time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC) var NonBlockingDeadline = time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC)
// Conn is a non-blocking, buffered net.Conn wrapper. It implements net.Conn. // Conn is a non-blocking net.Conn wrapper. It implements net.Conn.
// //
// It is designed to solve three problems. // It is designed to solve three problems.
// //

View File

@ -1,11 +1,11 @@
package nbbconn_test package nbconn_test
import ( import (
"net" "net"
"testing" "testing"
"time" "time"
"github.com/jackc/pgx/v5/internal/nbbconn" "github.com/jackc/pgx/v5/internal/nbconn"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -16,7 +16,7 @@ func TestWriteIsBuffered(t *testing.T) {
remote.Close() remote.Close()
}() }()
conn := nbbconn.New(local) conn := nbconn.New(local)
// net.Pipe is synchronous so the Write would block if not buffered. // net.Pipe is synchronous so the Write would block if not buffered.
writeBuf := []byte("test") writeBuf := []byte("test")
@ -44,7 +44,7 @@ func TestReadFlushesWriteBuffer(t *testing.T) {
remote.Close() remote.Close()
}() }()
conn := nbbconn.New(local) conn := nbconn.New(local)
writeBuf := []byte("test") writeBuf := []byte("test")
n, err := conn.Write(writeBuf) n, err := conn.Write(writeBuf)
@ -77,7 +77,7 @@ func TestCloseFlushesWriteBuffer(t *testing.T) {
remote.Close() remote.Close()
}() }()
conn := nbbconn.New(local) conn := nbconn.New(local)
writeBuf := []byte("test") writeBuf := []byte("test")
n, err := conn.Write(writeBuf) n, err := conn.Write(writeBuf)
@ -104,14 +104,14 @@ func TestNonBlockingRead(t *testing.T) {
remote.Close() remote.Close()
}() }()
conn := nbbconn.New(local) conn := nbconn.New(local)
err := conn.SetReadDeadline(nbbconn.NonBlockingDeadline) err := conn.SetReadDeadline(nbconn.NonBlockingDeadline)
require.NoError(t, err) require.NoError(t, err)
buf := make([]byte, 4) buf := make([]byte, 4)
n, err := conn.Read(buf) n, err := conn.Read(buf)
require.ErrorIs(t, err, nbbconn.ErrWouldBlock) require.ErrorIs(t, err, nbconn.ErrWouldBlock)
require.EqualValues(t, 0, n) require.EqualValues(t, 0, n)
errChan := make(chan error, 1) errChan := make(chan error, 1)

View File

@ -16,7 +16,7 @@ import (
"time" "time"
"github.com/jackc/pgx/v5/internal/iobufpool" "github.com/jackc/pgx/v5/internal/iobufpool"
"github.com/jackc/pgx/v5/internal/nbbconn" "github.com/jackc/pgx/v5/internal/nbconn"
"github.com/jackc/pgx/v5/internal/pgio" "github.com/jackc/pgx/v5/internal/pgio"
"github.com/jackc/pgx/v5/pgconn/internal/ctxwatch" "github.com/jackc/pgx/v5/pgconn/internal/ctxwatch"
"github.com/jackc/pgx/v5/pgproto3" "github.com/jackc/pgx/v5/pgproto3"
@ -248,8 +248,8 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
pgConn.contextWatcher.Watch(ctx) pgConn.contextWatcher.Watch(ctx)
} }
pgConn.conn = nbbconn.New(pgConn.conn) pgConn.conn = nbconn.New(pgConn.conn)
pgConn.contextWatcher.Unwatch() // context watcher should watch nbbconn pgConn.contextWatcher.Unwatch() // context watcher should watch nbconn
pgConn.contextWatcher = newContextWatcher(pgConn.conn) pgConn.contextWatcher = newContextWatcher(pgConn.conn)
pgConn.contextWatcher.Watch(ctx) pgConn.contextWatcher.Watch(ctx)
@ -428,7 +428,7 @@ func (pgConn *PgConn) peekMessage() (pgproto3.BackendMessage, error) {
msg, err := pgConn.frontend.Receive() msg, err := pgConn.frontend.Receive()
if err != nil { if err != nil {
if errors.Is(err, nbbconn.ErrWouldBlock) { if errors.Is(err, nbconn.ErrWouldBlock) {
return nil, err return nil, err
} }
@ -1143,7 +1143,7 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
return CommandTag{}, err return CommandTag{}, err
} }
err = pgConn.conn.SetReadDeadline(nbbconn.NonBlockingDeadline) err = pgConn.conn.SetReadDeadline(nbconn.NonBlockingDeadline)
if err != nil { if err != nil {
pgConn.asyncClose() pgConn.asyncClose()
return CommandTag{}, err return CommandTag{}, err
@ -1185,7 +1185,7 @@ func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (Co
for pgErr == nil { for pgErr == nil {
msg, err := pgConn.receiveMessage() msg, err := pgConn.receiveMessage()
if err != nil { if err != nil {
if errors.Is(err, nbbconn.ErrWouldBlock) { if errors.Is(err, nbconn.ErrWouldBlock) {
break break
} }
pgConn.asyncClose() pgConn.asyncClose()