ConnectionPool now only creates the connections it needs

This commit is contained in:
Jack Christensen 2013-07-26 07:27:14 -05:00
parent 622ff142ca
commit a81a5f08b8
2 changed files with 125 additions and 20 deletions

View File

@ -1,42 +1,83 @@
package pgx package pgx
import (
"sync"
)
type ConnectionPoolOptions struct { type ConnectionPoolOptions struct {
MaxConnections int // max simultaneous connections to use (currently all are immediately connected) MaxConnections int // max simultaneous connections to use
AfterConnect func(*Connection) error AfterConnect func(*Connection) error
} }
type ConnectionPool struct { type ConnectionPool struct {
connectionChannel chan *Connection allConnections []*Connection
availableConnections []*Connection
cond *sync.Cond
parameters ConnectionParameters // parameters used when establishing connection parameters ConnectionParameters // parameters used when establishing connection
maxConnections int maxConnections int
afterConnect func(*Connection) error afterConnect func(*Connection) error
} }
type ConnectionPoolStat struct {
MaxConnections int // max simultaneous connections to use
CurrentConnections int // current live connections
AvailableConnections int // unused live connections
}
// NewConnectionPool creates a new ConnectionPool. parameters are passed through to // NewConnectionPool creates a new ConnectionPool. parameters are passed through to
// Connect directly. // Connect directly.
func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOptions) (p *ConnectionPool, err error) { func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOptions) (p *ConnectionPool, err error) {
p = new(ConnectionPool) p = new(ConnectionPool)
p.connectionChannel = make(chan *Connection, options.MaxConnections)
p.parameters = parameters p.parameters = parameters
p.maxConnections = options.MaxConnections p.maxConnections = options.MaxConnections
p.afterConnect = options.AfterConnect p.afterConnect = options.AfterConnect
for i := 0; i < p.maxConnections; i++ { p.allConnections = make([]*Connection, 0, p.maxConnections)
p.availableConnections = make([]*Connection, 0, p.maxConnections)
p.cond = sync.NewCond(new(sync.Mutex))
// Initially establish one connection
var c *Connection var c *Connection
c, err = p.createConnection() c, err = p.createConnection()
if err != nil { if err != nil {
return return
} }
p.connectionChannel <- c p.allConnections = append(p.allConnections, c)
} p.availableConnections = append(p.availableConnections, c)
return return
} }
// Acquire takes exclusive use of a connection until it is released. // Acquire takes exclusive use of a connection until it is released.
func (p *ConnectionPool) Acquire() (c *Connection, err error) { func (p *ConnectionPool) Acquire() (c *Connection, err error) {
c = <-p.connectionChannel p.cond.L.Lock()
defer p.cond.L.Unlock()
// A connection is available
if len(p.availableConnections) > 0 {
c = p.availableConnections[len(p.availableConnections)-1]
p.availableConnections = p.availableConnections[:len(p.availableConnections)-1]
return
}
// No connections are available, but we can create more
if len(p.allConnections) < p.maxConnections {
c, err = p.createConnection()
if err != nil {
return
}
p.allConnections = append(p.allConnections, c)
return
}
// All connections are in use and we cannot create more
for len(p.availableConnections) == 0 {
p.cond.Wait()
}
c = p.availableConnections[len(p.availableConnections)-1]
p.availableConnections = p.availableConnections[:len(p.availableConnections)-1]
return return
} }
@ -45,15 +86,37 @@ func (p *ConnectionPool) Release(c *Connection) {
if c.TxStatus != 'I' { if c.TxStatus != 'I' {
c.Execute("rollback") c.Execute("rollback")
} }
p.connectionChannel <- c p.cond.L.Lock()
p.availableConnections = append(p.availableConnections, c)
p.cond.L.Unlock()
p.cond.Signal()
} }
// Close ends the use of a connection by closing all underlying connections. // Close ends the use of a connection by closing all underlying connections.
func (p *ConnectionPool) Close() { func (p *ConnectionPool) Close() {
for i := 0; i < p.maxConnections; i++ { for i := 0; i < p.maxConnections; i++ {
c := <-p.connectionChannel if c, err := p.Acquire(); err != nil {
_ = c.Close() _ = c.Close()
} }
}
}
func (p *ConnectionPool) Stat() (s ConnectionPoolStat) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
s.MaxConnections = p.maxConnections
s.CurrentConnections = len(p.allConnections)
s.AvailableConnections = len(p.availableConnections)
return
}
func (p *ConnectionPool) MaxConnectionCount() int {
return p.maxConnections
}
func (p *ConnectionPool) CurrentConnectionCount() int {
return p.maxConnections
} }
func (p *ConnectionPool) createConnection() (c *Connection, err error) { func (p *ConnectionPool) createConnection() (c *Connection, err error) {

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/JackC/pgx" "github.com/JackC/pgx"
"sync"
"testing" "testing"
) )
@ -30,8 +31,10 @@ func TestNewConnectionPool(t *testing.T) {
} }
defer pool.Close() defer pool.Close()
if numCallbacks != 2 { // It initially connects once
t.Errorf("Expected AfterConnect callback to fire %v times but only fired %v times", numCallbacks, numCallbacks) stat := pool.Stat()
if stat.CurrentConnections != 1 {
t.Errorf("Expected 1 connection to be established immediately, but %v were", numCallbacks)
} }
// Pool creation returns an error if any AfterConnect callback does // Pool creation returns an error if any AfterConnect callback does
@ -158,3 +161,42 @@ func TestPoolReleaseWithTransactions(t *testing.T) {
t.Fatalf("Expected release to rollback uncommitted transaction, but it did not: '%c'", conn.TxStatus) t.Fatalf("Expected release to rollback uncommitted transaction, but it did not: '%c'", conn.TxStatus)
} }
} }
func TestPoolAcquireAndReleaseCycleAutoConnect(t *testing.T) {
maxConnections := 3
pool := createConnectionPool(t, maxConnections)
defer pool.Close()
doSomething := func() {
c, err := pool.Acquire()
if err != nil {
t.Fatalf("Unable to Acquire: %v", err)
}
c.SelectValue("select 1")
pool.Release(c)
}
for i := 0; i < 1000; i++ {
doSomething()
}
stat := pool.Stat()
if stat.CurrentConnections != 1 {
t.Fatalf("Pool shouldn't have established more connections when no contention: %v", stat.CurrentConnections)
}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
doSomething()
}()
}
wg.Wait()
stat = pool.Stat()
if stat.CurrentConnections != stat.MaxConnections {
t.Fatalf("Pool should have used all possible connections: %v", stat.CurrentConnections)
}
}