mirror of https://github.com/jackc/pgx.git
Convert large objects to prepared statements
This allows removing semi-obsolete fastpath interface support. See https://www.postgresql.org/docs/current/libpq-fastpath.html. This also simplifies introducing context support.pull/586/head
parent
f572b336b1
commit
79f49ce300
1
conn.go
1
conn.go
|
@ -38,7 +38,6 @@ type Conn struct {
|
|||
preparedStatements map[string]*PreparedStatement
|
||||
logger Logger
|
||||
logLevel LogLevel
|
||||
fp *fastpath
|
||||
|
||||
causeOfDeath error
|
||||
|
||||
|
|
112
fastpath.go
112
fastpath.go
|
@ -1,112 +0,0 @@
|
|||
package pgx
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/jackc/pgio"
|
||||
"github.com/jackc/pgproto3"
|
||||
"github.com/jackc/pgtype"
|
||||
)
|
||||
|
||||
func newFastpath(cn *Conn) *fastpath {
|
||||
return &fastpath{cn: cn, fns: make(map[string]pgtype.OID)}
|
||||
}
|
||||
|
||||
type fastpath struct {
|
||||
cn *Conn
|
||||
fns map[string]pgtype.OID
|
||||
}
|
||||
|
||||
func (f *fastpath) functionOID(name string) pgtype.OID {
|
||||
return f.fns[name]
|
||||
}
|
||||
|
||||
func (f *fastpath) addFunction(name string, oid pgtype.OID) {
|
||||
f.fns[name] = oid
|
||||
}
|
||||
|
||||
func (f *fastpath) addFunctions(rows Rows) error {
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var oid pgtype.OID
|
||||
if err := rows.Scan(&name, &oid); err != nil {
|
||||
return err
|
||||
}
|
||||
f.addFunction(name, oid)
|
||||
}
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
type fpArg []byte
|
||||
|
||||
func fpIntArg(n int32) fpArg {
|
||||
res := make([]byte, 4)
|
||||
binary.BigEndian.PutUint32(res, uint32(n))
|
||||
return res
|
||||
}
|
||||
|
||||
func fpInt64Arg(n int64) fpArg {
|
||||
res := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(res, uint64(n))
|
||||
return res
|
||||
}
|
||||
|
||||
func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) {
|
||||
buf := f.cn.wbuf
|
||||
buf = append(buf, 'F') // function call
|
||||
sp := len(buf)
|
||||
buf = pgio.AppendInt32(buf, -1)
|
||||
|
||||
buf = pgio.AppendInt32(buf, int32(oid)) // function object id
|
||||
buf = pgio.AppendInt16(buf, 1) // # of argument format codes
|
||||
buf = pgio.AppendInt16(buf, 1) // format code: binary
|
||||
buf = pgio.AppendInt16(buf, int16(len(args))) // # of arguments
|
||||
for _, arg := range args {
|
||||
buf = pgio.AppendInt32(buf, int32(len(arg))) // length of argument
|
||||
buf = append(buf, arg...) // argument value
|
||||
}
|
||||
buf = pgio.AppendInt16(buf, 1) // response format code (binary)
|
||||
pgio.SetInt32(buf[sp:], int32(len(buf[sp:])))
|
||||
|
||||
if _, err := f.cn.pgConn.Conn().Write(buf); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := f.cn.pgConn.ReceiveMessage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case *pgproto3.FunctionCallResponse:
|
||||
res = make([]byte, len(msg.Result))
|
||||
copy(res, msg.Result)
|
||||
case *pgproto3.ReadyForQuery:
|
||||
// done
|
||||
return res, err
|
||||
default:
|
||||
if err := f.cn.processContextFreeMsg(msg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fastpath) CallFn(fn string, args []fpArg) ([]byte, error) {
|
||||
return f.Call(f.functionOID(fn), args)
|
||||
}
|
||||
|
||||
func fpInt32(data []byte, err error) (int32, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n := int32(binary.BigEndian.Uint32(data))
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func fpInt64(data []byte, err error) (int64, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int64(binary.BigEndian.Uint64(data)), nil
|
||||
}
|
148
large_objects.go
148
large_objects.go
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
|
||||
"github.com/jackc/pgtype"
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// LargeObjects is a structure used to access the large objects API. It is only
|
||||
|
@ -12,9 +13,7 @@ import (
|
|||
//
|
||||
// For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html
|
||||
type LargeObjects struct {
|
||||
// Has64 is true if the server is capable of working with 64-bit numbers
|
||||
Has64 bool
|
||||
fp *fastpath
|
||||
tx *Tx
|
||||
}
|
||||
|
||||
const largeObjectFns = `select proname, oid from pg_catalog.pg_proc
|
||||
|
@ -34,24 +33,8 @@ where proname in (
|
|||
and pronamespace = (select oid from pg_catalog.pg_namespace where nspname = 'pg_catalog')`
|
||||
|
||||
// LargeObjects returns a LargeObjects instance for the transaction.
|
||||
func (tx *Tx) LargeObjects() (*LargeObjects, error) {
|
||||
if tx.conn.fp == nil {
|
||||
tx.conn.fp = newFastpath(tx.conn)
|
||||
}
|
||||
if _, exists := tx.conn.fp.fns["lo_open"]; !exists {
|
||||
res, err := tx.Query(context.TODO(), largeObjectFns)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := tx.conn.fp.addFunctions(res); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
lo := &LargeObjects{fp: tx.conn.fp}
|
||||
_, lo.Has64 = lo.fp.fns["lo_lseek64"]
|
||||
|
||||
return lo, nil
|
||||
func (tx *Tx) LargeObjects() LargeObjects {
|
||||
return LargeObjects{tx: tx}
|
||||
}
|
||||
|
||||
type LargeObjectMode int32
|
||||
|
@ -61,23 +44,51 @@ const (
|
|||
LargeObjectModeRead LargeObjectMode = 0x40000
|
||||
)
|
||||
|
||||
// Create creates a new large object. If id is zero, the server assigns an
|
||||
// Create creates a new large object. If oid is zero, the server assigns an
|
||||
// unused OID.
|
||||
func (o *LargeObjects) Create(id pgtype.OID) (pgtype.OID, error) {
|
||||
newOID, err := fpInt32(o.fp.CallFn("lo_create", []fpArg{fpIntArg(int32(id))}))
|
||||
return pgtype.OID(newOID), err
|
||||
func (o *LargeObjects) Create(oid pgtype.OID) (pgtype.OID, error) {
|
||||
_, err := o.tx.Prepare(context.TODO(), "lo_create", "select lo_create($1)")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
err = o.tx.QueryRow(context.TODO(), "lo_create", oid).Scan(&oid)
|
||||
return oid, err
|
||||
}
|
||||
|
||||
// Open opens an existing large object with the given mode.
|
||||
func (o *LargeObjects) Open(oid pgtype.OID, mode LargeObjectMode) (*LargeObject, error) {
|
||||
fd, err := fpInt32(o.fp.CallFn("lo_open", []fpArg{fpIntArg(int32(oid)), fpIntArg(int32(mode))}))
|
||||
return &LargeObject{fd: fd, lo: o}, err
|
||||
_, err := o.tx.Prepare(context.TODO(), "lo_open", "select lo_open($1, $2)")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var fd int32
|
||||
err = o.tx.QueryRow(context.TODO(), "lo_open", oid, mode).Scan(&fd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LargeObject{fd: fd, tx: o.tx}, nil
|
||||
}
|
||||
|
||||
// Unlink removes a large object from the database.
|
||||
func (o *LargeObjects) Unlink(oid pgtype.OID) error {
|
||||
_, err := o.fp.CallFn("lo_unlink", []fpArg{fpIntArg(int32(oid))})
|
||||
return err
|
||||
_, err := o.tx.Prepare(context.TODO(), "lo_unlink", "select lo_unlink($1)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var result int32
|
||||
err = o.tx.QueryRow(context.TODO(), "lo_unlink", oid).Scan(&result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if result != 1 {
|
||||
return errors.New("failed to remove large object")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// A LargeObject is a large object stored on the server. It is only valid within
|
||||
|
@ -89,62 +100,91 @@ func (o *LargeObjects) Unlink(oid pgtype.OID) error {
|
|||
// io.Closer
|
||||
type LargeObject struct {
|
||||
fd int32
|
||||
lo *LargeObjects
|
||||
tx *Tx
|
||||
}
|
||||
|
||||
// Write writes p to the large object and returns the number of bytes written
|
||||
// and an error if not all of p was written.
|
||||
func (o *LargeObject) Write(p []byte) (int, error) {
|
||||
n, err := fpInt32(o.lo.fp.CallFn("lowrite", []fpArg{fpIntArg(o.fd), p}))
|
||||
return int(n), err
|
||||
_, err := o.tx.Prepare(context.TODO(), "lowrite", "select lowrite($1, $2)")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var n int
|
||||
err = o.tx.QueryRow(context.TODO(), "lowrite", o.fd, p).Scan(&n)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
if n < 0 {
|
||||
return 0, errors.New("failed to write to large object")
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Read reads up to len(p) bytes into p returning the number of bytes read.
|
||||
func (o *LargeObject) Read(p []byte) (int, error) {
|
||||
res, err := o.lo.fp.CallFn("loread", []fpArg{fpIntArg(o.fd), fpIntArg(int32(len(p)))})
|
||||
_, err := o.tx.Prepare(context.TODO(), "loread", "select loread($1, $2)")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var res []byte
|
||||
err = o.tx.QueryRow(context.TODO(), "loread", o.fd, len(p)).Scan(&res)
|
||||
copy(p, res)
|
||||
if err != nil {
|
||||
return len(res), err
|
||||
}
|
||||
|
||||
if len(res) < len(p) {
|
||||
err = io.EOF
|
||||
}
|
||||
return copy(p, res), err
|
||||
return len(res), err
|
||||
}
|
||||
|
||||
// Seek moves the current location pointer to the new location specified by offset.
|
||||
func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) {
|
||||
if o.lo.Has64 {
|
||||
n, err = fpInt64(o.lo.fp.CallFn("lo_lseek64", []fpArg{fpIntArg(o.fd), fpInt64Arg(offset), fpIntArg(int32(whence))}))
|
||||
} else {
|
||||
var n32 int32
|
||||
n32, err = fpInt32(o.lo.fp.CallFn("lo_lseek", []fpArg{fpIntArg(o.fd), fpIntArg(int32(offset)), fpIntArg(int32(whence))}))
|
||||
n = int64(n32)
|
||||
_, err = o.tx.Prepare(context.TODO(), "lo_lseek64", "select lo_lseek64($1, $2, $3)")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return
|
||||
|
||||
err = o.tx.QueryRow(context.TODO(), "lo_lseek64", o.fd, offset, whence).Scan(&n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Tell returns the current read or write location of the large object
|
||||
// descriptor.
|
||||
func (o *LargeObject) Tell() (n int64, err error) {
|
||||
if o.lo.Has64 {
|
||||
n, err = fpInt64(o.lo.fp.CallFn("lo_tell64", []fpArg{fpIntArg(o.fd)}))
|
||||
} else {
|
||||
var n32 int32
|
||||
n32, err = fpInt32(o.lo.fp.CallFn("lo_tell", []fpArg{fpIntArg(o.fd)}))
|
||||
n = int64(n32)
|
||||
_, err = o.tx.Prepare(context.TODO(), "lo_tell64", "select lo_tell64($1)")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return
|
||||
|
||||
err = o.tx.QueryRow(context.TODO(), "lo_tell64", o.fd).Scan(&n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Trunctes the large object to size.
|
||||
func (o *LargeObject) Truncate(size int64) (err error) {
|
||||
if o.lo.Has64 {
|
||||
_, err = o.lo.fp.CallFn("lo_truncate64", []fpArg{fpIntArg(o.fd), fpInt64Arg(size)})
|
||||
} else {
|
||||
_, err = o.lo.fp.CallFn("lo_truncate", []fpArg{fpIntArg(o.fd), fpIntArg(int32(size))})
|
||||
_, err = o.tx.Prepare(context.TODO(), "lo_truncate64", "select lo_truncate64($1, $2)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
|
||||
_, err = o.tx.Exec(context.TODO(), "lo_truncate64", o.fd, size)
|
||||
return err
|
||||
}
|
||||
|
||||
// Close closees the large object descriptor.
|
||||
func (o *LargeObject) Close() error {
|
||||
_, err := o.lo.fp.CallFn("lo_close", []fpArg{fpIntArg(o.fd)})
|
||||
_, err := o.tx.Prepare(context.TODO(), "lo_close", "select lo_close($1)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = o.tx.Exec(context.TODO(), "lo_close", o.fd)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
)
|
||||
|
||||
func TestLargeObjects(t *testing.T) {
|
||||
t.Skip("TODO: fix or (re)move")
|
||||
t.Parallel()
|
||||
|
||||
conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
|
@ -24,10 +23,7 @@ func TestLargeObjects(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lo, err := tx.LargeObjects()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
lo := tx.LargeObjects()
|
||||
|
||||
id, err := lo.Create(0)
|
||||
if err != nil {
|
||||
|
@ -125,7 +121,6 @@ func TestLargeObjects(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
||||
t.Skip("TODO: fix or (re)move")
|
||||
t.Parallel()
|
||||
|
||||
conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE"))
|
||||
|
@ -138,10 +133,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lo, err := tx.LargeObjects()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
lo := tx.LargeObjects()
|
||||
|
||||
id, err := lo.Create(0)
|
||||
if err != nil {
|
||||
|
@ -181,10 +173,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lo2, err := tx2.LargeObjects()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
lo2 := tx2.LargeObjects()
|
||||
|
||||
// Reopen the large object in the new transaction
|
||||
obj2, err := lo2.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
|
||||
|
|
Loading…
Reference in New Issue