Add PgConn.CleanupChan

query-exec-mode
Jack Christensen 2020-08-20 22:00:21 -05:00
parent c894ca8b7d
commit 3eb5432c47
4 changed files with 65 additions and 1 deletions

View File

@ -1,3 +1,7 @@
# Unreleased
* Add PgConn.CleanupChan so connection pools can determine when async close is complete
# 1.6.4 (July 29, 2020)
* Fix deadlock on error after CommandComplete but before ReadyForQuery

View File

@ -15,6 +15,11 @@ func closeConn(t testing.TB, conn *pgconn.PgConn) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, conn.Close(ctx))
select {
case <-conn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
// Do a simple query to ensure the connection is still usable

View File

@ -89,6 +89,8 @@ type PgConn struct {
resultReader ResultReader
multiResultReader MultiResultReader
contextWatcher *ctxwatch.ContextWatcher
cleanupChan chan struct{}
}
// Connect establishes a connection to a PostgreSQL server using the environment and connString (in URL or DSN format)
@ -201,6 +203,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
pgConn := new(PgConn)
pgConn.config = config
pgConn.wbuf = make([]byte, 0, wbufLen)
pgConn.cleanupChan = make(chan struct{})
var err error
network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port)
@ -504,6 +507,7 @@ func (pgConn *PgConn) Close(ctx context.Context) error {
}
pgConn.status = connStatusClosed
defer close(pgConn.cleanupChan)
defer pgConn.conn.Close()
if ctx != context.Background() {
@ -538,6 +542,7 @@ func (pgConn *PgConn) asyncClose() {
pgConn.status = connStatusClosed
go func() {
defer close(pgConn.cleanupChan)
defer pgConn.conn.Close()
deadline := time.Now().Add(time.Second * 15)
@ -554,7 +559,21 @@ func (pgConn *PgConn) asyncClose() {
}()
}
// CleanupChan returns a channel that will be closed after all underlying resources have been cleaned up. A closed
// connection is no longer usable, but underlying resources, in particular the net.Conn, may not have finished closing
// yet. This is because certain errors such as a context cancellation require that the interrupted function call return
// immediately, but the error may also cause the connection to be closed. In these cases the underlying resources are
// closed asynchronously.
//
// This is only likely to be useful to connection pools. It gives them a way avoid establishing a new connection while
// an old connection is still being cleaned up and thereby exceeding the maximum pool size.
func (pgConn *PgConn) CleanupChan() chan (struct{}) {
return pgConn.cleanupChan
}
// IsClosed reports if the connection has been closed.
//
// CleanupChan() can be used to determine if all cleanup has been completed.
func (pgConn *PgConn) IsClosed() bool {
return pgConn.status < connStatusIdle
}
@ -1585,7 +1604,8 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
status: connStatusIdle,
wbuf: make([]byte, 0, wbufLen),
wbuf: make([]byte, 0, wbufLen),
cleanupChan: make(chan struct{}),
}
pgConn.contextWatcher = ctxwatch.NewContextWatcher(

View File

@ -547,6 +547,11 @@ func TestConnExecContextCanceled(t *testing.T) {
err = multiResult.Close()
assert.True(t, pgconn.Timeout(err))
assert.True(t, pgConn.IsClosed())
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
func TestConnExecContextPrecanceled(t *testing.T) {
@ -680,6 +685,11 @@ func TestConnExecParamsCanceled(t *testing.T) {
assert.True(t, pgconn.Timeout(err))
assert.True(t, pgConn.IsClosed())
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
func TestConnExecParamsPrecanceled(t *testing.T) {
@ -824,6 +834,11 @@ func TestConnExecPreparedCanceled(t *testing.T) {
assert.Equal(t, pgconn.CommandTag(nil), commandTag)
assert.True(t, pgconn.Timeout(err))
assert.True(t, pgConn.IsClosed())
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
func TestConnExecPreparedPrecanceled(t *testing.T) {
@ -1306,6 +1321,11 @@ func TestConnCopyToCanceled(t *testing.T) {
assert.Equal(t, pgconn.CommandTag(nil), res)
assert.True(t, pgConn.IsClosed())
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
func TestConnCopyToPrecanceled(t *testing.T) {
@ -1397,6 +1417,11 @@ func TestConnCopyFromCanceled(t *testing.T) {
assert.Error(t, err)
assert.True(t, pgConn.IsClosed())
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
func TestConnCopyFromPrecanceled(t *testing.T) {
@ -1647,6 +1672,11 @@ func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) {
err = multiResult.Close()
assert.True(t, pgconn.Timeout(err))
assert.True(t, pgConn.IsClosed())
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
otherConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
require.NoError(t, err)
@ -1750,6 +1780,11 @@ func TestConnCloseWhileCancellableQueryInProgress(t *testing.T) {
closeCtx, _ := context.WithCancel(context.Background())
pgConn.Close(closeCtx)
select {
case <-pgConn.CleanupChan():
case <-time.After(5 * time.Second):
t.Fatal("Connection cleanup exceeded maximum time")
}
}
// https://github.com/jackc/pgx/issues/800