mirror of https://github.com/jackc/pgx.git
parent
782133158f
commit
f015ced1bf
2
go.mod
2
go.mod
|
@ -5,7 +5,7 @@ go 1.18
|
|||
require (
|
||||
github.com/jackc/pgpassfile v1.0.0
|
||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b
|
||||
github.com/jackc/puddle/v2 v2.0.0-beta.1
|
||||
github.com/jackc/puddle/v2 v2.0.0-beta.2
|
||||
github.com/stretchr/testify v1.8.0
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
||||
golang.org/x/text v0.3.7
|
||||
|
|
2
go.sum
2
go.sum
|
@ -7,6 +7,8 @@ github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHF
|
|||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
|
||||
github.com/jackc/puddle/v2 v2.0.0-beta.1 h1:Y4Ao+kFWANtDhWUkdw1JcbH+x84/aq6WUfhVQ1wdib8=
|
||||
github.com/jackc/puddle/v2 v2.0.0-beta.1/go.mod h1:itE7ZJY8xnoo0JqJEpSMprN0f+NQkMCuEV/N9j8h0oc=
|
||||
github.com/jackc/puddle/v2 v2.0.0-beta.2 h1:xhhtVfiDyh29TTvZPIvY5zld5YYMmA9ErRr+fjMkmE0=
|
||||
github.com/jackc/puddle/v2 v2.0.0-beta.2/go.mod h1:itE7ZJY8xnoo0JqJEpSMprN0f+NQkMCuEV/N9j8h0oc=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
|
|
101
pgxpool/pool.go
101
pgxpool/pool.go
|
@ -70,16 +70,6 @@ func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
|
|||
return pr
|
||||
}
|
||||
|
||||
// detachedCtx wraps a context and will never be canceled, regardless of if
|
||||
// the wrapped one is cancelled. The Err() method will never return any errors.
|
||||
type detachedCtx struct {
|
||||
context.Context
|
||||
}
|
||||
|
||||
func (detachedCtx) Done() <-chan struct{} { return nil }
|
||||
func (detachedCtx) Deadline() (time.Time, bool) { return time.Time{}, false }
|
||||
func (detachedCtx) Err() error { return nil }
|
||||
|
||||
// Pool allows for connection reuse.
|
||||
type Pool struct {
|
||||
p *puddle.Pool[*connResource]
|
||||
|
@ -197,64 +187,61 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
|
|||
closeChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
p.p = puddle.NewPool(
|
||||
func(ctx context.Context) (*connResource, error) {
|
||||
// we ignore cancellation on the original context because its either from
|
||||
// the health check or its from a query and we don't want to cancel creating
|
||||
// a connection just because the original query was cancelled since that
|
||||
// could end up stampeding the server
|
||||
// this will keep any Values in the original context and will just ignore
|
||||
// cancellation
|
||||
// see https://github.com/jackc/pgx/issues/1259
|
||||
ctx = detachedCtx{ctx}
|
||||
var err error
|
||||
p.p, err = puddle.NewPool(
|
||||
&puddle.Config[*connResource]{
|
||||
Constructor: func(ctx context.Context) (*connResource, error) {
|
||||
connConfig := p.config.ConnConfig.Copy()
|
||||
|
||||
connConfig := p.config.ConnConfig.Copy()
|
||||
|
||||
// But we do want to ensure that a connect won't hang forever.
|
||||
if connConfig.ConnectTimeout <= 0 {
|
||||
connConfig.ConnectTimeout = 2 * time.Minute
|
||||
}
|
||||
|
||||
if p.beforeConnect != nil {
|
||||
if err := p.beforeConnect(ctx, connConfig); err != nil {
|
||||
return nil, err
|
||||
// Connection will continue in background even if Acquire is canceled. Ensure that a connect won't hang forever.
|
||||
if connConfig.ConnectTimeout <= 0 {
|
||||
connConfig.ConnectTimeout = 2 * time.Minute
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := pgx.ConnectConfig(ctx, connConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p.beforeConnect != nil {
|
||||
if err := p.beforeConnect(ctx, connConfig); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if p.afterConnect != nil {
|
||||
err = p.afterConnect(ctx, conn)
|
||||
conn, err := pgx.ConnectConfig(ctx, connConfig)
|
||||
if err != nil {
|
||||
conn.Close(ctx)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cr := &connResource{
|
||||
conn: conn,
|
||||
conns: make([]Conn, 64),
|
||||
poolRows: make([]poolRow, 64),
|
||||
poolRowss: make([]poolRows, 64),
|
||||
}
|
||||
if p.afterConnect != nil {
|
||||
err = p.afterConnect(ctx, conn)
|
||||
if err != nil {
|
||||
conn.Close(ctx)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return cr, nil
|
||||
cr := &connResource{
|
||||
conn: conn,
|
||||
conns: make([]Conn, 64),
|
||||
poolRows: make([]poolRow, 64),
|
||||
poolRowss: make([]poolRows, 64),
|
||||
}
|
||||
|
||||
return cr, nil
|
||||
},
|
||||
Destructor: func(value *connResource) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
conn := value.conn
|
||||
conn.Close(ctx)
|
||||
select {
|
||||
case <-conn.PgConn().CleanupDone():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
cancel()
|
||||
},
|
||||
MaxSize: config.MaxConns,
|
||||
},
|
||||
func(value *connResource) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
conn := value.conn
|
||||
conn.Close(ctx)
|
||||
select {
|
||||
case <-conn.PgConn().CleanupDone():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
cancel()
|
||||
},
|
||||
config.MaxConns,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
p.createIdleResources(ctx, int(p.minConns))
|
||||
|
|
Loading…
Reference in New Issue