diff --git a/pool/conn.go b/pool/conn.go index 8194945c..8d2ea262 100644 --- a/pool/conn.go +++ b/pool/conn.go @@ -66,5 +66,17 @@ func (c *Conn) Begin(ctx context.Context, txOptions *pgx.TxOptions) (*pgx.Tx, er } func (c *Conn) Conn() *pgx.Conn { - return c.res.Value().(*pgx.Conn) + return c.res.Value().(*connResource).conn +} + +func (c *Conn) connResource() *connResource { + return c.res.Value().(*connResource) +} + +func (c *Conn) getPoolRow(r pgx.Row) *poolRow { + return c.connResource().getPoolRow(c, r) +} + +func (c *Conn) getPoolRows(r pgx.Rows) *poolRows { + return c.connResource().getPoolRows(c, r) } diff --git a/pool/pool.go b/pool/pool.go index 4af8ac33..5da61e11 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -4,7 +4,6 @@ import ( "context" "runtime" "strconv" - "sync" "time" "github.com/jackc/pgconn" @@ -17,6 +16,55 @@ var defaultMaxConns = int32(4) var defaultMaxConnLifetime = time.Hour var defaultHealthCheckPeriod = time.Minute +type connResource struct { + conn *pgx.Conn + conns []Conn + poolRows []poolRow + poolRowss []poolRows +} + +func (cr *connResource) getConn(p *Pool, res *puddle.Resource) *Conn { + if len(cr.conns) == 0 { + cr.conns = make([]Conn, 128) + } + + c := &cr.conns[len(cr.conns)-1] + cr.conns = cr.conns[0 : len(cr.conns)-1] + + c.res = res + c.p = p + + return c +} + +func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow { + if len(cr.poolRows) == 0 { + cr.poolRows = make([]poolRow, 128) + } + + pr := &cr.poolRows[len(cr.poolRows)-1] + cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1] + + pr.c = c + pr.r = r + + return pr +} + +func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows { + if len(cr.poolRowss) == 0 { + cr.poolRowss = make([]poolRows, 128) + } + + pr := &cr.poolRowss[len(cr.poolRowss)-1] + cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1] + + pr.c = c + pr.r = r + + return pr +} + type Pool struct { p *puddle.Pool afterConnect func(context.Context, *pgx.Conn) error @@ -25,15 +73,6 @@ type Pool struct { maxConnLifetime time.Duration healthCheckPeriod time.Duration closeChan chan struct{} - - preallocatedConnsMux sync.Mutex - preallocatedConns []Conn - - preallocatedPoolRowsMux sync.Mutex - preallocatedPoolRows []poolRow - - preallocatedPoolRowssMux sync.Mutex - preallocatedPoolRowss []poolRows } // Config is the configuration struct for creating a pool. It is highly recommended to modify a Config returned by @@ -101,11 +140,18 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { } } - return conn, nil + cr := &connResource{ + conn: conn, + conns: make([]Conn, 64), + poolRows: make([]poolRow, 64), + poolRowss: make([]poolRows, 64), + } + + return cr, nil }, func(value interface{}) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - value.(*pgx.Conn).Close(ctx) + value.(*connResource).conn.Close(ctx) cancel() }, config.MaxConns, @@ -222,60 +268,6 @@ func (p *Pool) checkIdleConnsHealth() { } } -func (p *Pool) getConn(res *puddle.Resource) *Conn { - p.preallocatedConnsMux.Lock() - - if len(p.preallocatedConns) == 0 { - p.preallocatedConns = make([]Conn, 128) - } - - c := &p.preallocatedConns[len(p.preallocatedConns)-1] - p.preallocatedConns = p.preallocatedConns[0 : len(p.preallocatedConns)-1] - - p.preallocatedConnsMux.Unlock() - - c.res = res - c.p = p - - return c -} - -func (p *Pool) getPoolRow(c *Conn, r pgx.Row) *poolRow { - p.preallocatedPoolRowsMux.Lock() - - if len(p.preallocatedPoolRows) == 0 { - p.preallocatedPoolRows = make([]poolRow, 128) - } - - pr := &p.preallocatedPoolRows[len(p.preallocatedPoolRows)-1] - p.preallocatedPoolRows = p.preallocatedPoolRows[0 : len(p.preallocatedPoolRows)-1] - - p.preallocatedPoolRowsMux.Unlock() - - pr.c = c - pr.r = r - - return pr -} - -func (p *Pool) getPoolRows(c *Conn, r pgx.Rows) *poolRows { - p.preallocatedPoolRowssMux.Lock() - - if len(p.preallocatedPoolRowss) == 0 { - p.preallocatedPoolRowss = make([]poolRows, 128) - } - - pr := &p.preallocatedPoolRowss[len(p.preallocatedPoolRowss)-1] - p.preallocatedPoolRowss = p.preallocatedPoolRowss[0 : len(p.preallocatedPoolRowss)-1] - - p.preallocatedPoolRowssMux.Unlock() - - pr.c = c - pr.r = r - - return pr -} - func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { for { res, err := p.p.Acquire(ctx) @@ -283,8 +275,9 @@ func (p *Pool) Acquire(ctx context.Context) (*Conn, error) { return nil, err } - if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) { - return p.getConn(res), nil + cr := res.Value().(*connResource) + if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) { + return cr.getConn(p, res), nil } res.Destroy() @@ -297,8 +290,9 @@ func (p *Pool) AcquireAllIdle() []*Conn { resources := p.p.AcquireAllIdle() conns := make([]*Conn, 0, len(resources)) for _, res := range resources { - if p.beforeAcquire == nil || p.beforeAcquire(res.Value().(*pgx.Conn)) { - conns = append(conns, p.getConn(res)) + cr := res.Value().(*connResource) + if p.beforeAcquire == nil || p.beforeAcquire(cr.conn) { + conns = append(conns, cr.getConn(p, res)) } else { res.Destroy() } @@ -333,7 +327,7 @@ func (p *Pool) Query(ctx context.Context, sql string, args ...interface{}) (pgx. return errRows{err: err}, err } - return p.getPoolRows(c, rows), nil + return c.getPoolRows(rows), nil } func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { @@ -343,7 +337,7 @@ func (p *Pool) QueryRow(ctx context.Context, sql string, args ...interface{}) pg } row := c.QueryRow(ctx, sql, args...) - return p.getPoolRow(c, row) + return c.getPoolRow(row) } func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {