pgxpool pools always connect lazily

Rename constructor functions now that they don't actually connect.
pull/1281/head
Jack Christensen 2022-07-10 14:58:30 -05:00
parent ca41a6a222
commit a059d1099f
10 changed files with 79 additions and 268 deletions

View File

@ -20,6 +20,10 @@ pgconn now supports pipeline mode.
`*PgConn.ReceiveResults` removed. Use pipeline mode instead. `*PgConn.ReceiveResults` removed. Use pipeline mode instead.
## pgxpool
`Connect` and `ConnectConfig` have been renamed to `New` and `NewConfig` respectively. The `LazyConnect` option has been removed. Pools always lazily connect.
## pgtype ## pgtype
The `pgtype` package has been significantly changed. The `pgtype` package has been significantly changed.

View File

@ -13,7 +13,7 @@ var pool *pgxpool.Pool
func main() { func main() {
var err error var err error
pool, err = pgxpool.Connect(context.Background(), os.Getenv("DATABASE_URL")) pool, err = pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, "Unable to connect to database:", err) fmt.Fprintln(os.Stderr, "Unable to connect to database:", err)
os.Exit(1) os.Exit(1)

View File

@ -75,7 +75,7 @@ func main() {
log.Fatalln("Unable to parse DATABASE_URL:", err) log.Fatalln("Unable to parse DATABASE_URL:", err)
} }
db, err = pgxpool.ConnectConfig(context.Background(), poolConfig) db, err = pgxpool.NewConfig(context.Background(), poolConfig)
if err != nil { if err != nil {
log.Fatalln("Unable to create connection pool:", err) log.Fatalln("Unable to create connection pool:", err)
} }

View File

@ -11,7 +11,7 @@ import (
) )
func BenchmarkAcquireAndRelease(b *testing.B) { func BenchmarkAcquireAndRelease(b *testing.B) {
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(b, err) require.NoError(b, err)
defer pool.Close() defer pool.Close()
@ -34,7 +34,7 @@ func BenchmarkMinimalPreparedSelectBaseline(b *testing.B) {
return err return err
} }
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(b, err) require.NoError(b, err)
conn, err := db.Acquire(context.Background()) conn, err := db.Acquire(context.Background())
@ -65,7 +65,7 @@ func BenchmarkMinimalPreparedSelect(b *testing.B) {
return err return err
} }
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(b, err) require.NoError(b, err)
var n int64 var n int64

View File

@ -148,7 +148,6 @@ func assertConfigsEqual(t *testing.T, expected, actual *pgxpool.Config, testName
assert.Equalf(t, expected.MaxConns, actual.MaxConns, "%s - MaxConns", testName) assert.Equalf(t, expected.MaxConns, actual.MaxConns, "%s - MaxConns", testName)
assert.Equalf(t, expected.MinConns, actual.MinConns, "%s - MinConns", testName) assert.Equalf(t, expected.MinConns, actual.MinConns, "%s - MinConns", testName)
assert.Equalf(t, expected.HealthCheckPeriod, actual.HealthCheckPeriod, "%s - HealthCheckPeriod", testName) assert.Equalf(t, expected.HealthCheckPeriod, actual.HealthCheckPeriod, "%s - HealthCheckPeriod", testName)
assert.Equalf(t, expected.LazyConnect, actual.LazyConnect, "%s - LazyConnect", testName)
assertConnConfigsEqual(t, expected.ConnConfig, actual.ConnConfig, testName) assertConnConfigsEqual(t, expected.ConnConfig, actual.ConnConfig, testName)
} }

View File

@ -12,7 +12,7 @@ import (
func TestConnExec(t *testing.T) { func TestConnExec(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -26,7 +26,7 @@ func TestConnExec(t *testing.T) {
func TestConnQuery(t *testing.T) { func TestConnQuery(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -40,7 +40,7 @@ func TestConnQuery(t *testing.T) {
func TestConnQueryRow(t *testing.T) { func TestConnQueryRow(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -54,7 +54,7 @@ func TestConnQueryRow(t *testing.T) {
func TestConnSendBatch(t *testing.T) { func TestConnSendBatch(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -68,7 +68,7 @@ func TestConnSendBatch(t *testing.T) {
func TestConnCopyFrom(t *testing.T) { func TestConnCopyFrom(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()

View File

@ -2,11 +2,11 @@
/* /*
pgxpool implements a nearly identical interface to pgx connections. pgxpool implements a nearly identical interface to pgx connections.
Establishing a Connection Creating a Pool
The primary way of establishing a connection is with `pgxpool.Connect`. The primary way of creating a pool is with `pgxpool.New`.
pool, err := pgxpool.Connect(context.Background(), os.Getenv("DATABASE_URL")) pool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
The database connection string can be in URL or DSN format. PostgreSQL settings, pgx settings, and pool settings can be The database connection string can be in URL or DSN format. PostgreSQL settings, pgx settings, and pool settings can be
specified here. In addition, a config struct can be created by `ParseConfig` and modified before establishing the specified here. In addition, a config struct can be created by `ParseConfig` and modified before establishing the
@ -20,6 +20,9 @@ connection with `ConnectConfig`.
// do something with every new connection // do something with every new connection
} }
pool, err := pgxpool.ConnectConfig(context.Background(), config) pool, err := pgxpool.NewConfig(context.Background(), config)
A pool returns without waiting for any connections to be established. Acquire a connection immediately after creating
the pool to check if a connection can successfully be established.
*/ */
package pgxpool package pgxpool

View File

@ -122,11 +122,6 @@ type Config struct {
// HealthCheckPeriod is the duration between checks of the health of idle connections. // HealthCheckPeriod is the duration between checks of the health of idle connections.
HealthCheckPeriod time.Duration HealthCheckPeriod time.Duration
// If set to true, pool doesn't do any I/O operation on initialization.
// And connects to the server only when the pool starts to be used.
// The default is false.
LazyConnect bool
createdByParseConfig bool // Used to enforce created by ParseConfig rule. createdByParseConfig bool // Used to enforce created by ParseConfig rule.
} }
@ -143,20 +138,18 @@ func (c *Config) Copy() *Config {
// ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config. // ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config.
func (c *Config) ConnString() string { return c.ConnConfig.ConnString() } func (c *Config) ConnString() string { return c.ConnConfig.ConnString() }
// Connect creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial // New creates a new Pool. See ParseConfig for information on connString format.
// connection. See ParseConfig for information on connString format. func New(ctx context.Context, connString string) (*Pool, error) {
func Connect(ctx context.Context, connString string) (*Pool, error) {
config, err := ParseConfig(connString) config, err := ParseConfig(connString)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return ConnectConfig(ctx, config) return NewConfig(ctx, config)
} }
// ConnectConfig creates a new Pool and immediately establishes one connection. ctx can be used to cancel this initial // NewConfig creates a new Pool. config must have been created by ParseConfig.
// connection. config must have been created by ParseConfig. func NewConfig(ctx context.Context, config *Config) (*Pool, error) {
func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from // Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
// zero values. // zero values.
if !config.createdByParseConfig { if !config.createdByParseConfig {
@ -222,23 +215,10 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
config.MaxConns, config.MaxConns,
) )
if !config.LazyConnect { go func() {
if err := p.createIdleResources(ctx, int(p.minConns)); err != nil { p.checkMinConns() // reach min conns as soon as possible
// Couldn't create resources for minpool size. Close unhealthy pool. p.backgroundHealthCheck()
p.Close() }()
return nil, err
}
// Initially establish one connection
res, err := p.p.Acquire(ctx)
if err != nil {
p.Close()
return nil, err
}
res.Release()
}
go p.backgroundHealthCheck()
return p, nil return p, nil
} }

View File

@ -19,7 +19,7 @@ import (
func TestConnect(t *testing.T) { func TestConnect(t *testing.T) {
t.Parallel() t.Parallel()
connString := os.Getenv("PGX_TEST_DATABASE") connString := os.Getenv("PGX_TEST_DATABASE")
pool, err := pgxpool.Connect(context.Background(), connString) pool, err := pgxpool.New(context.Background(), connString)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, connString, pool.Config().ConnString()) assert.Equal(t, connString, pool.Config().ConnString())
pool.Close() pool.Close()
@ -30,7 +30,7 @@ func TestConnectConfig(t *testing.T) {
connString := os.Getenv("PGX_TEST_DATABASE") connString := os.Getenv("PGX_TEST_DATABASE")
config, err := pgxpool.ParseConfig(connString) config, err := pgxpool.ParseConfig(connString)
require.NoError(t, err) require.NoError(t, err)
pool, err := pgxpool.ConnectConfig(context.Background(), config) pool, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
assertConfigsEqual(t, config, pool.Config(), "Pool.Config() returns original config") assertConfigsEqual(t, config, pool.Config(), "Pool.Config() returns original config")
pool.Close() pool.Close()
@ -47,39 +47,12 @@ func TestParseConfigExtractsPoolArguments(t *testing.T) {
assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns") assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns")
} }
func TestConnectCancel(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
cancel()
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
assert.Nil(t, pool)
assert.Equal(t, context.Canceled, err)
}
func TestLazyConnect(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
assert.NoError(t, err)
config.LazyConnect = true
ctx, cancel := context.WithCancel(context.Background())
cancel()
pool, err := pgxpool.ConnectConfig(ctx, config)
assert.NoError(t, err)
_, err = pool.Exec(ctx, "SELECT 1")
assert.Equal(t, context.Canceled, err)
}
func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) { func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
t.Parallel() t.Parallel()
config := &pgxpool.Config{} config := &pgxpool.Config{}
require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgxpool.ConnectConfig(context.Background(), config) }) require.PanicsWithValue(t, "config must be created by ParseConfig", func() { pgxpool.NewConfig(context.Background(), config) })
} }
func TestConfigCopyReturnsEqualConfig(t *testing.T) { func TestConfigCopyReturnsEqualConfig(t *testing.T) {
@ -99,7 +72,7 @@ func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
copied := original.Copy() copied := original.Copy()
assert.NotPanics(t, func() { assert.NotPanics(t, func() {
_, err = pgxpool.ConnectConfig(context.Background(), copied) _, err = pgxpool.NewConfig(context.Background(), copied)
}) })
assert.NoError(t, err) assert.NoError(t, err)
} }
@ -107,7 +80,7 @@ func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
func TestPoolAcquireAndConnRelease(t *testing.T) { func TestPoolAcquireAndConnRelease(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -121,7 +94,7 @@ func TestPoolAcquireAndConnHijack(t *testing.T) {
ctx := context.Background() ctx := context.Background()
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -150,7 +123,7 @@ func TestPoolAcquireChecksIdleConns(t *testing.T) {
defer controllerConn.Close(context.Background()) defer controllerConn.Close(context.Background())
pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)") pgxtest.SkipCockroachDB(t, controllerConn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -196,7 +169,7 @@ func TestPoolAcquireChecksIdleConns(t *testing.T) {
func TestPoolAcquireFunc(t *testing.T) { func TestPoolAcquireFunc(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -211,7 +184,7 @@ func TestPoolAcquireFunc(t *testing.T) {
func TestPoolAcquireFuncReturnsFnError(t *testing.T) { func TestPoolAcquireFuncReturnsFnError(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -232,7 +205,7 @@ func TestPoolBeforeConnect(t *testing.T) {
return nil return nil
} }
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -253,7 +226,7 @@ func TestPoolAfterConnect(t *testing.T) {
return err return err
} }
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -276,7 +249,7 @@ func TestPoolBeforeAcquire(t *testing.T) {
return acquireAttempts%2 == 0 return acquireAttempts%2 == 0
} }
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -308,7 +281,7 @@ func TestPoolAfterRelease(t *testing.T) {
t.Parallel() t.Parallel()
func() { func() {
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -331,7 +304,7 @@ func TestPoolAfterRelease(t *testing.T) {
return afterReleaseCount%2 == 1 return afterReleaseCount%2 == 1
} }
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -351,19 +324,11 @@ func TestPoolAfterRelease(t *testing.T) {
func TestPoolAcquireAllIdle(t *testing.T) { func TestPoolAcquireAllIdle(t *testing.T) {
t.Parallel() t.Parallel()
db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) db, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
conns := db.AcquireAllIdle(context.Background()) conns := make([]*pgxpool.Conn, 3)
assert.Len(t, conns, 1)
for _, c := range conns {
c.Release()
}
waitForReleaseToComplete()
conns = make([]*pgxpool.Conn, 3)
for i := range conns { for i := range conns {
conns[i], err = db.Acquire(context.Background()) conns[i], err = db.Acquire(context.Background())
assert.NoError(t, err) assert.NoError(t, err)
@ -392,7 +357,7 @@ func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
config.MaxConnLifetime = 250 * time.Millisecond config.MaxConnLifetime = 250 * time.Millisecond
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -411,7 +376,7 @@ func TestConnReleaseChecksMaxConnLifetime(t *testing.T) {
func TestConnReleaseClosesBusyConn(t *testing.T) { func TestConnReleaseClosesBusyConn(t *testing.T) {
t.Parallel() t.Parallel()
db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) db, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -437,7 +402,7 @@ func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) {
config.MaxConnLifetime = 100 * time.Millisecond config.MaxConnLifetime = 100 * time.Millisecond
config.HealthCheckPeriod = 100 * time.Millisecond config.HealthCheckPeriod = 100 * time.Millisecond
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -460,7 +425,7 @@ func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) {
config.MaxConnIdleTime = 100 * time.Millisecond config.MaxConnIdleTime = 100 * time.Millisecond
config.HealthCheckPeriod = 150 * time.Millisecond config.HealthCheckPeriod = 150 * time.Millisecond
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -487,7 +452,7 @@ func TestPoolBackgroundChecksMinConns(t *testing.T) {
config.HealthCheckPeriod = 100 * time.Millisecond config.HealthCheckPeriod = 100 * time.Millisecond
config.MinConns = 2 config.MinConns = 2
db, err := pgxpool.ConnectConfig(context.Background(), config) db, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -500,7 +465,7 @@ func TestPoolBackgroundChecksMinConns(t *testing.T) {
func TestPoolExec(t *testing.T) { func TestPoolExec(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -510,7 +475,7 @@ func TestPoolExec(t *testing.T) {
func TestPoolQuery(t *testing.T) { func TestPoolQuery(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -539,7 +504,7 @@ func TestPoolQuery(t *testing.T) {
func TestPoolQueryRow(t *testing.T) { func TestPoolQueryRow(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -555,7 +520,7 @@ func TestPoolQueryRow(t *testing.T) {
func TestPoolQueryRowErrNoRows(t *testing.T) { func TestPoolQueryRowErrNoRows(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -566,7 +531,7 @@ func TestPoolQueryRowErrNoRows(t *testing.T) {
func TestPoolSendBatch(t *testing.T) { func TestPoolSendBatch(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -586,7 +551,7 @@ func TestPoolCopyFrom(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -629,7 +594,7 @@ func TestConnReleaseClosesConnInFailedTransaction(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -675,7 +640,7 @@ func TestConnReleaseClosesConnInTransaction(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -716,7 +681,7 @@ func TestConnReleaseDestroysClosedConn(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -737,7 +702,7 @@ func TestConnReleaseDestroysClosedConn(t *testing.T) {
func TestConnPoolQueryConcurrentLoad(t *testing.T) { func TestConnPoolQueryConcurrentLoad(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -763,7 +728,7 @@ func TestConnReleaseWhenBeginFail(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
db, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) db, err := pgxpool.New(ctx, os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -787,7 +752,7 @@ func TestConnReleaseWhenBeginFail(t *testing.T) {
} }
func TestTxBeginFuncNestedTransactionCommit(t *testing.T) { func TestTxBeginFuncNestedTransactionCommit(t *testing.T) {
db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) db, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -834,7 +799,7 @@ func TestTxBeginFuncNestedTransactionCommit(t *testing.T) {
} }
func TestTxBeginFuncNestedTransactionRollback(t *testing.T) { func TestTxBeginFuncNestedTransactionRollback(t *testing.T) {
db, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) db, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer db.Close() defer db.Close()
@ -877,7 +842,7 @@ func TestTxBeginFuncNestedTransactionRollback(t *testing.T) {
} }
func TestIdempotentPoolClose(t *testing.T) { func TestIdempotentPoolClose(t *testing.T) {
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
// Close the open pool. // Close the open pool.
@ -887,7 +852,7 @@ func TestIdempotentPoolClose(t *testing.T) {
require.NotPanics(t, func() { pool.Close() }) require.NotPanics(t, func() { pool.Close() })
} }
func TestConnectCreatesMinPool(t *testing.T) { func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
t.Parallel() t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
@ -895,7 +860,6 @@ func TestConnectCreatesMinPool(t *testing.T) {
config.MinConns = int32(12) config.MinConns = int32(12)
config.MaxConns = int32(15) config.MaxConns = int32(15)
config.LazyConnect = false
acquireAttempts := int64(0) acquireAttempts := int64(0)
connectAttempts := int64(0) connectAttempts := int64(0)
@ -909,166 +873,27 @@ func TestConnectCreatesMinPool(t *testing.T) {
return nil return nil
} }
pool, err := pgxpool.ConnectConfig(context.Background(), config) pool, err := pgxpool.NewConfig(context.Background(), config)
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
stat := pool.Stat() for i := 0; i < 500; i++ {
require.Equal(t, int32(12), stat.IdleConns()) time.Sleep(10 * time.Millisecond)
require.Equal(t, int64(1), stat.AcquireCount())
require.Equal(t, int32(12), stat.TotalConns())
require.Equal(t, int64(0), acquireAttempts)
require.Equal(t, int64(12), connectAttempts)
}
func TestConnectSkipMinPoolWithLazy(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) stat := pool.Stat()
require.NoError(t, err) if stat.IdleConns() == 12 && stat.AcquireCount() == 0 && stat.TotalConns() == 12 && atomic.LoadInt64(&acquireAttempts) == 0 && atomic.LoadInt64(&connectAttempts) == 12 {
return
config.MinConns = int32(12)
config.MaxConns = int32(15)
config.LazyConnect = true
acquireAttempts := int64(0)
connectAttempts := int64(0)
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
atomic.AddInt64(&acquireAttempts, 1)
return true
}
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
atomic.AddInt64(&connectAttempts, 1)
return nil
}
pool, err := pgxpool.ConnectConfig(context.Background(), config)
require.NoError(t, err)
defer pool.Close()
stat := pool.Stat()
require.Equal(t, int32(0), stat.IdleConns())
require.Equal(t, int64(0), stat.AcquireCount())
require.Equal(t, int32(0), stat.TotalConns())
require.Equal(t, int64(0), acquireAttempts)
require.Equal(t, int64(0), connectAttempts)
}
func TestConnectMinPoolZero(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)
config.MinConns = int32(0)
config.MaxConns = int32(15)
config.LazyConnect = false
acquireAttempts := int64(0)
connectAttempts := int64(0)
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
atomic.AddInt64(&acquireAttempts, 1)
return true
}
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
atomic.AddInt64(&connectAttempts, 1)
return nil
}
pool, err := pgxpool.ConnectConfig(context.Background(), config)
require.NoError(t, err)
defer pool.Close()
stat := pool.Stat()
require.Equal(t, int32(1), stat.IdleConns())
require.Equal(t, int64(1), stat.AcquireCount())
require.Equal(t, int32(1), stat.TotalConns())
require.Equal(t, int64(0), acquireAttempts)
require.Equal(t, int64(1), connectAttempts)
}
func TestCreateMinPoolClosesConnectionsOnError(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)
config.MinConns = int32(12)
config.MaxConns = int32(15)
config.LazyConnect = false
acquireAttempts := int64(0)
madeConnections := int64(0)
conns := make(chan *pgx.Conn, 15)
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
atomic.AddInt64(&acquireAttempts, 1)
return true
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
conns <- conn
atomic.AddInt64(&madeConnections, 1)
mc := atomic.LoadInt64(&madeConnections)
if mc == 10 {
return errors.New("mock error")
} }
return nil
}
pool, err := pgxpool.ConnectConfig(context.Background(), config)
require.Error(t, err)
require.Nil(t, pool)
close(conns)
for conn := range conns {
require.True(t, conn.IsClosed())
} }
require.Equal(t, int64(0), acquireAttempts) t.Fatal("did not reach min pool size")
require.True(t, madeConnections >= 10, "Expected %d got %d", 10, madeConnections)
}
func TestCreateMinPoolReturnsFirstError(t *testing.T) {
t.Parallel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)
config.MinConns = int32(12)
config.MaxConns = int32(15)
config.LazyConnect = false
acquireAttempts := int64(0)
connectAttempts := int64(0)
mockErr := errors.New("mock connect error")
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
atomic.AddInt64(&acquireAttempts, 1)
return true
}
config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error {
atomic.AddInt64(&connectAttempts, 1)
ca := atomic.LoadInt64(&connectAttempts)
if ca >= 5 {
return mockErr
}
return nil
}
pool, err := pgxpool.ConnectConfig(context.Background(), config)
require.Nil(t, pool)
require.Error(t, err)
require.True(t, connectAttempts >= 5, "Expected %d got %d", 5, connectAttempts)
require.ErrorIs(t, err, mockErr)
} }
func TestPoolSendBatchBatchCloseTwice(t *testing.T) { func TestPoolSendBatchBatchCloseTwice(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()

View File

@ -12,7 +12,7 @@ import (
func TestTxExec(t *testing.T) { func TestTxExec(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -26,7 +26,7 @@ func TestTxExec(t *testing.T) {
func TestTxQuery(t *testing.T) { func TestTxQuery(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -40,7 +40,7 @@ func TestTxQuery(t *testing.T) {
func TestTxQueryRow(t *testing.T) { func TestTxQueryRow(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -54,7 +54,7 @@ func TestTxQueryRow(t *testing.T) {
func TestTxSendBatch(t *testing.T) { func TestTxSendBatch(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()
@ -68,7 +68,7 @@ func TestTxSendBatch(t *testing.T) {
func TestTxCopyFrom(t *testing.T) { func TestTxCopyFrom(t *testing.T) {
t.Parallel() t.Parallel()
pool, err := pgxpool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pool, err := pgxpool.New(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err) require.NoError(t, err)
defer pool.Close() defer pool.Close()