Rename some types and methods

pull/483/head
Jack Christensen 2019-01-05 18:01:57 -06:00
parent caf72c627a
commit 2f0db78865
6 changed files with 72 additions and 72 deletions

View File

@ -76,7 +76,7 @@ func BenchmarkExecPrepared(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
result := conn.ExecPrepared(context.Background(), "ps1", nil, nil, nil).ReadAll() result := conn.ExecPrepared(context.Background(), "ps1", nil, nil, nil).Read()
require.Nil(b, result.Err) require.Nil(b, result.Err)
} }
} }
@ -95,7 +95,7 @@ func BenchmarkExecPreparedPossibleToCancel(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
result := conn.ExecPrepared(ctx, "ps1", nil, nil, nil).ReadAll() result := conn.ExecPrepared(ctx, "ps1", nil, nil, nil).Read()
require.Nil(b, result.Err) require.Nil(b, result.Err)
} }
} }

View File

@ -470,7 +470,7 @@ func makeConnectTimeoutDialFunc(s string) (DialFunc, error) {
// AfterConnectTargetSessionAttrsReadWrite is an AfterConnectFunc that implements libpq compatible // AfterConnectTargetSessionAttrsReadWrite is an AfterConnectFunc that implements libpq compatible
// target_session_attrs=read-write. // target_session_attrs=read-write.
func AfterConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error { func AfterConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error {
result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).ReadAll() result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read()
if result.Err != nil { if result.Err != nil {
return result.Err return result.Err
} }

View File

@ -20,7 +20,7 @@ func closeConn(t testing.TB, conn *pgconn.PgConn) {
// Do a simple query to ensure the connection is still usable // Do a simple query to ensure the connection is still usable
func ensureConnValid(t *testing.T, pgConn *pgconn.PgConn) { func ensureConnValid(t *testing.T, pgConn *pgconn.PgConn) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
result := pgConn.ExecParams(ctx, "select generate_series(1,$1)", [][]byte{[]byte("3")}, nil, nil, nil).ReadAll() result := pgConn.ExecParams(ctx, "select generate_series(1,$1)", [][]byte{[]byte("3")}, nil, nil, nil).Read()
cancel() cancel()
require.Nil(t, result.Err) require.Nil(t, result.Err)

View File

@ -536,8 +536,8 @@ func (pgConn *PgConn) cancelRequest(ctx context.Context) error {
// statements. // statements.
// //
// Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. // Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries.
func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResult { func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader {
multiResult := &MultiResult{ multiResult := &MultiResultReader{
pgConn: pgConn, pgConn: pgConn,
ctx: ctx, ctx: ctx,
cleanupContextDeadline: func() {}, cleanupContextDeadline: func() {},
@ -592,9 +592,9 @@ func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResult {
// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or
// binary format. If resultFormats is nil all results will be in text protocol. // binary format. If resultFormats is nil all results will be in text protocol.
// //
// Result must be closed before PgConn can be used again. // ResultReader must be closed before PgConn can be used again.
func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *Result { func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader {
result := &Result{ result := &ResultReader{
pgConn: pgConn, pgConn: pgConn,
ctx: ctx, ctx: ctx,
cleanupContextDeadline: func() {}, cleanupContextDeadline: func() {},
@ -648,9 +648,9 @@ func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues []
// resultFormats is a slice of format codes determining for each result column whether it is encoded in text or // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or
// binary format. If resultFormats is nil all results will be in text protocol. // binary format. If resultFormats is nil all results will be in text protocol.
// //
// Result must be closed before PgConn can be used again. // ResultReader must be closed before PgConn can be used again.
func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *Result { func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader {
result := &Result{ result := &ResultReader{
pgConn: pgConn, pgConn: pgConn,
ctx: ctx, ctx: ctx,
cleanupContextDeadline: func() {}, cleanupContextDeadline: func() {},
@ -689,77 +689,77 @@ func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramVa
return result return result
} }
type MultiResult struct { type MultiResultReader struct {
pgConn *PgConn pgConn *PgConn
ctx context.Context ctx context.Context
cleanupContextDeadline func() cleanupContextDeadline func()
pgResult *Result rr *ResultReader
closed bool closed bool
err error err error
} }
func (mr *MultiResult) ReadAll() ([]*BufferedResult, error) { func (mrr *MultiResultReader) ReadAll() ([]*Result, error) {
var results []*BufferedResult var results []*Result
for mr.NextResult() { for mrr.NextResult() {
results = append(results, mr.Result().ReadAll()) results = append(results, mrr.ResultReader().Read())
} }
err := mr.Close() err := mrr.Close()
return results, err return results, err
} }
func (mr *MultiResult) receiveMessage() (pgproto3.BackendMessage, error) { func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) {
msg, err := mr.pgConn.ReceiveMessage() msg, err := mrr.pgConn.ReceiveMessage()
if err != nil { if err != nil {
mr.cleanupContextDeadline() mrr.cleanupContextDeadline()
mr.err = preferContextOverNetTimeoutError(mr.ctx, err) mrr.err = preferContextOverNetTimeoutError(mrr.ctx, err)
mr.closed = true mrr.closed = true
if err, ok := err.(net.Error); ok && err.Timeout() { if err, ok := err.(net.Error); ok && err.Timeout() {
go mr.pgConn.recoverFromTimeout() go mrr.pgConn.recoverFromTimeout()
} else { } else {
<-mr.pgConn.controller <-mrr.pgConn.controller
} }
return nil, mr.err return nil, mrr.err
} }
switch msg := msg.(type) { switch msg := msg.(type) {
case *pgproto3.ReadyForQuery: case *pgproto3.ReadyForQuery:
mr.cleanupContextDeadline() mrr.cleanupContextDeadline()
mr.closed = true mrr.closed = true
<-mr.pgConn.controller <-mrr.pgConn.controller
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
mr.err = errorResponseToPgError(msg) mrr.err = errorResponseToPgError(msg)
} }
return msg, nil return msg, nil
} }
// NextResult returns advances the MultiResult to the next result and returns true if a result is available. // NextResult returns advances the MultiResultReader to the next result and returns true if a result is available.
func (mr *MultiResult) NextResult() bool { func (mrr *MultiResultReader) NextResult() bool {
for !mr.closed && mr.err == nil { for !mrr.closed && mrr.err == nil {
msg, err := mr.receiveMessage() msg, err := mrr.receiveMessage()
if err != nil { if err != nil {
return false return false
} }
switch msg := msg.(type) { switch msg := msg.(type) {
case *pgproto3.RowDescription: case *pgproto3.RowDescription:
mr.pgResult = &Result{ mrr.rr = &ResultReader{
pgConn: mr.pgConn, pgConn: mrr.pgConn,
pgMultiResult: mr, multiResultReader: mrr,
ctx: mr.ctx, ctx: mrr.ctx,
cleanupContextDeadline: func() {}, cleanupContextDeadline: func() {},
fieldDescriptions: msg.Fields, fieldDescriptions: msg.Fields,
} }
return true return true
case *pgproto3.CommandComplete: case *pgproto3.CommandComplete:
mr.pgResult = &Result{ mrr.rr = &ResultReader{
commandTag: CommandTag(msg.CommandTag), commandTag: CommandTag(msg.CommandTag),
commandConcluded: true, commandConcluded: true,
closed: true, closed: true,
@ -773,24 +773,24 @@ func (mr *MultiResult) NextResult() bool {
return false return false
} }
func (mr *MultiResult) Result() *Result { func (mrr *MultiResultReader) ResultReader() *ResultReader {
return mr.pgResult return mrr.rr
} }
func (mr *MultiResult) Close() error { func (mrr *MultiResultReader) Close() error {
for !mr.closed { for !mrr.closed {
_, err := mr.receiveMessage() _, err := mrr.receiveMessage()
if err != nil { if err != nil {
return mr.err return mrr.err
} }
} }
return mr.err return mrr.err
} }
type Result struct { type ResultReader struct {
pgConn *PgConn pgConn *PgConn
pgMultiResult *MultiResult multiResultReader *MultiResultReader
ctx context.Context ctx context.Context
cleanupContextDeadline func() cleanupContextDeadline func()
@ -802,15 +802,15 @@ type Result struct {
err error err error
} }
type BufferedResult struct { type Result struct {
FieldDescriptions []pgproto3.FieldDescription FieldDescriptions []pgproto3.FieldDescription
Rows [][][]byte Rows [][][]byte
CommandTag CommandTag CommandTag CommandTag
Err error Err error
} }
func (rr *Result) ReadAll() *BufferedResult { func (rr *ResultReader) Read() *Result {
br := &BufferedResult{} br := &Result{}
for rr.NextRow() { for rr.NextRow() {
if br.FieldDescriptions == nil { if br.FieldDescriptions == nil {
@ -828,8 +828,8 @@ func (rr *Result) ReadAll() *BufferedResult {
return br return br
} }
// NextRow advances the Result to the next row and returns true if a row is available. // NextRow advances the ResultReader to the next row and returns true if a row is available.
func (rr *Result) NextRow() bool { func (rr *ResultReader) NextRow() bool {
for !rr.commandConcluded { for !rr.commandConcluded {
msg, err := rr.receiveMessage() msg, err := rr.receiveMessage()
if err != nil { if err != nil {
@ -847,21 +847,21 @@ func (rr *Result) NextRow() bool {
} }
// FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until // FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until
// the Result is closed. // the ResultReader is closed.
func (rr *Result) FieldDescriptions() []pgproto3.FieldDescription { func (rr *ResultReader) FieldDescriptions() []pgproto3.FieldDescription {
return rr.fieldDescriptions return rr.fieldDescriptions
} }
// Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only // Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only
// valid until the next NextRow call or the Result is closed. However, the underlying byte data is safe to // valid until the next NextRow call or the ResultReader is closed. However, the underlying byte data is safe to
// retain a reference to and mutate. // retain a reference to and mutate.
func (rr *Result) Values() [][]byte { func (rr *ResultReader) Values() [][]byte {
return rr.rowValues return rr.rowValues
} }
// Close consumes any remaining result data and returns the command tag or // Close consumes any remaining result data and returns the command tag or
// error. // error.
func (rr *Result) Close() (CommandTag, error) { func (rr *ResultReader) Close() (CommandTag, error) {
if rr.closed { if rr.closed {
return rr.commandTag, rr.err return rr.commandTag, rr.err
} }
@ -874,7 +874,7 @@ func (rr *Result) Close() (CommandTag, error) {
} }
} }
if rr.pgMultiResult == nil { if rr.multiResultReader == nil {
for { for {
msg, err := rr.receiveMessage() msg, err := rr.receiveMessage()
if err != nil { if err != nil {
@ -893,18 +893,18 @@ func (rr *Result) Close() (CommandTag, error) {
return rr.commandTag, rr.err return rr.commandTag, rr.err
} }
func (rr *Result) receiveMessage() (msg pgproto3.BackendMessage, err error) { func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) {
if rr.pgMultiResult == nil { if rr.multiResultReader == nil {
msg, err = rr.pgConn.ReceiveMessage() msg, err = rr.pgConn.ReceiveMessage()
} else { } else {
msg, err = rr.pgMultiResult.receiveMessage() msg, err = rr.multiResultReader.receiveMessage()
} }
if err != nil { if err != nil {
rr.concludeCommand(nil, err) rr.concludeCommand(nil, err)
rr.cleanupContextDeadline() rr.cleanupContextDeadline()
rr.closed = true rr.closed = true
if rr.pgMultiResult == nil { if rr.multiResultReader == nil {
if err, ok := err.(net.Error); ok && err.Timeout() { if err, ok := err.(net.Error); ok && err.Timeout() {
go rr.pgConn.recoverFromTimeout() go rr.pgConn.recoverFromTimeout()
} else { } else {
@ -927,7 +927,7 @@ func (rr *Result) receiveMessage() (msg pgproto3.BackendMessage, err error) {
return msg, nil return msg, nil
} }
func (rr *Result) concludeCommand(commandTag CommandTag, err error) { func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) {
if rr.commandConcluded { if rr.commandConcluded {
return return
} }
@ -1008,8 +1008,8 @@ func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFor
} }
// ExecBatch executes all the queries in batch in a single round-trip. // ExecBatch executes all the queries in batch in a single round-trip.
func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResult { func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader {
multiResult := &MultiResult{ multiResult := &MultiResultReader{
pgConn: pgConn, pgConn: pgConn,
ctx: ctx, ctx: ctx,
cleanupContextDeadline: func() {}, cleanupContextDeadline: func() {},

View File

@ -70,7 +70,7 @@ func stressExecSelect(pgConn *pgconn.PgConn) error {
} }
func stressExecParamsSelect(pgConn *pgconn.PgConn) error { func stressExecParamsSelect(pgConn *pgconn.PgConn) error {
result := pgConn.ExecParams(context.Background(), "select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).ReadAll() result := pgConn.ExecParams(context.Background(), "select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).Read()
return result.Err return result.Err
} }
@ -96,7 +96,7 @@ func stressExecSelectCanceled(pgConn *pgconn.PgConn) error {
func stressExecParamsSelectCanceled(pgConn *pgconn.PgConn) error { func stressExecParamsSelectCanceled(pgConn *pgconn.PgConn) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
result := pgConn.ExecParams(ctx, "select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).ReadAll() result := pgConn.ExecParams(ctx, "select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).Read()
cancel() cancel()
if result.Err != context.DeadlineExceeded { if result.Err != context.DeadlineExceeded {
return result.Err return result.Err

View File

@ -134,12 +134,12 @@ func TestConnectWithRuntimeParams(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, conn) defer closeConn(t, conn)
result := conn.ExecParams(context.Background(), "show application_name", nil, nil, nil, nil).ReadAll() result := conn.ExecParams(context.Background(), "show application_name", nil, nil, nil, nil).Read()
require.Nil(t, result.Err) require.Nil(t, result.Err)
assert.Equal(t, 1, len(result.Rows)) assert.Equal(t, 1, len(result.Rows))
assert.Equal(t, "pgxtest", string(result.Rows[0][0])) assert.Equal(t, "pgxtest", string(result.Rows[0][0]))
result = conn.ExecParams(context.Background(), "show search_path", nil, nil, nil, nil).ReadAll() result = conn.ExecParams(context.Background(), "show search_path", nil, nil, nil, nil).Read()
require.Nil(t, result.Err) require.Nil(t, result.Err)
assert.Equal(t, 1, len(result.Rows)) assert.Equal(t, 1, len(result.Rows))
assert.Equal(t, "myschema", string(result.Rows[0][0])) assert.Equal(t, "myschema", string(result.Rows[0][0]))
@ -263,7 +263,7 @@ func TestConnExecEmpty(t *testing.T) {
resultCount := 0 resultCount := 0
for multiResult.NextResult() { for multiResult.NextResult() {
resultCount += 1 resultCount += 1
multiResult.Result().Close() multiResult.ResultReader().Close()
} }
assert.Equal(t, 0, resultCount) assert.Equal(t, 0, resultCount)
err = multiResult.Close() err = multiResult.Close()