Added option to customize query cancellation and wait for ready

CockroachDB doesn't support 80877102 and doesn't plan to, so instead allow
the user to customize the cancellation with their own function. In our
function, we call CANCEL QUERY with the query_id based on the LocalAddr().

The WaitForReady method can be used by a pool to not put a connection back
in the pool until it is finished cancelled and ready for a new query.
pull/477/head
James Hartig 2018-10-31 16:34:35 -04:00
parent c6cec81e2c
commit 527f404bbc
1 changed files with 68 additions and 45 deletions

113
conn.go
View File

@ -20,7 +20,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -78,6 +77,7 @@ type ConnConfig struct {
RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name) RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name)
OnNotice NoticeHandler // Callback function called when a notice response is received. OnNotice NoticeHandler // Callback function called when a notice response is received.
CustomConnInfo func(*Conn) (*pgtype.ConnInfo, error) // Callback function to implement connection strategies for different backends. crate, pgbouncer, pgpool, etc. CustomConnInfo func(*Conn) (*pgtype.ConnInfo, error) // Callback function to implement connection strategies for different backends. crate, pgbouncer, pgpool, etc.
CustomCancel func(*Conn) error // Callback function used to override cancellation behavior
// PreferSimpleProtocol disables implicit prepared statement usage. By default // PreferSimpleProtocol disables implicit prepared statement usage. By default
// pgx automatically uses the unnamed prepared statement for Query and // pgx automatically uses the unnamed prepared statement for Query and
@ -133,8 +133,7 @@ type Conn struct {
status byte // One of connStatus* constants status byte // One of connStatus* constants
causeOfDeath error causeOfDeath error
pendingReadyForQueryCount int // numer of ReadyForQuery messages expected pendingReadyForQueryCount int // number of ReadyForQuery messages expected
cancelQueryInProgress int32
cancelQueryCompleted chan struct{} cancelQueryCompleted chan struct{}
// context support // context support
@ -309,7 +308,8 @@ func (c *Conn) connect(config ConnConfig, network, address string, tlsConfig *tl
c.preparedStatements = make(map[string]*PreparedStatement) c.preparedStatements = make(map[string]*PreparedStatement)
c.channels = make(map[string]struct{}) c.channels = make(map[string]struct{})
c.lastActivityTime = time.Now() c.lastActivityTime = time.Now()
c.cancelQueryCompleted = make(chan struct{}, 1) c.cancelQueryCompleted = make(chan struct{})
close(c.cancelQueryCompleted)
c.doneChan = make(chan struct{}) c.doneChan = make(chan struct{})
c.closedChan = make(chan error) c.closedChan = make(chan error)
c.wbuf = make([]byte, 0, 1024) c.wbuf = make([]byte, 0, 1024)
@ -620,6 +620,14 @@ func (c *Conn) PID() uint32 {
return c.pid return c.pid
} }
// LocalAddr returns the underlying connection's local address
func (c *Conn) LocalAddr() (net.Addr, error) {
if !c.IsAlive() {
return nil, errors.New("connection not ready")
}
return c.conn.LocalAddr(), nil
}
// Close closes a connection. It is safe to call Close on a already closed // Close closes a connection. It is safe to call Close on a already closed
// connection. // connection.
func (c *Conn) Close() (err error) { func (c *Conn) Close() (err error) {
@ -1656,59 +1664,67 @@ func quoteIdentifier(s string) string {
return `"` + strings.Replace(s, `"`, `""`, -1) + `"` return `"` + strings.Replace(s, `"`, `""`, -1) + `"`
} }
func doCancel(c *Conn) error {
network, address := c.config.networkAddress()
cancelConn, err := c.config.Dial(network, address)
if err != nil {
return err
}
defer cancelConn.Close()
// If server doesn't process cancellation request in bounded time then abort.
now := time.Now()
err = cancelConn.SetDeadline(now.Add(15 * time.Second))
if err != nil {
return err
}
buf := make([]byte, 16)
binary.BigEndian.PutUint32(buf[0:4], 16)
binary.BigEndian.PutUint32(buf[4:8], 80877102)
binary.BigEndian.PutUint32(buf[8:12], uint32(c.pid))
binary.BigEndian.PutUint32(buf[12:16], uint32(c.secretKey))
_, err = cancelConn.Write(buf)
if err != nil {
return err
}
_, err = cancelConn.Read(buf)
if err != io.EOF {
return errors.Errorf("Server failed to close connection after cancel query request: %v %v", err, buf)
}
return nil
}
// cancelQuery sends a cancel request to the PostgreSQL server. It returns an // cancelQuery sends a cancel request to the PostgreSQL server. It returns an
// error if unable to deliver the cancel request, but lack of an error does not // error if unable to deliver the cancel request, but lack of an error does not
// ensure that the query was canceled. As specified in the documentation, there // ensure that the query was canceled. As specified in the documentation, there
// is no way to be sure a query was canceled. See // is no way to be sure a query was canceled. See
// https://www.postgresql.org/docs/current/static/protocol-flow.html#AEN112861 // https://www.postgresql.org/docs/current/static/protocol-flow.html#AEN112861
func (c *Conn) cancelQuery() { func (c *Conn) cancelQuery() {
if !atomic.CompareAndSwapInt32(&c.cancelQueryInProgress, 0, 1) {
panic("cancelQuery when cancelQueryInProgress")
}
if err := c.conn.SetDeadline(time.Now()); err != nil { if err := c.conn.SetDeadline(time.Now()); err != nil {
c.Close() // Close connection if unable to set deadline c.Close() // Close connection if unable to set deadline
return return
} }
doCancel := func() error { var cancelFn func(*Conn) error
network, address := c.config.networkAddress() completeCh := make(chan struct{})
cancelConn, err := c.config.Dial(network, address) c.mux.Lock()
if err != nil { c.cancelQueryCompleted = completeCh
return err c.mux.Unlock()
} if c.config.CustomCancel != nil {
defer cancelConn.Close() cancelFn = c.config.CustomCancel
} else {
// If server doesn't process cancellation request in bounded time then abort. cancelFn = doCancel
err = cancelConn.SetDeadline(time.Now().Add(15 * time.Second))
if err != nil {
return err
}
buf := make([]byte, 16)
binary.BigEndian.PutUint32(buf[0:4], 16)
binary.BigEndian.PutUint32(buf[4:8], 80877102)
binary.BigEndian.PutUint32(buf[8:12], uint32(c.pid))
binary.BigEndian.PutUint32(buf[12:16], uint32(c.secretKey))
_, err = cancelConn.Write(buf)
if err != nil {
return err
}
_, err = cancelConn.Read(buf)
if err != io.EOF {
return errors.Errorf("Server failed to close connection after cancel query request: %v %v", err, buf)
}
return nil
} }
go func() { go func() {
err := doCancel() defer close(completeCh)
err := cancelFn(c)
if err != nil { if err != nil {
c.Close() // Something is very wrong. Terminate the connection. c.Close() // Something is very wrong. Terminate the connection.
} }
c.cancelQueryCompleted <- struct{}{}
}() }()
} }
@ -1893,14 +1909,21 @@ func (c *Conn) contextHandler(ctx context.Context) {
} }
} }
func (c *Conn) waitForPreviousCancelQuery(ctx context.Context) error { // WaitUntilReady will return when the connection is ready for another query
if atomic.LoadInt32(&c.cancelQueryInProgress) == 0 { func (c *Conn) WaitUntilReady(ctx context.Context) error {
return nil err := c.waitForPreviousCancelQuery(ctx)
if err != nil {
return err
} }
return c.ensureConnectionReadyForQuery()
}
func (c *Conn) waitForPreviousCancelQuery(ctx context.Context) error {
c.mux.Lock()
completeCh := c.cancelQueryCompleted
c.mux.Unlock()
select { select {
case <-c.cancelQueryCompleted: case <-completeCh:
atomic.StoreInt32(&c.cancelQueryInProgress, 0)
if err := c.conn.SetDeadline(time.Time{}); err != nil { if err := c.conn.SetDeadline(time.Time{}); err != nil {
c.Close() // Close connection if unable to disable deadline c.Close() // Close connection if unable to disable deadline
return err return err