From 644bd73dcc5fb05743c4dc7fad297c64e76aa0d9 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 7 May 2022 07:13:23 -0500 Subject: [PATCH 1/8] Upgrade to pgconn v1.12.1 --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 28565f04..6106e94a 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/cockroachdb/apd v1.1.0 github.com/go-kit/log v0.1.0 github.com/gofrs/uuid v4.0.0+incompatible - github.com/jackc/pgconn v1.12.0 + github.com/jackc/pgconn v1.12.1 github.com/jackc/pgio v1.0.0 github.com/jackc/pgproto3/v2 v2.3.0 github.com/jackc/pgtype v1.11.0 diff --git a/go.sum b/go.sum index 770acf85..7a5e7218 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/jackc/pgconn v1.11.0 h1:HiHArx4yFbwl91X3qqIHtUFoiIfLNJXCQRsnzkiwwaQ= github.com/jackc/pgconn v1.11.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= github.com/jackc/pgconn v1.12.0 h1:/RvQ24k3TnNdfBSW0ou9EOi5jx2cX7zfE8n2nLKuiP0= github.com/jackc/pgconn v1.12.0/go.mod h1:ZkhRC59Llhrq3oSfrikvwQ5NaxYExr6twkdkMLaKono= +github.com/jackc/pgconn v1.12.1 h1:rsDFzIpRk7xT4B8FufgpCCeyjdNpKyghZeSefViE5W8= +github.com/jackc/pgconn v1.12.1/go.mod h1:ZkhRC59Llhrq3oSfrikvwQ5NaxYExr6twkdkMLaKono= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= From 8b9b4055f3b679999adde4a0ab44eb6e1ea536c7 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 7 May 2022 07:14:53 -0500 Subject: [PATCH 2/8] Release v4.16.1 --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 68035450..7bb69fe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +# 4.16.1 (May 7, 2022) + +* Upgrade pgconn to v1.12.1 +* Fix explicitly prepared statements with describe statement cache mode + # 4.16.0 (April 21, 2022) * Upgrade pgconn to v1.12.0 From bfb19cd4f6229647f26bb8dcaff37e4f676bf276 Mon Sep 17 00:00:00 2001 From: Stepan Rabotkin Date: Sun, 8 May 2022 01:32:59 +0300 Subject: [PATCH 3/8] feat: add time duration to error query and copy --- copy_from.go | 4 ++-- rows.go | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/copy_from.go b/copy_from.go index 3494e28f..49139d05 100644 --- a/copy_from.go +++ b/copy_from.go @@ -153,13 +153,13 @@ func (ct *copyFrom) run(ctx context.Context) (int64, error) { <-doneChan rowsAffected := commandTag.RowsAffected() + endTime := time.Now() if err == nil { if ct.conn.shouldLog(LogLevelInfo) { - endTime := time.Now() ct.conn.log(ctx, LogLevelInfo, "CopyFrom", map[string]interface{}{"tableName": ct.tableName, "columnNames": ct.columnNames, "time": endTime.Sub(startTime), "rowCount": rowsAffected}) } } else if ct.conn.shouldLog(LogLevelError) { - ct.conn.log(ctx, LogLevelError, "CopyFrom", map[string]interface{}{"err": err, "tableName": ct.tableName, "columnNames": ct.columnNames}) + ct.conn.log(ctx, LogLevelError, "CopyFrom", map[string]interface{}{"err": err, "tableName": ct.tableName, "columnNames": ct.columnNames, "time": endTime.Sub(startTime)}) } return rowsAffected, err diff --git a/rows.go b/rows.go index 271c6e52..4749ead9 100644 --- a/rows.go +++ b/rows.go @@ -143,14 +143,15 @@ func (rows *connRows) Close() { } if rows.logger != nil { + endTime := time.Now() + if rows.err == nil { if rows.logger.shouldLog(LogLevelInfo) { - endTime := time.Now() rows.logger.log(rows.ctx, LogLevelInfo, "Query", map[string]interface{}{"sql": rows.sql, "args": logQueryArgs(rows.args), "time": endTime.Sub(rows.startTime), "rowCount": rows.rowCount}) } } else { if rows.logger.shouldLog(LogLevelError) { - rows.logger.log(rows.ctx, LogLevelError, "Query", map[string]interface{}{"err": rows.err, "sql": rows.sql, "args": logQueryArgs(rows.args)}) + rows.logger.log(rows.ctx, LogLevelError, "Query", map[string]interface{}{"err": rows.err, "sql": rows.sql, "time": endTime.Sub(rows.startTime), "args": logQueryArgs(rows.args)}) } if rows.err != nil && rows.conn.stmtcache != nil { rows.conn.stmtcache.StatementErrored(rows.sql, rows.err) From 4099b447b907ab06c255e4fefe54c7d9de764a15 Mon Sep 17 00:00:00 2001 From: Stepan Rabotkin Date: Mon, 9 May 2022 14:01:19 +0300 Subject: [PATCH 4/8] feat: add batch logging --- conn.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/conn.go b/conn.go index 9b620d27..f46b6dcd 100644 --- a/conn.go +++ b/conn.go @@ -710,6 +710,8 @@ func (c *Conn) QueryFunc(ctx context.Context, sql string, args []interface{}, sc // explicit transaction control statements are executed. The returned BatchResults must be closed before the connection // is used again. func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults { + startTime := time.Now() + simpleProtocol := c.config.PreferSimpleProtocol var sb strings.Builder if simpleProtocol { @@ -768,23 +770,23 @@ func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults { var err error sd, err = stmtCache.Get(ctx, bi.query) if err != nil { - return &batchResults{ctx: ctx, conn: c, err: err} + return c.logBatchResults(ctx, startTime, &batchResults{ctx: ctx, conn: c, err: err}) } } if len(sd.ParamOIDs) != len(bi.arguments) { - return &batchResults{ctx: ctx, conn: c, err: fmt.Errorf("mismatched param and argument count")} + return c.logBatchResults(ctx, startTime, &batchResults{ctx: ctx, conn: c, err: fmt.Errorf("mismatched param and argument count")}) } args, err := convertDriverValuers(bi.arguments) if err != nil { - return &batchResults{ctx: ctx, conn: c, err: err} + return c.logBatchResults(ctx, startTime, &batchResults{ctx: ctx, conn: c, err: err}) } for i := range args { err = c.eqb.AppendParam(c.connInfo, sd.ParamOIDs[i], args[i]) if err != nil { - return &batchResults{ctx: ctx, conn: c, err: err} + return c.logBatchResults(ctx, startTime, &batchResults{ctx: ctx, conn: c, err: err}) } } @@ -803,13 +805,29 @@ func (c *Conn) SendBatch(ctx context.Context, b *Batch) BatchResults { mrr := c.pgConn.ExecBatch(ctx, batch) - return &batchResults{ + return c.logBatchResults(ctx, startTime, &batchResults{ ctx: ctx, conn: c, mrr: mrr, b: b, ix: 0, + }) +} + +func (c *Conn) logBatchResults(ctx context.Context, startTime time.Time, results *batchResults) BatchResults { + if results.err != nil { + if c.shouldLog(LogLevelError) { + endTime := time.Now() + c.log(ctx, LogLevelError, "SendBatch", map[string]interface{}{"err": results.err, "time": endTime.Sub(startTime)}) + } } + + if c.shouldLog(LogLevelInfo) { + endTime := time.Now() + c.log(ctx, LogLevelInfo, "SendBatch", map[string]interface{}{"batchLen": results.b.Len(), "time": endTime.Sub(startTime)}) + } + + return results } func (c *Conn) sanitizeForSimpleQuery(sql string, args ...interface{}) (string, error) { From dc0ad04ff58f72f4819289f54745a36124cdbec3 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 12 May 2022 19:10:02 -0500 Subject: [PATCH 5/8] Fix batch logging tests --- batch_test.go | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/batch_test.go b/batch_test.go index 309ae11d..f95e335e 100644 --- a/batch_test.go +++ b/batch_test.go @@ -737,23 +737,27 @@ func TestLogBatchStatementsOnExec(t *testing.T) { t.Fatalf("Unexpected error dropping table: %v", err) } - if len(l1.logs) != 2 { + if len(l1.logs) != 3 { t.Fatalf("Expected two log entries but got %d", len(l1.logs)) } - if l1.logs[0].msg != "BatchResult.Exec" { - t.Errorf("Expected first log message to be 'BatchResult.Exec' but was '%s", l1.logs[0].msg) - } - - if l1.logs[0].data["sql"] != "create table foo (id bigint)" { - t.Errorf("Expected the first query to be 'create table foo (id bigint)' but was '%s'", l1.logs[0].data["sql"]) + if l1.logs[0].msg != "SendBatch" { + t.Errorf("Expected first log message to be 'SendBatch' but was '%s'", l1.logs[0].msg) } if l1.logs[1].msg != "BatchResult.Exec" { + t.Errorf("Expected first log message to be 'BatchResult.Exec' but was '%s'", l1.logs[0].msg) + } + + if l1.logs[1].data["sql"] != "create table foo (id bigint)" { + t.Errorf("Expected the first query to be 'create table foo (id bigint)' but was '%s'", l1.logs[0].data["sql"]) + } + + if l1.logs[2].msg != "BatchResult.Exec" { t.Errorf("Expected second log message to be 'BatchResult.Exec' but was '%s", l1.logs[1].msg) } - if l1.logs[1].data["sql"] != "drop table foo" { + if l1.logs[2].data["sql"] != "drop table foo" { t.Errorf("Expected the second query to be 'drop table foo' but was '%s'", l1.logs[1].data["sql"]) } } @@ -778,23 +782,27 @@ func TestLogBatchStatementsOnBatchResultClose(t *testing.T) { t.Fatalf("Unexpected batch error: %v", err) } - if len(l1.logs) != 2 { + if len(l1.logs) != 3 { t.Fatalf("Expected 2 log statements but found %d", len(l1.logs)) } - if l1.logs[0].msg != "BatchResult.Close" { - t.Errorf("Expected first log statement to be 'BatchResult.Close' but was %s", l1.logs[0].msg) - } - - if l1.logs[0].data["sql"] != "select generate_series(1,$1)" { - t.Errorf("Expected first query to be 'select generate_series(1,$1)' but was '%s'", l1.logs[0].data["sql"]) + if l1.logs[0].msg != "SendBatch" { + t.Errorf("Expected first log message to be 'SendBatch' but was '%s'", l1.logs[0].msg) } if l1.logs[1].msg != "BatchResult.Close" { + t.Errorf("Expected first log statement to be 'BatchResult.Close' but was '%s'", l1.logs[0].msg) + } + + if l1.logs[1].data["sql"] != "select generate_series(1,$1)" { + t.Errorf("Expected first query to be 'select generate_series(1,$1)' but was '%s'", l1.logs[0].data["sql"]) + } + + if l1.logs[2].msg != "BatchResult.Close" { t.Errorf("Expected second log statement to be 'BatchResult.Close' but was %s", l1.logs[1].msg) } - if l1.logs[1].data["sql"] != "select 1 = 1;" { + if l1.logs[2].data["sql"] != "select 1 = 1;" { t.Errorf("Expected second query to be 'select 1 = 1;' but was '%s'", l1.logs[1].data["sql"]) } } From 37c3f157bcbdbd30b17885deee87efd1d1baaa53 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 14 Apr 2022 11:50:12 -0500 Subject: [PATCH 6/8] Add Hijack from v5 --- pgxpool/conn.go | 16 ++++++++++++++++ pgxpool/pool_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/pgxpool/conn.go b/pgxpool/conn.go index 0b59d741..036c728a 100644 --- a/pgxpool/conn.go +++ b/pgxpool/conn.go @@ -46,6 +46,22 @@ func (c *Conn) Release() { }() } +// Hijack assumes ownership of the connection from the pool. Caller is responsible for closing the connection. Hijack +// will panic if called on an already released or hijacked connection. +func (c *Conn) Hijack() *pgx.Conn { + if c.res == nil { + panic("cannot hijack already released or hijacked connection") + } + + conn := c.Conn() + res := c.res + c.res = nil + + res.Hijack() + + return conn +} + func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) { return c.Conn().Exec(ctx, sql, arguments...) } diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index 072ca053..42e029e1 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -115,6 +115,32 @@ func TestPoolAcquireAndConnRelease(t *testing.T) { c.Release() } +func TestPoolAcquireAndConnHijack(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + pool, err := pgxpool.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + defer pool.Close() + + c, err := pool.Acquire(ctx) + require.NoError(t, err) + + connsBeforeHijack := pool.Stat().TotalConns() + + conn := c.Hijack() + defer conn.Close(ctx) + + connsAfterHijack := pool.Stat().TotalConns() + require.Equal(t, connsBeforeHijack-1, connsAfterHijack) + + var n int32 + err = conn.QueryRow(ctx, `select 1`).Scan(&n) + require.NoError(t, err) + require.Equal(t, int32(1), n) +} + func TestPoolAcquireFunc(t *testing.T) { t.Parallel() From a814153aebbd5fc8b037f535ff8d1e54209247a8 Mon Sep 17 00:00:00 2001 From: James Hartig Date: Thu, 26 May 2022 10:49:57 -0400 Subject: [PATCH 7/8] pgxpool: health check should avoid going below minConns --- pgxpool/conn.go | 23 +++++- pgxpool/pool.go | 174 +++++++++++++++++++++++++++++++++---------- pgxpool/pool_test.go | 38 +++++++++- pgxpool/stat.go | 22 +++++- 4 files changed, 212 insertions(+), 45 deletions(-) diff --git a/pgxpool/conn.go b/pgxpool/conn.go index 036c728a..6482c821 100644 --- a/pgxpool/conn.go +++ b/pgxpool/conn.go @@ -2,7 +2,7 @@ package pgxpool import ( "context" - "time" + "sync/atomic" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" @@ -26,9 +26,23 @@ func (c *Conn) Release() { res := c.res c.res = nil - now := time.Now() - if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' || (now.Sub(res.CreationTime()) > c.p.maxConnLifetime) { + if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' { res.Destroy() + // Signal to the health check to run since we just destroyed a connections + // and we might be below minConns now + c.p.triggerHealthCheck() + return + } + + // If the pool is consistently being used, we might never get to check the + // lifetime of a connection since we only check idle connections in checkConnsHealth + // so we also check the lifetime here and force a health check + if c.p.isExpired(res) { + atomic.AddInt64(&c.p.lifetimeDestroyCount, 1) + res.Destroy() + // Signal to the health check to run since we just destroyed a connections + // and we might be below minConns now + c.p.triggerHealthCheck() return } @@ -42,6 +56,9 @@ func (c *Conn) Release() { res.Release() } else { res.Destroy() + // Signal to the health check to run since we just destroyed a connections + // and we might be below minConns now + c.p.triggerHealthCheck() } }() } diff --git a/pgxpool/pool.go b/pgxpool/pool.go index d7586168..3234c162 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -3,9 +3,11 @@ package pgxpool import ( "context" "fmt" + "math/rand" "runtime" "strconv" "sync" + "sync/atomic" "time" "github.com/jackc/pgconn" @@ -70,16 +72,23 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows { // Pool allows for connection reuse. type Pool struct { - p *puddle.Pool - config *Config - beforeConnect func(context.Context, *pgx.ConnConfig) error - afterConnect func(context.Context, *pgx.Conn) error - beforeAcquire func(context.Context, *pgx.Conn) bool - afterRelease func(*pgx.Conn) bool - minConns int32 - maxConnLifetime time.Duration - maxConnIdleTime time.Duration - healthCheckPeriod time.Duration + p *puddle.Pool + config *Config + beforeConnect func(context.Context, *pgx.ConnConfig) error + afterConnect func(context.Context, *pgx.Conn) error + beforeAcquire func(context.Context, *pgx.Conn) bool + afterRelease func(*pgx.Conn) bool + minConns int32 + maxConns int32 + maxConnLifetime time.Duration + maxConnLifetimeJitter time.Duration + maxConnIdleTime time.Duration + healthCheckPeriod time.Duration + healthCheckChan chan struct{} + + newConnsCount int64 + lifetimeDestroyCount int64 + idleDestroyCount int64 closeOnce sync.Once closeChan chan struct{} @@ -109,14 +118,19 @@ type Config struct { // MaxConnLifetime is the duration since creation after which a connection will be automatically closed. MaxConnLifetime time.Duration + // MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection. + // This helps prevent all connections from being closed at the exact same time, starving the pool. + MaxConnLifetimeJitter time.Duration + // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check. MaxConnIdleTime time.Duration // MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU(). MaxConns int32 - // MinConns is the minimum size of the pool. The health check will increase the number of connections to this - // amount if it had dropped below. + // MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low + // number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance + // to create new connections. MinConns int32 // HealthCheckPeriod is the duration between checks of the health of idle connections. @@ -164,16 +178,19 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { } p := &Pool{ - config: config, - beforeConnect: config.BeforeConnect, - afterConnect: config.AfterConnect, - beforeAcquire: config.BeforeAcquire, - afterRelease: config.AfterRelease, - minConns: config.MinConns, - maxConnLifetime: config.MaxConnLifetime, - maxConnIdleTime: config.MaxConnIdleTime, - healthCheckPeriod: config.HealthCheckPeriod, - closeChan: make(chan struct{}), + config: config, + beforeConnect: config.BeforeConnect, + afterConnect: config.AfterConnect, + beforeAcquire: config.BeforeAcquire, + afterRelease: config.AfterRelease, + minConns: config.MinConns, + maxConns: config.MaxConns, + maxConnLifetime: config.MaxConnLifetime, + maxConnLifetimeJitter: config.MaxConnLifetimeJitter, + maxConnIdleTime: config.MaxConnIdleTime, + healthCheckPeriod: config.HealthCheckPeriod, + healthCheckChan: make(chan struct{}, 1), + closeChan: make(chan struct{}), } p.p = puddle.NewPool( @@ -223,7 +240,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { ) if !config.LazyConnect { - if err := p.createIdleResources(ctx, int(p.minConns)); err != nil { + if err := p.checkMinConns(); err != nil { // Couldn't create resources for minpool size. Close unhealthy pool. p.Close() return nil, err @@ -251,6 +268,7 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { // pool_max_conn_lifetime: duration string // pool_max_conn_idle_time: duration string // pool_health_check_period: duration string +// pool_max_conn_lifetime_jitter: duration string // // See Config for definitions of these arguments. // @@ -331,6 +349,15 @@ func ParseConfig(connString string) (*Config, error) { config.HealthCheckPeriod = defaultHealthCheckPeriod } + if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok { + delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter") + d, err := time.ParseDuration(s) + if err != nil { + return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err) + } + config.MaxConnLifetimeJitter = d + } + return config, nil } @@ -343,44 +370,105 @@ func (p *Pool) Close() { }) } +func (p *Pool) isExpired(res *puddle.Resource) bool { + now := time.Now() + // Small optimization to avoid rand. If it's over lifetime AND jitter, immediately + // return true. + if now.Sub(res.CreationTime()) > p.maxConnLifetime+p.maxConnLifetimeJitter { + return true + } + if p.maxConnLifetimeJitter == 0 { + return false + } + jitterSecs := rand.Float64() * p.maxConnLifetimeJitter.Seconds() + return now.Sub(res.CreationTime()) > p.maxConnLifetime+(time.Duration(jitterSecs)*time.Second) +} + +func (p *Pool) triggerHealthCheck() { + go func() { + // Destroy is asynchronous so we give it time to actually remove itself from + // the pool otherwise we might try to check the pool size too soon + time.Sleep(500 * time.Millisecond) + select { + case p.healthCheckChan <- struct{}{}: + default: + } + }() +} + func (p *Pool) backgroundHealthCheck() { ticker := time.NewTicker(p.healthCheckPeriod) - + defer ticker.Stop() for { select { case <-p.closeChan: - ticker.Stop() return + case <-p.healthCheckChan: + p.checkHealth() case <-ticker.C: - p.checkIdleConnsHealth() - p.checkMinConns() + p.checkHealth() } } } -func (p *Pool) checkIdleConnsHealth() { - resources := p.p.AcquireAllIdle() +func (p *Pool) checkHealth() { + for { + // If checkMinConns failed we don't destroy any connections since we couldn't + // even get to minConns + if err := p.checkMinConns(); err != nil { + // Should we log this error somewhere? + break + } + if !p.checkConnsHealth() { + // Since we didn't destroy any connections we can stop looping + break + } + // Technically Destroy is asynchronous but 500ms should be enough for it to + // remove it from the underlying pool + select { + case <-p.closeChan: + return + case <-time.After(500 * time.Millisecond): + } + } +} - now := time.Now() +// checkConnsHealth will check all idle connections, destroy a connection if +// it's idle or too old, and returns true if any were destroyed +func (p *Pool) checkConnsHealth() bool { + var destroyed bool + totalConns := p.Stat().TotalConns() + resources := p.p.AcquireAllIdle() for _, res := range resources { - if now.Sub(res.CreationTime()) > p.maxConnLifetime { + // We're okay going under minConns if the lifetime is up + if p.isExpired(res) && totalConns >= p.minConns { + atomic.AddInt64(&p.lifetimeDestroyCount, 1) res.Destroy() - } else if res.IdleDuration() > p.maxConnIdleTime { + destroyed = true + // Since Destroy is async we manually decrement totalConns. + totalConns-- + } else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns { + atomic.AddInt64(&p.idleDestroyCount, 1) res.Destroy() + destroyed = true + // Since Destroy is async we manually decrement totalConns. + totalConns-- } else { res.ReleaseUnused() } } + return destroyed } -func (p *Pool) checkMinConns() { - for i := p.minConns - p.Stat().TotalConns(); i > 0; i-- { - go func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - p.p.CreateResource(ctx) - }() +func (p *Pool) checkMinConns() error { + // TotalConns can include ones that are being destroyed but we should have + // sleep(500ms) around all of the destroys to help prevent that from throwing + // off this check + toCreate := p.minConns - p.Stat().TotalConns() + if toCreate > 0 { + return p.createIdleResources(context.Background(), int(toCreate)) } + return nil } func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error { @@ -391,6 +479,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in for i := 0; i < targetResources; i++ { go func() { + atomic.AddInt64(&p.newConnsCount, 1) err := p.p.CreateResource(ctx) errs <- err }() @@ -460,7 +549,12 @@ func (p *Pool) Config() *Config { return p.config.Copy() } // Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics. func (p *Pool) Stat() *Stat { - return &Stat{s: p.p.Stat()} + return &Stat{ + s: p.p.Stat(), + newConnsCount: atomic.LoadInt64(&p.newConnsCount), + lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount), + idleDestroyCount: atomic.LoadInt64(&p.idleDestroyCount), + } } // Exec acquires a connection from the Pool and executes the given SQL. diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index 42e029e1..1742f55d 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -372,6 +372,14 @@ func TestConnReleaseClosesBusyConn(t *testing.T) { c.Release() waitForReleaseToComplete() + // wait for the connection to actually be destroyed + for i := 0; i < 1000; i++ { + if db.Stat().TotalConns() == 0 { + break + } + time.Sleep(time.Millisecond) + } + stats := db.Stat() assert.EqualValues(t, 0, stats.TotalConns()) } @@ -396,6 +404,8 @@ func TestPoolBackgroundChecksMaxConnLifetime(t *testing.T) { stats := db.Stat() assert.EqualValues(t, 0, stats.TotalConns()) + assert.EqualValues(t, 0, stats.MaxIdleDestroyCount()) + assert.EqualValues(t, 1, stats.MaxLifetimeDestroyCount()) } func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) { @@ -426,6 +436,8 @@ func TestPoolBackgroundChecksMaxConnIdleTime(t *testing.T) { stats := db.Stat() assert.EqualValues(t, 0, stats.TotalConns()) + assert.EqualValues(t, 1, stats.MaxIdleDestroyCount()) + assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount()) } func TestPoolBackgroundChecksMinConns(t *testing.T) { @@ -443,6 +455,21 @@ func TestPoolBackgroundChecksMinConns(t *testing.T) { stats := db.Stat() assert.EqualValues(t, 2, stats.TotalConns()) + assert.EqualValues(t, 0, stats.MaxLifetimeDestroyCount()) + assert.EqualValues(t, 2, stats.NewConnsCount()) + + c, err := db.Acquire(context.Background()) + require.NoError(t, err) + err = c.Conn().Close(context.Background()) + require.NoError(t, err) + c.Release() + + time.Sleep(config.HealthCheckPeriod + 500*time.Millisecond) + + stats = db.Stat() + assert.EqualValues(t, 2, stats.TotalConns()) + assert.EqualValues(t, 0, stats.MaxIdleDestroyCount()) + assert.EqualValues(t, 3, stats.NewConnsCount()) } func TestPoolExec(t *testing.T) { @@ -679,6 +706,14 @@ func TestConnReleaseDestroysClosedConn(t *testing.T) { c.Release() waitForReleaseToComplete() + // wait for the connection to actually be destroyed + for i := 0; i < 1000; i++ { + if pool.Stat().TotalConns() == 0 { + break + } + time.Sleep(time.Millisecond) + } + assert.EqualValues(t, 0, pool.Stat().TotalConns()) } @@ -767,7 +802,7 @@ func TestTxBeginFuncNestedTransactionCommit(t *testing.T) { require.NoError(t, err) return nil }) - + require.NoError(t, err) return nil }) require.NoError(t, err) @@ -817,6 +852,7 @@ func TestTxBeginFuncNestedTransactionRollback(t *testing.T) { return nil }) + require.NoError(t, err) var n int64 err = db.QueryRow(context.Background(), "select count(*) from pgxpooltx").Scan(&n) diff --git a/pgxpool/stat.go b/pgxpool/stat.go index 336be42d..47342be4 100644 --- a/pgxpool/stat.go +++ b/pgxpool/stat.go @@ -8,7 +8,10 @@ import ( // Stat is a snapshot of Pool statistics. type Stat struct { - s *puddle.Stat + s *puddle.Stat + newConnsCount int64 + lifetimeDestroyCount int64 + idleDestroyCount int64 } // AcquireCount returns the cumulative count of successful acquires from the pool. @@ -62,3 +65,20 @@ func (s *Stat) MaxConns() int32 { func (s *Stat) TotalConns() int32 { return s.s.TotalResources() } + +// NewConnsCount returns the cumulative count of new connections opened. +func (s *Stat) NewConnsCount() int64 { + return s.newConnsCount +} + +// MaxLifetimeDestroyCount returns the cumulative count of connections destroyed +// because they exceeded MaxConnLifetime. +func (s *Stat) MaxLifetimeDestroyCount() int64 { + return s.lifetimeDestroyCount +} + +// MaxIdleDestroyCount returns the cumulative count of connections destroyed because +// they exceeded MaxConnIdleTime. +func (s *Stat) MaxIdleDestroyCount() int64 { + return s.idleDestroyCount +} From 396195466c6a0cd445d3690a3e33551d7808cf85 Mon Sep 17 00:00:00 2001 From: Gabor Szabad Date: Tue, 21 Jun 2022 10:20:21 +0100 Subject: [PATCH 8/8] Add logger func wrapper --- conn_test.go | 33 +++++++++++++++++++++++++++++++++ logger.go | 8 ++++++++ 2 files changed, 41 insertions(+) diff --git a/conn_test.go b/conn_test.go index e34662ae..467f6ecc 100644 --- a/conn_test.go +++ b/conn_test.go @@ -1,7 +1,9 @@ package pgx_test import ( + "bytes" "context" + "log" "os" "strings" "sync" @@ -837,6 +839,37 @@ func TestLogPassesContext(t *testing.T) { } } +func TestLoggerFunc(t *testing.T) { + t.Parallel() + + const testMsg = "foo" + + buf := bytes.Buffer{} + logger := log.New(&buf, "", 0) + + createAdapterFn := func(logger *log.Logger) pgx.LoggerFunc { + return func(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { + logger.Printf("%s", testMsg) + } + } + + config := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE")) + config.Logger = createAdapterFn(logger) + + conn := mustConnect(t, config) + defer closeConn(t, conn) + + buf.Reset() // Clear logs written when establishing connection + + if _, err := conn.Exec(context.TODO(), ";"); err != nil { + t.Fatal(err) + } + + if strings.TrimSpace(buf.String()) != testMsg { + t.Errorf("Expected logger function to return '%s', but it was '%s'", testMsg, buf.String()) + } +} + func TestIdentifierSanitize(t *testing.T) { t.Parallel() diff --git a/logger.go b/logger.go index 89fd5af5..19a74123 100644 --- a/logger.go +++ b/logger.go @@ -47,6 +47,14 @@ type Logger interface { Log(ctx context.Context, level LogLevel, msg string, data map[string]interface{}) } +// LoggerFunc is a wrapper around a function to satisfy the pgx.Logger interface +type LoggerFunc func(ctx context.Context, level LogLevel, msg string, data map[string]interface{}) + +// Log delegates the logging request to the wrapped function +func (f LoggerFunc) Log(ctx context.Context, level LogLevel, msg string, data map[string]interface{}) { + f(ctx, level, msg, data) +} + // LogLevelFromString converts log level string to constant // // Valid levels: