Explicitly close checked-in connections on ConnPool.Reset

v3-experimental-chan
Jack Christensen 2017-01-23 22:48:17 -06:00
parent 6c5b470317
commit d398d95764
3 changed files with 87 additions and 6 deletions

View File

@ -3,6 +3,7 @@
## Fixes
* Oid underlying type changed to uint32, previously it was incorrectly int32 (Manni Wood)
* Explicitly close checked-in connections on ConnPool.Reset, previously they were closed by GC
## Features

View File

@ -255,8 +255,13 @@ func (p *ConnPool) Reset() {
defer p.cond.L.Unlock()
p.resetCount++
p.allConnections = make([]*Conn, 0, p.maxConnections)
p.availableConnections = make([]*Conn, 0, p.maxConnections)
p.allConnections = p.allConnections[0:0]
for _, conn := range p.availableConnections {
conn.Close()
}
p.availableConnections = p.availableConnections[0:0]
}
// invalidateAcquired causes all acquired connections to be closed when released.

View File

@ -465,32 +465,38 @@ func TestPoolReleaseDiscardsDeadConnections(t *testing.T) {
}
}
func TestConnPoolReset(t *testing.T) {
func TestConnPoolResetClosesCheckedOutConnectionsOnRelease(t *testing.T) {
t.Parallel()
pool := createConnPool(t, 5)
defer pool.Close()
inProgressRows := []*pgx.Rows{}
var inProgressPIDs []int32
// Start some queries and reset pool while they are in progress
for i := 0; i < 10; i++ {
rows, err := pool.Query("select generate_series(1,5)::bigint")
rows, err := pool.Query("select pg_backend_pid() union all select 1 union all select 2")
if err != nil {
t.Fatal(err)
}
rows.Next()
var pid int32
rows.Scan(&pid)
inProgressPIDs = append(inProgressPIDs, pid)
inProgressRows = append(inProgressRows, rows)
pool.Reset()
}
// Check that the queries are completed
for _, rows := range inProgressRows {
var expectedN int64
var expectedN int32
for rows.Next() {
expectedN++
var n int64
var n int32
err := rows.Scan(&n)
if err != nil {
t.Fatal(err)
@ -510,6 +516,75 @@ func TestConnPoolReset(t *testing.T) {
if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 {
t.Fatalf("Unexpected connection pool stats: %v", stats)
}
var connCount int
err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount)
if err != nil {
t.Fatal(err)
}
if connCount != 0 {
t.Fatalf("%d connections not closed", connCount)
}
}
func TestConnPoolResetClosesCheckedInConnections(t *testing.T) {
t.Parallel()
pool := createConnPool(t, 5)
defer pool.Close()
inProgressRows := []*pgx.Rows{}
var inProgressPIDs []int32
// Start some queries and reset pool while they are in progress
for i := 0; i < 5; i++ {
rows, err := pool.Query("select pg_backend_pid()")
if err != nil {
t.Fatal(err)
}
inProgressRows = append(inProgressRows, rows)
}
// Check that the queries are completed
for _, rows := range inProgressRows {
for rows.Next() {
var pid int32
err := rows.Scan(&pid)
if err != nil {
t.Fatal(err)
}
inProgressPIDs = append(inProgressPIDs, pid)
}
if err := rows.Err(); err != nil {
t.Fatal(err)
}
}
// Ensure pool is fully connected and available
stats := pool.Stat()
if stats.CurrentConnections != 5 || stats.AvailableConnections != 5 {
t.Fatalf("Unexpected connection pool stats: %v", stats)
}
pool.Reset()
// Pool should be empty after reset
stats = pool.Stat()
if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 {
t.Fatalf("Unexpected connection pool stats: %v", stats)
}
var connCount int
err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount)
if err != nil {
t.Fatal(err)
}
if connCount != 0 {
t.Fatalf("%d connections not closed", connCount)
}
}
func TestConnPoolTransaction(t *testing.T) {