mirror of https://github.com/jackc/pgx.git
add BeforeClose to pgxpool.Pool
parent
d8b38b28be
commit
7f2bb9595f
|
@ -85,6 +85,7 @@ type Pool struct {
|
||||||
afterConnect func(context.Context, *pgx.Conn) error
|
afterConnect func(context.Context, *pgx.Conn) error
|
||||||
beforeAcquire func(context.Context, *pgx.Conn) bool
|
beforeAcquire func(context.Context, *pgx.Conn) bool
|
||||||
afterRelease func(*pgx.Conn) bool
|
afterRelease func(*pgx.Conn) bool
|
||||||
|
beforeClose func(*pgx.Conn)
|
||||||
minConns int32
|
minConns int32
|
||||||
maxConns int32
|
maxConns int32
|
||||||
maxConnLifetime time.Duration
|
maxConnLifetime time.Duration
|
||||||
|
@ -111,7 +112,7 @@ type Config struct {
|
||||||
AfterConnect func(context.Context, *pgx.Conn) error
|
AfterConnect func(context.Context, *pgx.Conn) error
|
||||||
|
|
||||||
// BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
|
// BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
|
||||||
// acquision or false to indicate that the connection should be destroyed and a different connection should be
|
// acquisition or false to indicate that the connection should be destroyed and a different connection should be
|
||||||
// acquired.
|
// acquired.
|
||||||
BeforeAcquire func(context.Context, *pgx.Conn) bool
|
BeforeAcquire func(context.Context, *pgx.Conn) bool
|
||||||
|
|
||||||
|
@ -119,6 +120,9 @@ type Config struct {
|
||||||
// return the connection to the pool or false to destroy the connection.
|
// return the connection to the pool or false to destroy the connection.
|
||||||
AfterRelease func(*pgx.Conn) bool
|
AfterRelease func(*pgx.Conn) bool
|
||||||
|
|
||||||
|
// BeforeClose is called right before a connection is closed and removed from the pool.
|
||||||
|
BeforeClose func(*pgx.Conn)
|
||||||
|
|
||||||
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
|
// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
|
||||||
MaxConnLifetime time.Duration
|
MaxConnLifetime time.Duration
|
||||||
|
|
||||||
|
@ -180,6 +184,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||||
afterConnect: config.AfterConnect,
|
afterConnect: config.AfterConnect,
|
||||||
beforeAcquire: config.BeforeAcquire,
|
beforeAcquire: config.BeforeAcquire,
|
||||||
afterRelease: config.AfterRelease,
|
afterRelease: config.AfterRelease,
|
||||||
|
beforeClose: config.BeforeClose,
|
||||||
minConns: config.MinConns,
|
minConns: config.MinConns,
|
||||||
maxConns: config.MaxConns,
|
maxConns: config.MaxConns,
|
||||||
maxConnLifetime: config.MaxConnLifetime,
|
maxConnLifetime: config.MaxConnLifetime,
|
||||||
|
@ -236,6 +241,9 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
|
||||||
Destructor: func(value *connResource) {
|
Destructor: func(value *connResource) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
conn := value.conn
|
conn := value.conn
|
||||||
|
if p.beforeClose != nil {
|
||||||
|
p.beforeClose(conn)
|
||||||
|
}
|
||||||
conn.Close(ctx)
|
conn.Close(ctx)
|
||||||
select {
|
select {
|
||||||
case <-conn.PgConn().CleanupDone():
|
case <-conn.PgConn().CleanupDone():
|
||||||
|
|
|
@ -347,6 +347,49 @@ func TestPoolAfterRelease(t *testing.T) {
|
||||||
assert.EqualValues(t, 5, len(connPIDs))
|
assert.EqualValues(t, 5, len(connPIDs))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolBeforeClose(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
func() {
|
||||||
|
pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer pool.Close()
|
||||||
|
|
||||||
|
err = pool.AcquireFunc(context.Background(), func(conn *pgxpool.Conn) error {
|
||||||
|
if conn.Conn().PgConn().ParameterStatus("crdb_version") != "" {
|
||||||
|
t.Skip("Server does not support backend PID")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
connPIDs := make(chan uint32, 5)
|
||||||
|
config.BeforeClose = func(c *pgx.Conn) {
|
||||||
|
connPIDs <- c.PgConn().PID()
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := pgxpool.NewWithConfig(context.Background(), config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
acquiredPIDs := make([]uint32, 0, 5)
|
||||||
|
closedPIDs := make([]uint32, 0, 5)
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
conn, err := db.Acquire(context.Background())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
acquiredPIDs = append(acquiredPIDs, conn.Conn().PgConn().PID())
|
||||||
|
conn.Release()
|
||||||
|
db.Reset()
|
||||||
|
closedPIDs = append(closedPIDs, <-connPIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, acquiredPIDs, closedPIDs)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPoolAcquireAllIdle(t *testing.T) {
|
func TestPoolAcquireAllIdle(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue