package pgx_test

import (
	"bytes"
	"context"
	"database/sql"
	"os"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgconn"
	"github.com/jackc/pgx/v5/pgtype"
	"github.com/jackc/pgx/v5/pgxtest"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

func TestCrateDBConnect(t *testing.T) {
	t.Parallel()

	connString := os.Getenv("PGX_TEST_CRATEDB_CONN_STRING")
	if connString == "" {
		t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_CRATEDB_CONN_STRING")
	}

	conn, err := pgx.Connect(context.Background(), connString)
	require.Nil(t, err)
	defer closeConn(t, conn)

	assert.Equal(t, connString, conn.Config().ConnString())

	var result int
	err = conn.QueryRow(context.Background(), "select 1 +1").Scan(&result)
	if err != nil {
		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
	}
	if result != 2 {
		t.Errorf("bad result: %d", result)
	}
}

func TestConnect(t *testing.T) {
	t.Parallel()

	connString := os.Getenv("PGX_TEST_DATABASE")
	config := mustParseConfig(t, connString)

	conn, err := pgx.ConnectConfig(context.Background(), config)
	if err != nil {
		t.Fatalf("Unable to establish connection: %v", err)
	}

	assertConfigsEqual(t, config, conn.Config(), "Conn.Config() returns original config")

	var currentDB string
	err = conn.QueryRow(context.Background(), "select current_database()").Scan(&currentDB)
	if err != nil {
		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
	}
	if currentDB != config.Config.Database {
		t.Errorf("Did not connect to specified database (%v)", config.Config.Database)
	}

	var user string
	err = conn.QueryRow(context.Background(), "select current_user").Scan(&user)
	if err != nil {
		t.Fatalf("QueryRow Scan unexpectedly failed: %v", err)
	}
	if user != config.Config.User {
		t.Errorf("Did not connect as specified user (%v)", config.Config.User)
	}

	err = conn.Close(context.Background())
	if err != nil {
		t.Fatal("Unable to close connection")
	}
}

func TestConnectWithPreferSimpleProtocol(t *testing.T) {
	t.Parallel()

	connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
	connConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol

	conn := mustConnect(t, connConfig)
	defer closeConn(t, conn)

	// If simple protocol is used we should be able to correctly scan the result
	// into a pgtype.Text as the integer will have been encoded in text.

	var s pgtype.Text
	err := conn.QueryRow(context.Background(), "select $1::int4", 42).Scan(&s)
	require.NoError(t, err)
	require.Equal(t, pgtype.Text{String: "42", Valid: true}, s)

	ensureConnValid(t, conn)
}

func TestConnectConfigRequiresConnConfigFromParseConfig(t *testing.T) {
	config := &pgx.ConnConfig{}
	require.PanicsWithValue(t, "config must be created by ParseConfig", func() {
		pgx.ConnectConfig(context.Background(), config)
	})
}

func TestConfigContainsConnStr(t *testing.T) {
	connStr := os.Getenv("PGX_TEST_DATABASE")
	config, err := pgx.ParseConfig(connStr)
	require.NoError(t, err)
	assert.Equal(t, connStr, config.ConnString())
}

func TestConfigCopyReturnsEqualConfig(t *testing.T) {
	connString := "postgres://jack:secret@localhost:5432/mydb?application_name=pgxtest&search_path=myschema&connect_timeout=5"
	original, err := pgx.ParseConfig(connString)
	require.NoError(t, err)

	copied := original.Copy()
	assertConfigsEqual(t, original, copied, t.Name())
}

func TestConfigCopyCanBeUsedToConnect(t *testing.T) {
	connString := os.Getenv("PGX_TEST_DATABASE")
	original, err := pgx.ParseConfig(connString)
	require.NoError(t, err)

	copied := original.Copy()
	assert.NotPanics(t, func() {
		_, err = pgx.ConnectConfig(context.Background(), copied)
	})
	assert.NoError(t, err)
}

func TestParseConfigExtractsStatementCacheOptions(t *testing.T) {
	t.Parallel()

	config, err := pgx.ParseConfig("statement_cache_capacity=0")
	require.NoError(t, err)
	require.EqualValues(t, 0, config.StatementCacheCapacity)

	config, err = pgx.ParseConfig("statement_cache_capacity=42")
	require.NoError(t, err)
	require.EqualValues(t, 42, config.StatementCacheCapacity)

	config, err = pgx.ParseConfig("description_cache_capacity=0")
	require.NoError(t, err)
	require.EqualValues(t, 0, config.DescriptionCacheCapacity)

	config, err = pgx.ParseConfig("description_cache_capacity=42")
	require.NoError(t, err)
	require.EqualValues(t, 42, config.DescriptionCacheCapacity)

	//	default_query_exec_mode
	//		Possible values: "cache_statement", "cache_describe", "describe_exec", "exec", and "simple_protocol". See

	config, err = pgx.ParseConfig("default_query_exec_mode=cache_statement")
	require.NoError(t, err)
	require.Equal(t, pgx.QueryExecModeCacheStatement, config.DefaultQueryExecMode)

	config, err = pgx.ParseConfig("default_query_exec_mode=cache_describe")
	require.NoError(t, err)
	require.Equal(t, pgx.QueryExecModeCacheDescribe, config.DefaultQueryExecMode)

	config, err = pgx.ParseConfig("default_query_exec_mode=describe_exec")
	require.NoError(t, err)
	require.Equal(t, pgx.QueryExecModeDescribeExec, config.DefaultQueryExecMode)

	config, err = pgx.ParseConfig("default_query_exec_mode=exec")
	require.NoError(t, err)
	require.Equal(t, pgx.QueryExecModeExec, config.DefaultQueryExecMode)

	config, err = pgx.ParseConfig("default_query_exec_mode=simple_protocol")
	require.NoError(t, err)
	require.Equal(t, pgx.QueryExecModeSimpleProtocol, config.DefaultQueryExecMode)
}

func TestParseConfigExtractsDefaultQueryExecMode(t *testing.T) {
	t.Parallel()

	for _, tt := range []struct {
		connString           string
		defaultQueryExecMode pgx.QueryExecMode
	}{
		{"", pgx.QueryExecModeCacheStatement},
		{"default_query_exec_mode=cache_statement", pgx.QueryExecModeCacheStatement},
		{"default_query_exec_mode=cache_describe", pgx.QueryExecModeCacheDescribe},
		{"default_query_exec_mode=describe_exec", pgx.QueryExecModeDescribeExec},
		{"default_query_exec_mode=exec", pgx.QueryExecModeExec},
		{"default_query_exec_mode=simple_protocol", pgx.QueryExecModeSimpleProtocol},
	} {
		config, err := pgx.ParseConfig(tt.connString)
		require.NoError(t, err)
		require.Equalf(t, tt.defaultQueryExecMode, config.DefaultQueryExecMode, "connString: `%s`", tt.connString)
		require.Empty(t, config.RuntimeParams["default_query_exec_mode"])
	}
}

func TestParseConfigErrors(t *testing.T) {
	t.Parallel()

	for _, tt := range []struct {
		connString           string
		expectedErrSubstring string
	}{
		{"default_query_exec_mode=does_not_exist", "does_not_exist"},
	} {
		config, err := pgx.ParseConfig(tt.connString)
		require.Nil(t, config)
		require.ErrorContains(t, err, tt.expectedErrSubstring)
	}
}

func TestExec(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		if results := mustExec(t, conn, "create temporary table foo(id integer primary key);"); results.String() != "CREATE TABLE" {
			t.Error("Unexpected results from Exec")
		}

		// Accept parameters
		if results := mustExec(t, conn, "insert into foo(id) values($1)", 1); results.String() != "INSERT 0 1" {
			t.Errorf("Unexpected results from Exec: %v", results)
		}

		if results := mustExec(t, conn, "drop table foo;"); results.String() != "DROP TABLE" {
			t.Error("Unexpected results from Exec")
		}

		// Multiple statements can be executed -- last command tag is returned
		if results := mustExec(t, conn, "create temporary table foo(id serial primary key); drop table foo;"); results.String() != "DROP TABLE" {
			t.Error("Unexpected results from Exec")
		}

		// Can execute longer SQL strings than sharedBufferSize
		if results := mustExec(t, conn, strings.Repeat("select 42; ", 1000)); results.String() != "SELECT 1" {
			t.Errorf("Unexpected results from Exec: %v", results)
		}

		// Exec no-op which does not return a command tag
		if results := mustExec(t, conn, "--;"); results.String() != "" {
			t.Errorf("Unexpected results from Exec: %v", results)
		}
	})
}

type testQueryRewriter struct {
	sql  string
	args []any
}

func (qr *testQueryRewriter) RewriteQuery(ctx context.Context, conn *pgx.Conn, sql string, args []any) (newSQL string, newArgs []any, err error) {
	return qr.sql, qr.args, nil
}

func TestExecWithQueryRewriter(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		qr := testQueryRewriter{sql: "select $1::int", args: []any{42}}
		_, err := conn.Exec(ctx, "should be replaced", &qr)
		require.NoError(t, err)
	})
}

func TestExecFailure(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		if _, err := conn.Exec(context.Background(), "selct;"); err == nil {
			t.Fatal("Expected SQL syntax error")
		}

		rows, _ := conn.Query(context.Background(), "select 1")
		rows.Close()
		if rows.Err() != nil {
			t.Fatalf("Exec failure appears to have broken connection: %v", rows.Err())
		}
	})
}

func TestExecFailureWithArguments(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		_, err := conn.Exec(context.Background(), "selct $1;", 1)
		if err == nil {
			t.Fatal("Expected SQL syntax error")
		}
		assert.False(t, pgconn.SafeToRetry(err))

		_, err = conn.Exec(context.Background(), "select $1::varchar(1);", "1", "2")
		require.Error(t, err)
	})
}

func TestExecContextWithoutCancelation(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		ctx, cancelFunc := context.WithCancel(ctx)
		defer cancelFunc()

		commandTag, err := conn.Exec(ctx, "create temporary table foo(id integer primary key);")
		if err != nil {
			t.Fatal(err)
		}
		if commandTag.String() != "CREATE TABLE" {
			t.Fatalf("Unexpected results from Exec: %v", commandTag)
		}
		assert.False(t, pgconn.SafeToRetry(err))
	})
}

func TestExecContextFailureWithoutCancelation(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		ctx, cancelFunc := context.WithCancel(ctx)
		defer cancelFunc()

		_, err := conn.Exec(ctx, "selct;")
		if err == nil {
			t.Fatal("Expected SQL syntax error")
		}
		assert.False(t, pgconn.SafeToRetry(err))

		rows, _ := conn.Query(context.Background(), "select 1")
		rows.Close()
		if rows.Err() != nil {
			t.Fatalf("ExecEx failure appears to have broken connection: %v", rows.Err())
		}
		assert.False(t, pgconn.SafeToRetry(err))
	})
}

func TestExecContextFailureWithoutCancelationWithArguments(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		ctx, cancelFunc := context.WithCancel(ctx)
		defer cancelFunc()

		_, err := conn.Exec(ctx, "selct $1;", 1)
		if err == nil {
			t.Fatal("Expected SQL syntax error")
		}
		assert.False(t, pgconn.SafeToRetry(err))
	})
}

func TestExecFailureCloseBefore(t *testing.T) {
	t.Parallel()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	closeConn(t, conn)

	_, err := conn.Exec(context.Background(), "select 1")
	require.Error(t, err)
	assert.True(t, pgconn.SafeToRetry(err))
}

func TestExecPerQuerySimpleProtocol(t *testing.T) {
	t.Parallel()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	ctx, cancelFunc := context.WithCancel(context.Background())
	defer cancelFunc()

	commandTag, err := conn.Exec(ctx, "create temporary table foo(name varchar primary key);")
	if err != nil {
		t.Fatal(err)
	}
	if commandTag.String() != "CREATE TABLE" {
		t.Fatalf("Unexpected results from Exec: %v", commandTag)
	}

	commandTag, err = conn.Exec(ctx,
		"insert into foo(name) values($1);",
		pgx.QueryExecModeSimpleProtocol,
		"bar'; drop table foo;--",
	)
	if err != nil {
		t.Fatal(err)
	}
	if commandTag.String() != "INSERT 0 1" {
		t.Fatalf("Unexpected results from Exec: %v", commandTag)
	}

}

func TestPrepare(t *testing.T) {
	t.Parallel()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	_, err := conn.Prepare(context.Background(), "test", "select $1::varchar")
	if err != nil {
		t.Errorf("Unable to prepare statement: %v", err)
		return
	}

	var s string
	err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
	if err != nil {
		t.Errorf("Executing prepared statement failed: %v", err)
	}

	if s != "hello" {
		t.Errorf("Prepared statement did not return expected value: %v", s)
	}

	err = conn.Deallocate(context.Background(), "test")
	if err != nil {
		t.Errorf("conn.Deallocate failed: %v", err)
	}

	// Create another prepared statement to ensure Deallocate left the connection
	// in a working state and that we can reuse the prepared statement name.

	_, err = conn.Prepare(context.Background(), "test", "select $1::integer")
	if err != nil {
		t.Errorf("Unable to prepare statement: %v", err)
		return
	}

	var n int32
	err = conn.QueryRow(context.Background(), "test", int32(1)).Scan(&n)
	if err != nil {
		t.Errorf("Executing prepared statement failed: %v", err)
	}

	if n != 1 {
		t.Errorf("Prepared statement did not return expected value: %v", s)
	}

	err = conn.DeallocateAll(context.Background())
	if err != nil {
		t.Errorf("conn.Deallocate failed: %v", err)
	}
}

func TestPrepareBadSQLFailure(t *testing.T) {
	t.Parallel()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	if _, err := conn.Prepare(context.Background(), "badSQL", "select foo"); err == nil {
		t.Fatal("Prepare should have failed with syntax error")
	}

	ensureConnValid(t, conn)
}

func TestPrepareIdempotency(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		for i := 0; i < 2; i++ {
			_, err := conn.Prepare(context.Background(), "test", "select 42::integer")
			if err != nil {
				t.Fatalf("%d. Unable to prepare statement: %v", i, err)
			}

			var n int32
			err = conn.QueryRow(context.Background(), "test").Scan(&n)
			if err != nil {
				t.Errorf("%d. Executing prepared statement failed: %v", i, err)
			}

			if n != int32(42) {
				t.Errorf("%d. Prepared statement did not return expected value: %v", i, n)
			}
		}

		_, err := conn.Prepare(context.Background(), "test", "select 'fail'::varchar")
		if err == nil {
			t.Fatalf("Prepare statement with same name but different SQL should have failed but it didn't")
			return
		}
	})
}

func TestPrepareStatementCacheModes(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		_, err := conn.Prepare(context.Background(), "test", "select $1::text")
		require.NoError(t, err)

		var s string
		err = conn.QueryRow(context.Background(), "test", "hello").Scan(&s)
		require.NoError(t, err)
		require.Equal(t, "hello", s)
	})
}

func TestPrepareWithDigestedName(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		sql := "select $1::text"
		sd, err := conn.Prepare(ctx, sql, sql)
		require.NoError(t, err)
		require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)

		var s string
		err = conn.QueryRow(ctx, sql, "hello").Scan(&s)
		require.NoError(t, err)
		require.Equal(t, "hello", s)

		err = conn.Deallocate(ctx, sql)
		require.NoError(t, err)
	})
}

// https://github.com/jackc/pgx/pull/1795
func TestDeallocateInAbortedTransaction(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		tx, err := conn.Begin(ctx)
		require.NoError(t, err)

		sql := "select $1::text"
		sd, err := tx.Prepare(ctx, sql, sql)
		require.NoError(t, err)
		require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)

		var s string
		err = tx.QueryRow(ctx, sql, "hello").Scan(&s)
		require.NoError(t, err)
		require.Equal(t, "hello", s)

		_, err = tx.Exec(ctx, "select 1/0") // abort transaction with divide by zero error
		require.Error(t, err)

		err = conn.Deallocate(ctx, sql)
		require.NoError(t, err)

		err = tx.Rollback(ctx)
		require.NoError(t, err)

		sd, err = conn.Prepare(ctx, sql, sql)
		require.NoError(t, err)
		require.Equal(t, "stmt_2510cc7db17de3f42758a2a29c8b9ef8305d007b997ebdd6", sd.Name)
	})
}

func TestDeallocateMissingPreparedStatementStillClearsFromPreparedStatementMap(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		_, err := conn.Prepare(ctx, "ps", "select $1::text")
		require.NoError(t, err)

		_, err = conn.Exec(ctx, "deallocate ps")
		require.NoError(t, err)

		err = conn.Deallocate(ctx, "ps")
		require.NoError(t, err)

		_, err = conn.Prepare(ctx, "ps", "select $1::text, $2::text")
		require.NoError(t, err)

		var s1, s2 string
		err = conn.QueryRow(ctx, "ps", "hello", "world").Scan(&s1, &s2)
		require.NoError(t, err)
		require.Equal(t, "hello", s1)
		require.Equal(t, "world", s2)
	})
}

func TestListenNotify(t *testing.T) {
	t.Parallel()

	listener := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, listener)

	if listener.PgConn().ParameterStatus("crdb_version") != "" {
		t.Skip("Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
	}

	mustExec(t, listener, "listen chat")

	notifier := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, notifier)

	mustExec(t, notifier, "notify chat")

	// when notification is waiting on the socket to be read
	notification, err := listener.WaitForNotification(context.Background())
	require.NoError(t, err)
	assert.Equal(t, "chat", notification.Channel)

	// when notification has already been read during previous query
	mustExec(t, notifier, "notify chat")
	rows, _ := listener.Query(context.Background(), "select 1")
	rows.Close()
	require.NoError(t, rows.Err())

	ctx, cancelFn := context.WithCancel(context.Background())
	cancelFn()
	notification, err = listener.WaitForNotification(ctx)
	require.NoError(t, err)
	assert.Equal(t, "chat", notification.Channel)

	// when timeout occurs
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
	defer cancel()
	notification, err = listener.WaitForNotification(ctx)
	assert.True(t, pgconn.Timeout(err))
	assert.Nil(t, notification)

	// listener can listen again after a timeout
	mustExec(t, notifier, "notify chat")
	notification, err = listener.WaitForNotification(context.Background())
	require.NoError(t, err)
	assert.Equal(t, "chat", notification.Channel)
}

func TestListenNotifyWhileBusyIsSafe(t *testing.T) {
	t.Parallel()

	func() {
		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
		defer closeConn(t, conn)
		pgxtest.SkipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")
	}()

	listenerDone := make(chan bool)
	notifierDone := make(chan bool)
	listening := make(chan bool)
	go func() {
		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
		defer closeConn(t, conn)
		defer func() {
			listenerDone <- true
		}()

		mustExec(t, conn, "listen busysafe")
		listening <- true

		for i := 0; i < 5000; i++ {
			var sum int32
			var rowCount int32

			rows, err := conn.Query(context.Background(), "select generate_series(1,$1)", 100)
			if err != nil {
				t.Errorf("conn.Query failed: %v", err)
				return
			}

			for rows.Next() {
				var n int32
				if err := rows.Scan(&n); err != nil {
					t.Errorf("Row scan failed: %v", err)
					return
				}
				sum += n
				rowCount++
			}

			if rows.Err() != nil {
				t.Errorf("conn.Query failed: %v", rows.Err())
				return
			}

			if sum != 5050 {
				t.Errorf("Wrong rows sum: %v", sum)
				return
			}

			if rowCount != 100 {
				t.Errorf("Wrong number of rows: %v", rowCount)
				return
			}
		}
	}()

	go func() {
		conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
		defer closeConn(t, conn)
		defer func() {
			notifierDone <- true
		}()

		<-listening

		for i := 0; i < 100000; i++ {
			mustExec(t, conn, "notify busysafe, 'hello'")
		}
	}()

	<-listenerDone
	<-notifierDone
}

func TestListenNotifySelfNotification(t *testing.T) {
	t.Parallel()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	pgxtest.SkipCockroachDB(t, conn, "Server does not support LISTEN / NOTIFY (https://github.com/cockroachdb/cockroach/issues/41522)")

	mustExec(t, conn, "listen self")

	// Notify self and WaitForNotification immediately
	mustExec(t, conn, "notify self")

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	notification, err := conn.WaitForNotification(ctx)
	require.NoError(t, err)
	assert.Equal(t, "self", notification.Channel)

	// Notify self and do something else before WaitForNotification
	mustExec(t, conn, "notify self")

	rows, _ := conn.Query(context.Background(), "select 1")
	rows.Close()
	if rows.Err() != nil {
		t.Fatalf("Unexpected error on Query: %v", rows.Err())
	}

	ctx, cncl := context.WithTimeout(context.Background(), time.Second)
	defer cncl()
	notification, err = conn.WaitForNotification(ctx)
	require.NoError(t, err)
	assert.Equal(t, "self", notification.Channel)
}

func TestFatalRxError(t *testing.T) {
	t.Parallel()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	pgxtest.SkipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		var n int32
		var s string
		err := conn.QueryRow(context.Background(), "select 1::int4, pg_sleep(10)::varchar").Scan(&n, &s)
		if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Severity == "FATAL" {
		} else {
			t.Errorf("Expected QueryRow Scan to return fatal PgError, but instead received %v", err)
			return
		}
	}()

	otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer otherConn.Close(context.Background())

	if _, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID()); err != nil {
		t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
	}

	wg.Wait()

	if !conn.IsClosed() {
		t.Fatal("Connection should be closed")
	}
}

func TestFatalTxError(t *testing.T) {
	t.Parallel()

	// Run timing sensitive test many times
	for i := 0; i < 50; i++ {
		func() {
			conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
			defer closeConn(t, conn)

			pgxtest.SkipCockroachDB(t, conn, "Server does not support pg_terminate_backend() (https://github.com/cockroachdb/cockroach/issues/35897)")

			otherConn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
			defer otherConn.Close(context.Background())

			_, err := otherConn.Exec(context.Background(), "select pg_terminate_backend($1)", conn.PgConn().PID())
			if err != nil {
				t.Fatalf("Unable to kill backend PostgreSQL process: %v", err)
			}

			err = conn.QueryRow(context.Background(), "select 1").Scan(nil)
			if err == nil {
				t.Fatal("Expected error but none occurred")
			}

			if !conn.IsClosed() {
				t.Fatalf("Connection should be closed but isn't. Previous Query err: %v", err)
			}
		}()
	}
}

func TestInsertBoolArray(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		if results := mustExec(t, conn, "create temporary table foo(spice bool[]);"); results.String() != "CREATE TABLE" {
			t.Error("Unexpected results from Exec")
		}

		// Accept parameters
		if results := mustExec(t, conn, "insert into foo(spice) values($1)", []bool{true, false, true}); results.String() != "INSERT 0 1" {
			t.Errorf("Unexpected results from Exec: %v", results)
		}
	})
}

func TestInsertTimestampArray(t *testing.T) {
	t.Parallel()

	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		if results := mustExec(t, conn, "create temporary table foo(spice timestamp[]);"); results.String() != "CREATE TABLE" {
			t.Error("Unexpected results from Exec")
		}

		// Accept parameters
		if results := mustExec(t, conn, "insert into foo(spice) values($1)", []time.Time{time.Unix(1419143667, 0), time.Unix(1419143672, 0)}); results.String() != "INSERT 0 1" {
			t.Errorf("Unexpected results from Exec: %v", results)
		}
	})
}

func TestIdentifierSanitize(t *testing.T) {
	t.Parallel()

	tests := []struct {
		ident    pgx.Identifier
		expected string
	}{
		{
			ident:    pgx.Identifier{`foo`},
			expected: `"foo"`,
		},
		{
			ident:    pgx.Identifier{`select`},
			expected: `"select"`,
		},
		{
			ident:    pgx.Identifier{`foo`, `bar`},
			expected: `"foo"."bar"`,
		},
		{
			ident:    pgx.Identifier{`you should " not do this`},
			expected: `"you should "" not do this"`,
		},
		{
			ident:    pgx.Identifier{`you should " not do this`, `please don't`},
			expected: `"you should "" not do this"."please don't"`,
		},
		{
			ident:    pgx.Identifier{`you should ` + string([]byte{0}) + `not do this`},
			expected: `"you should not do this"`,
		},
	}

	for i, tt := range tests {
		qval := tt.ident.Sanitize()
		if qval != tt.expected {
			t.Errorf("%d. Expected Sanitize %v to return %v but it was %v", i, tt.ident, tt.expected, qval)
		}
	}
}

func TestConnInitTypeMap(t *testing.T) {
	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	// spot check that the standard postgres type names aren't qualified
	nameOIDs := map[string]uint32{
		"_int8": pgtype.Int8ArrayOID,
		"int8":  pgtype.Int8OID,
		"json":  pgtype.JSONOID,
		"text":  pgtype.TextOID,
	}
	for name, oid := range nameOIDs {
		dtByName, ok := conn.TypeMap().TypeForName(name)
		if !ok {
			t.Fatalf("Expected type named %v to be present", name)
		}
		dtByOID, ok := conn.TypeMap().TypeForOID(oid)
		if !ok {
			t.Fatalf("Expected type OID %v to be present", oid)
		}
		if dtByName != dtByOID {
			t.Fatalf("Expected type named %v to be the same as type OID %v", name, oid)
		}
	}

	ensureConnValid(t, conn)
}

func TestUnregisteredTypeUsableAsStringArgumentAndBaseResult(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")

		var n uint64
		err := conn.QueryRow(context.Background(), "select $1::uint64", "42").Scan(&n)
		if err != nil {
			t.Fatal(err)
		}

		if n != 42 {
			t.Fatalf("Expected n to be 42, but was %v", n)
		}
	})
}

func TestDomainType(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "Server does support domain types (https://github.com/cockroachdb/cockroach/issues/27796)")

		// Domain type uint64 is a PostgreSQL domain of underlying type numeric.

		// In the extended protocol preparing "select $1::uint64" appears to create a statement that expects a param OID of
		// uint64 but a result OID of the underlying numeric.

		var s string
		err := conn.QueryRow(ctx, "select $1::uint64", "24").Scan(&s)
		require.NoError(t, err)
		require.Equal(t, "24", s)

		// Register type
		uint64Type, err := conn.LoadType(ctx, "uint64")
		require.NoError(t, err)
		conn.TypeMap().RegisterType(uint64Type)

		var n uint64
		err = conn.QueryRow(ctx, "select $1::uint64", uint64(24)).Scan(&n)
		require.NoError(t, err)

		// String is still an acceptable argument after registration
		err = conn.QueryRow(ctx, "select $1::uint64", "7").Scan(&n)
		if err != nil {
			t.Fatal(err)
		}
		if n != 7 {
			t.Fatalf("Expected n to be 7, but was %v", n)
		}
	})
}

func TestLoadTypeSameNameInDifferentSchemas(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "Server does support composite types (https://github.com/cockroachdb/cockroach/issues/27792)")

		tx, err := conn.Begin(ctx)
		require.NoError(t, err)
		defer tx.Rollback(ctx)

		_, err = tx.Exec(ctx, `create schema pgx_a;
create type pgx_a.point as (a text, b text);
create schema pgx_b;
create type pgx_b.point as (c text);
`)
		require.NoError(t, err)

		// Register types
		for _, typename := range []string{"pgx_a.point", "pgx_b.point"} {
			// Obviously using conn while a tx is in use and registering a type after the connection has been established are
			// really bad practices, but for the sake of convenience we do it in the test here.
			dt, err := conn.LoadType(ctx, typename)
			require.NoError(t, err)
			conn.TypeMap().RegisterType(dt)
		}

		type aPoint struct {
			A string
			B string
		}

		type bPoint struct {
			C string
		}

		var a aPoint
		var b bPoint
		err = tx.QueryRow(ctx, `select '(foo,bar)'::pgx_a.point, '(baz)'::pgx_b.point`).Scan(&a, &b)
		require.NoError(t, err)
		require.Equal(t, aPoint{"foo", "bar"}, a)
		require.Equal(t, bPoint{"baz"}, b)
	})
}

func TestLoadCompositeType(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "Server does support composite types (https://github.com/cockroachdb/cockroach/issues/27792)")

		tx, err := conn.Begin(ctx)
		require.NoError(t, err)
		defer tx.Rollback(ctx)

		_, err = tx.Exec(ctx, "create type compositetype as (attr1 int, attr2 int)")
		require.NoError(t, err)

		_, err = tx.Exec(ctx, "alter type compositetype drop attribute attr1")
		require.NoError(t, err)

		_, err = conn.LoadType(ctx, "compositetype")
		require.NoError(t, err)
	})
}

func TestLoadRangeType(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "Server does support range types")

		tx, err := conn.Begin(ctx)
		require.NoError(t, err)
		defer tx.Rollback(ctx)

		_, err = tx.Exec(ctx, "create type examplefloatrange as range (subtype=float8, subtype_diff=float8mi)")
		require.NoError(t, err)

		// Register types
		newRangeType, err := conn.LoadType(ctx, "examplefloatrange")
		require.NoError(t, err)
		conn.TypeMap().RegisterType(newRangeType)
		conn.TypeMap().RegisterDefaultPgType(pgtype.Range[float64]{}, "examplefloatrange")

		var inputRangeType = pgtype.Range[float64]{
			Lower:     1.0,
			Upper:     2.0,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Inclusive,
			Valid:     true,
		}
		var outputRangeType pgtype.Range[float64]
		err = tx.QueryRow(ctx, "SELECT $1::examplefloatrange", inputRangeType).Scan(&outputRangeType)
		require.NoError(t, err)
		require.Equal(t, inputRangeType, outputRangeType)
	})
}

func TestLoadMultiRangeType(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "Server does support range types")
		pgxtest.SkipPostgreSQLVersionLessThan(t, conn, 14) // multirange data type was added in 14 postgresql

		tx, err := conn.Begin(ctx)
		require.NoError(t, err)
		defer tx.Rollback(ctx)

		_, err = tx.Exec(ctx, "create type examplefloatrange as range (subtype=float8, subtype_diff=float8mi, multirange_type_name=examplefloatmultirange)")
		require.NoError(t, err)

		// Register types
		newRangeType, err := conn.LoadType(ctx, "examplefloatrange")
		require.NoError(t, err)
		conn.TypeMap().RegisterType(newRangeType)
		conn.TypeMap().RegisterDefaultPgType(pgtype.Range[float64]{}, "examplefloatrange")

		newMultiRangeType, err := conn.LoadType(ctx, "examplefloatmultirange")
		require.NoError(t, err)
		conn.TypeMap().RegisterType(newMultiRangeType)
		conn.TypeMap().RegisterDefaultPgType(pgtype.Multirange[pgtype.Range[float64]]{}, "examplefloatmultirange")

		var inputMultiRangeType = pgtype.Multirange[pgtype.Range[float64]]{
			{
				Lower:     1.0,
				Upper:     2.0,
				LowerType: pgtype.Inclusive,
				UpperType: pgtype.Inclusive,
				Valid:     true,
			},
			{
				Lower:     3.0,
				Upper:     4.0,
				LowerType: pgtype.Exclusive,
				UpperType: pgtype.Exclusive,
				Valid:     true,
			},
		}
		var outputMultiRangeType pgtype.Multirange[pgtype.Range[float64]]
		err = tx.QueryRow(ctx, "SELECT $1::examplefloatmultirange", inputMultiRangeType).Scan(&outputMultiRangeType)
		require.NoError(t, err)
		require.Equal(t, inputMultiRangeType, outputMultiRangeType)
	})
}

func TestStmtCacheInvalidationConn(t *testing.T) {
	ctx := context.Background()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	// create a table and fill it with some data
	_, err := conn.Exec(ctx, `
        DROP TABLE IF EXISTS drop_cols;
        CREATE TABLE drop_cols (
            id SERIAL PRIMARY KEY NOT NULL,
            f1 int NOT NULL,
            f2 int NOT NULL
        );
    `)
	require.NoError(t, err)
	_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
	require.NoError(t, err)

	getSQL := "SELECT * FROM drop_cols WHERE id = $1"

	// This query will populate the statement cache. We don't care about the result.
	rows, err := conn.Query(ctx, getSQL, 1)
	require.NoError(t, err)
	rows.Close()
	require.NoError(t, rows.Err())

	// Now, change the schema of the table out from under the statement, making it invalid.
	_, err = conn.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
	require.NoError(t, err)

	// We must get an error the first time we try to re-execute a bad statement.
	// It is up to the application to determine if it wants to try again. We punt to
	// the application because there is no clear recovery path in the case of failed transactions
	// or batch operations and because automatic retry is tricky and we don't want to get
	// it wrong at such an importaint layer of the stack.
	rows, err = conn.Query(ctx, getSQL, 1)
	require.NoError(t, err)
	rows.Next()
	nextErr := rows.Err()
	rows.Close()
	for _, err := range []error{nextErr, rows.Err()} {
		if err == nil {
			t.Fatal(`expected "cached plan must not change result type": no error`)
		}
		if !strings.Contains(err.Error(), "cached plan must not change result type") {
			t.Fatalf(`expected "cached plan must not change result type", got: "%s"`, err.Error())
		}
	}

	// On retry, the statement should have been flushed from the cache.
	rows, err = conn.Query(ctx, getSQL, 1)
	require.NoError(t, err)
	rows.Next()
	err = rows.Err()
	require.NoError(t, err)
	rows.Close()
	require.NoError(t, rows.Err())

	ensureConnValid(t, conn)
}

func TestStmtCacheInvalidationTx(t *testing.T) {
	ctx := context.Background()

	conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
	defer closeConn(t, conn)

	if conn.PgConn().ParameterStatus("crdb_version") != "" {
		t.Skip("Server has non-standard prepare in errored transaction behavior (https://github.com/cockroachdb/cockroach/issues/84140)")
	}

	// create a table and fill it with some data
	_, err := conn.Exec(ctx, `
        DROP TABLE IF EXISTS drop_cols;
        CREATE TABLE drop_cols (
            id SERIAL PRIMARY KEY NOT NULL,
            f1 int NOT NULL,
            f2 int NOT NULL
        );
    `)
	require.NoError(t, err)
	_, err = conn.Exec(ctx, "INSERT INTO drop_cols (f1, f2) VALUES (1, 2)")
	require.NoError(t, err)

	tx, err := conn.Begin(ctx)
	require.NoError(t, err)

	getSQL := "SELECT * FROM drop_cols WHERE id = $1"

	// This query will populate the statement cache. We don't care about the result.
	rows, err := tx.Query(ctx, getSQL, 1)
	require.NoError(t, err)
	rows.Close()
	require.NoError(t, rows.Err())

	// Now, change the schema of the table out from under the statement, making it invalid.
	_, err = tx.Exec(ctx, "ALTER TABLE drop_cols DROP COLUMN f1")
	require.NoError(t, err)

	// We must get an error the first time we try to re-execute a bad statement.
	// It is up to the application to determine if it wants to try again. We punt to
	// the application because there is no clear recovery path in the case of failed transactions
	// or batch operations and because automatic retry is tricky and we don't want to get
	// it wrong at such an importaint layer of the stack.
	rows, err = tx.Query(ctx, getSQL, 1)
	require.NoError(t, err)
	rows.Next()
	nextErr := rows.Err()
	rows.Close()
	for _, err := range []error{nextErr, rows.Err()} {
		if err == nil {
			t.Fatal(`expected "cached plan must not change result type": no error`)
		}
		if !strings.Contains(err.Error(), "cached plan must not change result type") {
			t.Fatalf(`expected "cached plan must not change result type", got: "%s"`, err.Error())
		}
	}

	rows, _ = tx.Query(ctx, getSQL, 1)
	rows.Close()
	err = rows.Err()
	// Retries within the same transaction are errors (really anything except a rollback
	// will be an error in this transaction).
	require.Error(t, err)
	rows.Close()

	err = tx.Rollback(ctx)
	require.NoError(t, err)

	// once we've rolled back, retries will work
	rows, err = conn.Query(ctx, getSQL, 1)
	require.NoError(t, err)
	rows.Next()
	err = rows.Err()
	require.NoError(t, err)
	rows.Close()

	ensureConnValid(t, conn)
}

func TestInsertDurationInterval(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		_, err := conn.Exec(context.Background(), "create temporary table t(duration INTERVAL(0) NOT NULL)")
		require.NoError(t, err)

		result, err := conn.Exec(context.Background(), "insert into t(duration) values($1)", time.Minute)
		require.NoError(t, err)

		n := result.RowsAffected()
		require.EqualValues(t, 1, n)
	})
}

func TestRawValuesUnderlyingMemoryReused(t *testing.T) {
	defaultConnTestRunner.RunTest(context.Background(), t, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		var buf []byte

		rows, err := conn.Query(ctx, `select 1::int`)
		require.NoError(t, err)

		for rows.Next() {
			buf = rows.RawValues()[0]
		}

		require.NoError(t, rows.Err())

		original := make([]byte, len(buf))
		copy(original, buf)

		for i := 0; i < 1_000_000; i++ {
			rows, err := conn.Query(ctx, `select $1::int`, i)
			require.NoError(t, err)
			rows.Close()
			require.NoError(t, rows.Err())

			if !bytes.Equal(original, buf) {
				return
			}
		}

		t.Fatal("expected buffer from RawValues to be overwritten by subsequent queries but it was not")
	})
}

// https://github.com/jackc/pgx/issues/1847
func TestConnDeallocateInvalidatedCachedStatementsWhenCanceled(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
	defer cancel()

	pgxtest.RunWithQueryExecModes(ctx, t, defaultConnTestRunner, nil, func(ctx context.Context, t testing.TB, conn *pgx.Conn) {
		pgxtest.SkipCockroachDB(t, conn, "CockroachDB returns decimal instead of integer for integer division")

		var n int32
		err := conn.QueryRow(ctx, "select 1 / $1::int", 1).Scan(&n)
		require.NoError(t, err)
		require.EqualValues(t, 1, n)

		// Divide by zero causes an error. baseRows.Close() calls Invalidate on the statement cache whenever an error was
		// encountered by the query. Use this to purposely invalidate the query. If we had access to private fields of conn
		// we could call conn.statementCache.InvalidateAll() instead.
		err = conn.QueryRow(ctx, "select 1 / $1::int", 0).Scan(&n)
		require.Error(t, err)

		ctx2, cancel2 := context.WithCancel(ctx)
		cancel2()
		err = conn.QueryRow(ctx2, "select 1 / $1::int", 1).Scan(&n)
		require.Error(t, err)
		require.ErrorIs(t, err, context.Canceled)

		err = conn.QueryRow(ctx, "select 1 / $1::int", 1).Scan(&n)
		require.NoError(t, err)
		require.EqualValues(t, 1, n)
	})
}

// https://github.com/jackc/pgx/issues/1847
func TestConnDeallocateInvalidatedCachedStatementsInTransactionWithBatch(t *testing.T) {
	t.Parallel()

	ctx := context.Background()

	connString := os.Getenv("PGX_TEST_DATABASE")
	config := mustParseConfig(t, connString)
	config.DefaultQueryExecMode = pgx.QueryExecModeCacheStatement
	config.StatementCacheCapacity = 2

	conn, err := pgx.ConnectConfig(ctx, config)
	require.NoError(t, err)

	tx, err := conn.Begin(ctx)
	require.NoError(t, err)
	defer tx.Rollback(ctx)

	_, err = tx.Exec(ctx, "select $1::int + 1", 1)
	require.NoError(t, err)

	_, err = tx.Exec(ctx, "select $1::int + 2", 1)
	require.NoError(t, err)

	// This should invalidate the first cached statement.
	_, err = tx.Exec(ctx, "select $1::int + 3", 1)
	require.NoError(t, err)

	batch := &pgx.Batch{}
	batch.Queue("select $1::int + 1", 1)
	err = tx.SendBatch(ctx, batch).Close()
	require.NoError(t, err)

	err = tx.Rollback(ctx)
	require.NoError(t, err)

	ensureConnValid(t, conn)
}

func TestErrNoRows(t *testing.T) {
	t.Parallel()

	// ensure we preserve old error message
	require.Equal(t, "no rows in result set", pgx.ErrNoRows.Error())

	require.ErrorIs(t, pgx.ErrNoRows, sql.ErrNoRows, "pgx.ErrNowRows must match sql.ErrNoRows")
	require.ErrorIs(t, pgx.ErrNoRows, pgx.ErrNoRows, "sql.ErrNowRows must match pgx.ErrNoRows")
}