mirror of https://github.com/jackc/pgx.git
Use pgconn for more conn status
parent
ae73ca2da1
commit
2978a7440a
48
conn.go
48
conn.go
|
@ -46,7 +46,6 @@ type Conn struct {
|
|||
preallocatedRows []connRows
|
||||
|
||||
mux sync.Mutex
|
||||
status byte // One of connStatus* constants
|
||||
causeOfDeath error
|
||||
|
||||
lastStmtSent bool
|
||||
|
@ -95,10 +94,6 @@ var ErrDeadConn = errors.New("conn is dead")
|
|||
// PostgreSQL server refuses to use TLS
|
||||
var ErrTLSRefused = pgconn.ErrTLSRefused
|
||||
|
||||
// ErrConnBusy occurs when the connection is busy (for example, in the middle of
|
||||
// reading query results) and another action is attempted.
|
||||
var ErrConnBusy = errors.New("conn is busy")
|
||||
|
||||
// ErrInvalidLogLevel occurs on attempt to set an invalid log level.
|
||||
var ErrInvalidLogLevel = errors.New("invalid log level")
|
||||
|
||||
|
@ -169,7 +164,6 @@ func connect(ctx context.Context, config *ConnConfig) (c *Conn, err error) {
|
|||
c.doneChan = make(chan struct{})
|
||||
c.closedChan = make(chan error)
|
||||
c.wbuf = make([]byte, 0, 1024)
|
||||
c.status = connStatusIdle
|
||||
|
||||
// Replication connections can't execute the queries to
|
||||
// populate the c.PgTypes and c.pgsqlAfInet
|
||||
|
@ -199,10 +193,9 @@ func (c *Conn) Close(ctx context.Context) error {
|
|||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
if c.status < connStatusIdle {
|
||||
if !c.IsAlive() {
|
||||
return nil
|
||||
}
|
||||
c.status = connStatusClosed
|
||||
|
||||
err := c.pgConn.Close(ctx)
|
||||
c.causeOfDeath = errors.New("Closed")
|
||||
|
@ -306,9 +299,7 @@ func (c *Conn) deallocateContext(ctx context.Context, name string) (err error) {
|
|||
}
|
||||
|
||||
func (c *Conn) IsAlive() bool {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
return c.pgConn.IsAlive() && c.status >= connStatusIdle
|
||||
return c.pgConn.IsAlive()
|
||||
}
|
||||
|
||||
func (c *Conn) CauseOfDeath() error {
|
||||
|
@ -364,37 +355,15 @@ func (c *Conn) die(err error) {
|
|||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
if c.status == connStatusClosed {
|
||||
if !c.IsAlive() {
|
||||
return
|
||||
}
|
||||
|
||||
c.status = connStatusClosed
|
||||
c.causeOfDeath = err
|
||||
c.pgConn.Conn().Close()
|
||||
}
|
||||
|
||||
func (c *Conn) lock() error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
if c.status != connStatusIdle {
|
||||
return ErrConnBusy
|
||||
}
|
||||
|
||||
c.status = connStatusBusy
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) unlock() error {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
if c.status != connStatusBusy {
|
||||
return errors.New("unlock conn that is not busy")
|
||||
}
|
||||
|
||||
c.status = connStatusIdle
|
||||
return nil
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // force immediate hard cancel
|
||||
c.pgConn.Close(ctx)
|
||||
}
|
||||
|
||||
func (c *Conn) shouldLog(lvl LogLevel) bool {
|
||||
|
@ -488,11 +457,6 @@ func (c *Conn) PgConn() *pgconn.PgConn { return c.pgConn }
|
|||
func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
||||
c.lastStmtSent = false
|
||||
|
||||
if err := c.lock(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer c.unlock()
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
commandTag, err := c.exec(ctx, sql, arguments...)
|
||||
|
|
|
@ -215,6 +215,7 @@ func TestExecContextFailureWithoutCancelationWithArguments(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestExecFailureCloseBefore(t *testing.T) {
|
||||
t.Skip("TODO: LastStmtSent needs to be ported / rewritten for pgconn")
|
||||
t.Parallel()
|
||||
|
||||
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
|
||||
|
@ -505,8 +506,8 @@ func TestCatchSimultaneousConnectionQueries(t *testing.T) {
|
|||
defer rows1.Close()
|
||||
|
||||
_, err = conn.Query(context.Background(), "select generate_series(1,$1)", 10)
|
||||
if err != pgx.ErrConnBusy {
|
||||
t.Fatalf("conn.Query should have failed with pgx.ErrConnBusy, but it was %v", err)
|
||||
if err != pgconn.ErrConnBusy {
|
||||
t.Fatalf("conn.Query should have failed with pgconn.ErrConnBusy, but it was %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -523,8 +524,8 @@ func TestCatchSimultaneousConnectionQueryAndExec(t *testing.T) {
|
|||
defer rows.Close()
|
||||
|
||||
_, err = conn.Exec(context.Background(), "create temporary table foo(spice timestamp[])")
|
||||
if err != pgx.ErrConnBusy {
|
||||
t.Fatalf("conn.Exec should have failed with pgx.ErrConnBusy, but it was %v", err)
|
||||
if err != pgconn.ErrConnBusy {
|
||||
t.Fatalf("conn.Exec should have failed with pgconn.ErrConnBusy, but it was %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -4,7 +4,7 @@ go 1.12
|
|||
|
||||
require (
|
||||
github.com/cockroachdb/apd v1.1.0
|
||||
github.com/jackc/pgconn v0.0.0-20190419205212-7bb6c2f3e982
|
||||
github.com/jackc/pgconn v0.0.0-20190419211655-3710e52a9a12
|
||||
github.com/jackc/pgio v1.0.0
|
||||
github.com/jackc/pgproto3 v1.1.0
|
||||
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190419041544-9b6a681f50bf
|
||||
|
|
2
go.sum
2
go.sum
|
@ -12,6 +12,8 @@ github.com/jackc/pgconn v0.0.0-20190419194309-16412e56e22d h1:hcjvON7F1UqRaTfXyg
|
|||
github.com/jackc/pgconn v0.0.0-20190419194309-16412e56e22d/go.mod h1:UsnoyBN75lNxOeZXUT70J9xAvZffv2fxrxCrIPIH/Rk=
|
||||
github.com/jackc/pgconn v0.0.0-20190419205212-7bb6c2f3e982 h1:U0G6KWiQ9xrqbXt96OV+BF5dKZdLbvedURzqGPqH5jg=
|
||||
github.com/jackc/pgconn v0.0.0-20190419205212-7bb6c2f3e982/go.mod h1:UsnoyBN75lNxOeZXUT70J9xAvZffv2fxrxCrIPIH/Rk=
|
||||
github.com/jackc/pgconn v0.0.0-20190419211655-3710e52a9a12 h1:PzGjcOqGl6npHTDt8yDK5lnI9/ZQ+5ZpywzazR+yd8Q=
|
||||
github.com/jackc/pgconn v0.0.0-20190419211655-3710e52a9a12/go.mod h1:UsnoyBN75lNxOeZXUT70J9xAvZffv2fxrxCrIPIH/Rk=
|
||||
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
|
||||
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
|
|
34
query.go
34
query.go
|
@ -70,18 +70,17 @@ func (r *connRow) Scan(dest ...interface{}) (err error) {
|
|||
|
||||
// connRows implements the Rows interface for Conn.Query.
|
||||
type connRows struct {
|
||||
conn *Conn
|
||||
batch *Batch
|
||||
values [][]byte
|
||||
fields []FieldDescription
|
||||
rowCount int
|
||||
columnIdx int
|
||||
err error
|
||||
startTime time.Time
|
||||
sql string
|
||||
args []interface{}
|
||||
unlockConn bool
|
||||
closed bool
|
||||
conn *Conn
|
||||
batch *Batch
|
||||
values [][]byte
|
||||
fields []FieldDescription
|
||||
rowCount int
|
||||
columnIdx int
|
||||
err error
|
||||
startTime time.Time
|
||||
sql string
|
||||
args []interface{}
|
||||
closed bool
|
||||
|
||||
resultReader *pgconn.ResultReader
|
||||
multiResultReader *pgconn.MultiResultReader
|
||||
|
@ -96,11 +95,6 @@ func (rows *connRows) Close() {
|
|||
return
|
||||
}
|
||||
|
||||
if rows.unlockConn {
|
||||
rows.conn.unlock()
|
||||
rows.unlockConn = false
|
||||
}
|
||||
|
||||
rows.closed = true
|
||||
|
||||
if rows.resultReader != nil {
|
||||
|
@ -315,12 +309,6 @@ optionLoop:
|
|||
args: args,
|
||||
}
|
||||
|
||||
if err := c.lock(); err != nil {
|
||||
rows.fatal(err)
|
||||
return rows, err
|
||||
}
|
||||
rows.unlockConn = true
|
||||
|
||||
// err = c.initContext(ctx)
|
||||
// if err != nil {
|
||||
// rows.fatal(err)
|
||||
|
|
Loading…
Reference in New Issue