mirror of https://github.com/jackc/pgx.git
update TestConnContextCanceledCancelsRunningQueryOnServer
Check cancellation of the request for pgbouncerpull/1789/head
parent
6ca3d8ed4e
commit
5d0f904831
|
@ -17,16 +17,19 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/internal/pgio"
|
"github.com/jackc/pgx/v5/internal/pgio"
|
||||||
"github.com/jackc/pgx/v5/internal/pgmock"
|
"github.com/jackc/pgx/v5/internal/pgmock"
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
"github.com/jackc/pgx/v5/pgproto3"
|
"github.com/jackc/pgx/v5/pgproto3"
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
"github.com/jackc/pgx/v5/pgtype"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const pgbouncerConnStringEnvVar = "PGX_TEST_PGBOUNCER_CONN_STRING"
|
||||||
|
|
||||||
func TestConnect(t *testing.T) {
|
func TestConnect(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -2256,18 +2259,44 @@ func TestConnCancelRequest(t *testing.T) {
|
||||||
func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) {
|
func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("postgres", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testConnContextCanceledCancelsRunningQueryOnServer(t, os.Getenv("PGX_TEST_DATABASE"), "postgres")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("pgbouncer", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
connString := os.Getenv(pgbouncerConnStringEnvVar)
|
||||||
|
if connString == "" {
|
||||||
|
t.Skipf("Skipping due to missing environment variable %v", pgbouncerConnStringEnvVar)
|
||||||
|
}
|
||||||
|
|
||||||
|
testConnContextCanceledCancelsRunningQueryOnServer(t, connString, "pgbouncer")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testConnContextCanceledCancelsRunningQueryOnServer(t *testing.T, connString, dbType string) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
pgConn, err := pgconn.Connect(ctx, connString)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer closeConn(t, pgConn)
|
defer closeConn(t, pgConn)
|
||||||
|
|
||||||
pid := pgConn.PID()
|
|
||||||
|
|
||||||
ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
|
ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
multiResult := pgConn.Exec(ctx, "select 'Hello, world', pg_sleep(30)")
|
|
||||||
|
// Getting the actual PostgreSQL server process ID (PID) from a query executed through pgbouncer is not straightforward
|
||||||
|
// because pgbouncer abstracts the underlying database connections, and it doesn't expose the PID of the PostgreSQL
|
||||||
|
// server process to clients. However, we can check if the query is running by checking the generated query ID.
|
||||||
|
queryID := fmt.Sprintf("%s testConnContextCanceled %d", dbType, time.Now().UnixNano())
|
||||||
|
|
||||||
|
multiResult := pgConn.Exec(ctx, fmt.Sprintf(`
|
||||||
|
-- %v
|
||||||
|
select 'Hello, world', pg_sleep(30)
|
||||||
|
`, queryID))
|
||||||
|
|
||||||
for multiResult.NextResult() {
|
for multiResult.NextResult() {
|
||||||
}
|
}
|
||||||
|
@ -2283,7 +2312,7 @@ func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) {
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
otherConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
otherConn, err := pgconn.Connect(ctx, connString)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer closeConn(t, otherConn)
|
defer closeConn(t, otherConn)
|
||||||
|
|
||||||
|
@ -2292,8 +2321,8 @@ func TestConnContextCanceledCancelsRunningQueryOnServer(t *testing.T) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
result := otherConn.ExecParams(ctx,
|
result := otherConn.ExecParams(ctx,
|
||||||
`select 1 from pg_stat_activity where pid=$1`,
|
`select 1 from pg_stat_activity where query like $1`,
|
||||||
[][]byte{[]byte(strconv.FormatInt(int64(pid), 10))},
|
[][]byte{[]byte("%" + queryID + "%")},
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
nil,
|
nil,
|
||||||
|
|
Loading…
Reference in New Issue