Add deallocate to pipeline mode

pull/1281/head
Jack Christensen 2022-07-07 19:32:01 -05:00
parent 1168b375e4
commit a86f4f3db9
3 changed files with 66 additions and 0 deletions

View File

@ -1677,6 +1677,9 @@ type Pipeline struct {
// PipelineSync is returned by GetResults when a ReadyForQuery message is received.
type PipelineSync struct{}
// CloseComplete is returned by GetResults when a CloseComplete message is received.
type CloseComplete struct{}
// StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent
// to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection
// to normal mode. While in pipeline mode, no methods that communicate with the server may be called except
@ -1723,6 +1726,16 @@ func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) {
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name})
}
// SendDeallocate deallocates a prepared statement.
func (p *Pipeline) SendDeallocate(name string) {
if p.closed {
return
}
p.pendingSync = true
p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name})
}
// SendQueryParams is the pipeline version of *PgConn.QueryParams.
func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) {
if p.closed {
@ -1825,6 +1838,8 @@ func (p *Pipeline) GetResults() (results any, err error) {
if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok {
return p.getResultsPrepare()
}
case *pgproto3.CloseComplete:
return &CloseComplete{}, nil
case *pgproto3.ReadyForQuery:
p.expectedReadyForQueryCount--
return &PipelineSync{}, nil

View File

@ -2190,6 +2190,47 @@ func TestPipelinePrepareError(t *testing.T) {
ensureConnValid(t, pgConn)
}
func TestPipelinePrepareAndDeallocate(t *testing.T) {
t.Parallel()
pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING"))
require.NoError(t, err)
defer closeConn(t, pgConn)
pipeline := pgConn.StartPipeline(context.Background())
pipeline.SendPrepare("selectInt", "select $1::bigint as a", nil)
pipeline.SendDeallocate("selectInt")
err = pipeline.Sync()
require.NoError(t, err)
results, err := pipeline.GetResults()
require.NoError(t, err)
sd, ok := results.(*pgconn.StatementDescription)
require.Truef(t, ok, "expected StatementDescription, got: %#v", results)
require.Len(t, sd.Fields, 1)
require.Equal(t, string(sd.Fields[0].Name), "a")
require.Equal(t, []uint32{pgtype.Int8OID}, sd.ParamOIDs)
results, err = pipeline.GetResults()
require.NoError(t, err)
_, ok = results.(*pgconn.CloseComplete)
require.Truef(t, ok, "expected CloseComplete, got: %#v", results)
results, err = pipeline.GetResults()
require.NoError(t, err)
_, ok = results.(*pgconn.PipelineSync)
require.Truef(t, ok, "expected PipelineSync, got: %#v", results)
results, err = pipeline.GetResults()
require.NoError(t, err)
require.Nil(t, results)
err = pipeline.Close()
require.NoError(t, err)
ensureConnValid(t, pgConn)
}
func TestPipelineQuery(t *testing.T) {
t.Parallel()

View File

@ -136,6 +136,16 @@ func (f *Frontend) SendParse(msg *Parse) {
}
}
// SendClose sends a Close message to the backend (i.e. the server). The message is not guaranteed to be written until
// Flush is called.
func (f *Frontend) SendClose(msg *Close) {
prevLen := len(f.wbuf)
f.wbuf = msg.Encode(f.wbuf)
if f.tracer != nil {
f.tracer.traceClose('F', int32(len(f.wbuf)-prevLen), msg)
}
}
// SendDescribe sends a Describe message to the backend (i.e. the server). The message is not guaranteed to be written until
// Flush is called.
func (f *Frontend) SendDescribe(msg *Describe) {