diff --git a/large_objects.go b/large_objects.go index 0d66ed54..0167aa8e 100644 --- a/large_objects.go +++ b/large_objects.go @@ -8,8 +8,8 @@ import ( errors "golang.org/x/xerrors" ) -// LargeObjects is a structure used to access the large objects API. It is only -// valid within the transaction where it was created. +// LargeObjects is a structure used to access the large objects API. It is only valid within the transaction where it +// was created. // // For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html type LargeObjects struct { @@ -44,42 +44,42 @@ const ( LargeObjectModeRead LargeObjectMode = 0x40000 ) -// Create creates a new large object. If oid is zero, the server assigns an -// unused OID. -func (o *LargeObjects) Create(oid pgtype.OID) (pgtype.OID, error) { - _, err := o.tx.Prepare(context.TODO(), "lo_create", "select lo_create($1)") +// Create creates a new large object. If oid is zero, the server assigns an unused OID. +func (o *LargeObjects) Create(ctx context.Context, oid pgtype.OID) (pgtype.OID, error) { + _, err := o.tx.Prepare(ctx, "lo_create", "select lo_create($1)") if err != nil { return 0, err } - err = o.tx.QueryRow(context.TODO(), "lo_create", oid).Scan(&oid) + err = o.tx.QueryRow(ctx, "lo_create", oid).Scan(&oid) return oid, err } -// Open opens an existing large object with the given mode. -func (o *LargeObjects) Open(oid pgtype.OID, mode LargeObjectMode) (*LargeObject, error) { - _, err := o.tx.Prepare(context.TODO(), "lo_open", "select lo_open($1, $2)") +// Open opens an existing large object with the given mode. ctx will also be used for all operations on the opened large +// object. +func (o *LargeObjects) Open(ctx context.Context, oid pgtype.OID, mode LargeObjectMode) (*LargeObject, error) { + _, err := o.tx.Prepare(ctx, "lo_open", "select lo_open($1, $2)") if err != nil { return nil, err } var fd int32 - err = o.tx.QueryRow(context.TODO(), "lo_open", oid, mode).Scan(&fd) + err = o.tx.QueryRow(ctx, "lo_open", oid, mode).Scan(&fd) if err != nil { return nil, err } - return &LargeObject{fd: fd, tx: o.tx}, nil + return &LargeObject{fd: fd, tx: o.tx, ctx: ctx}, nil } // Unlink removes a large object from the database. -func (o *LargeObjects) Unlink(oid pgtype.OID) error { - _, err := o.tx.Prepare(context.TODO(), "lo_unlink", "select lo_unlink($1)") +func (o *LargeObjects) Unlink(ctx context.Context, oid pgtype.OID) error { + _, err := o.tx.Prepare(ctx, "lo_unlink", "select lo_unlink($1)") if err != nil { return err } var result int32 - err = o.tx.QueryRow(context.TODO(), "lo_unlink", oid).Scan(&result) + err = o.tx.QueryRow(ctx, "lo_unlink", oid).Scan(&result) if err != nil { return err } @@ -91,28 +91,28 @@ func (o *LargeObjects) Unlink(oid pgtype.OID) error { return nil } -// A LargeObject is a large object stored on the server. It is only valid within -// the transaction that it was initialized in. It implements these interfaces: +// A LargeObject is a large object stored on the server. It is only valid within the transaction that it was initialized +// in. It uses the context it was initialized with for all operations. It implements these interfaces: // // io.Writer // io.Reader // io.Seeker // io.Closer type LargeObject struct { - fd int32 - tx *Tx + ctx context.Context + tx *Tx + fd int32 } -// Write writes p to the large object and returns the number of bytes written -// and an error if not all of p was written. +// Write writes p to the large object and returns the number of bytes written and an error if not all of p was written. func (o *LargeObject) Write(p []byte) (int, error) { - _, err := o.tx.Prepare(context.TODO(), "lowrite", "select lowrite($1, $2)") + _, err := o.tx.Prepare(o.ctx, "lowrite", "select lowrite($1, $2)") if err != nil { return 0, err } var n int - err = o.tx.QueryRow(context.TODO(), "lowrite", o.fd, p).Scan(&n) + err = o.tx.QueryRow(o.ctx, "lowrite", o.fd, p).Scan(&n) if err != nil { return n, err } @@ -126,13 +126,13 @@ func (o *LargeObject) Write(p []byte) (int, error) { // Read reads up to len(p) bytes into p returning the number of bytes read. func (o *LargeObject) Read(p []byte) (int, error) { - _, err := o.tx.Prepare(context.TODO(), "loread", "select loread($1, $2)") + _, err := o.tx.Prepare(o.ctx, "loread", "select loread($1, $2)") if err != nil { return 0, err } var res []byte - err = o.tx.QueryRow(context.TODO(), "loread", o.fd, len(p)).Scan(&res) + err = o.tx.QueryRow(o.ctx, "loread", o.fd, len(p)).Scan(&res) copy(p, res) if err != nil { return len(res), err @@ -146,45 +146,44 @@ func (o *LargeObject) Read(p []byte) (int, error) { // Seek moves the current location pointer to the new location specified by offset. func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) { - _, err = o.tx.Prepare(context.TODO(), "lo_lseek64", "select lo_lseek64($1, $2, $3)") + _, err = o.tx.Prepare(o.ctx, "lo_lseek64", "select lo_lseek64($1, $2, $3)") if err != nil { return 0, err } - err = o.tx.QueryRow(context.TODO(), "lo_lseek64", o.fd, offset, whence).Scan(&n) + err = o.tx.QueryRow(o.ctx, "lo_lseek64", o.fd, offset, whence).Scan(&n) return n, err } -// Tell returns the current read or write location of the large object -// descriptor. +// Tell returns the current read or write location of the large object descriptor. func (o *LargeObject) Tell() (n int64, err error) { - _, err = o.tx.Prepare(context.TODO(), "lo_tell64", "select lo_tell64($1)") + _, err = o.tx.Prepare(o.ctx, "lo_tell64", "select lo_tell64($1)") if err != nil { return 0, err } - err = o.tx.QueryRow(context.TODO(), "lo_tell64", o.fd).Scan(&n) + err = o.tx.QueryRow(o.ctx, "lo_tell64", o.fd).Scan(&n) return n, err } // Trunctes the large object to size. func (o *LargeObject) Truncate(size int64) (err error) { - _, err = o.tx.Prepare(context.TODO(), "lo_truncate64", "select lo_truncate64($1, $2)") + _, err = o.tx.Prepare(o.ctx, "lo_truncate64", "select lo_truncate64($1, $2)") if err != nil { return err } - _, err = o.tx.Exec(context.TODO(), "lo_truncate64", o.fd, size) + _, err = o.tx.Exec(o.ctx, "lo_truncate64", o.fd, size) return err } // Close closees the large object descriptor. func (o *LargeObject) Close() error { - _, err := o.tx.Prepare(context.TODO(), "lo_close", "select lo_close($1)") + _, err := o.tx.Prepare(o.ctx, "lo_close", "select lo_close($1)") if err != nil { return err } - _, err = o.tx.Exec(context.TODO(), "lo_close", o.fd) + _, err = o.tx.Exec(o.ctx, "lo_close", o.fd) return err } diff --git a/large_objects_test.go b/large_objects_test.go index e1065ba6..2d8dd9c4 100644 --- a/large_objects_test.go +++ b/large_objects_test.go @@ -5,6 +5,7 @@ import ( "io" "os" "testing" + "time" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" @@ -13,24 +14,27 @@ import ( func TestLargeObjects(t *testing.T) { t.Parallel() - conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) if err != nil { t.Fatal(err) } - tx, err := conn.Begin(context.Background(), nil) + tx, err := conn.Begin(ctx, nil) if err != nil { t.Fatal(err) } lo := tx.LargeObjects() - id, err := lo.Create(0) + id, err := lo.Create(ctx, 0) if err != nil { t.Fatal(err) } - obj, err := lo.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite) + obj, err := lo.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite) if err != nil { t.Fatal(err) } @@ -109,12 +113,12 @@ func TestLargeObjects(t *testing.T) { t.Fatal(err) } - err = lo.Unlink(id) + err = lo.Unlink(ctx, id) if err != nil { t.Fatal(err) } - _, err = lo.Open(id, pgx.LargeObjectModeRead) + _, err = lo.Open(ctx, id, pgx.LargeObjectModeRead) if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" { t.Errorf("Expected undefined_object error (42704), got %#v", err) } @@ -123,24 +127,27 @@ func TestLargeObjects(t *testing.T) { func TestLargeObjectsMultipleTransactions(t *testing.T) { t.Parallel() - conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) if err != nil { t.Fatal(err) } - tx, err := conn.Begin(context.Background(), nil) + tx, err := conn.Begin(ctx, nil) if err != nil { t.Fatal(err) } lo := tx.LargeObjects() - id, err := lo.Create(0) + id, err := lo.Create(ctx, 0) if err != nil { t.Fatal(err) } - obj, err := lo.Open(id, pgx.LargeObjectModeWrite) + obj, err := lo.Open(ctx, id, pgx.LargeObjectModeWrite) if err != nil { t.Fatal(err) } @@ -154,21 +161,21 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) { } // Commit the first transaction - err = tx.Commit(context.Background()) + err = tx.Commit(ctx) if err != nil { t.Fatal(err) } // IMPORTANT: Use the same connection for another query query := `select n from generate_series(1,10) n` - rows, err := conn.Query(context.Background(), query) + rows, err := conn.Query(ctx, query) if err != nil { t.Fatal(err) } rows.Close() // Start a new transaction - tx2, err := conn.Begin(context.Background(), nil) + tx2, err := conn.Begin(ctx, nil) if err != nil { t.Fatal(err) } @@ -176,7 +183,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) { lo2 := tx2.LargeObjects() // Reopen the large object in the new transaction - obj2, err := lo2.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite) + obj2, err := lo2.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite) if err != nil { t.Fatal(err) } @@ -247,12 +254,12 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) { t.Fatal(err) } - err = lo2.Unlink(id) + err = lo2.Unlink(ctx, id) if err != nil { t.Fatal(err) } - _, err = lo2.Open(id, pgx.LargeObjectModeRead) + _, err = lo2.Open(ctx, id, pgx.LargeObjectModeRead) if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" { t.Errorf("Expected undefined_object error (42704), got %#v", err) }