From ae65a8007b3078346959e4b80d1f37d3730b92df Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 20 Aug 2022 09:54:30 -0500 Subject: [PATCH] Use higher pgconn.FieldDescription with string Name Instead of using pgproto3.FieldDescription through pgconn and pgx. This lets the lowest level pgproto3 still be as memory efficient as possible. https://github.com/jackc/pgx/pull/1281 --- pgconn/pgconn.go | 61 ++++++++++++++++++++++++++++++++----------- pgconn/pgconn_test.go | 10 +++---- pgxpool/rows.go | 21 +++++++-------- query_test.go | 2 +- rows.go | 9 +++---- 5 files changed, 66 insertions(+), 37 deletions(-) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 546a4bd0..e29283f4 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -83,6 +83,7 @@ type PgConn struct { multiResultReader MultiResultReader pipeline Pipeline contextWatcher *ctxwatch.ContextWatcher + fieldDescriptions [16]FieldDescription cleanupDone chan struct{} } @@ -526,9 +527,10 @@ func (pgConn *PgConn) PID() uint32 { // TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message. // // Possible return values: -// 'I' - idle / not in transaction -// 'T' - in a transaction -// 'E' - in a failed transaction +// +// 'I' - idle / not in transaction +// 'T' - in a transaction +// 'E' - in a failed transaction // // See https://www.postgresql.org/docs/current/protocol-message-formats.html. func (pgConn *PgConn) TxStatus() byte { @@ -714,11 +716,41 @@ func (ct CommandTag) Select() bool { return strings.HasPrefix(ct.s, "SELECT") } +type FieldDescription struct { + Name string + TableOID uint32 + TableAttributeNumber uint16 + DataTypeOID uint32 + DataTypeSize int16 + TypeModifier int32 + Format int16 +} + +func (pgConn *PgConn) convertRowDescription(dst []FieldDescription, rd *pgproto3.RowDescription) []FieldDescription { + if cap(dst) >= len(rd.Fields) { + dst = dst[:len(rd.Fields):len(rd.Fields)] + } else { + dst = make([]FieldDescription, len(rd.Fields)) + } + + for i := range rd.Fields { + dst[i].Name = string(rd.Fields[i].Name) + dst[i].TableOID = rd.Fields[i].TableOID + dst[i].TableAttributeNumber = rd.Fields[i].TableAttributeNumber + dst[i].DataTypeOID = rd.Fields[i].DataTypeOID + dst[i].DataTypeSize = rd.Fields[i].DataTypeSize + dst[i].TypeModifier = rd.Fields[i].TypeModifier + dst[i].Format = rd.Fields[i].Format + } + + return dst +} + type StatementDescription struct { Name string SQL string ParamOIDs []uint32 - Fields []pgproto3.FieldDescription + Fields []FieldDescription } // Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This @@ -765,8 +797,7 @@ readloop: psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) copy(psd.ParamOIDs, msg.ParameterOIDs) case *pgproto3.RowDescription: - psd.Fields = make([]pgproto3.FieldDescription, len(msg.Fields)) - copy(psd.Fields, msg.Fields) + psd.Fields = pgConn.convertRowDescription(nil, msg) case *pgproto3.ErrorResponse: parseErr = ErrorResponseToPgError(msg) case *pgproto3.ReadyForQuery: @@ -1281,8 +1312,9 @@ func (mrr *MultiResultReader) NextResult() bool { pgConn: mrr.pgConn, multiResultReader: mrr, ctx: mrr.ctx, - fieldDescriptions: msg.Fields, + fieldDescriptions: mrr.pgConn.convertRowDescription(mrr.pgConn.fieldDescriptions[:], msg), } + mrr.rr = &mrr.pgConn.resultReader return true case *pgproto3.CommandComplete: @@ -1325,7 +1357,7 @@ type ResultReader struct { pipeline *Pipeline ctx context.Context - fieldDescriptions []pgproto3.FieldDescription + fieldDescriptions []FieldDescription rowValues [][]byte commandTag CommandTag commandConcluded bool @@ -1335,7 +1367,7 @@ type ResultReader struct { // Result is the saved query response that is returned by calling Read on a ResultReader. type Result struct { - FieldDescriptions []pgproto3.FieldDescription + FieldDescriptions []FieldDescription Rows [][][]byte CommandTag CommandTag Err error @@ -1347,7 +1379,7 @@ func (rr *ResultReader) Read() *Result { for rr.NextRow() { if br.FieldDescriptions == nil { - br.FieldDescriptions = make([]pgproto3.FieldDescription, len(rr.FieldDescriptions())) + br.FieldDescriptions = make([]FieldDescription, len(rr.FieldDescriptions())) copy(br.FieldDescriptions, rr.FieldDescriptions()) } @@ -1385,7 +1417,7 @@ func (rr *ResultReader) NextRow() bool { // FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until // the ResultReader is closed. -func (rr *ResultReader) FieldDescriptions() []pgproto3.FieldDescription { +func (rr *ResultReader) FieldDescriptions() []FieldDescription { return rr.fieldDescriptions } @@ -1473,7 +1505,7 @@ func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error switch msg := msg.(type) { case *pgproto3.RowDescription: - rr.fieldDescriptions = msg.Fields + rr.fieldDescriptions = rr.pgConn.convertRowDescription(rr.pgConn.fieldDescriptions[:], msg) case *pgproto3.CommandComplete: rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil) case *pgproto3.EmptyQueryResponse: @@ -1825,7 +1857,7 @@ func (p *Pipeline) GetResults() (results any, err error) { pgConn: p.conn, pipeline: p, ctx: p.ctx, - fieldDescriptions: msg.Fields, + fieldDescriptions: p.conn.convertRowDescription(p.conn.fieldDescriptions[:], msg), } return &p.conn.resultReader, nil case *pgproto3.CommandComplete: @@ -1872,8 +1904,7 @@ func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) { psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs)) copy(psd.ParamOIDs, msg.ParameterOIDs) case *pgproto3.RowDescription: - psd.Fields = make([]pgproto3.FieldDescription, len(msg.Fields)) - copy(psd.Fields, msg.Fields) + psd.Fields = p.conn.convertRowDescription(nil, msg) return psd, nil // NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 153072c2..601cbc8e 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -642,13 +642,13 @@ func TestConnExecMultipleQueriesEagerFieldDescriptions(t *testing.T) { require.True(t, mrr.NextResult()) require.Len(t, mrr.ResultReader().FieldDescriptions(), 1) - assert.Equal(t, []byte("msg"), mrr.ResultReader().FieldDescriptions()[0].Name) + assert.Equal(t, "msg", mrr.ResultReader().FieldDescriptions()[0].Name) _, err = mrr.ResultReader().Close() require.NoError(t, err) require.True(t, mrr.NextResult()) require.Len(t, mrr.ResultReader().FieldDescriptions(), 1) - assert.Equal(t, []byte("num"), mrr.ResultReader().FieldDescriptions()[0].Name) + assert.Equal(t, "num", mrr.ResultReader().FieldDescriptions()[0].Name) _, err = mrr.ResultReader().Close() require.NoError(t, err) @@ -772,7 +772,7 @@ func TestConnExecParams(t *testing.T) { result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil) require.Len(t, result.FieldDescriptions(), 1) - assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name) + assert.Equal(t, "msg", result.FieldDescriptions()[0].Name) rowCount := 0 for result.NextRow() { @@ -937,7 +937,7 @@ func TestResultReaderValuesHaveSameCapacityAsLength(t *testing.T) { result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil) require.Len(t, result.FieldDescriptions(), 1) - assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name) + assert.Equal(t, "msg", result.FieldDescriptions()[0].Name) rowCount := 0 for result.NextRow() { @@ -968,7 +968,7 @@ func TestConnExecPrepared(t *testing.T) { result := pgConn.ExecPrepared(context.Background(), "ps1", [][]byte{[]byte("Hello, world")}, nil, nil) require.Len(t, result.FieldDescriptions(), 1) - assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name) + assert.Equal(t, "msg", result.FieldDescriptions()[0].Name) rowCount := 0 for result.NextRow() { diff --git a/pgxpool/rows.go b/pgxpool/rows.go index 0c0a7382..2b11ecd3 100644 --- a/pgxpool/rows.go +++ b/pgxpool/rows.go @@ -3,22 +3,21 @@ package pgxpool import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgproto3" ) type errRows struct { err error } -func (errRows) Close() {} -func (e errRows) Err() error { return e.err } -func (errRows) CommandTag() pgconn.CommandTag { return pgconn.CommandTag{} } -func (errRows) FieldDescriptions() []pgproto3.FieldDescription { return nil } -func (errRows) Next() bool { return false } -func (e errRows) Scan(dest ...any) error { return e.err } -func (e errRows) Values() ([]any, error) { return nil, e.err } -func (e errRows) RawValues() [][]byte { return nil } -func (e errRows) Conn() *pgx.Conn { return nil } +func (errRows) Close() {} +func (e errRows) Err() error { return e.err } +func (errRows) CommandTag() pgconn.CommandTag { return pgconn.CommandTag{} } +func (errRows) FieldDescriptions() []pgconn.FieldDescription { return nil } +func (errRows) Next() bool { return false } +func (e errRows) Scan(dest ...any) error { return e.err } +func (e errRows) Values() ([]any, error) { return nil, e.err } +func (e errRows) RawValues() [][]byte { return nil } +func (e errRows) Conn() *pgx.Conn { return nil } type errRow struct { err error @@ -51,7 +50,7 @@ func (rows *poolRows) CommandTag() pgconn.CommandTag { return rows.r.CommandTag() } -func (rows *poolRows) FieldDescriptions() []pgproto3.FieldDescription { +func (rows *poolRows) FieldDescriptions() []pgconn.FieldDescription { return rows.r.FieldDescriptions() } diff --git a/query_test.go b/query_test.go index 317e4f60..720a1911 100644 --- a/query_test.go +++ b/query_test.go @@ -66,7 +66,7 @@ func TestConnQueryRowsFieldDescriptionsBeforeNext(t *testing.T) { defer rows.Close() require.Len(t, rows.FieldDescriptions(), 1) - assert.Equal(t, []byte("msg"), rows.FieldDescriptions()[0].Name) + assert.Equal(t, "msg", rows.FieldDescriptions()[0].Name) } func TestConnQueryWithoutResultSetCommandTag(t *testing.T) { diff --git a/rows.go b/rows.go index c4fd283c..80df4bb2 100644 --- a/rows.go +++ b/rows.go @@ -9,7 +9,6 @@ import ( "github.com/jackc/pgx/v5/internal/stmtcache" "github.com/jackc/pgx/v5/pgconn" - "github.com/jackc/pgx/v5/pgproto3" "github.com/jackc/pgx/v5/pgtype" ) @@ -34,7 +33,7 @@ type Rows interface { // CommandTag returns the command tag from this query. It is only available after Rows is closed. CommandTag() pgconn.CommandTag - FieldDescriptions() []pgproto3.FieldDescription + FieldDescriptions() []pgconn.FieldDescription // Next prepares the next row for reading. It returns true if there is another // row and false if no more rows are available. It automatically closes rows @@ -135,7 +134,7 @@ type baseRows struct { rowCount int } -func (rows *baseRows) FieldDescriptions() []pgproto3.FieldDescription { +func (rows *baseRows) FieldDescriptions() []pgconn.FieldDescription { return rows.resultReader.FieldDescriptions() } @@ -337,7 +336,7 @@ func (e ScanArgError) Unwrap() error { // fieldDescriptions - OID and format of values // values - the raw data as returned from the PostgreSQL server // dest - the destination that values will be decoded into -func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgproto3.FieldDescription, values [][]byte, dest ...any) error { +func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgconn.FieldDescription, values [][]byte, dest ...any) error { if len(fieldDescriptions) != len(values) { return fmt.Errorf("number of field descriptions must equal number of values, got %d and %d", len(fieldDescriptions), len(values)) } @@ -395,7 +394,7 @@ func ForEachRow(rows Rows, scans []any, fn func() error) (pgconn.CommandTag, err // CollectableRow is the subset of Rows methods that a RowToFunc is allowed to call. type CollectableRow interface { - FieldDescriptions() []pgproto3.FieldDescription + FieldDescriptions() []pgconn.FieldDescription Scan(dest ...any) error Values() ([]any, error) RawValues() [][]byte