mirror of https://github.com/jackc/pgx.git
Use higher pgconn.FieldDescription with string Name
Instead of using pgproto3.FieldDescription through pgconn and pgx. This lets the lowest level pgproto3 still be as memory efficient as possible. https://github.com/jackc/pgx/pull/1281v5-dev
parent
dbee461dc9
commit
ae65a8007b
|
@ -83,6 +83,7 @@ type PgConn struct {
|
||||||
multiResultReader MultiResultReader
|
multiResultReader MultiResultReader
|
||||||
pipeline Pipeline
|
pipeline Pipeline
|
||||||
contextWatcher *ctxwatch.ContextWatcher
|
contextWatcher *ctxwatch.ContextWatcher
|
||||||
|
fieldDescriptions [16]FieldDescription
|
||||||
|
|
||||||
cleanupDone chan struct{}
|
cleanupDone chan struct{}
|
||||||
}
|
}
|
||||||
|
@ -526,9 +527,10 @@ func (pgConn *PgConn) PID() uint32 {
|
||||||
// TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message.
|
// TxStatus returns the current TxStatus as reported by the server in the ReadyForQuery message.
|
||||||
//
|
//
|
||||||
// Possible return values:
|
// Possible return values:
|
||||||
// 'I' - idle / not in transaction
|
//
|
||||||
// 'T' - in a transaction
|
// 'I' - idle / not in transaction
|
||||||
// 'E' - in a failed transaction
|
// 'T' - in a transaction
|
||||||
|
// 'E' - in a failed transaction
|
||||||
//
|
//
|
||||||
// See https://www.postgresql.org/docs/current/protocol-message-formats.html.
|
// See https://www.postgresql.org/docs/current/protocol-message-formats.html.
|
||||||
func (pgConn *PgConn) TxStatus() byte {
|
func (pgConn *PgConn) TxStatus() byte {
|
||||||
|
@ -714,11 +716,41 @@ func (ct CommandTag) Select() bool {
|
||||||
return strings.HasPrefix(ct.s, "SELECT")
|
return strings.HasPrefix(ct.s, "SELECT")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FieldDescription struct {
|
||||||
|
Name string
|
||||||
|
TableOID uint32
|
||||||
|
TableAttributeNumber uint16
|
||||||
|
DataTypeOID uint32
|
||||||
|
DataTypeSize int16
|
||||||
|
TypeModifier int32
|
||||||
|
Format int16
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pgConn *PgConn) convertRowDescription(dst []FieldDescription, rd *pgproto3.RowDescription) []FieldDescription {
|
||||||
|
if cap(dst) >= len(rd.Fields) {
|
||||||
|
dst = dst[:len(rd.Fields):len(rd.Fields)]
|
||||||
|
} else {
|
||||||
|
dst = make([]FieldDescription, len(rd.Fields))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range rd.Fields {
|
||||||
|
dst[i].Name = string(rd.Fields[i].Name)
|
||||||
|
dst[i].TableOID = rd.Fields[i].TableOID
|
||||||
|
dst[i].TableAttributeNumber = rd.Fields[i].TableAttributeNumber
|
||||||
|
dst[i].DataTypeOID = rd.Fields[i].DataTypeOID
|
||||||
|
dst[i].DataTypeSize = rd.Fields[i].DataTypeSize
|
||||||
|
dst[i].TypeModifier = rd.Fields[i].TypeModifier
|
||||||
|
dst[i].Format = rd.Fields[i].Format
|
||||||
|
}
|
||||||
|
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
type StatementDescription struct {
|
type StatementDescription struct {
|
||||||
Name string
|
Name string
|
||||||
SQL string
|
SQL string
|
||||||
ParamOIDs []uint32
|
ParamOIDs []uint32
|
||||||
Fields []pgproto3.FieldDescription
|
Fields []FieldDescription
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This
|
// Prepare creates a prepared statement. If the name is empty, the anonymous prepared statement will be used. This
|
||||||
|
@ -765,8 +797,7 @@ readloop:
|
||||||
psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs))
|
psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs))
|
||||||
copy(psd.ParamOIDs, msg.ParameterOIDs)
|
copy(psd.ParamOIDs, msg.ParameterOIDs)
|
||||||
case *pgproto3.RowDescription:
|
case *pgproto3.RowDescription:
|
||||||
psd.Fields = make([]pgproto3.FieldDescription, len(msg.Fields))
|
psd.Fields = pgConn.convertRowDescription(nil, msg)
|
||||||
copy(psd.Fields, msg.Fields)
|
|
||||||
case *pgproto3.ErrorResponse:
|
case *pgproto3.ErrorResponse:
|
||||||
parseErr = ErrorResponseToPgError(msg)
|
parseErr = ErrorResponseToPgError(msg)
|
||||||
case *pgproto3.ReadyForQuery:
|
case *pgproto3.ReadyForQuery:
|
||||||
|
@ -1281,8 +1312,9 @@ func (mrr *MultiResultReader) NextResult() bool {
|
||||||
pgConn: mrr.pgConn,
|
pgConn: mrr.pgConn,
|
||||||
multiResultReader: mrr,
|
multiResultReader: mrr,
|
||||||
ctx: mrr.ctx,
|
ctx: mrr.ctx,
|
||||||
fieldDescriptions: msg.Fields,
|
fieldDescriptions: mrr.pgConn.convertRowDescription(mrr.pgConn.fieldDescriptions[:], msg),
|
||||||
}
|
}
|
||||||
|
|
||||||
mrr.rr = &mrr.pgConn.resultReader
|
mrr.rr = &mrr.pgConn.resultReader
|
||||||
return true
|
return true
|
||||||
case *pgproto3.CommandComplete:
|
case *pgproto3.CommandComplete:
|
||||||
|
@ -1325,7 +1357,7 @@ type ResultReader struct {
|
||||||
pipeline *Pipeline
|
pipeline *Pipeline
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
fieldDescriptions []pgproto3.FieldDescription
|
fieldDescriptions []FieldDescription
|
||||||
rowValues [][]byte
|
rowValues [][]byte
|
||||||
commandTag CommandTag
|
commandTag CommandTag
|
||||||
commandConcluded bool
|
commandConcluded bool
|
||||||
|
@ -1335,7 +1367,7 @@ type ResultReader struct {
|
||||||
|
|
||||||
// Result is the saved query response that is returned by calling Read on a ResultReader.
|
// Result is the saved query response that is returned by calling Read on a ResultReader.
|
||||||
type Result struct {
|
type Result struct {
|
||||||
FieldDescriptions []pgproto3.FieldDescription
|
FieldDescriptions []FieldDescription
|
||||||
Rows [][][]byte
|
Rows [][][]byte
|
||||||
CommandTag CommandTag
|
CommandTag CommandTag
|
||||||
Err error
|
Err error
|
||||||
|
@ -1347,7 +1379,7 @@ func (rr *ResultReader) Read() *Result {
|
||||||
|
|
||||||
for rr.NextRow() {
|
for rr.NextRow() {
|
||||||
if br.FieldDescriptions == nil {
|
if br.FieldDescriptions == nil {
|
||||||
br.FieldDescriptions = make([]pgproto3.FieldDescription, len(rr.FieldDescriptions()))
|
br.FieldDescriptions = make([]FieldDescription, len(rr.FieldDescriptions()))
|
||||||
copy(br.FieldDescriptions, rr.FieldDescriptions())
|
copy(br.FieldDescriptions, rr.FieldDescriptions())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1385,7 +1417,7 @@ func (rr *ResultReader) NextRow() bool {
|
||||||
|
|
||||||
// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until
|
// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until
|
||||||
// the ResultReader is closed.
|
// the ResultReader is closed.
|
||||||
func (rr *ResultReader) FieldDescriptions() []pgproto3.FieldDescription {
|
func (rr *ResultReader) FieldDescriptions() []FieldDescription {
|
||||||
return rr.fieldDescriptions
|
return rr.fieldDescriptions
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1473,7 +1505,7 @@ func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error
|
||||||
|
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case *pgproto3.RowDescription:
|
case *pgproto3.RowDescription:
|
||||||
rr.fieldDescriptions = msg.Fields
|
rr.fieldDescriptions = rr.pgConn.convertRowDescription(rr.pgConn.fieldDescriptions[:], msg)
|
||||||
case *pgproto3.CommandComplete:
|
case *pgproto3.CommandComplete:
|
||||||
rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil)
|
rr.concludeCommand(rr.pgConn.makeCommandTag(msg.CommandTag), nil)
|
||||||
case *pgproto3.EmptyQueryResponse:
|
case *pgproto3.EmptyQueryResponse:
|
||||||
|
@ -1825,7 +1857,7 @@ func (p *Pipeline) GetResults() (results any, err error) {
|
||||||
pgConn: p.conn,
|
pgConn: p.conn,
|
||||||
pipeline: p,
|
pipeline: p,
|
||||||
ctx: p.ctx,
|
ctx: p.ctx,
|
||||||
fieldDescriptions: msg.Fields,
|
fieldDescriptions: p.conn.convertRowDescription(p.conn.fieldDescriptions[:], msg),
|
||||||
}
|
}
|
||||||
return &p.conn.resultReader, nil
|
return &p.conn.resultReader, nil
|
||||||
case *pgproto3.CommandComplete:
|
case *pgproto3.CommandComplete:
|
||||||
|
@ -1872,8 +1904,7 @@ func (p *Pipeline) getResultsPrepare() (*StatementDescription, error) {
|
||||||
psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs))
|
psd.ParamOIDs = make([]uint32, len(msg.ParameterOIDs))
|
||||||
copy(psd.ParamOIDs, msg.ParameterOIDs)
|
copy(psd.ParamOIDs, msg.ParameterOIDs)
|
||||||
case *pgproto3.RowDescription:
|
case *pgproto3.RowDescription:
|
||||||
psd.Fields = make([]pgproto3.FieldDescription, len(msg.Fields))
|
psd.Fields = p.conn.convertRowDescription(nil, msg)
|
||||||
copy(psd.Fields, msg.Fields)
|
|
||||||
return psd, nil
|
return psd, nil
|
||||||
|
|
||||||
// NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING
|
// NoData is returned instead of RowDescription when there is no expected result. e.g. An INSERT without a RETURNING
|
||||||
|
|
|
@ -642,13 +642,13 @@ func TestConnExecMultipleQueriesEagerFieldDescriptions(t *testing.T) {
|
||||||
|
|
||||||
require.True(t, mrr.NextResult())
|
require.True(t, mrr.NextResult())
|
||||||
require.Len(t, mrr.ResultReader().FieldDescriptions(), 1)
|
require.Len(t, mrr.ResultReader().FieldDescriptions(), 1)
|
||||||
assert.Equal(t, []byte("msg"), mrr.ResultReader().FieldDescriptions()[0].Name)
|
assert.Equal(t, "msg", mrr.ResultReader().FieldDescriptions()[0].Name)
|
||||||
_, err = mrr.ResultReader().Close()
|
_, err = mrr.ResultReader().Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.True(t, mrr.NextResult())
|
require.True(t, mrr.NextResult())
|
||||||
require.Len(t, mrr.ResultReader().FieldDescriptions(), 1)
|
require.Len(t, mrr.ResultReader().FieldDescriptions(), 1)
|
||||||
assert.Equal(t, []byte("num"), mrr.ResultReader().FieldDescriptions()[0].Name)
|
assert.Equal(t, "num", mrr.ResultReader().FieldDescriptions()[0].Name)
|
||||||
_, err = mrr.ResultReader().Close()
|
_, err = mrr.ResultReader().Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -772,7 +772,7 @@ func TestConnExecParams(t *testing.T) {
|
||||||
|
|
||||||
result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
|
result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
|
||||||
require.Len(t, result.FieldDescriptions(), 1)
|
require.Len(t, result.FieldDescriptions(), 1)
|
||||||
assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name)
|
assert.Equal(t, "msg", result.FieldDescriptions()[0].Name)
|
||||||
|
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
for result.NextRow() {
|
for result.NextRow() {
|
||||||
|
@ -937,7 +937,7 @@ func TestResultReaderValuesHaveSameCapacityAsLength(t *testing.T) {
|
||||||
|
|
||||||
result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
|
result := pgConn.ExecParams(context.Background(), "select $1::text as msg", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
|
||||||
require.Len(t, result.FieldDescriptions(), 1)
|
require.Len(t, result.FieldDescriptions(), 1)
|
||||||
assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name)
|
assert.Equal(t, "msg", result.FieldDescriptions()[0].Name)
|
||||||
|
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
for result.NextRow() {
|
for result.NextRow() {
|
||||||
|
@ -968,7 +968,7 @@ func TestConnExecPrepared(t *testing.T) {
|
||||||
|
|
||||||
result := pgConn.ExecPrepared(context.Background(), "ps1", [][]byte{[]byte("Hello, world")}, nil, nil)
|
result := pgConn.ExecPrepared(context.Background(), "ps1", [][]byte{[]byte("Hello, world")}, nil, nil)
|
||||||
require.Len(t, result.FieldDescriptions(), 1)
|
require.Len(t, result.FieldDescriptions(), 1)
|
||||||
assert.Equal(t, []byte("msg"), result.FieldDescriptions()[0].Name)
|
assert.Equal(t, "msg", result.FieldDescriptions()[0].Name)
|
||||||
|
|
||||||
rowCount := 0
|
rowCount := 0
|
||||||
for result.NextRow() {
|
for result.NextRow() {
|
||||||
|
|
|
@ -3,22 +3,21 @@ package pgxpool
|
||||||
import (
|
import (
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/jackc/pgx/v5/pgproto3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type errRows struct {
|
type errRows struct {
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (errRows) Close() {}
|
func (errRows) Close() {}
|
||||||
func (e errRows) Err() error { return e.err }
|
func (e errRows) Err() error { return e.err }
|
||||||
func (errRows) CommandTag() pgconn.CommandTag { return pgconn.CommandTag{} }
|
func (errRows) CommandTag() pgconn.CommandTag { return pgconn.CommandTag{} }
|
||||||
func (errRows) FieldDescriptions() []pgproto3.FieldDescription { return nil }
|
func (errRows) FieldDescriptions() []pgconn.FieldDescription { return nil }
|
||||||
func (errRows) Next() bool { return false }
|
func (errRows) Next() bool { return false }
|
||||||
func (e errRows) Scan(dest ...any) error { return e.err }
|
func (e errRows) Scan(dest ...any) error { return e.err }
|
||||||
func (e errRows) Values() ([]any, error) { return nil, e.err }
|
func (e errRows) Values() ([]any, error) { return nil, e.err }
|
||||||
func (e errRows) RawValues() [][]byte { return nil }
|
func (e errRows) RawValues() [][]byte { return nil }
|
||||||
func (e errRows) Conn() *pgx.Conn { return nil }
|
func (e errRows) Conn() *pgx.Conn { return nil }
|
||||||
|
|
||||||
type errRow struct {
|
type errRow struct {
|
||||||
err error
|
err error
|
||||||
|
@ -51,7 +50,7 @@ func (rows *poolRows) CommandTag() pgconn.CommandTag {
|
||||||
return rows.r.CommandTag()
|
return rows.r.CommandTag()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rows *poolRows) FieldDescriptions() []pgproto3.FieldDescription {
|
func (rows *poolRows) FieldDescriptions() []pgconn.FieldDescription {
|
||||||
return rows.r.FieldDescriptions()
|
return rows.r.FieldDescriptions()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ func TestConnQueryRowsFieldDescriptionsBeforeNext(t *testing.T) {
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
require.Len(t, rows.FieldDescriptions(), 1)
|
require.Len(t, rows.FieldDescriptions(), 1)
|
||||||
assert.Equal(t, []byte("msg"), rows.FieldDescriptions()[0].Name)
|
assert.Equal(t, "msg", rows.FieldDescriptions()[0].Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnQueryWithoutResultSetCommandTag(t *testing.T) {
|
func TestConnQueryWithoutResultSetCommandTag(t *testing.T) {
|
||||||
|
|
9
rows.go
9
rows.go
|
@ -9,7 +9,6 @@ import (
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/internal/stmtcache"
|
"github.com/jackc/pgx/v5/internal/stmtcache"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/jackc/pgx/v5/pgproto3"
|
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
"github.com/jackc/pgx/v5/pgtype"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,7 +33,7 @@ type Rows interface {
|
||||||
// CommandTag returns the command tag from this query. It is only available after Rows is closed.
|
// CommandTag returns the command tag from this query. It is only available after Rows is closed.
|
||||||
CommandTag() pgconn.CommandTag
|
CommandTag() pgconn.CommandTag
|
||||||
|
|
||||||
FieldDescriptions() []pgproto3.FieldDescription
|
FieldDescriptions() []pgconn.FieldDescription
|
||||||
|
|
||||||
// Next prepares the next row for reading. It returns true if there is another
|
// Next prepares the next row for reading. It returns true if there is another
|
||||||
// row and false if no more rows are available. It automatically closes rows
|
// row and false if no more rows are available. It automatically closes rows
|
||||||
|
@ -135,7 +134,7 @@ type baseRows struct {
|
||||||
rowCount int
|
rowCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rows *baseRows) FieldDescriptions() []pgproto3.FieldDescription {
|
func (rows *baseRows) FieldDescriptions() []pgconn.FieldDescription {
|
||||||
return rows.resultReader.FieldDescriptions()
|
return rows.resultReader.FieldDescriptions()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -337,7 +336,7 @@ func (e ScanArgError) Unwrap() error {
|
||||||
// fieldDescriptions - OID and format of values
|
// fieldDescriptions - OID and format of values
|
||||||
// values - the raw data as returned from the PostgreSQL server
|
// values - the raw data as returned from the PostgreSQL server
|
||||||
// dest - the destination that values will be decoded into
|
// dest - the destination that values will be decoded into
|
||||||
func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgproto3.FieldDescription, values [][]byte, dest ...any) error {
|
func ScanRow(typeMap *pgtype.Map, fieldDescriptions []pgconn.FieldDescription, values [][]byte, dest ...any) error {
|
||||||
if len(fieldDescriptions) != len(values) {
|
if len(fieldDescriptions) != len(values) {
|
||||||
return fmt.Errorf("number of field descriptions must equal number of values, got %d and %d", len(fieldDescriptions), len(values))
|
return fmt.Errorf("number of field descriptions must equal number of values, got %d and %d", len(fieldDescriptions), len(values))
|
||||||
}
|
}
|
||||||
|
@ -395,7 +394,7 @@ func ForEachRow(rows Rows, scans []any, fn func() error) (pgconn.CommandTag, err
|
||||||
|
|
||||||
// CollectableRow is the subset of Rows methods that a RowToFunc is allowed to call.
|
// CollectableRow is the subset of Rows methods that a RowToFunc is allowed to call.
|
||||||
type CollectableRow interface {
|
type CollectableRow interface {
|
||||||
FieldDescriptions() []pgproto3.FieldDescription
|
FieldDescriptions() []pgconn.FieldDescription
|
||||||
Scan(dest ...any) error
|
Scan(dest ...any) error
|
||||||
Values() ([]any, error)
|
Values() ([]any, error)
|
||||||
RawValues() [][]byte
|
RawValues() [][]byte
|
||||||
|
|
Loading…
Reference in New Issue