mirror of https://github.com/jackc/pgx.git
Add AfterConnect callback for ConnectionPool
parent
7f3c5ab815
commit
e55a5ebccf
|
@ -1,27 +1,37 @@
|
||||||
package pgx
|
package pgx
|
||||||
|
|
||||||
|
type ConnectionPoolOptions struct {
|
||||||
|
MaxConnections int // max simultaneous connections to use (currently all are immediately connected)
|
||||||
|
AfterConnect func(*Connection) error
|
||||||
|
}
|
||||||
|
|
||||||
type ConnectionPool struct {
|
type ConnectionPool struct {
|
||||||
connectionChannel chan *Connection
|
connectionChannel chan *Connection
|
||||||
parameters ConnectionParameters // options used when establishing connection
|
parameters ConnectionParameters // parameters used when establishing connection
|
||||||
MaxConnections int
|
options ConnectionPoolOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConnectionPool creates a new ConnectionPool. options are passed through to
|
// NewConnectionPool creates a new ConnectionPool. parameters are passed through to
|
||||||
// Connect directly. MaxConnections is max simultaneous connections to use
|
// Connect directly.
|
||||||
// (currently all are immediately connected).
|
func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOptions) (p *ConnectionPool, err error) {
|
||||||
func NewConnectionPool(parameters ConnectionParameters, MaxConnections int) (p *ConnectionPool, err error) {
|
|
||||||
p = new(ConnectionPool)
|
p = new(ConnectionPool)
|
||||||
p.connectionChannel = make(chan *Connection, MaxConnections)
|
p.connectionChannel = make(chan *Connection, options.MaxConnections)
|
||||||
p.MaxConnections = MaxConnections
|
|
||||||
|
|
||||||
p.parameters = parameters
|
p.parameters = parameters
|
||||||
|
p.options = options
|
||||||
|
|
||||||
for i := 0; i < p.MaxConnections; i++ {
|
for i := 0; i < p.options.MaxConnections; i++ {
|
||||||
var c *Connection
|
var c *Connection
|
||||||
c, err = Connect(p.parameters)
|
c, err = Connect(p.parameters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if p.options.AfterConnect != nil {
|
||||||
|
err = p.options.AfterConnect(c)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
p.connectionChannel <- c
|
p.connectionChannel <- c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,7 +54,7 @@ func (p *ConnectionPool) Release(c *Connection) {
|
||||||
|
|
||||||
// Close ends the use of a connection by closing all underlying connections.
|
// Close ends the use of a connection by closing all underlying connections.
|
||||||
func (p *ConnectionPool) Close() {
|
func (p *ConnectionPool) Close() {
|
||||||
for i := 0; i < p.MaxConnections; i++ {
|
for i := 0; i < p.options.MaxConnections; i++ {
|
||||||
c := <-p.connectionChannel
|
c := <-p.connectionChannel
|
||||||
_ = c.Close()
|
_ = c.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,13 +1,15 @@
|
||||||
package pgx_test
|
package pgx_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/JackC/pgx"
|
"github.com/JackC/pgx"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func createConnectionPool(t *testing.T, maxConnections int) *pgx.ConnectionPool {
|
func createConnectionPool(t *testing.T, maxConnections int) *pgx.ConnectionPool {
|
||||||
pool, err := pgx.NewConnectionPool(*defaultConnectionParameters, maxConnections)
|
options := pgx.ConnectionPoolOptions{MaxConnections: maxConnections}
|
||||||
|
pool, err := pgx.NewConnectionPool(*defaultConnectionParameters, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unable to create connection pool: %v", err)
|
t.Fatalf("Unable to create connection pool: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -15,14 +17,33 @@ func createConnectionPool(t *testing.T, maxConnections int) *pgx.ConnectionPool
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewConnectionPool(t *testing.T) {
|
func TestNewConnectionPool(t *testing.T) {
|
||||||
pool, err := pgx.NewConnectionPool(*defaultConnectionParameters, 5)
|
var numCallbacks int
|
||||||
|
afterConnect := func(c *pgx.Connection) error {
|
||||||
|
numCallbacks++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
options := pgx.ConnectionPoolOptions{MaxConnections: 2, AfterConnect: afterConnect}
|
||||||
|
pool, err := pgx.NewConnectionPool(*defaultConnectionParameters, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Unable to establish connection pool")
|
t.Fatal("Unable to establish connection pool")
|
||||||
}
|
}
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
if pool.MaxConnections != 5 {
|
if numCallbacks != 2 {
|
||||||
t.Error("Wrong maxConnections")
|
t.Errorf("Expected AfterConnect callback to fire %v times but only fired %v times", numCallbacks, numCallbacks)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pool creation returns an error if any AfterConnect callback does
|
||||||
|
errAfterConnect := errors.New("Some error")
|
||||||
|
afterConnect = func(c *pgx.Connection) error {
|
||||||
|
return errAfterConnect
|
||||||
|
}
|
||||||
|
|
||||||
|
options = pgx.ConnectionPoolOptions{MaxConnections: 2, AfterConnect: afterConnect}
|
||||||
|
pool, err = pgx.NewConnectionPool(*defaultConnectionParameters, options)
|
||||||
|
if err != errAfterConnect {
|
||||||
|
t.Errorf("Expected errAfterConnect but received unexpected: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue