mirror of
https://github.com/jackc/pgx.git
synced 2025-05-24 00:09:55 +00:00
Large objects use context
This commit is contained in:
parent
79f49ce300
commit
03abfc6452
@ -8,8 +8,8 @@ import (
|
|||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LargeObjects is a structure used to access the large objects API. It is only
|
// LargeObjects is a structure used to access the large objects API. It is only valid within the transaction where it
|
||||||
// valid within the transaction where it was created.
|
// was created.
|
||||||
//
|
//
|
||||||
// For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html
|
// For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html
|
||||||
type LargeObjects struct {
|
type LargeObjects struct {
|
||||||
@ -44,42 +44,42 @@ const (
|
|||||||
LargeObjectModeRead LargeObjectMode = 0x40000
|
LargeObjectModeRead LargeObjectMode = 0x40000
|
||||||
)
|
)
|
||||||
|
|
||||||
// Create creates a new large object. If oid is zero, the server assigns an
|
// Create creates a new large object. If oid is zero, the server assigns an unused OID.
|
||||||
// unused OID.
|
func (o *LargeObjects) Create(ctx context.Context, oid pgtype.OID) (pgtype.OID, error) {
|
||||||
func (o *LargeObjects) Create(oid pgtype.OID) (pgtype.OID, error) {
|
_, err := o.tx.Prepare(ctx, "lo_create", "select lo_create($1)")
|
||||||
_, err := o.tx.Prepare(context.TODO(), "lo_create", "select lo_create($1)")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = o.tx.QueryRow(context.TODO(), "lo_create", oid).Scan(&oid)
|
err = o.tx.QueryRow(ctx, "lo_create", oid).Scan(&oid)
|
||||||
return oid, err
|
return oid, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens an existing large object with the given mode.
|
// Open opens an existing large object with the given mode. ctx will also be used for all operations on the opened large
|
||||||
func (o *LargeObjects) Open(oid pgtype.OID, mode LargeObjectMode) (*LargeObject, error) {
|
// object.
|
||||||
_, err := o.tx.Prepare(context.TODO(), "lo_open", "select lo_open($1, $2)")
|
func (o *LargeObjects) Open(ctx context.Context, oid pgtype.OID, mode LargeObjectMode) (*LargeObject, error) {
|
||||||
|
_, err := o.tx.Prepare(ctx, "lo_open", "select lo_open($1, $2)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var fd int32
|
var fd int32
|
||||||
err = o.tx.QueryRow(context.TODO(), "lo_open", oid, mode).Scan(&fd)
|
err = o.tx.QueryRow(ctx, "lo_open", oid, mode).Scan(&fd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &LargeObject{fd: fd, tx: o.tx}, nil
|
return &LargeObject{fd: fd, tx: o.tx, ctx: ctx}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlink removes a large object from the database.
|
// Unlink removes a large object from the database.
|
||||||
func (o *LargeObjects) Unlink(oid pgtype.OID) error {
|
func (o *LargeObjects) Unlink(ctx context.Context, oid pgtype.OID) error {
|
||||||
_, err := o.tx.Prepare(context.TODO(), "lo_unlink", "select lo_unlink($1)")
|
_, err := o.tx.Prepare(ctx, "lo_unlink", "select lo_unlink($1)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var result int32
|
var result int32
|
||||||
err = o.tx.QueryRow(context.TODO(), "lo_unlink", oid).Scan(&result)
|
err = o.tx.QueryRow(ctx, "lo_unlink", oid).Scan(&result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -91,28 +91,28 @@ func (o *LargeObjects) Unlink(oid pgtype.OID) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// A LargeObject is a large object stored on the server. It is only valid within
|
// A LargeObject is a large object stored on the server. It is only valid within the transaction that it was initialized
|
||||||
// the transaction that it was initialized in. It implements these interfaces:
|
// in. It uses the context it was initialized with for all operations. It implements these interfaces:
|
||||||
//
|
//
|
||||||
// io.Writer
|
// io.Writer
|
||||||
// io.Reader
|
// io.Reader
|
||||||
// io.Seeker
|
// io.Seeker
|
||||||
// io.Closer
|
// io.Closer
|
||||||
type LargeObject struct {
|
type LargeObject struct {
|
||||||
fd int32
|
ctx context.Context
|
||||||
tx *Tx
|
tx *Tx
|
||||||
|
fd int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write writes p to the large object and returns the number of bytes written
|
// Write writes p to the large object and returns the number of bytes written and an error if not all of p was written.
|
||||||
// and an error if not all of p was written.
|
|
||||||
func (o *LargeObject) Write(p []byte) (int, error) {
|
func (o *LargeObject) Write(p []byte) (int, error) {
|
||||||
_, err := o.tx.Prepare(context.TODO(), "lowrite", "select lowrite($1, $2)")
|
_, err := o.tx.Prepare(o.ctx, "lowrite", "select lowrite($1, $2)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var n int
|
var n int
|
||||||
err = o.tx.QueryRow(context.TODO(), "lowrite", o.fd, p).Scan(&n)
|
err = o.tx.QueryRow(o.ctx, "lowrite", o.fd, p).Scan(&n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
@ -126,13 +126,13 @@ func (o *LargeObject) Write(p []byte) (int, error) {
|
|||||||
|
|
||||||
// Read reads up to len(p) bytes into p returning the number of bytes read.
|
// Read reads up to len(p) bytes into p returning the number of bytes read.
|
||||||
func (o *LargeObject) Read(p []byte) (int, error) {
|
func (o *LargeObject) Read(p []byte) (int, error) {
|
||||||
_, err := o.tx.Prepare(context.TODO(), "loread", "select loread($1, $2)")
|
_, err := o.tx.Prepare(o.ctx, "loread", "select loread($1, $2)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var res []byte
|
var res []byte
|
||||||
err = o.tx.QueryRow(context.TODO(), "loread", o.fd, len(p)).Scan(&res)
|
err = o.tx.QueryRow(o.ctx, "loread", o.fd, len(p)).Scan(&res)
|
||||||
copy(p, res)
|
copy(p, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return len(res), err
|
return len(res), err
|
||||||
@ -146,45 +146,44 @@ func (o *LargeObject) Read(p []byte) (int, error) {
|
|||||||
|
|
||||||
// Seek moves the current location pointer to the new location specified by offset.
|
// Seek moves the current location pointer to the new location specified by offset.
|
||||||
func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) {
|
func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) {
|
||||||
_, err = o.tx.Prepare(context.TODO(), "lo_lseek64", "select lo_lseek64($1, $2, $3)")
|
_, err = o.tx.Prepare(o.ctx, "lo_lseek64", "select lo_lseek64($1, $2, $3)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = o.tx.QueryRow(context.TODO(), "lo_lseek64", o.fd, offset, whence).Scan(&n)
|
err = o.tx.QueryRow(o.ctx, "lo_lseek64", o.fd, offset, whence).Scan(&n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tell returns the current read or write location of the large object
|
// Tell returns the current read or write location of the large object descriptor.
|
||||||
// descriptor.
|
|
||||||
func (o *LargeObject) Tell() (n int64, err error) {
|
func (o *LargeObject) Tell() (n int64, err error) {
|
||||||
_, err = o.tx.Prepare(context.TODO(), "lo_tell64", "select lo_tell64($1)")
|
_, err = o.tx.Prepare(o.ctx, "lo_tell64", "select lo_tell64($1)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = o.tx.QueryRow(context.TODO(), "lo_tell64", o.fd).Scan(&n)
|
err = o.tx.QueryRow(o.ctx, "lo_tell64", o.fd).Scan(&n)
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trunctes the large object to size.
|
// Trunctes the large object to size.
|
||||||
func (o *LargeObject) Truncate(size int64) (err error) {
|
func (o *LargeObject) Truncate(size int64) (err error) {
|
||||||
_, err = o.tx.Prepare(context.TODO(), "lo_truncate64", "select lo_truncate64($1, $2)")
|
_, err = o.tx.Prepare(o.ctx, "lo_truncate64", "select lo_truncate64($1, $2)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = o.tx.Exec(context.TODO(), "lo_truncate64", o.fd, size)
|
_, err = o.tx.Exec(o.ctx, "lo_truncate64", o.fd, size)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closees the large object descriptor.
|
// Close closees the large object descriptor.
|
||||||
func (o *LargeObject) Close() error {
|
func (o *LargeObject) Close() error {
|
||||||
_, err := o.tx.Prepare(context.TODO(), "lo_close", "select lo_close($1)")
|
_, err := o.tx.Prepare(o.ctx, "lo_close", "select lo_close($1)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = o.tx.Exec(context.TODO(), "lo_close", o.fd)
|
_, err = o.tx.Exec(o.ctx, "lo_close", o.fd)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgconn"
|
"github.com/jackc/pgconn"
|
||||||
"github.com/jackc/pgx/v4"
|
"github.com/jackc/pgx/v4"
|
||||||
@ -13,24 +14,27 @@ import (
|
|||||||
func TestLargeObjects(t *testing.T) {
|
func TestLargeObjects(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := conn.Begin(context.Background(), nil)
|
tx, err := conn.Begin(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lo := tx.LargeObjects()
|
lo := tx.LargeObjects()
|
||||||
|
|
||||||
id, err := lo.Create(0)
|
id, err := lo.Create(ctx, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := lo.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
|
obj, err := lo.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -109,12 +113,12 @@ func TestLargeObjects(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = lo.Unlink(id)
|
err = lo.Unlink(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = lo.Open(id, pgx.LargeObjectModeRead)
|
_, err = lo.Open(ctx, id, pgx.LargeObjectModeRead)
|
||||||
if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" {
|
if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" {
|
||||||
t.Errorf("Expected undefined_object error (42704), got %#v", err)
|
t.Errorf("Expected undefined_object error (42704), got %#v", err)
|
||||||
}
|
}
|
||||||
@ -123,24 +127,27 @@ func TestLargeObjects(t *testing.T) {
|
|||||||
func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
conn, err := pgx.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := conn.Begin(context.Background(), nil)
|
tx, err := conn.Begin(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lo := tx.LargeObjects()
|
lo := tx.LargeObjects()
|
||||||
|
|
||||||
id, err := lo.Create(0)
|
id, err := lo.Create(ctx, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := lo.Open(id, pgx.LargeObjectModeWrite)
|
obj, err := lo.Open(ctx, id, pgx.LargeObjectModeWrite)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -154,21 +161,21 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Commit the first transaction
|
// Commit the first transaction
|
||||||
err = tx.Commit(context.Background())
|
err = tx.Commit(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IMPORTANT: Use the same connection for another query
|
// IMPORTANT: Use the same connection for another query
|
||||||
query := `select n from generate_series(1,10) n`
|
query := `select n from generate_series(1,10) n`
|
||||||
rows, err := conn.Query(context.Background(), query)
|
rows, err := conn.Query(ctx, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
rows.Close()
|
rows.Close()
|
||||||
|
|
||||||
// Start a new transaction
|
// Start a new transaction
|
||||||
tx2, err := conn.Begin(context.Background(), nil)
|
tx2, err := conn.Begin(ctx, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -176,7 +183,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
|||||||
lo2 := tx2.LargeObjects()
|
lo2 := tx2.LargeObjects()
|
||||||
|
|
||||||
// Reopen the large object in the new transaction
|
// Reopen the large object in the new transaction
|
||||||
obj2, err := lo2.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
|
obj2, err := lo2.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -247,12 +254,12 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = lo2.Unlink(id)
|
err = lo2.Unlink(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = lo2.Open(id, pgx.LargeObjectModeRead)
|
_, err = lo2.Open(ctx, id, pgx.LargeObjectModeRead)
|
||||||
if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" {
|
if e, ok := err.(*pgconn.PgError); !ok || e.Code != "42704" {
|
||||||
t.Errorf("Expected undefined_object error (42704), got %#v", err)
|
t.Errorf("Expected undefined_object error (42704), got %#v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user