diff --git a/CHANGELOG.md b/CHANGELOG.md index bedf106b..baef2bdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Fixes * Oid underlying type changed to uint32, previously it was incorrectly int32 (Manni Wood) +* Explicitly close checked-in connections on ConnPool.Reset, previously they were closed by GC ## Features diff --git a/conn_pool.go b/conn_pool.go index eac731dc..6d04565d 100644 --- a/conn_pool.go +++ b/conn_pool.go @@ -255,8 +255,13 @@ func (p *ConnPool) Reset() { defer p.cond.L.Unlock() p.resetCount++ - p.allConnections = make([]*Conn, 0, p.maxConnections) - p.availableConnections = make([]*Conn, 0, p.maxConnections) + p.allConnections = p.allConnections[0:0] + + for _, conn := range p.availableConnections { + conn.Close() + } + + p.availableConnections = p.availableConnections[0:0] } // invalidateAcquired causes all acquired connections to be closed when released. diff --git a/conn_pool_test.go b/conn_pool_test.go index 71a361a6..ab76bfb7 100644 --- a/conn_pool_test.go +++ b/conn_pool_test.go @@ -465,32 +465,38 @@ func TestPoolReleaseDiscardsDeadConnections(t *testing.T) { } } -func TestConnPoolReset(t *testing.T) { +func TestConnPoolResetClosesCheckedOutConnectionsOnRelease(t *testing.T) { t.Parallel() pool := createConnPool(t, 5) defer pool.Close() inProgressRows := []*pgx.Rows{} + var inProgressPIDs []int32 // Start some queries and reset pool while they are in progress for i := 0; i < 10; i++ { - rows, err := pool.Query("select generate_series(1,5)::bigint") + rows, err := pool.Query("select pg_backend_pid() union all select 1 union all select 2") if err != nil { t.Fatal(err) } + rows.Next() + var pid int32 + rows.Scan(&pid) + inProgressPIDs = append(inProgressPIDs, pid) + inProgressRows = append(inProgressRows, rows) pool.Reset() } // Check that the queries are completed for _, rows := range inProgressRows { - var expectedN int64 + var expectedN int32 for rows.Next() { expectedN++ - var n int64 + var n int32 err := rows.Scan(&n) if err != nil { t.Fatal(err) @@ -510,6 +516,75 @@ func TestConnPoolReset(t *testing.T) { if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 { t.Fatalf("Unexpected connection pool stats: %v", stats) } + + var connCount int + err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount) + if err != nil { + t.Fatal(err) + } + if connCount != 0 { + t.Fatalf("%d connections not closed", connCount) + } +} + +func TestConnPoolResetClosesCheckedInConnections(t *testing.T) { + t.Parallel() + + pool := createConnPool(t, 5) + defer pool.Close() + + inProgressRows := []*pgx.Rows{} + var inProgressPIDs []int32 + + // Start some queries and reset pool while they are in progress + for i := 0; i < 5; i++ { + rows, err := pool.Query("select pg_backend_pid()") + if err != nil { + t.Fatal(err) + } + + inProgressRows = append(inProgressRows, rows) + } + + // Check that the queries are completed + for _, rows := range inProgressRows { + for rows.Next() { + var pid int32 + err := rows.Scan(&pid) + if err != nil { + t.Fatal(err) + } + inProgressPIDs = append(inProgressPIDs, pid) + + } + + if err := rows.Err(); err != nil { + t.Fatal(err) + } + } + + // Ensure pool is fully connected and available + stats := pool.Stat() + if stats.CurrentConnections != 5 || stats.AvailableConnections != 5 { + t.Fatalf("Unexpected connection pool stats: %v", stats) + } + + pool.Reset() + + // Pool should be empty after reset + stats = pool.Stat() + if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 { + t.Fatalf("Unexpected connection pool stats: %v", stats) + } + + var connCount int + err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount) + if err != nil { + t.Fatal(err) + } + if connCount != 0 { + t.Fatalf("%d connections not closed", connCount) + } } func TestConnPoolTransaction(t *testing.T) {