mirror of https://github.com/jackc/pgx.git
Track Listen/Unlisten and clear subscriptions on conn release
parent
516c88fde3
commit
eb5cadccf9
32
conn.go
32
conn.go
|
@ -53,6 +53,7 @@ type Conn struct {
|
|||
config ConnConfig // config used when establishing this connection
|
||||
TxStatus byte
|
||||
preparedStatements map[string]*PreparedStatement
|
||||
channels map[string]struct{}
|
||||
notifications []*Notification
|
||||
alive bool
|
||||
causeOfDeath error
|
||||
|
@ -197,6 +198,7 @@ func (c *Conn) connect(config ConnConfig, network, address string, tlsConfig *tl
|
|||
|
||||
c.RuntimeParams = make(map[string]string)
|
||||
c.preparedStatements = make(map[string]*PreparedStatement)
|
||||
c.channels = make(map[string]struct{})
|
||||
c.alive = true
|
||||
c.lastActivityTime = time.Now()
|
||||
|
||||
|
@ -602,6 +604,36 @@ func (c *Conn) Deallocate(name string) (err error) {
|
|||
// Listen establishes a PostgreSQL listen/notify to channel
|
||||
func (c *Conn) Listen(channel string) (err error) {
|
||||
_, err = c.Exec("listen " + channel)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var s struct{}
|
||||
c.channels[channel] = s
|
||||
return
|
||||
}
|
||||
|
||||
// Unlisten unsubscribes from a listen channel
|
||||
// If channel is empty then unsubscribe from all channels
|
||||
func (c *Conn) Unlisten(channel string) (err error) {
|
||||
if channel != "" {
|
||||
err = c.unlisten(channel)
|
||||
} else {
|
||||
for s, _ := range c.channels {
|
||||
err = c.unlisten(s)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Conn) unlisten(channel string) (err error) {
|
||||
_, err = c.Exec("unlisten " + channel)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
delete(c.channels, channel)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -118,6 +118,7 @@ func (p *ConnPool) Release(conn *Conn) {
|
|||
if conn.TxStatus != 'I' {
|
||||
conn.Exec("rollback")
|
||||
}
|
||||
conn.Unlisten("")
|
||||
|
||||
p.cond.L.Lock()
|
||||
if conn.IsAlive() {
|
||||
|
|
Loading…
Reference in New Issue