From a86f4f3db988a5b4c389a4f5b2869d774057c18f Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 7 Jul 2022 19:32:01 -0500 Subject: [PATCH] Add deallocate to pipeline mode --- pgconn/pgconn.go | 15 +++++++++++++++ pgconn/pgconn_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ pgproto3/frontend.go | 10 ++++++++++ 3 files changed, 66 insertions(+) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index a49e569a..7426add9 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -1677,6 +1677,9 @@ type Pipeline struct { // PipelineSync is returned by GetResults when a ReadyForQuery message is received. type PipelineSync struct{} +// CloseComplete is returned by GetResults when a CloseComplete message is received. +type CloseComplete struct{} + // StartPipeline switches the connection to pipeline mode and returns a *Pipeline. In pipeline mode requests can be sent // to the server without waiting for a response. Close must be called on the returned *Pipeline to return the connection // to normal mode. While in pipeline mode, no methods that communicate with the server may be called except @@ -1723,6 +1726,16 @@ func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) { p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) } +// SendDeallocate deallocates a prepared statement. +func (p *Pipeline) SendDeallocate(name string) { + if p.closed { + return + } + p.pendingSync = true + + p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) +} + // SendQueryParams is the pipeline version of *PgConn.QueryParams. func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) { if p.closed { @@ -1825,6 +1838,8 @@ func (p *Pipeline) GetResults() (results any, err error) { if _, ok := peekedMsg.(*pgproto3.ParameterDescription); ok { return p.getResultsPrepare() } + case *pgproto3.CloseComplete: + return &CloseComplete{}, nil case *pgproto3.ReadyForQuery: p.expectedReadyForQueryCount-- return &PipelineSync{}, nil diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 87837f8b..0fba9417 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -2190,6 +2190,47 @@ func TestPipelinePrepareError(t *testing.T) { ensureConnValid(t, pgConn) } +func TestPipelinePrepareAndDeallocate(t *testing.T) { + t.Parallel() + + pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_CONN_STRING")) + require.NoError(t, err) + defer closeConn(t, pgConn) + + pipeline := pgConn.StartPipeline(context.Background()) + pipeline.SendPrepare("selectInt", "select $1::bigint as a", nil) + pipeline.SendDeallocate("selectInt") + err = pipeline.Sync() + require.NoError(t, err) + + results, err := pipeline.GetResults() + require.NoError(t, err) + sd, ok := results.(*pgconn.StatementDescription) + require.Truef(t, ok, "expected StatementDescription, got: %#v", results) + require.Len(t, sd.Fields, 1) + require.Equal(t, string(sd.Fields[0].Name), "a") + require.Equal(t, []uint32{pgtype.Int8OID}, sd.ParamOIDs) + + results, err = pipeline.GetResults() + require.NoError(t, err) + _, ok = results.(*pgconn.CloseComplete) + require.Truef(t, ok, "expected CloseComplete, got: %#v", results) + + results, err = pipeline.GetResults() + require.NoError(t, err) + _, ok = results.(*pgconn.PipelineSync) + require.Truef(t, ok, "expected PipelineSync, got: %#v", results) + + results, err = pipeline.GetResults() + require.NoError(t, err) + require.Nil(t, results) + + err = pipeline.Close() + require.NoError(t, err) + + ensureConnValid(t, pgConn) +} + func TestPipelineQuery(t *testing.T) { t.Parallel() diff --git a/pgproto3/frontend.go b/pgproto3/frontend.go index 321d0bf9..eed8dc4f 100644 --- a/pgproto3/frontend.go +++ b/pgproto3/frontend.go @@ -136,6 +136,16 @@ func (f *Frontend) SendParse(msg *Parse) { } } +// SendClose sends a Close message to the backend (i.e. the server). The message is not guaranteed to be written until +// Flush is called. +func (f *Frontend) SendClose(msg *Close) { + prevLen := len(f.wbuf) + f.wbuf = msg.Encode(f.wbuf) + if f.tracer != nil { + f.tracer.traceClose('F', int32(len(f.wbuf)-prevLen), msg) + } +} + // SendDescribe sends a Describe message to the backend (i.e. the server). The message is not guaranteed to be written until // Flush is called. func (f *Frontend) SendDescribe(msg *Describe) {