Big restructure to better handle context cancel

pull/483/head
Jack Christensen 2019-01-05 17:37:28 -06:00
parent 67725478c5
commit 31cb2b4e72
6 changed files with 686 additions and 757 deletions

View File

@ -44,7 +44,7 @@ func BenchmarkExec(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := conn.Exec(context.Background(), "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date") _, err := conn.Exec(context.Background(), "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date").ReadAll()
require.Nil(b, err) require.Nil(b, err)
} }
} }
@ -60,7 +60,7 @@ func BenchmarkExecPossibleToCancel(b *testing.B) {
defer cancel() defer cancel()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := conn.Exec(ctx, "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date") _, err := conn.Exec(ctx, "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date").ReadAll()
require.Nil(b, err) require.Nil(b, err)
} }
} }
@ -71,12 +71,13 @@ func BenchmarkExecPrepared(b *testing.B) {
defer closeConn(b, conn) defer closeConn(b, conn)
_, err = conn.Prepare(context.Background(), "ps1", "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date", nil) _, err = conn.Prepare(context.Background(), "ps1", "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date", nil)
require.Nil(b, err)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := conn.ExecPrepared(context.Background(), "ps1", nil, nil, nil) result := conn.ExecPrepared(context.Background(), "ps1", nil, nil, nil).ReadAll()
require.Nil(b, err) require.Nil(b, result.Err)
} }
} }
@ -89,32 +90,12 @@ func BenchmarkExecPreparedPossibleToCancel(b *testing.B) {
defer cancel() defer cancel()
_, err = conn.Prepare(ctx, "ps1", "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date", nil) _, err = conn.Prepare(ctx, "ps1", "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date", nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := conn.ExecPrepared(ctx, "ps1", nil, nil, nil)
require.Nil(b, err)
}
}
func BenchmarkSendExecPrepared(b *testing.B) {
conn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.Nil(b, err) require.Nil(b, err)
defer closeConn(b, conn)
_, err = conn.Prepare(context.Background(), "ps1", "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date", nil)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
conn.SendExecPrepared("ps1", nil, nil, nil) result := conn.ExecPrepared(ctx, "ps1", nil, nil, nil).ReadAll()
err := conn.Flush(context.Background()) require.Nil(b, result.Err)
require.Nil(b, err)
for conn.NextResult(context.Background()) {
_, err := conn.ResultReader().Close()
require.Nil(b, err)
}
} }
} }

View File

@ -470,9 +470,9 @@ func makeConnectTimeoutDialFunc(s string) (DialFunc, error) {
// AfterConnectTargetSessionAttrsReadWrite is an AfterConnectFunc that implements libpq compatible // AfterConnectTargetSessionAttrsReadWrite is an AfterConnectFunc that implements libpq compatible
// target_session_attrs=read-write. // target_session_attrs=read-write.
func AfterConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error { func AfterConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgConn) error {
result, err := pgConn.Exec(ctx, "show transaction_read_only") result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).ReadAll()
if err != nil { if result.Err != nil {
return err return result.Err
} }
if string(result.Rows[0][0]) == "on" { if string(result.Rows[0][0]) == "on" {

View File

@ -20,10 +20,10 @@ func closeConn(t testing.TB, conn *pgconn.PgConn) {
// Do a simple query to ensure the connection is still usable // Do a simple query to ensure the connection is still usable
func ensureConnValid(t *testing.T, pgConn *pgconn.PgConn) { func ensureConnValid(t *testing.T, pgConn *pgconn.PgConn) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
result, err := pgConn.ExecParams(ctx, "select generate_series(1,$1)", [][]byte{[]byte("3")}, nil, nil, nil) result := pgConn.ExecParams(ctx, "select generate_series(1,$1)", [][]byte{[]byte("3")}, nil, nil, nil).ReadAll()
cancel() cancel()
require.Nil(t, err) require.Nil(t, result.Err)
assert.Equal(t, 3, len(result.Rows)) assert.Equal(t, 3, len(result.Rows))
assert.Equal(t, "1", string(result.Rows[0][0])) assert.Equal(t, "1", string(result.Rows[0][0]))
assert.Equal(t, "2", string(result.Rows[1][0])) assert.Equal(t, "2", string(result.Rows[1][0]))

File diff suppressed because it is too large Load Diff

View File

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/jackc/pgx/pgconn" "github.com/jackc/pgx/pgconn"
"github.com/pkg/errors"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -22,9 +21,9 @@ func TestConnStress(t *testing.T) {
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
actionCount := 100 actionCount := 100
if s := os.Getenv("PTX_TEST_STRESS_FACTOR"); s != "" { if s := os.Getenv("PGX_TEST_STRESS_FACTOR"); s != "" {
stressFactor, err := strconv.ParseInt(s, 10, 64) stressFactor, err := strconv.ParseInt(s, 10, 64)
require.Nil(t, err, "Failed to parse PTX_TEST_STRESS_FACTOR") require.Nil(t, err, "Failed to parse PGX_TEST_STRESS_FACTOR")
actionCount *= int(stressFactor) actionCount *= int(stressFactor)
} }
@ -61,138 +60,61 @@ func setupStressDB(t *testing.T, pgConn *pgconn.PgConn) {
insert into widgets(name, description) values insert into widgets(name, description) values
('Foo', 'bar'), ('Foo', 'bar'),
('baz', 'Something really long Something really long Something really long Something really long Something really long'), ('baz', 'Something really long Something really long Something really long Something really long Something really long'),
('a', 'b')`) ('a', 'b')`).ReadAll()
require.Nil(t, err) require.Nil(t, err)
} }
func stressExecSelect(pgConn *pgconn.PgConn) error { func stressExecSelect(pgConn *pgconn.PgConn) error {
_, err := pgConn.Exec(context.Background(), "select * from widgets") _, err := pgConn.Exec(context.Background(), "select * from widgets").ReadAll()
return err return err
} }
func stressExecParamsSelect(pgConn *pgconn.PgConn) error { func stressExecParamsSelect(pgConn *pgconn.PgConn) error {
_, err := pgConn.ExecParams(context.Background(), "select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil) result := pgConn.ExecParams(context.Background(), "select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).ReadAll()
return err return result.Err
} }
func stressBatch(pgConn *pgconn.PgConn) error { func stressBatch(pgConn *pgconn.PgConn) error {
pgConn.SendExec("select * from widgets") batch := &pgconn.Batch{}
pgConn.SendExecParams("select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil)
err := pgConn.Flush(context.Background())
if err != nil {
return err
}
// Query 1 batch.ExecParams("select * from widgets", nil, nil, nil, nil)
if !pgConn.NextResult(context.Background()) { batch.ExecParams("select * from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil)
return errors.New("missing result") _, err := pgConn.ExecBatch(context.Background(), batch).ReadAll()
} return err
resultReader := pgConn.ResultReader()
for resultReader.NextRow() {
}
_, err = resultReader.Close()
if err != nil {
return err
}
// Query 2
if !pgConn.NextResult(context.Background()) {
return errors.New("missing result")
}
resultReader = pgConn.ResultReader()
for resultReader.NextRow() {
}
_, err = resultReader.Close()
if err != nil {
return err
}
// No more
if pgConn.NextResult(context.Background()) {
return errors.New("unexpected result reader")
}
return nil
} }
func stressExecSelectCanceled(pgConn *pgconn.PgConn) error { func stressExecSelectCanceled(pgConn *pgconn.PgConn) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
_, err := pgConn.Exec(ctx, "select *, pg_sleep(1) from widgets") _, err := pgConn.Exec(ctx, "select *, pg_sleep(1) from widgets").ReadAll()
cancel() cancel()
if err != context.DeadlineExceeded { if err != context.DeadlineExceeded {
return err return err
} }
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
recovered := pgConn.RecoverFromTimeout(ctx)
cancel()
if !recovered {
return errors.New("unable to recover from timeout")
}
return nil return nil
} }
func stressExecParamsSelectCanceled(pgConn *pgconn.PgConn) error { func stressExecParamsSelectCanceled(pgConn *pgconn.PgConn) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
_, err := pgConn.ExecParams(ctx, "select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil) result := pgConn.ExecParams(ctx, "select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil).ReadAll()
cancel() cancel()
if err != context.DeadlineExceeded { if result.Err != context.DeadlineExceeded {
return err return result.Err
} }
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
recovered := pgConn.RecoverFromTimeout(ctx)
cancel()
if !recovered {
return errors.New("unable to recover from timeout")
}
return nil return nil
} }
func stressBatchCanceled(pgConn *pgconn.PgConn) error { func stressBatchCanceled(pgConn *pgconn.PgConn) error {
batch := &pgconn.Batch{}
pgConn.SendExec("select * from widgets") batch.ExecParams("select * from widgets", nil, nil, nil, nil)
pgConn.SendExecParams("select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil) batch.ExecParams("select *, pg_sleep(1) from widgets where id < $1", [][]byte{[]byte("10")}, nil, nil, nil)
err := pgConn.Flush(context.Background())
if err != nil {
return err
}
// Query 1
if !pgConn.NextResult(context.Background()) {
return errors.New("missing result")
}
resultReader := pgConn.ResultReader()
for resultReader.NextRow() {
}
_, err = resultReader.Close()
if err != nil {
return err
}
// Query 2
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
if !pgConn.NextResult(ctx) { _, err := pgConn.ExecBatch(ctx, batch).ReadAll()
return errors.New("missing result")
}
cancel() cancel()
resultReader = pgConn.ResultReader()
for resultReader.NextRow() {
}
_, err = resultReader.Close()
if err != context.DeadlineExceeded { if err != context.DeadlineExceeded {
return err return err
} }
ctx, cancel = context.WithTimeout(context.Background(), 500*time.Millisecond)
recovered := pgConn.RecoverFromTimeout(ctx)
cancel()
if !recovered {
return errors.New("unable to recover from timeout")
}
return nil return nil
} }

View File

@ -134,13 +134,13 @@ func TestConnectWithRuntimeParams(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, conn) defer closeConn(t, conn)
result, err := conn.Exec(context.Background(), "show application_name") result := conn.ExecParams(context.Background(), "show application_name", nil, nil, nil, nil).ReadAll()
require.Nil(t, err) require.Nil(t, result.Err)
assert.Equal(t, 1, len(result.Rows)) assert.Equal(t, 1, len(result.Rows))
assert.Equal(t, "pgxtest", string(result.Rows[0][0])) assert.Equal(t, "pgxtest", string(result.Rows[0][0]))
result, err = conn.Exec(context.Background(), "show search_path") result = conn.ExecParams(context.Background(), "show search_path", nil, nil, nil, nil).ReadAll()
require.Nil(t, err) require.Nil(t, result.Err)
assert.Equal(t, 1, len(result.Rows)) assert.Equal(t, 1, len(result.Rows))
assert.Equal(t, "myschema", string(result.Rows[0][0])) assert.Equal(t, "myschema", string(result.Rows[0][0]))
} }
@ -239,10 +239,14 @@ func TestConnExec(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
result, err := pgConn.Exec(context.Background(), "select current_database()") results, err := pgConn.Exec(context.Background(), "select 'Hello, world'").ReadAll()
require.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(result.Rows))
assert.Equal(t, pgConn.Config.Database, string(result.Rows[0][0])) assert.Len(t, results, 1)
assert.Nil(t, results[0].Err)
assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
assert.Len(t, results[0].Rows, 1)
assert.Equal(t, "Hello, world", string(results[0].Rows[0][0]))
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -254,10 +258,16 @@ func TestConnExecEmpty(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
result, err := pgConn.Exec(context.Background(), ";") multiResult := pgConn.Exec(context.Background(), ";")
require.Nil(t, err)
assert.Nil(t, result.CommandTag) resultCount := 0
assert.Equal(t, 0, len(result.Rows)) for multiResult.NextResult() {
resultCount += 1
multiResult.Result().Close()
}
assert.Equal(t, 0, resultCount)
err = multiResult.Close()
assert.Nil(t, err)
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -269,10 +279,20 @@ func TestConnExecMultipleQueries(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
result, err := pgConn.Exec(context.Background(), "select current_database(); select 1") results, err := pgConn.Exec(context.Background(), "select 'Hello, world'; select 1").ReadAll()
require.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(result.Rows))
assert.Equal(t, "1", string(result.Rows[0][0])) assert.Len(t, results, 2)
assert.Nil(t, results[0].Err)
assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
assert.Len(t, results[0].Rows, 1)
assert.Equal(t, "Hello, world", string(results[0].Rows[0][0]))
assert.Nil(t, results[1].Err)
assert.Equal(t, "SELECT 1", string(results[1].CommandTag))
assert.Len(t, results[1].Rows, 1)
assert.Equal(t, "1", string(results[1].Rows[0][0]))
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -284,15 +304,18 @@ func TestConnExecMultipleQueriesError(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
result, err := pgConn.Exec(context.Background(), "select 1; select 1/0; select 1") results, err := pgConn.Exec(context.Background(), "select 1; select 1/0; select 1").ReadAll()
require.NotNil(t, err) require.NotNil(t, err)
require.Nil(t, result)
if pgErr, ok := err.(*pgconn.PgError); ok { if pgErr, ok := err.(*pgconn.PgError); ok {
assert.Equal(t, "22012", pgErr.Code) assert.Equal(t, "22012", pgErr.Code)
} else { } else {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
assert.Len(t, results, 1)
assert.Len(t, results[0].Rows, 1)
assert.Equal(t, "1", string(results[0].Rows[0][0]))
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -305,11 +328,12 @@ func TestConnExecContextCanceled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel() defer cancel()
result, err := pgConn.Exec(ctx, "select current_database(), pg_sleep(1)") multiResult := pgConn.Exec(ctx, "select 'Hello, world', pg_sleep(1)")
assert.Nil(t, result)
assert.Equal(t, context.DeadlineExceeded, err)
assert.True(t, pgConn.RecoverFromTimeout(context.Background())) for multiResult.NextResult() {
}
err = multiResult.Close()
assert.Equal(t, context.DeadlineExceeded, err)
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -321,10 +345,16 @@ func TestConnExecParams(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
result, err := pgConn.ExecParams(context.Background(), "select $1::text", [][]byte{[]byte("Hello, world")}, nil, nil, nil) result := pgConn.ExecParams(context.Background(), "select $1::text", [][]byte{[]byte("Hello, world")}, nil, nil, nil)
require.Nil(t, err) rowCount := 0
assert.Equal(t, 1, len(result.Rows)) for result.NextRow() {
assert.Equal(t, "Hello, world", string(result.Rows[0][0])) rowCount += 1
assert.Equal(t, "Hello, world", string(result.Values()[0]))
}
assert.Equal(t, 1, rowCount)
commandTag, err := result.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -338,12 +368,16 @@ func TestConnExecParamsCanceled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel() defer cancel()
result, err := pgConn.ExecParams(ctx, "select current_database(), pg_sleep(1)", nil, nil, nil, nil) result := pgConn.ExecParams(ctx, "select current_database(), pg_sleep(1)", nil, nil, nil, nil)
assert.Nil(t, result) rowCount := 0
for result.NextRow() {
rowCount += 1
}
assert.Equal(t, 0, rowCount)
commandTag, err := result.Close()
assert.Nil(t, commandTag)
assert.Equal(t, context.DeadlineExceeded, err) assert.Equal(t, context.DeadlineExceeded, err)
assert.True(t, pgConn.RecoverFromTimeout(context.Background()))
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -360,10 +394,16 @@ func TestConnExecPrepared(t *testing.T) {
assert.Len(t, psd.ParamOIDs, 1) assert.Len(t, psd.ParamOIDs, 1)
assert.Len(t, psd.Fields, 1) assert.Len(t, psd.Fields, 1)
result, err := pgConn.ExecPrepared(context.Background(), "ps1", [][]byte{[]byte("Hello, world")}, nil, nil) result := pgConn.ExecPrepared(context.Background(), "ps1", [][]byte{[]byte("Hello, world")}, nil, nil)
require.Nil(t, err) rowCount := 0
assert.Equal(t, 1, len(result.Rows)) for result.NextRow() {
assert.Equal(t, "Hello, world", string(result.Rows[0][0])) rowCount += 1
assert.Equal(t, "Hello, world", string(result.Values()[0]))
}
assert.Equal(t, 1, rowCount)
commandTag, err := result.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
@ -380,16 +420,20 @@ func TestConnExecPreparedCanceled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel() defer cancel()
result, err := pgConn.ExecPrepared(ctx, "ps1", nil, nil, nil) result := pgConn.ExecPrepared(ctx, "ps1", nil, nil, nil)
assert.Nil(t, result) rowCount := 0
for result.NextRow() {
rowCount += 1
}
assert.Equal(t, 0, rowCount)
commandTag, err := result.Close()
assert.Nil(t, commandTag)
assert.Equal(t, context.DeadlineExceeded, err) assert.Equal(t, context.DeadlineExceeded, err)
assert.True(t, pgConn.RecoverFromTimeout(context.Background()))
ensureConnValid(t, pgConn) ensureConnValid(t, pgConn)
} }
func TestConnBatchedQueries(t *testing.T) { func TestConnExecBatch(t *testing.T) {
t.Parallel() t.Parallel()
pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
@ -399,160 +443,26 @@ func TestConnBatchedQueries(t *testing.T) {
_, err = pgConn.Prepare(context.Background(), "ps1", "select $1::text", nil) _, err = pgConn.Prepare(context.Background(), "ps1", "select $1::text", nil)
require.Nil(t, err) require.Nil(t, err)
pgConn.SendExec("select 'SendExec 1'") batch := &pgconn.Batch{}
pgConn.SendExecParams("select $1::text", [][]byte{[]byte("SendExecParams 1")}, nil, nil, nil)
pgConn.SendExecPrepared("ps1", [][]byte{[]byte("SendExecPrepared 1")}, nil, nil)
pgConn.SendExec("select 'SendExec 2'")
pgConn.SendExecParams("select $1::text", [][]byte{[]byte("SendExecParams 2")}, nil, nil, nil)
err = pgConn.Flush(context.Background())
// "select 'SendExec 1'" batch.ExecParams("select $1::text", [][]byte{[]byte("ExecParams 1")}, nil, nil, nil)
require.True(t, pgConn.NextResult(context.Background())) batch.ExecPrepared("ps1", [][]byte{[]byte("ExecPrepared 1")}, nil, nil)
resultReader := pgConn.ResultReader() batch.ExecParams("select $1::text", [][]byte{[]byte("ExecParams 2")}, nil, nil, nil)
results, err := pgConn.ExecBatch(context.Background(), batch).ReadAll()
rows := [][][]byte{}
for resultReader.NextRow() {
row := make([][]byte, len(resultReader.Values()))
copy(row, resultReader.Values())
rows = append(rows, row)
}
require.Len(t, rows, 1)
require.Len(t, rows[0], 1)
assert.Equal(t, "SendExec 1", string(rows[0][0]))
commandTag, err := resultReader.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
// "SendExecParams 1"
require.True(t, pgConn.NextResult(context.Background()))
resultReader = pgConn.ResultReader()
rows = [][][]byte{}
for resultReader.NextRow() {
row := make([][]byte, len(resultReader.Values()))
copy(row, resultReader.Values())
rows = append(rows, row)
}
require.Len(t, rows, 1)
require.Len(t, rows[0], 1)
assert.Equal(t, "SendExecParams 1", string(rows[0][0]))
commandTag, err = resultReader.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
// "SendExecPrepared 1"
require.True(t, pgConn.NextResult(context.Background()))
resultReader = pgConn.ResultReader()
rows = [][][]byte{}
for resultReader.NextRow() {
row := make([][]byte, len(resultReader.Values()))
copy(row, resultReader.Values())
rows = append(rows, row)
}
require.Len(t, rows, 1)
require.Len(t, rows[0], 1)
assert.Equal(t, "SendExecPrepared 1", string(rows[0][0]))
commandTag, err = resultReader.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
// "SendExec 2"
require.True(t, pgConn.NextResult(context.Background()))
resultReader = pgConn.ResultReader()
rows = [][][]byte{}
for resultReader.NextRow() {
row := make([][]byte, len(resultReader.Values()))
copy(row, resultReader.Values())
rows = append(rows, row)
}
require.Len(t, rows, 1)
require.Len(t, rows[0], 1)
assert.Equal(t, "SendExec 2", string(rows[0][0]))
commandTag, err = resultReader.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
// "SendExecParams 2"
require.True(t, pgConn.NextResult(context.Background()))
resultReader = pgConn.ResultReader()
rows = [][][]byte{}
for resultReader.NextRow() {
row := make([][]byte, len(resultReader.Values()))
copy(row, resultReader.Values())
rows = append(rows, row)
}
require.Len(t, rows, 1)
require.Len(t, rows[0], 1)
assert.Equal(t, "SendExecParams 2", string(rows[0][0]))
commandTag, err = resultReader.Close()
assert.Equal(t, "SELECT 1", string(commandTag))
assert.Nil(t, err)
// Done
require.False(t, pgConn.NextResult(context.Background()))
ensureConnValid(t, pgConn)
}
func TestConnRecoverFromTimeout(t *testing.T) {
t.Parallel()
pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) require.Len(t, results, 3)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) require.Len(t, results[0].Rows, 1)
result, err := pgConn.Exec(ctx, "select current_database(), pg_sleep(1)") require.Equal(t, "ExecParams 1", string(results[0].Rows[0][0]))
cancel() assert.Equal(t, "SELECT 1", string(results[0].CommandTag))
require.Nil(t, result)
assert.Equal(t, context.DeadlineExceeded, err)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) require.Len(t, results[1].Rows, 1)
if assert.True(t, pgConn.RecoverFromTimeout(ctx)) { require.Equal(t, "ExecPrepared 1", string(results[1].Rows[0][0]))
result, err := pgConn.Exec(ctx, "select 1") assert.Equal(t, "SELECT 1", string(results[1].CommandTag))
require.Nil(t, err)
assert.Len(t, result.Rows, 1)
assert.Len(t, result.Rows[0], 1)
assert.Equal(t, "1", string(result.Rows[0][0]))
}
cancel()
ensureConnValid(t, pgConn) require.Len(t, results[2].Rows, 1)
} require.Equal(t, "ExecParams 2", string(results[2].Rows[0][0]))
assert.Equal(t, "SELECT 1", string(results[2].CommandTag))
func TestConnCancelQuery(t *testing.T) {
t.Parallel()
pgConn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
require.Nil(t, err)
defer closeConn(t, pgConn)
pgConn.SendExec("select current_database(), pg_sleep(5)")
err = pgConn.Flush(context.Background())
require.Nil(t, err)
err = pgConn.CancelRequest(context.Background())
require.Nil(t, err)
require.True(t, pgConn.NextResult(context.Background()))
_, err = pgConn.ResultReader().Close()
if err, ok := err.(*pgconn.PgError); ok {
assert.Equal(t, "57014", err.Code)
} else {
t.Errorf("expected pgconn.PgError got %v", err)
}
require.False(t, pgConn.NextResult(context.Background()))
ensureConnValid(t, pgConn)
} }
func TestCommandTag(t *testing.T) { func TestCommandTag(t *testing.T) {
@ -593,10 +503,11 @@ func TestConnOnNotice(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer closeConn(t, pgConn) defer closeConn(t, pgConn)
_, err = pgConn.Exec(context.Background(), `do $$ multiResult := pgConn.Exec(context.Background(), `do $$
begin begin
raise notice 'hello, world'; raise notice 'hello, world';
end$$;`) end$$;`)
err = multiResult.Close()
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, "hello, world", msg) assert.Equal(t, "hello, world", msg)