diff --git a/go.mod b/go.mod index 9b854033..8fec8975 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/jackc/pgio v1.0.0 github.com/jackc/pgproto3/v2 v2.0.1 github.com/jackc/pgtype v1.1.0 - github.com/jackc/puddle v1.0.1-0.20200126004755-807afe48a83d + github.com/jackc/puddle v1.0.1-0.20200203142542-cde4bcb7af80 github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-isatty v0.0.9 // indirect github.com/rs/zerolog v1.15.0 diff --git a/go.sum b/go.sum index 20c51f28..107b2ad2 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/jackc/puddle v1.0.0 h1:rbjAshlgKscNa7j0jAM0uNQflis5o2XUogPMVAwtcsM= github.com/jackc/puddle v1.0.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.0.1-0.20200126004755-807afe48a83d h1:lYLhmugF2D1ysJgU4pyW/GcdH+X4O3T96duzNdxcHqY= github.com/jackc/puddle v1.0.1-0.20200126004755-807afe48a83d/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.0.1-0.20200203142542-cde4bcb7af80 h1:69wtn0QFLOhbxAnvoPYtkMcMTsa6lBF/Wp9341J5Yqo= +github.com/jackc/puddle v1.0.1-0.20200203142542-cde4bcb7af80/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= diff --git a/pgxpool/pool.go b/pgxpool/pool.go index 524373b9..02ff4e5a 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -13,6 +13,7 @@ import ( ) var defaultMaxConns = int32(4) +var defaultMinConns = int32(0) var defaultMaxConnLifetime = time.Hour var defaultMaxConnIdleTime = time.Minute * 30 var defaultHealthCheckPeriod = time.Minute @@ -71,6 +72,7 @@ type Pool struct { afterConnect func(context.Context, *pgx.Conn) error beforeAcquire func(context.Context, *pgx.Conn) bool afterRelease func(*pgx.Conn) bool + minConns int32 maxConnLifetime time.Duration maxConnIdleTime time.Duration healthCheckPeriod time.Duration @@ -103,6 +105,10 @@ type Config struct { // MaxConns is the maximum size of the pool. MaxConns int32 + // MinConns is the minimum size of the pool. The health check will increase the number of connections to this + // amount if it had dropped below. + MinConns int32 + // HealthCheckPeriod is the duration between checks of the health of idle connections. HealthCheckPeriod time.Duration @@ -133,6 +139,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { afterConnect: config.AfterConnect, beforeAcquire: config.BeforeAcquire, afterRelease: config.AfterRelease, + minConns: config.MinConns, maxConnLifetime: config.MaxConnLifetime, maxConnIdleTime: config.MaxConnIdleTime, healthCheckPeriod: config.HealthCheckPeriod, @@ -190,6 +197,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { // addition of the following variables: // // pool_max_conns: integer greater than 0 +// pool_min_conns: integer 0 or greater // pool_max_conn_lifetime: duration string // pool_max_conn_idle_time: duration string // pool_health_check_period: duration string @@ -229,6 +237,17 @@ func ParseConfig(connString string) (*Config, error) { } } + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_min_conns") + n, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return nil, errors.Errorf("cannot parse pool_min_conns: %w", err) + } + config.MinConns = int32(n) + } else { + config.MinConns = defaultMinConns + } + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok { delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime") d, err := time.ParseDuration(s) @@ -282,6 +301,7 @@ func (p *Pool) backgroundHealthCheck() { return case <-ticker.C: p.checkIdleConnsHealth() + p.checkMinConns() } } } @@ -301,6 +321,12 @@ func (p *Pool) checkIdleConnsHealth() { } } +func (p *Pool) checkMinConns() { + for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- { + go p.p.CreateResource(context.Background()) + } +} + func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { for { res, err := p.p.Acquire(ctx) diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index d8ac4f98..2fdc87b4 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -23,10 +23,12 @@ func TestConnect(t *testing.T) { func TestParseConfigExtractsPoolArguments(t *testing.T) { t.Parallel() - config, err := pgxpool.ParseConfig("pool_max_conns=42") + config, err := pgxpool.ParseConfig("pool_max_conns=42 pool_min_conns=1") assert.NoError(t, err) assert.EqualValues(t, 42, config.MaxConns) + assert.EqualValues(t, 1, config.MinConns) assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_max_conns") + assert.NotContains(t, config.ConnConfig.Config.RuntimeParams, "pool_min_conns") } func TestConnectCancel(t *testing.T) { @@ -276,6 +278,25 @@ func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) { assert.EqualValues(t, 0, stats.TotalConns()) } +func TestPoolBackgroundChecksMinConns(t *testing.T) { + t.Parallel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.HealthCheckPeriod = 100 * time.Millisecond + config.MinConns = 2 + + db, err := pgxpool.ConnectConfig(context.Background(), config) + require.NoError(t, err) + defer db.Close() + + time.Sleep(config.HealthCheckPeriod + 100*time.Millisecond) + + stats := db.Stat() + assert.EqualValues(t, 2, stats.TotalConns()) +} + func TestPoolExec(t *testing.T) { t.Parallel()