mirror of https://github.com/jackc/pgx.git
fix: Do not use infinite timers
parent
dc94db6b3d
commit
2bf5a61401
|
@ -263,7 +263,8 @@ func expandWithIPs(ctx context.Context, lookupFn LookupFunc, fallbacks []*Fallba
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig,
|
func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig,
|
||||||
ignoreNotPreferredErr bool) (*PgConn, error) {
|
ignoreNotPreferredErr bool,
|
||||||
|
) (*PgConn, error) {
|
||||||
pgConn := new(PgConn)
|
pgConn := new(PgConn)
|
||||||
pgConn.config = config
|
pgConn.config = config
|
||||||
pgConn.cleanupDone = make(chan struct{})
|
pgConn.cleanupDone = make(chan struct{})
|
||||||
|
@ -298,6 +299,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig
|
||||||
pgConn.status = connStatusConnecting
|
pgConn.status = connStatusConnecting
|
||||||
pgConn.bgReader = bgreader.New(pgConn.conn)
|
pgConn.bgReader = bgreader.New(pgConn.conn)
|
||||||
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
|
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
|
||||||
|
pgConn.slowWriteTimer.Stop()
|
||||||
pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
|
pgConn.frontend = config.BuildFrontend(pgConn.bgReader, pgConn.conn)
|
||||||
|
|
||||||
startupMsg := pgproto3.StartupMessage{
|
startupMsg := pgproto3.StartupMessage{
|
||||||
|
@ -476,7 +478,8 @@ func (pgConn *PgConn) ReceiveMessage(ctx context.Context) (pgproto3.BackendMessa
|
||||||
err = &pgconnError{
|
err = &pgconnError{
|
||||||
msg: "receive message failed",
|
msg: "receive message failed",
|
||||||
err: normalizeTimeoutError(ctx, err),
|
err: normalizeTimeoutError(ctx, err),
|
||||||
safeToRetry: true}
|
safeToRetry: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return msg, err
|
return msg, err
|
||||||
}
|
}
|
||||||
|
@ -1336,7 +1339,6 @@ func (mrr *MultiResultReader) ReadAll() ([]*Result, error) {
|
||||||
|
|
||||||
func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) {
|
func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) {
|
||||||
msg, err := mrr.pgConn.receiveMessage()
|
msg, err := mrr.pgConn.receiveMessage()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mrr.pgConn.contextWatcher.Unwatch()
|
mrr.pgConn.contextWatcher.Unwatch()
|
||||||
mrr.err = normalizeTimeoutError(mrr.ctx, err)
|
mrr.err = normalizeTimeoutError(mrr.ctx, err)
|
||||||
|
@ -1647,8 +1649,8 @@ func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultR
|
||||||
batch.buf = (&pgproto3.Sync{}).Encode(batch.buf)
|
batch.buf = (&pgproto3.Sync{}).Encode(batch.buf)
|
||||||
|
|
||||||
pgConn.enterPotentialWriteReadDeadlock()
|
pgConn.enterPotentialWriteReadDeadlock()
|
||||||
|
defer pgConn.exitPotentialWriteReadDeadlock()
|
||||||
_, err := pgConn.conn.Write(batch.buf)
|
_, err := pgConn.conn.Write(batch.buf)
|
||||||
pgConn.exitPotentialWriteReadDeadlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
multiResult.closed = true
|
multiResult.closed = true
|
||||||
multiResult.err = err
|
multiResult.err = err
|
||||||
|
@ -1719,20 +1721,22 @@ func (pgConn *PgConn) enterPotentialWriteReadDeadlock() {
|
||||||
//
|
//
|
||||||
// In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is
|
// In addition, on Windows the default timer resolution is 15.6ms. So setting the timer to less than that is
|
||||||
// ineffective.
|
// ineffective.
|
||||||
pgConn.slowWriteTimer.Reset(15 * time.Millisecond)
|
if pgConn.slowWriteTimer.Reset(15 * time.Millisecond) {
|
||||||
|
panic("BUG: slow write timer already active")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
|
// exitPotentialWriteReadDeadlock must be called after a call to enterPotentialWriteReadDeadlock.
|
||||||
func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
|
func (pgConn *PgConn) exitPotentialWriteReadDeadlock() {
|
||||||
if !pgConn.slowWriteTimer.Reset(time.Duration(math.MaxInt64)) {
|
// The state of the timer is not relevant upon exiting the potential slow write. It may both
|
||||||
pgConn.slowWriteTimer.Stop()
|
// fire (due to a slow write), or not fire (due to a fast write).
|
||||||
}
|
_ = pgConn.slowWriteTimer.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
|
func (pgConn *PgConn) flushWithPotentialWriteReadDeadlock() error {
|
||||||
pgConn.enterPotentialWriteReadDeadlock()
|
pgConn.enterPotentialWriteReadDeadlock()
|
||||||
|
defer pgConn.exitPotentialWriteReadDeadlock()
|
||||||
err := pgConn.frontend.Flush()
|
err := pgConn.frontend.Flush()
|
||||||
pgConn.exitPotentialWriteReadDeadlock()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1796,6 +1800,7 @@ func Construct(hc *HijackedConn) (*PgConn, error) {
|
||||||
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
|
pgConn.contextWatcher = newContextWatcher(pgConn.conn)
|
||||||
pgConn.bgReader = bgreader.New(pgConn.conn)
|
pgConn.bgReader = bgreader.New(pgConn.conn)
|
||||||
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
|
pgConn.slowWriteTimer = time.AfterFunc(time.Duration(math.MaxInt64), pgConn.bgReader.Start)
|
||||||
|
pgConn.slowWriteTimer.Stop()
|
||||||
|
|
||||||
return pgConn, nil
|
return pgConn, nil
|
||||||
}
|
}
|
||||||
|
@ -1997,7 +2002,6 @@ func (p *Pipeline) GetResults() (results any, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {
|
func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {
|
||||||
|
|
Loading…
Reference in New Issue