mirror of https://github.com/jackc/pgx.git
Check conn liveness before using when idle for more than 1 second
Implemented in pgxpool.Pool and database/sql. https://github.com/jackc/pgx/issues/672pull/1281/head
parent
26eda0f86d
commit
03da9fcec6
|
@ -417,6 +417,15 @@ func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cr := res.Value()
|
cr := res.Value()
|
||||||
|
|
||||||
|
if res.IdleDuration() > time.Second {
|
||||||
|
err := cr.conn.PgConn().CheckConn()
|
||||||
|
if err != nil {
|
||||||
|
res.Destroy()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
|
if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
|
||||||
return cr.getConn(p, res), nil
|
return cr.getConn(p, res), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
"github.com/jackc/pgx/v5/pgxtest"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -141,6 +142,57 @@ func TestPoolAcquireAndConnHijack(t *testing.T) {
|
||||||
require.Equal(t, int32(1), n)
|
require.Equal(t, int32(1), n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolAcquireChecksIdleConns(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
controllerConn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer controllerConn.Close(context.Background())
|
||||||
|
pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
|
||||||
|
|
||||||
|
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer pool.Close()
|
||||||
|
|
||||||
|
var conns []*pgxpool.Conn
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
c, err := pool.Acquire(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
conns = append(conns, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.EqualValues(t, 3, pool.Stat().TotalConns())
|
||||||
|
|
||||||
|
var pids []uint32
|
||||||
|
for _, c := range conns {
|
||||||
|
pids = append(pids, c.Conn().PgConn().PID())
|
||||||
|
c.Release()
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = controllerConn.Exec(context.Background(), `select pg_terminate_backend(n) from unnest($1::int[]) n`, pids)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// All conns are dead they don't know it and neither does the pool.
|
||||||
|
require.EqualValues(t, 3, pool.Stat().TotalConns())
|
||||||
|
|
||||||
|
// Wait long enough so the pool will realize it needs to check the connections.
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// Pool should try all existing connections and find them dead, then create a new connection which should successfully ping.
|
||||||
|
err = pool.Ping(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// The original 3 conns should have been terminated and the a new conn established for the ping.
|
||||||
|
require.EqualValues(t, 1, pool.Stat().TotalConns())
|
||||||
|
c, err := pool.Acquire(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cPID := c.Conn().PgConn().PID()
|
||||||
|
c.Release()
|
||||||
|
|
||||||
|
require.NotContains(t, pids, cPID)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPoolAcquireFunc(t *testing.T) {
|
func TestPoolAcquireFunc(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
|
@ -313,6 +313,7 @@ type Conn struct {
|
||||||
driver *Driver
|
driver *Driver
|
||||||
connConfig pgx.ConnConfig
|
connConfig pgx.ConnConfig
|
||||||
resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused
|
resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused
|
||||||
|
lastResetSessionTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conn returns the underlying *pgx.Conn
|
// Conn returns the underlying *pgx.Conn
|
||||||
|
@ -450,6 +451,14 @@ func (c *Conn) ResetSession(ctx context.Context) error {
|
||||||
return driver.ErrBadConn
|
return driver.ErrBadConn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
if now.Sub(c.lastResetSessionTime) > time.Second {
|
||||||
|
if err := c.conn.PgConn().CheckConn(); err != nil {
|
||||||
|
return driver.ErrBadConn
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.lastResetSessionTime = now
|
||||||
|
|
||||||
return c.resetSessionFunc(ctx, c.conn)
|
return c.resetSessionFunc(ctx, c.conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1154,3 +1154,67 @@ func TestResetSessionHookCalled(t *testing.T) {
|
||||||
|
|
||||||
require.True(t, mockCalled)
|
require.True(t, mockCalled)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCheckIdleConn(t *testing.T) {
|
||||||
|
controllerConn, err := sql.Open("pgx", os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer closeDB(t, controllerConn)
|
||||||
|
|
||||||
|
skipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
|
||||||
|
|
||||||
|
db, err := sql.Open("pgx", os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer closeDB(t, db)
|
||||||
|
|
||||||
|
var conns []*sql.Conn
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
c, err := db.Conn(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
conns = append(conns, c)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.EqualValues(t, 3, db.Stats().OpenConnections)
|
||||||
|
|
||||||
|
var pids []uint32
|
||||||
|
for _, c := range conns {
|
||||||
|
err := c.Raw(func(driverConn any) error {
|
||||||
|
pids = append(pids, driverConn.(*stdlib.Conn).Conn().PgConn().PID())
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = c.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The database/sql connection pool seems to automatically close idle connections to only keep 2 alive.
|
||||||
|
// require.EqualValues(t, 3, db.Stats().OpenConnections)
|
||||||
|
|
||||||
|
_, err = controllerConn.ExecContext(context.Background(), `select pg_terminate_backend(n) from unnest($1::int[]) n`, pids)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// All conns are dead they don't know it and neither does the pool. But because of database/sql automatically closing
|
||||||
|
// idle connections we can't be sure how many we should have. require.EqualValues(t, 3, db.Stats().OpenConnections)
|
||||||
|
|
||||||
|
// Wait long enough so the pool will realize it needs to check the connections.
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// Pool should try all existing connections and find them dead, then create a new connection which should successfully ping.
|
||||||
|
err = db.PingContext(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// The original 3 conns should have been terminated and the a new conn established for the ping.
|
||||||
|
require.EqualValues(t, 1, db.Stats().OpenConnections)
|
||||||
|
c, err := db.Conn(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var cPID uint32
|
||||||
|
err = c.Raw(func(driverConn any) error {
|
||||||
|
cPID = driverConn.(*stdlib.Conn).Conn().PgConn().PID()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = c.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NotContains(t, pids, cPID)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue