mirror of https://github.com/jackc/pgx.git
Per pool.Conn preallocated resources
This removes the pool wide mutexes for preallocated resources.pull/483/head
parent
00a1b62e91
commit
3661a005fa
14
pool/conn.go
14
pool/conn.go
|
@ -66,5 +66,17 @@ func (c *Conn) Begin(ctx context.Context, txOptions *pgx.TxOptions) (*pgx.Tx, er
|
|||
}
|
||||
|
||||
func (c *Conn) Conn() *pgx.Conn {
|
||||
return c.res.Value().(*pgx.Conn)
|
||||
return c.res.Value().(*connResource).conn
|
||||
}
|
||||
|
||||
func (c *Conn) connResource() *connResource {
|
||||
return c.res.Value().(*connResource)
|
||||
}
|
||||
|
||||
func (c *Conn) getPoolRow(r pgx.Row) *poolRow {
|
||||
return c.connResource().getPoolRow(c, r)
|
||||
}
|
||||
|
||||
func (c *Conn) getPoolRows(r pgx.Rows) *poolRows {
|
||||
return c.connResource().getPoolRows(c, r)
|
||||
}
|
||||
|
|
138
pool/pool.go
138
pool/pool.go
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
|
@ -17,6 +16,55 @@ var defaultMaxConns = int32(4)
|
|||
var defaultMaxConnLifetime = time.Hour
|
||||
var defaultHealthCheckPeriod = time.Minute
|
||||
|
||||
type connResource struct {
|
||||
conn *pgx.Conn
|
||||
conns []Conn
|
||||
poolRows []poolRow
|
||||
poolRowss []poolRows
|
||||
}
|
||||
|
||||
func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn {
|
||||
if len(cr.conns) == 0 {
|
||||
cr.conns = make([]Conn, 128)
|
||||
}
|
||||
|
||||
c := &cr.conns[len(cr.conns)-1]
|
||||
cr.conns = cr.conns[0 : len(cr.conns)-1]
|
||||
|
||||
c.res = res
|
||||
c.p = p
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
|
||||
if len(cr.poolRows) == 0 {
|
||||
cr.poolRows = make([]poolRow, 128)
|
||||
}
|
||||
|
||||
pr := &cr.poolRows[len(cr.poolRows)-1]
|
||||
cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
|
||||
|
||||
pr.c = c
|
||||
pr.r = r
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
|
||||
if len(cr.poolRowss) == 0 {
|
||||
cr.poolRowss = make([]poolRows, 128)
|
||||
}
|
||||
|
||||
pr := &cr.poolRowss[len(cr.poolRowss)-1]
|
||||
cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
|
||||
|
||||
pr.c = c
|
||||
pr.r = r
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
type Pool struct {
|
||||
p *puddle.Pool
|
||||
afterConnect func(context.Context, *pgx.Conn) error
|
||||
|
@ -25,15 +73,6 @@ type Pool struct {
|
|||
maxConnLifetime time.Duration
|
||||
healthCheckPeriod time.Duration
|
||||
closeChan chan struct{}
|
||||
|
||||
preallocatedConnsMux sync.Mutex
|
||||
preallocatedConns []Conn
|
||||
|
||||
preallocatedPoolRowsMux sync.Mutex
|
||||
preallocatedPoolRows []poolRow
|
||||
|
||||
preallocatedPoolRowssMux sync.Mutex
|
||||
preallocatedPoolRowss []poolRows
|
||||
}
|
||||
|
||||
// Config is the configuration struct for creating a pool. It is highly recommended to modify a Config returned by
|
||||
|
@ -101,11 +140,18 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
cr := &connResource{
|
||||
conn: conn,
|
||||
conns: make([]Conn, 64),
|
||||
poolRows: make([]poolRow, 64),
|
||||
poolRowss: make([]poolRows, 64),
|
||||
}
|
||||
|
||||
return cr, nil
|
||||
},
|
||||
func(value interface{}) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
value.(*pgx.Conn).Close(ctx)
|
||||
value.(*connResource).conn.Close(ctx)
|
||||
cancel()
|
||||
},
|
||||
config.MaxConns,
|
||||
|
@ -222,60 +268,6 @@ func (p *Pool) checkIdleConnsHealth() {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *Pool) getConn(res *puddle.Resource) *Conn {
|
||||
p.preallocatedConnsMux.Lock()
|
||||
|
||||
if len(p.preallocatedConns) == 0 {
|
||||
p.preallocatedConns = make([]Conn, 128)
|
||||
}
|
||||
|
||||
c := &p.preallocatedConns[len(p.preallocatedConns)-1]
|
||||
p.preallocatedConns = p.preallocatedConns[0 : len(p.preallocatedConns)-1]
|
||||
|
||||
p.preallocatedConnsMux.Unlock()
|
||||
|
||||
c.res = res
|
||||
c.p = p
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (p *Pool) getPoolRow(c *Conn, r pgx.Row) *poolRow {
|
||||
p.preallocatedPoolRowsMux.Lock()
|
||||
|
||||
if len(p.preallocatedPoolRows) == 0 {
|
||||
p.preallocatedPoolRows = make([]poolRow, 128)
|
||||
}
|
||||
|
||||
pr := &p.preallocatedPoolRows[len(p.preallocatedPoolRows)-1]
|
||||
p.preallocatedPoolRows = p.preallocatedPoolRows[0 : len(p.preallocatedPoolRows)-1]
|
||||
|
||||
p.preallocatedPoolRowsMux.Unlock()
|
||||
|
||||
pr.c = c
|
||||
pr.r = r
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
func (p *Pool) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
|
||||
p.preallocatedPoolRowssMux.Lock()
|
||||
|
||||
if len(p.preallocatedPoolRowss) == 0 {
|
||||
p.preallocatedPoolRowss = make([]poolRows, 128)
|
||||
}
|
||||
|
||||
pr := &p.preallocatedPoolRowss[len(p.preallocatedPoolRowss)-1]
|
||||
p.preallocatedPoolRowss = p.preallocatedPoolRowss[0 : len(p.preallocatedPoolRowss)-1]
|
||||
|
||||
p.preallocatedPoolRowssMux.Unlock()
|
||||
|
||||
pr.c = c
|
||||
pr.r = r
|
||||
|
||||
return pr
|
||||
}
|
||||
|
||||
func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
||||
for {
|
||||
res, err := p.p.Acquire(ctx)
|
||||
|
@ -283,8 +275,9 @@ func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
||||
return p.getConn(res), nil
|
||||
cr := res.Value().(*connResource)
|
||||
if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) {
|
||||
return cr.getConn(p, res), nil
|
||||
}
|
||||
|
||||
res.Destroy()
|
||||
|
@ -297,8 +290,9 @@ func (p *Pool) AcquireAllIdle() []*Conn {
|
|||
resources := p.p.AcquireAllIdle()
|
||||
conns := make([]*Conn, 0, len(resources))
|
||||
for _, res := range resources {
|
||||
if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) {
|
||||
conns = append(conns, p.getConn(res))
|
||||
cr := res.Value().(*connResource)
|
||||
if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) {
|
||||
conns = append(conns, cr.getConn(p, res))
|
||||
} else {
|
||||
res.Destroy()
|
||||
}
|
||||
|
@ -333,7 +327,7 @@ func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx.
|
|||
return errRows{err: err}, err
|
||||
}
|
||||
|
||||
return p.getPoolRows(c, rows), nil
|
||||
return c.getPoolRows(rows), nil
|
||||
}
|
||||
|
||||
func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
|
||||
|
@ -343,7 +337,7 @@ func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pg
|
|||
}
|
||||
|
||||
row := c.QueryRow(ctx, sql, args...)
|
||||
return p.getPoolRow(c, row)
|
||||
return c.getPoolRow(row)
|
||||
}
|
||||
|
||||
func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
|
||||
|
|
Loading…
Reference in New Issue