From 03da9fcec609b82d7ebc950069e5963422ef0059 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 25 Jun 2022 17:58:53 -0500 Subject: [PATCH] Check conn liveness before using when idle for more than 1 second Implemented in pgxpool.Pool and database/sql. https://github.com/jackc/pgx/issues/672 --- pgxpool/pool.go | 9 +++++++ pgxpool/pool_test.go | 52 +++++++++++++++++++++++++++++++++++ stdlib/sql.go | 19 +++++++++---- stdlib/sql_test.go | 64 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 5 deletions(-) diff --git a/pgxpool/pool.go b/pgxpool/pool.go index 0446a245..de4e1066 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -417,6 +417,15 @@ func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { } cr := res.Value() + + if res.IdleDuration() > time.Second { + err := cr.conn.PgConn().CheckConn() + if err != nil { + res.Destroy() + continue + } + } + if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) { return cr.getConn(p, res), nil } diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index a6d0a083..3e3058d2 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/pgxtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -141,6 +142,57 @@ func TestPoolAcquireAndConnHijack(t *testing.T) { require.Equal(t, int32(1), n) } +func TestPoolAcquireChecksIdleConns(t *testing.T) { + t.Parallel() + + controllerConn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + defer controllerConn.Close(context.Background()) + pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)") + + pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + defer pool.Close() + + var conns []*pgxpool.Conn + for i := 0; i < 3; i++ { + c, err := pool.Acquire(context.Background()) + require.NoError(t, err) + conns = append(conns, c) + } + + require.EqualValues(t, 3, pool.Stat().TotalConns()) + + var pids []uint32 + for _, c := range conns { + pids = append(pids, c.Conn().PgConn().PID()) + c.Release() + } + + _, err = controllerConn.Exec(context.Background(), `select pg_terminate_backend(n) from unnest($1::int[]) n`, pids) + require.NoError(t, err) + + // All conns are dead they don't know it and neither does the pool. + require.EqualValues(t, 3, pool.Stat().TotalConns()) + + // Wait long enough so the pool will realize it needs to check the connections. + time.Sleep(time.Second) + + // Pool should try all existing connections and find them dead, then create a new connection which should successfully ping. + err = pool.Ping(context.Background()) + require.NoError(t, err) + + // The original 3 conns should have been terminated and the a new conn established for the ping. + require.EqualValues(t, 1, pool.Stat().TotalConns()) + c, err := pool.Acquire(context.Background()) + require.NoError(t, err) + + cPID := c.Conn().PgConn().PID() + c.Release() + + require.NotContains(t, pids, cPID) +} + func TestPoolAcquireFunc(t *testing.T) { t.Parallel() diff --git a/stdlib/sql.go b/stdlib/sql.go index e4565227..8a24c4c5 100644 --- a/stdlib/sql.go +++ b/stdlib/sql.go @@ -308,11 +308,12 @@ func UnregisterConnConfig(connStr string) { } type Conn struct { - conn *pgx.Conn - psCount int64 // Counter used for creating unique prepared statement names - driver *Driver - connConfig pgx.ConnConfig - resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused + conn *pgx.Conn + psCount int64 // Counter used for creating unique prepared statement names + driver *Driver + connConfig pgx.ConnConfig + resetSessionFunc func(context.Context, *pgx.Conn) error // Function is called before a connection is reused + lastResetSessionTime time.Time } // Conn returns the underlying *pgx.Conn @@ -450,6 +451,14 @@ func (c *Conn) ResetSession(ctx context.Context) error { return driver.ErrBadConn } + now := time.Now() + if now.Sub(c.lastResetSessionTime) > time.Second { + if err := c.conn.PgConn().CheckConn(); err != nil { + return driver.ErrBadConn + } + } + c.lastResetSessionTime = now + return c.resetSessionFunc(ctx, c.conn) } diff --git a/stdlib/sql_test.go b/stdlib/sql_test.go index 9106df62..ee038add 100644 --- a/stdlib/sql_test.go +++ b/stdlib/sql_test.go @@ -1154,3 +1154,67 @@ func TestResetSessionHookCalled(t *testing.T) { require.True(t, mockCalled) } + +func TestCheckIdleConn(t *testing.T) { + controllerConn, err := sql.Open("pgx", os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + defer closeDB(t, controllerConn) + + skipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)") + + db, err := sql.Open("pgx", os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + defer closeDB(t, db) + + var conns []*sql.Conn + for i := 0; i < 3; i++ { + c, err := db.Conn(context.Background()) + require.NoError(t, err) + conns = append(conns, c) + } + + require.EqualValues(t, 3, db.Stats().OpenConnections) + + var pids []uint32 + for _, c := range conns { + err := c.Raw(func(driverConn any) error { + pids = append(pids, driverConn.(*stdlib.Conn).Conn().PgConn().PID()) + return nil + }) + require.NoError(t, err) + err = c.Close() + require.NoError(t, err) + } + + // The database/sql connection pool seems to automatically close idle connections to only keep 2 alive. + // require.EqualValues(t, 3, db.Stats().OpenConnections) + + _, err = controllerConn.ExecContext(context.Background(), `select pg_terminate_backend(n) from unnest($1::int[]) n`, pids) + require.NoError(t, err) + + // All conns are dead they don't know it and neither does the pool. But because of database/sql automatically closing + // idle connections we can't be sure how many we should have. require.EqualValues(t, 3, db.Stats().OpenConnections) + + // Wait long enough so the pool will realize it needs to check the connections. + time.Sleep(time.Second) + + // Pool should try all existing connections and find them dead, then create a new connection which should successfully ping. + err = db.PingContext(context.Background()) + require.NoError(t, err) + + // The original 3 conns should have been terminated and the a new conn established for the ping. + require.EqualValues(t, 1, db.Stats().OpenConnections) + c, err := db.Conn(context.Background()) + require.NoError(t, err) + + var cPID uint32 + err = c.Raw(func(driverConn any) error { + cPID = driverConn.(*stdlib.Conn).Conn().PgConn().PID() + return nil + }) + require.NoError(t, err) + err = c.Close() + require.NoError(t, err) + + require.NotContains(t, pids, cPID) +}