mirror of https://github.com/jackc/pgx.git
Remove some obsolete context handling
parent
54c6ddc2f0
commit
acd15cf589
5
batch.go
5
batch.go
|
@ -84,11 +84,6 @@ func (b *Batch) Send(ctx context.Context) error {
|
||||||
|
|
||||||
b.ctx = ctx
|
b.ctx = ctx
|
||||||
|
|
||||||
err := b.conn.waitForPreviousCancelQuery(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
batch := &pgconn.Batch{}
|
batch := &pgconn.Batch{}
|
||||||
|
|
||||||
for _, bi := range b.items {
|
for _, bi := range b.items {
|
||||||
|
|
103
conn.go
103
conn.go
|
@ -76,8 +76,7 @@ type Conn struct {
|
||||||
status byte // One of connStatus* constants
|
status byte // One of connStatus* constants
|
||||||
causeOfDeath error
|
causeOfDeath error
|
||||||
|
|
||||||
cancelQueryCompleted chan struct{}
|
lastStmtSent bool
|
||||||
lastStmtSent bool
|
|
||||||
|
|
||||||
// context support
|
// context support
|
||||||
ctxInProgress bool
|
ctxInProgress bool
|
||||||
|
@ -190,8 +189,6 @@ func connect(ctx context.Context, config *ConnConfig, connInfo *pgtype.ConnInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.preparedStatements = make(map[string]*PreparedStatement)
|
c.preparedStatements = make(map[string]*PreparedStatement)
|
||||||
c.cancelQueryCompleted = make(chan struct{})
|
|
||||||
close(c.cancelQueryCompleted)
|
|
||||||
c.doneChan = make(chan struct{})
|
c.doneChan = make(chan struct{})
|
||||||
c.closedChan = make(chan error)
|
c.closedChan = make(chan error)
|
||||||
c.wbuf = make([]byte, 0, 1024)
|
c.wbuf = make([]byte, 0, 1024)
|
||||||
|
@ -487,11 +484,6 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) {
|
||||||
// name and sql arguments. This allows a code path to PrepareEx and Query/Exec without
|
// name and sql arguments. This allows a code path to PrepareEx and Query/Exec without
|
||||||
// concern for if the statement has already been prepared.
|
// concern for if the statement has already been prepared.
|
||||||
func (c *Conn) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExOptions) (ps *PreparedStatement, err error) {
|
func (c *Conn) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExOptions) (ps *PreparedStatement, err error) {
|
||||||
err = c.waitForPreviousCancelQuery(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if name != "" {
|
if name != "" {
|
||||||
if ps, ok := c.preparedStatements[name]; ok && ps.SQL == sql {
|
if ps, ok := c.preparedStatements[name]; ok && ps.SQL == sql {
|
||||||
return ps, nil
|
return ps, nil
|
||||||
|
@ -552,21 +544,7 @@ func (c *Conn) Deallocate(name string) error {
|
||||||
|
|
||||||
// TODO - consider making this public
|
// TODO - consider making this public
|
||||||
func (c *Conn) deallocateContext(ctx context.Context, name string) (err error) {
|
func (c *Conn) deallocateContext(ctx context.Context, name string) (err error) {
|
||||||
err = c.waitForPreviousCancelQuery(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = c.initContext(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
err = c.termContext(err)
|
|
||||||
}()
|
|
||||||
|
|
||||||
delete(c.preparedStatements, name)
|
delete(c.preparedStatements, name)
|
||||||
|
|
||||||
_, err = c.pgConn.Exec(ctx, "deallocate "+quoteIdentifier(name)).ReadAll()
|
_, err = c.pgConn.Exec(ctx, "deallocate "+quoteIdentifier(name)).ReadAll()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -740,81 +718,6 @@ func (c *Conn) Ping(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) initContext(ctx context.Context) error {
|
|
||||||
if c.ctxInProgress {
|
|
||||||
return errors.New("ctx already in progress")
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctx.Done() == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
c.ctxInProgress = true
|
|
||||||
|
|
||||||
go c.contextHandler(ctx)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) termContext(opErr error) error {
|
|
||||||
if !c.ctxInProgress {
|
|
||||||
return opErr
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err = <-c.closedChan:
|
|
||||||
if opErr == nil {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
case c.doneChan <- struct{}{}:
|
|
||||||
err = opErr
|
|
||||||
}
|
|
||||||
|
|
||||||
c.ctxInProgress = false
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) contextHandler(ctx context.Context) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
c.closedChan <- ctx.Err()
|
|
||||||
case <-c.doneChan:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitUntilReady will return when the connection is ready for another query
|
|
||||||
func (c *Conn) WaitUntilReady(ctx context.Context) error {
|
|
||||||
err := c.waitForPreviousCancelQuery(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) waitForPreviousCancelQuery(ctx context.Context) error {
|
|
||||||
c.mux.Lock()
|
|
||||||
completeCh := c.cancelQueryCompleted
|
|
||||||
c.mux.Unlock()
|
|
||||||
select {
|
|
||||||
case <-completeCh:
|
|
||||||
if err := c.pgConn.Conn().SetDeadline(time.Time{}); err != nil {
|
|
||||||
c.Close() // Close connection if unable to disable deadline
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func connInfoFromRows(rows *Rows, err error) (map[string]pgtype.OID, error) {
|
func connInfoFromRows(rows *Rows, err error) (map[string]pgtype.OID, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -854,10 +757,6 @@ func (c *Conn) LastStmtSent() bool {
|
||||||
// positionally from the sql string as $1, $2, etc.
|
// positionally from the sql string as $1, $2, etc.
|
||||||
func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
func (c *Conn) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
|
||||||
c.lastStmtSent = false
|
c.lastStmtSent = false
|
||||||
err := c.waitForPreviousCancelQuery(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.lock(); err != nil {
|
if err := c.lock(); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
|
6
query.go
6
query.go
|
@ -356,12 +356,6 @@ func (c *Conn) Query(ctx context.Context, sql string, optionsAndArgs ...interfac
|
||||||
args: args,
|
args: args,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.waitForPreviousCancelQuery(ctx)
|
|
||||||
if err != nil {
|
|
||||||
rows.fatal(err)
|
|
||||||
return rows, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.lock(); err != nil {
|
if err := c.lock(); err != nil {
|
||||||
rows.fatal(err)
|
rows.fatal(err)
|
||||||
return rows, err
|
return rows, err
|
||||||
|
|
Loading…
Reference in New Issue