pgx/copy_from_test.go
Jack Christensen 34da2fed95 Improve CopyFrom auto-conversion of text-ish values
CopyFrom requires that all values are encoded in the binary format. It
already tried to parse strings to values that can then be encoded into
the binary format. But it didn't handle types that can be encoded as
text and then parsed and converted to binary. It now does.
2024-02-03 09:49:56 -06:00

895 lines
22 KiB
Go

package pgx_test
import (
"context"
"fmt"
"os"
"reflect"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxtest"
"github.com/stretchr/testify/require"
)
func TestConnCopyWithAllQueryExecModes(t *testing.T) {
for _, mode := range pgxtest.AllQueryExecModes {
t.Run(mode.String(), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
cfg := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
cfg.DefaultQueryExecMode = mode
conn := mustConnect(t, cfg)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d text,
e timestamptz
)`)
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
inputRows := [][]any{
{int16(0), int32(1), int64(2), "abc", tzedTime},
{nil, nil, nil, nil, nil},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e"}, pgx.CopyFromRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyFrom: %v", err)
}
if int(copyCount) != len(inputRows) {
t.Errorf("Expected CopyFrom to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
})
}
}
func TestConnCopyWithKnownOIDQueryExecModes(t *testing.T) {
for _, mode := range pgxtest.KnownOIDQueryExecModes {
t.Run(mode.String(), func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
cfg := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
cfg.DefaultQueryExecMode = mode
conn := mustConnect(t, cfg)
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d varchar,
e text,
f date,
g timestamptz
)`)
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
inputRows := [][]any{
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
{nil, nil, nil, nil, nil, nil, nil},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyFrom: %v", err)
}
if int(copyCount) != len(inputRows) {
t.Errorf("Expected CopyFrom to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
})
}
}
func TestConnCopyFromSmall(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d varchar,
e text,
f date,
g timestamptz
)`)
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
inputRows := [][]any{
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
{nil, nil, nil, nil, nil, nil, nil},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g"}, pgx.CopyFromRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyFrom: %v", err)
}
if int(copyCount) != len(inputRows) {
t.Errorf("Expected CopyFrom to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
}
func TestConnCopyFromSliceSmall(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d varchar,
e text,
f date,
g timestamptz
)`)
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
inputRows := [][]any{
{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime},
{nil, nil, nil, nil, nil, nil, nil},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g"},
pgx.CopyFromSlice(len(inputRows), func(i int) ([]any, error) {
return inputRows[i], nil
}))
if err != nil {
t.Errorf("Unexpected error for CopyFrom: %v", err)
}
if int(copyCount) != len(inputRows) {
t.Errorf("Expected CopyFrom to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
}
func TestConnCopyFromLarge(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int2,
b int4,
c int8,
d varchar,
e text,
f date,
g timestamptz,
h bytea
)`)
tzedTime := time.Date(2010, 2, 3, 4, 5, 6, 0, time.Local)
inputRows := [][]any{}
for i := 0; i < 10000; i++ {
inputRows = append(inputRows, []any{int16(0), int32(1), int64(2), "abc", "efg", time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), tzedTime, []byte{111, 111, 111, 111}})
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f", "g", "h"}, pgx.CopyFromRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyFrom: %v", err)
}
if int(copyCount) != len(inputRows) {
t.Errorf("Expected CopyFrom to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal")
}
ensureConnValid(t, conn)
}
func TestConnCopyFromEnum(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
tx, err := conn.Begin(ctx)
require.NoError(t, err)
defer tx.Rollback(ctx)
_, err = tx.Exec(ctx, `drop type if exists color`)
require.NoError(t, err)
_, err = tx.Exec(ctx, `drop type if exists fruit`)
require.NoError(t, err)
_, err = tx.Exec(ctx, `create type color as enum ('blue', 'green', 'orange')`)
require.NoError(t, err)
_, err = tx.Exec(ctx, `create type fruit as enum ('apple', 'orange', 'grape')`)
require.NoError(t, err)
// Obviously using conn while a tx is in use and registering a type after the connection has been established are
// really bad practices, but for the sake of convenience we do it in the test here.
for _, name := range []string{"fruit", "color"} {
typ, err := conn.LoadType(ctx, name)
require.NoError(t, err)
conn.TypeMap().RegisterType(typ)
}
_, err = tx.Exec(ctx, `create temporary table foo(
a text,
b color,
c fruit,
d color,
e fruit,
f text
)`)
require.NoError(t, err)
inputRows := [][]any{
{"abc", "blue", "grape", "orange", "orange", "def"},
{nil, nil, nil, nil, nil, nil},
}
copyCount, err := tx.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b", "c", "d", "e", "f"}, pgx.CopyFromRows(inputRows))
require.NoError(t, err)
require.EqualValues(t, len(inputRows), copyCount)
rows, err := tx.Query(ctx, "select * from foo")
require.NoError(t, err)
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
require.NoError(t, err)
outputRows = append(outputRows, row)
}
require.NoError(t, rows.Err())
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
err = tx.Rollback(ctx)
require.NoError(t, err)
ensureConnValid(t, conn)
}
func TestConnCopyFromJSON(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
for _, typeName := range []string{"json", "jsonb"} {
if _, ok := conn.TypeMap().TypeForName(typeName); !ok {
return // No JSON/JSONB type -- must be running against old PostgreSQL
}
}
mustExec(t, conn, `create temporary table foo(
a json,
b jsonb
)`)
inputRows := [][]any{
{map[string]any{"foo": "bar"}, map[string]any{"bar": "quz"}},
{nil, nil},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b"}, pgx.CopyFromRows(inputRows))
if err != nil {
t.Errorf("Unexpected error for CopyFrom: %v", err)
}
if int(copyCount) != len(inputRows) {
t.Errorf("Expected CopyFrom to return %d copied rows, but got %d", len(inputRows), copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if !reflect.DeepEqual(inputRows, outputRows) {
t.Errorf("Input rows and output rows do not equal: %v -> %v", inputRows, outputRows)
}
ensureConnValid(t, conn)
}
type clientFailSource struct {
count int
err error
}
func (cfs *clientFailSource) Next() bool {
cfs.count++
return cfs.count < 100
}
func (cfs *clientFailSource) Values() ([]any, error) {
if cfs.count == 3 {
cfs.err = fmt.Errorf("client error")
return nil, cfs.err
}
return []any{make([]byte, 100000)}, nil
}
func (cfs *clientFailSource) Err() error {
return cfs.err
}
func TestConnCopyFromFailServerSideMidway(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int4,
b varchar not null
)`)
inputRows := [][]any{
{int32(1), "abc"},
{int32(2), nil}, // this row should trigger a failure
{int32(3), "def"},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b"}, pgx.CopyFromRows(inputRows))
if err == nil {
t.Errorf("Expected CopyFrom return error, but it did not")
}
if _, ok := err.(*pgconn.PgError); !ok {
t.Errorf("Expected CopyFrom return pgx.PgError, but instead it returned: %v", err)
}
if copyCount != 0 {
t.Errorf("Expected CopyFrom to return 0 copied rows, but got %d", copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
mustExec(t, conn, "truncate foo")
ensureConnValid(t, conn)
}
type failSource struct {
count int
}
func (fs *failSource) Next() bool {
time.Sleep(time.Millisecond * 100)
fs.count++
return fs.count < 100
}
func (fs *failSource) Values() ([]any, error) {
if fs.count == 3 {
return []any{nil}, nil
}
return []any{make([]byte, 100000)}, nil
}
func (fs *failSource) Err() error {
return nil
}
func TestConnCopyFromFailServerSideMidwayAbortsWithoutWaiting(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
pgxtest.SkipCockroachDB(t, conn, "Server copy error does not fail fast")
mustExec(t, conn, `create temporary table foo(
a bytea not null
)`)
startTime := time.Now()
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a"}, &failSource{})
if err == nil {
t.Errorf("Expected CopyFrom return error, but it did not")
}
if _, ok := err.(*pgconn.PgError); !ok {
t.Errorf("Expected CopyFrom return pgx.PgError, but instead it returned: %v", err)
}
if copyCount != 0 {
t.Errorf("Expected CopyFrom to return 0 copied rows, but got %d", copyCount)
}
endTime := time.Now()
copyTime := endTime.Sub(startTime)
if copyTime > time.Second {
t.Errorf("Failing CopyFrom shouldn't have taken so long: %v", copyTime)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
ensureConnValid(t, conn)
}
type slowFailRaceSource struct {
count int
}
func (fs *slowFailRaceSource) Next() bool {
time.Sleep(time.Millisecond)
fs.count++
return fs.count < 1000
}
func (fs *slowFailRaceSource) Values() ([]any, error) {
if fs.count == 500 {
return []any{nil, nil}, nil
}
return []any{1, make([]byte, 1000)}, nil
}
func (fs *slowFailRaceSource) Err() error {
return nil
}
func TestConnCopyFromSlowFailRace(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int not null,
b bytea not null
)`)
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a", "b"}, &slowFailRaceSource{})
if err == nil {
t.Errorf("Expected CopyFrom return error, but it did not")
}
if _, ok := err.(*pgconn.PgError); !ok {
t.Errorf("Expected CopyFrom return pgx.PgError, but instead it returned: %v", err)
}
if copyCount != 0 {
t.Errorf("Expected CopyFrom to return 0 copied rows, but got %d", copyCount)
}
ensureConnValid(t, conn)
}
func TestConnCopyFromCopyFromSourceErrorMidway(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a bytea not null
)`)
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a"}, &clientFailSource{})
if err == nil {
t.Errorf("Expected CopyFrom return error, but it did not")
}
if copyCount != 0 {
t.Errorf("Expected CopyFrom to return 0 copied rows, but got %d", copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", len(outputRows))
}
ensureConnValid(t, conn)
}
type clientFinalErrSource struct {
count int
}
func (cfs *clientFinalErrSource) Next() bool {
cfs.count++
return cfs.count < 5
}
func (cfs *clientFinalErrSource) Values() ([]any, error) {
return []any{make([]byte, 100000)}, nil
}
func (cfs *clientFinalErrSource) Err() error {
return fmt.Errorf("final error")
}
func TestConnCopyFromCopyFromSourceErrorEnd(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a bytea not null
)`)
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a"}, &clientFinalErrSource{})
if err == nil {
t.Errorf("Expected CopyFrom return error, but it did not")
}
if copyCount != 0 {
t.Errorf("Expected CopyFrom to return 0 copied rows, but got %d", copyCount)
}
rows, err := conn.Query(ctx, "select * from foo")
if err != nil {
t.Errorf("Unexpected error for Query: %v", err)
}
var outputRows [][]any
for rows.Next() {
row, err := rows.Values()
if err != nil {
t.Errorf("Unexpected error for rows.Values(): %v", err)
}
outputRows = append(outputRows, row)
}
if rows.Err() != nil {
t.Errorf("Unexpected error for rows.Err(): %v", rows.Err())
}
if len(outputRows) != 0 {
t.Errorf("Expected 0 rows, but got %v", outputRows)
}
ensureConnValid(t, conn)
}
func TestConnCopyFromAutomaticStringConversion(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int8
)`)
inputRows := [][]interface{}{
{"42"},
{"7"},
{8},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a"}, pgx.CopyFromRows(inputRows))
require.NoError(t, err)
require.EqualValues(t, len(inputRows), copyCount)
rows, _ := conn.Query(ctx, "select * from foo")
nums, err := pgx.CollectRows(rows, pgx.RowTo[int64])
require.NoError(t, err)
require.Equal(t, []int64{42, 7, 8}, nums)
ensureConnValid(t, conn)
}
// https://github.com/jackc/pgx/discussions/1891
func TestConnCopyFromAutomaticStringConversionArray(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a numeric[]
)`)
inputRows := [][]interface{}{
{[]string{"42"}},
{[]string{"7"}},
{[]string{"8", "9"}},
{[][]string{{"10", "11"}, {"12", "13"}}},
}
copyCount, err := conn.CopyFrom(ctx, pgx.Identifier{"foo"}, []string{"a"}, pgx.CopyFromRows(inputRows))
require.NoError(t, err)
require.EqualValues(t, len(inputRows), copyCount)
// Test reads as int64 and flattened array for simplicity.
rows, _ := conn.Query(ctx, "select * from foo")
nums, err := pgx.CollectRows(rows, pgx.RowTo[[]int64])
require.NoError(t, err)
require.Equal(t, [][]int64{{42}, {7}, {8, 9}, {10, 11, 12, 13}}, nums)
ensureConnValid(t, conn)
}
func TestCopyFromFunc(t *testing.T) {
t.Parallel()
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
defer closeConn(t, conn)
mustExec(t, conn, `create temporary table foo(
a int
)`)
dataCh := make(chan int, 1)
const channelItems = 10
go func() {
for i := 0; i < channelItems; i++ {
dataCh <- i
}
close(dataCh)
}()
copyCount, err := conn.CopyFrom(context.Background(), pgx.Identifier{"foo"}, []string{"a"},
pgx.CopyFromFunc(func() ([]any, error) {
v, ok := <-dataCh
if !ok {
return nil, nil
}
return []any{v}, nil
}))
require.ErrorIs(t, err, nil)
require.EqualValues(t, channelItems, copyCount)
rows, err := conn.Query(context.Background(), "select * from foo order by a")
require.NoError(t, err)
nums, err := pgx.CollectRows(rows, pgx.RowTo[int64])
require.NoError(t, err)
require.Equal(t, []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nums)
// simulate a failure
copyCount, err = conn.CopyFrom(context.Background(), pgx.Identifier{"foo"}, []string{"a"},
pgx.CopyFromFunc(func() func() ([]any, error) {
x := 9
return func() ([]any, error) {
x++
if x > 100 {
return nil, fmt.Errorf("simulated error")
}
return []any{x}, nil
}
}()))
require.NotErrorIs(t, err, nil)
require.EqualValues(t, 0, copyCount) // no change, due to error
ensureConnValid(t, conn)
}