mirror of https://github.com/jackc/pgx.git
Merge alive and busy states into atomic status
parent
09d37880ba
commit
f0dfe4fe89
56
conn.go
56
conn.go
|
@ -18,10 +18,17 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
connStatusUninitialized = iota
|
||||||
|
connStatusClosed
|
||||||
|
connStatusIdle
|
||||||
|
connStatusBusy
|
||||||
|
)
|
||||||
|
|
||||||
// DialFunc is a function that can be used to connect to a PostgreSQL server
|
// DialFunc is a function that can be used to connect to a PostgreSQL server
|
||||||
type DialFunc func(network, addr string) (net.Conn, error)
|
type DialFunc func(network, addr string) (net.Conn, error)
|
||||||
|
|
||||||
|
@ -80,12 +87,10 @@ type Conn struct {
|
||||||
fp *fastpath
|
fp *fastpath
|
||||||
pgsqlAfInet *byte
|
pgsqlAfInet *byte
|
||||||
pgsqlAfInet6 *byte
|
pgsqlAfInet6 *byte
|
||||||
busy bool
|
|
||||||
poolResetCount int
|
poolResetCount int
|
||||||
preallocatedRows []Rows
|
preallocatedRows []Rows
|
||||||
|
|
||||||
closingLock sync.Mutex
|
status int32 // One of connStatus* constants
|
||||||
alive bool
|
|
||||||
causeOfDeath error
|
causeOfDeath error
|
||||||
|
|
||||||
// context support
|
// context support
|
||||||
|
@ -252,14 +257,14 @@ func (c *Conn) connect(config ConnConfig, network, address string, tlsConfig *tl
|
||||||
defer func() {
|
defer func() {
|
||||||
if c != nil && err != nil {
|
if c != nil && err != nil {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.alive = false
|
atomic.StoreInt32(&c.status, connStatusClosed)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
c.RuntimeParams = make(map[string]string)
|
c.RuntimeParams = make(map[string]string)
|
||||||
c.preparedStatements = make(map[string]*PreparedStatement)
|
c.preparedStatements = make(map[string]*PreparedStatement)
|
||||||
c.channels = make(map[string]struct{})
|
c.channels = make(map[string]struct{})
|
||||||
c.alive = true
|
atomic.StoreInt32(&c.status, connStatusIdle)
|
||||||
c.lastActivityTime = time.Now()
|
c.lastActivityTime = time.Now()
|
||||||
c.doneChan = make(chan struct{})
|
c.doneChan = make(chan struct{})
|
||||||
c.closedChan = make(chan error)
|
c.closedChan = make(chan error)
|
||||||
|
@ -399,11 +404,14 @@ func (c *Conn) loadInetConstants() error {
|
||||||
// Close closes a connection. It is safe to call Close on a already closed
|
// Close closes a connection. It is safe to call Close on a already closed
|
||||||
// connection.
|
// connection.
|
||||||
func (c *Conn) Close() (err error) {
|
func (c *Conn) Close() (err error) {
|
||||||
c.closingLock.Lock()
|
for {
|
||||||
defer c.closingLock.Unlock()
|
status := atomic.LoadInt32(&c.status)
|
||||||
|
if status < connStatusIdle {
|
||||||
if !c.alive {
|
return nil
|
||||||
return nil
|
}
|
||||||
|
if atomic.CompareAndSwapInt32(&c.status, status, connStatusClosed) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = c.conn.Write([]byte{'X', 0, 0, 0, 4})
|
_, err = c.conn.Write([]byte{'X', 0, 0, 0, 4})
|
||||||
|
@ -893,10 +901,7 @@ func (c *Conn) waitForNotification(deadline time.Time) (*Notification, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) IsAlive() bool {
|
func (c *Conn) IsAlive() bool {
|
||||||
c.closingLock.Lock()
|
return atomic.LoadInt32(&c.status) >= connStatusIdle
|
||||||
alive := c.alive
|
|
||||||
c.closingLock.Unlock()
|
|
||||||
return alive
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) CauseOfDeath() error {
|
func (c *Conn) CauseOfDeath() error {
|
||||||
|
@ -1071,12 +1076,9 @@ func (c *Conn) processContextFreeMsg(t byte, r *msgReader) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) rxMsg() (t byte, r *msgReader, err error) {
|
func (c *Conn) rxMsg() (t byte, r *msgReader, err error) {
|
||||||
c.closingLock.Lock()
|
if atomic.LoadInt32(&c.status) < connStatusIdle {
|
||||||
if !c.alive {
|
|
||||||
c.closingLock.Unlock()
|
|
||||||
return 0, nil, ErrDeadConn
|
return 0, nil, ErrDeadConn
|
||||||
}
|
}
|
||||||
c.closingLock.Unlock()
|
|
||||||
|
|
||||||
t, err = c.mr.rxMsg()
|
t, err = c.mr.rxMsg()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1261,25 +1263,23 @@ func (c *Conn) txPasswordMessage(password string) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) die(err error) {
|
func (c *Conn) die(err error) {
|
||||||
c.alive = false
|
atomic.StoreInt32(&c.status, connStatusClosed)
|
||||||
c.causeOfDeath = err
|
c.causeOfDeath = err
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) lock() error {
|
func (c *Conn) lock() error {
|
||||||
if c.busy {
|
if atomic.CompareAndSwapInt32(&c.status, connStatusIdle, connStatusBusy) {
|
||||||
return ErrConnBusy
|
return nil
|
||||||
}
|
}
|
||||||
c.busy = true
|
return ErrConnBusy
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) unlock() error {
|
func (c *Conn) unlock() error {
|
||||||
if !c.busy {
|
if atomic.CompareAndSwapInt32(&c.status, connStatusBusy, connStatusIdle) {
|
||||||
return errors.New("unlock conn that is not busy")
|
return nil
|
||||||
}
|
}
|
||||||
c.busy = false
|
return errors.New("unlock conn that is not busy")
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) shouldLog(lvl int) bool {
|
func (c *Conn) shouldLog(lvl int) bool {
|
||||||
|
|
Loading…
Reference in New Issue