diff --git a/pgconn/benchmark_test.go b/pgconn/benchmark_test.go index ffb1455c..d2576324 100644 --- a/pgconn/benchmark_test.go +++ b/pgconn/benchmark_test.go @@ -76,7 +76,7 @@ func BenchmarkExecPrepared(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - result := conn.ExecPrepared(context.Background(), "ps1", nil, nil, nil).ReadAll() + result := conn.ExecPrepared(context.Background(), "ps1", nil, nil, nil).Read() require.Nil(b, result.Err) } } @@ -95,7 +95,7 @@ func BenchmarkExecPreparedPossibleToCancel(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - result := conn.ExecPrepared(ctx, "ps1", nil, nil, nil).ReadAll() + result := conn.ExecPrepared(ctx, "ps1", nil, nil, nil).Read() require.Nil(b, result.Err) } } diff --git a/pgconn/config.go b/pgconn/config.go index fb0719cd..b85bcaec 100644 --- a/pgconn/config.go +++ b/pgconn/config.go @@ -470,7 +470,7 @@ func makeConnectTimeoutDialFunc(s string) (DialFunc, error) { // AfterConnectTargetSessionAttrsReadWrite is an AfterConnectFunc that implements libpq compatible // target_session_attrs=read-write. func AfterConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error { - result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).ReadAll() + result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read() if result.Err != nil { return result.Err } diff --git a/pgconn/helper_test.go b/pgconn/helper_test.go index a50f7cb1..c5ac6e01 100644 --- a/pgconn/helper_test.go +++ b/pgconn/helper_test.go @@ -20,7 +20,7 @@ func closeConn(t testing.TB, conn *pgconn.PgConn) { // Do a simple query to ensure the connection is still usable func ensureConnValid(t *testing.T, pgConn *pgconn.PgConn) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) - result := pgConn.ExecParams(ctx, "select generate_series(1,$1)", [][]byte{[]byte("3")}, nil, nil, nil).ReadAll() + result := pgConn.ExecParams(ctx, "select generate_series(1,$1)", [][]byte{[]byte("3")}, nil, nil, nil).Read() cancel() require.Nil(t, result.Err) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 09d87b31..be7d37ae 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -536,8 +536,8 @@ func (pgConn *PgConn) cancelRequest(ctx context.Context) error { // statements. // // Prefer ExecParams unless executing arbitrary SQL that may contain multiple queries. -func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResult { - multiResult := &MultiResult{ +func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResultReader { + multiResult := &MultiResultReader{ pgConn: pgConn, ctx: ctx, cleanupContextDeadline: func() {}, @@ -592,9 +592,9 @@ func (pgConn *PgConn) Exec(ctx context.Context, sql string) *MultiResult { // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or // binary format. If resultFormats is nil all results will be in text protocol. // -// Result must be closed before PgConn can be used again. -func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *Result { - result := &Result{ +// ResultReader must be closed before PgConn can be used again. +func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [][]byte, paramOIDs []uint32, paramFormats []int16, resultFormats []int16) *ResultReader { + result := &ResultReader{ pgConn: pgConn, ctx: ctx, cleanupContextDeadline: func() {}, @@ -648,9 +648,9 @@ func (pgConn *PgConn) ExecParams(ctx context.Context, sql string, paramValues [] // resultFormats is a slice of format codes determining for each result column whether it is encoded in text or // binary format. If resultFormats is nil all results will be in text protocol. // -// Result must be closed before PgConn can be used again. -func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *Result { - result := &Result{ +// ResultReader must be closed before PgConn can be used again. +func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) *ResultReader { + result := &ResultReader{ pgConn: pgConn, ctx: ctx, cleanupContextDeadline: func() {}, @@ -689,77 +689,77 @@ func (pgConn *PgConn) ExecPrepared(ctx context.Context, stmtName string, paramVa return result } -type MultiResult struct { +type MultiResultReader struct { pgConn *PgConn ctx context.Context cleanupContextDeadline func() - pgResult *Result + rr *ResultReader closed bool err error } -func (mr *MultiResult) ReadAll() ([]*BufferedResult, error) { - var results []*BufferedResult +func (mrr *MultiResultReader) ReadAll() ([]*Result, error) { + var results []*Result - for mr.NextResult() { - results = append(results, mr.Result().ReadAll()) + for mrr.NextResult() { + results = append(results, mrr.ResultReader().Read()) } - err := mr.Close() + err := mrr.Close() return results, err } -func (mr *MultiResult) receiveMessage() (pgproto3.BackendMessage, error) { - msg, err := mr.pgConn.ReceiveMessage() +func (mrr *MultiResultReader) receiveMessage() (pgproto3.BackendMessage, error) { + msg, err := mrr.pgConn.ReceiveMessage() if err != nil { - mr.cleanupContextDeadline() - mr.err = preferContextOverNetTimeoutError(mr.ctx, err) - mr.closed = true + mrr.cleanupContextDeadline() + mrr.err = preferContextOverNetTimeoutError(mrr.ctx, err) + mrr.closed = true if err, ok := err.(net.Error); ok && err.Timeout() { - go mr.pgConn.recoverFromTimeout() + go mrr.pgConn.recoverFromTimeout() } else { - <-mr.pgConn.controller + <-mrr.pgConn.controller } - return nil, mr.err + return nil, mrr.err } switch msg := msg.(type) { case *pgproto3.ReadyForQuery: - mr.cleanupContextDeadline() - mr.closed = true - <-mr.pgConn.controller + mrr.cleanupContextDeadline() + mrr.closed = true + <-mrr.pgConn.controller case *pgproto3.ErrorResponse: - mr.err = errorResponseToPgError(msg) + mrr.err = errorResponseToPgError(msg) } return msg, nil } -// NextResult returns advances the MultiResult to the next result and returns true if a result is available. -func (mr *MultiResult) NextResult() bool { - for !mr.closed && mr.err == nil { - msg, err := mr.receiveMessage() +// NextResult returns advances the MultiResultReader to the next result and returns true if a result is available. +func (mrr *MultiResultReader) NextResult() bool { + for !mrr.closed && mrr.err == nil { + msg, err := mrr.receiveMessage() if err != nil { return false } switch msg := msg.(type) { case *pgproto3.RowDescription: - mr.pgResult = &Result{ - pgConn: mr.pgConn, - pgMultiResult: mr, - ctx: mr.ctx, + mrr.rr = &ResultReader{ + pgConn: mrr.pgConn, + multiResultReader: mrr, + ctx: mrr.ctx, cleanupContextDeadline: func() {}, fieldDescriptions: msg.Fields, } return true case *pgproto3.CommandComplete: - mr.pgResult = &Result{ + mrr.rr = &ResultReader{ commandTag: CommandTag(msg.CommandTag), commandConcluded: true, closed: true, @@ -773,24 +773,24 @@ func (mr *MultiResult) NextResult() bool { return false } -func (mr *MultiResult) Result() *Result { - return mr.pgResult +func (mrr *MultiResultReader) ResultReader() *ResultReader { + return mrr.rr } -func (mr *MultiResult) Close() error { - for !mr.closed { - _, err := mr.receiveMessage() +func (mrr *MultiResultReader) Close() error { + for !mrr.closed { + _, err := mrr.receiveMessage() if err != nil { - return mr.err + return mrr.err } } - return mr.err + return mrr.err } -type Result struct { +type ResultReader struct { pgConn *PgConn - pgMultiResult *MultiResult + multiResultReader *MultiResultReader ctx context.Context cleanupContextDeadline func() @@ -802,15 +802,15 @@ type Result struct { err error } -type BufferedResult struct { +type Result struct { FieldDescriptions []pgproto3.FieldDescription Rows [][][]byte CommandTag CommandTag Err error } -func (rr *Result) ReadAll() *BufferedResult { - br := &BufferedResult{} +func (rr *ResultReader) Read() *Result { + br := &Result{} for rr.NextRow() { if br.FieldDescriptions == nil { @@ -828,8 +828,8 @@ func (rr *Result) ReadAll() *BufferedResult { return br } -// NextRow advances the Result to the next row and returns true if a row is available. -func (rr *Result) NextRow() bool { +// NextRow advances the ResultReader to the next row and returns true if a row is available. +func (rr *ResultReader) NextRow() bool { for !rr.commandConcluded { msg, err := rr.receiveMessage() if err != nil { @@ -847,21 +847,21 @@ func (rr *Result) NextRow() bool { } // FieldDescriptions returns the field descriptions for the current result set. The returned slice is only valid until -// the Result is closed. -func (rr *Result) FieldDescriptions() []pgproto3.FieldDescription { +// the ResultReader is closed. +func (rr *ResultReader) FieldDescriptions() []pgproto3.FieldDescription { return rr.fieldDescriptions } // Values returns the current row data. NextRow must have been previously been called. The returned [][]byte is only -// valid until the next NextRow call or the Result is closed. However, the underlying byte data is safe to +// valid until the next NextRow call or the ResultReader is closed. However, the underlying byte data is safe to // retain a reference to and mutate. -func (rr *Result) Values() [][]byte { +func (rr *ResultReader) Values() [][]byte { return rr.rowValues } // Close consumes any remaining result data and returns the command tag or // error. -func (rr *Result) Close() (CommandTag, error) { +func (rr *ResultReader) Close() (CommandTag, error) { if rr.closed { return rr.commandTag, rr.err } @@ -874,7 +874,7 @@ func (rr *Result) Close() (CommandTag, error) { } } - if rr.pgMultiResult == nil { + if rr.multiResultReader == nil { for { msg, err := rr.receiveMessage() if err != nil { @@ -893,18 +893,18 @@ func (rr *Result) Close() (CommandTag, error) { return rr.commandTag, rr.err } -func (rr *Result) receiveMessage() (msg pgproto3.BackendMessage, err error) { - if rr.pgMultiResult == nil { +func (rr *ResultReader) receiveMessage() (msg pgproto3.BackendMessage, err error) { + if rr.multiResultReader == nil { msg, err = rr.pgConn.ReceiveMessage() } else { - msg, err = rr.pgMultiResult.receiveMessage() + msg, err = rr.multiResultReader.receiveMessage() } if err != nil { rr.concludeCommand(nil, err) rr.cleanupContextDeadline() rr.closed = true - if rr.pgMultiResult == nil { + if rr.multiResultReader == nil { if err, ok := err.(net.Error); ok && err.Timeout() { go rr.pgConn.recoverFromTimeout() } else { @@ -927,7 +927,7 @@ func (rr *Result) receiveMessage() (msg pgproto3.BackendMessage, err error) { return msg, nil } -func (rr *Result) concludeCommand(commandTag CommandTag, err error) { +func (rr *ResultReader) concludeCommand(commandTag CommandTag, err error) { if rr.commandConcluded { return } @@ -1008,8 +1008,8 @@ func (batch *Batch) ExecPrepared(stmtName string, paramValues [][]byte, paramFor } // ExecBatch executes all the queries in batch in a single round-trip. -func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResult { - multiResult := &MultiResult{ +func (pgConn *PgConn) ExecBatch(ctx context.Context, batch *Batch) *MultiResultReader { + multiResult := &MultiResultReader{ pgConn: pgConn, ctx: ctx, cleanupContextDeadline: func() {}, diff --git a/pgconn/pgconn_stress_test.go b/pgconn/pgconn_stress_test.go index 17d344b7..6b5efd9f 100644 --- a/pgconn/pgconn_stress_test.go +++ b/pgconn/pgconn_stress_test.go @@ -70,7 +70,7 @@ func stressExecSelect(pgConn *pgconn.PgConn) error { } func stressExecParamsSelect(pgConn *pgconn.PgConn) error { - result := pgConn.ExecParams(context.Background(), "select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).ReadAll() + result := pgConn.ExecParams(context.Background(), "select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).Read() return result.Err } @@ -96,7 +96,7 @@ func stressExecSelectCanceled(pgConn *pgconn.PgConn) error { func stressExecParamsSelectCanceled(pgConn *pgconn.PgConn) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) - result := pgConn.ExecParams(ctx, "select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).ReadAll() + result := pgConn.ExecParams(ctx, "select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).Read() cancel() if result.Err != context.DeadlineExceeded { return result.Err diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index a2eb7838..a524d18f 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -134,12 +134,12 @@ func TestConnectWithRuntimeParams(t *testing.T) { require.Nil(t, err) defer closeConn(t, conn) - result := conn.ExecParams(context.Background(), "show application_name", nil, nil, nil, nil).ReadAll() + result := conn.ExecParams(context.Background(), "show application_name", nil, nil, nil, nil).Read() require.Nil(t, result.Err) assert.Equal(t, 1, len(result.Rows)) assert.Equal(t, "pgxtest", string(result.Rows[0][0])) - result = conn.ExecParams(context.Background(), "show search_path", nil, nil, nil, nil).ReadAll() + result = conn.ExecParams(context.Background(), "show search_path", nil, nil, nil, nil).Read() require.Nil(t, result.Err) assert.Equal(t, 1, len(result.Rows)) assert.Equal(t, "myschema", string(result.Rows[0][0])) @@ -263,7 +263,7 @@ func TestConnExecEmpty(t *testing.T) { resultCount := 0 for multiResult.NextResult() { resultCount += 1 - multiResult.Result().Close() + multiResult.ResultReader().Close() } assert.Equal(t, 0, resultCount) err = multiResult.Close()