From 79f49ce3002b2a6b2d66a0e13adc416858e416a5 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 11 May 2019 11:36:38 -0500 Subject: [PATCH] Convert large objects to prepared statements This allows removing semi-obsolete fastpath interface support. See https://www.postgresql.org/docs/current/libpq-fastpath.html. This also simplifies introducing context support. --- conn.go | 1 - fastpath.go | 112 -------------------------------- large_objects.go | 148 +++++++++++++++++++++++++++--------------- large_objects_test.go | 17 +---- 4 files changed, 97 insertions(+), 181 deletions(-) delete mode 100644 fastpath.go diff --git a/conn.go b/conn.go index 0d06098b..d46c3b4c 100644 --- a/conn.go +++ b/conn.go @@ -38,7 +38,6 @@ type Conn struct { preparedStatements map[string]*PreparedStatement logger Logger logLevel LogLevel - fp *fastpath causeOfDeath error diff --git a/fastpath.go b/fastpath.go deleted file mode 100644 index e80d9014..00000000 --- a/fastpath.go +++ /dev/null @@ -1,112 +0,0 @@ -package pgx - -import ( - "encoding/binary" - - "github.com/jackc/pgio" - "github.com/jackc/pgproto3" - "github.com/jackc/pgtype" -) - -func newFastpath(cn *Conn) *fastpath { - return &fastpath{cn: cn, fns: make(map[string]pgtype.OID)} -} - -type fastpath struct { - cn *Conn - fns map[string]pgtype.OID -} - -func (f *fastpath) functionOID(name string) pgtype.OID { - return f.fns[name] -} - -func (f *fastpath) addFunction(name string, oid pgtype.OID) { - f.fns[name] = oid -} - -func (f *fastpath) addFunctions(rows Rows) error { - for rows.Next() { - var name string - var oid pgtype.OID - if err := rows.Scan(&name, &oid); err != nil { - return err - } - f.addFunction(name, oid) - } - return rows.Err() -} - -type fpArg []byte - -func fpIntArg(n int32) fpArg { - res := make([]byte, 4) - binary.BigEndian.PutUint32(res, uint32(n)) - return res -} - -func fpInt64Arg(n int64) fpArg { - res := make([]byte, 8) - binary.BigEndian.PutUint64(res, uint64(n)) - return res -} - -func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) { - buf := f.cn.wbuf - buf = append(buf, 'F') // function call - sp := len(buf) - buf = pgio.AppendInt32(buf, -1) - - buf = pgio.AppendInt32(buf, int32(oid)) // function object id - buf = pgio.AppendInt16(buf, 1) // # of argument format codes - buf = pgio.AppendInt16(buf, 1) // format code: binary - buf = pgio.AppendInt16(buf, int16(len(args))) // # of arguments - for _, arg := range args { - buf = pgio.AppendInt32(buf, int32(len(arg))) // length of argument - buf = append(buf, arg...) // argument value - } - buf = pgio.AppendInt16(buf, 1) // response format code (binary) - pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - - if _, err := f.cn.pgConn.Conn().Write(buf); err != nil { - return nil, err - } - - for { - msg, err := f.cn.pgConn.ReceiveMessage() - if err != nil { - return nil, err - } - switch msg := msg.(type) { - case *pgproto3.FunctionCallResponse: - res = make([]byte, len(msg.Result)) - copy(res, msg.Result) - case *pgproto3.ReadyForQuery: - // done - return res, err - default: - if err := f.cn.processContextFreeMsg(msg); err != nil { - return nil, err - } - } - } -} - -func (f *fastpath) CallFn(fn string, args []fpArg) ([]byte, error) { - return f.Call(f.functionOID(fn), args) -} - -func fpInt32(data []byte, err error) (int32, error) { - if err != nil { - return 0, err - } - n := int32(binary.BigEndian.Uint32(data)) - return n, nil -} - -func fpInt64(data []byte, err error) (int64, error) { - if err != nil { - return 0, err - } - return int64(binary.BigEndian.Uint64(data)), nil -} diff --git a/large_objects.go b/large_objects.go index 1a5f32be..0d66ed54 100644 --- a/large_objects.go +++ b/large_objects.go @@ -5,6 +5,7 @@ import ( "io" "github.com/jackc/pgtype" + errors "golang.org/x/xerrors" ) // LargeObjects is a structure used to access the large objects API. It is only @@ -12,9 +13,7 @@ import ( // // For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html type LargeObjects struct { - // Has64 is true if the server is capable of working with 64-bit numbers - Has64 bool - fp *fastpath + tx *Tx } const largeObjectFns = `select proname, oid from pg_catalog.pg_proc @@ -34,24 +33,8 @@ where proname in ( and pronamespace = (select oid from pg_catalog.pg_namespace where nspname = 'pg_catalog')` // LargeObjects returns a LargeObjects instance for the transaction. -func (tx *Tx) LargeObjects() (*LargeObjects, error) { - if tx.conn.fp == nil { - tx.conn.fp = newFastpath(tx.conn) - } - if _, exists := tx.conn.fp.fns["lo_open"]; !exists { - res, err := tx.Query(context.TODO(), largeObjectFns) - if err != nil { - return nil, err - } - if err := tx.conn.fp.addFunctions(res); err != nil { - return nil, err - } - } - - lo := &LargeObjects{fp: tx.conn.fp} - _, lo.Has64 = lo.fp.fns["lo_lseek64"] - - return lo, nil +func (tx *Tx) LargeObjects() LargeObjects { + return LargeObjects{tx: tx} } type LargeObjectMode int32 @@ -61,23 +44,51 @@ const ( LargeObjectModeRead LargeObjectMode = 0x40000 ) -// Create creates a new large object. If id is zero, the server assigns an +// Create creates a new large object. If oid is zero, the server assigns an // unused OID. -func (o *LargeObjects) Create(id pgtype.OID) (pgtype.OID, error) { - newOID, err := fpInt32(o.fp.CallFn("lo_create", []fpArg{fpIntArg(int32(id))})) - return pgtype.OID(newOID), err +func (o *LargeObjects) Create(oid pgtype.OID) (pgtype.OID, error) { + _, err := o.tx.Prepare(context.TODO(), "lo_create", "select lo_create($1)") + if err != nil { + return 0, err + } + + err = o.tx.QueryRow(context.TODO(), "lo_create", oid).Scan(&oid) + return oid, err } // Open opens an existing large object with the given mode. func (o *LargeObjects) Open(oid pgtype.OID, mode LargeObjectMode) (*LargeObject, error) { - fd, err := fpInt32(o.fp.CallFn("lo_open", []fpArg{fpIntArg(int32(oid)), fpIntArg(int32(mode))})) - return &LargeObject{fd: fd, lo: o}, err + _, err := o.tx.Prepare(context.TODO(), "lo_open", "select lo_open($1, $2)") + if err != nil { + return nil, err + } + + var fd int32 + err = o.tx.QueryRow(context.TODO(), "lo_open", oid, mode).Scan(&fd) + if err != nil { + return nil, err + } + return &LargeObject{fd: fd, tx: o.tx}, nil } // Unlink removes a large object from the database. func (o *LargeObjects) Unlink(oid pgtype.OID) error { - _, err := o.fp.CallFn("lo_unlink", []fpArg{fpIntArg(int32(oid))}) - return err + _, err := o.tx.Prepare(context.TODO(), "lo_unlink", "select lo_unlink($1)") + if err != nil { + return err + } + + var result int32 + err = o.tx.QueryRow(context.TODO(), "lo_unlink", oid).Scan(&result) + if err != nil { + return err + } + + if result != 1 { + return errors.New("failed to remove large object") + } + + return nil } // A LargeObject is a large object stored on the server. It is only valid within @@ -89,62 +100,91 @@ func (o *LargeObjects) Unlink(oid pgtype.OID) error { // io.Closer type LargeObject struct { fd int32 - lo *LargeObjects + tx *Tx } // Write writes p to the large object and returns the number of bytes written // and an error if not all of p was written. func (o *LargeObject) Write(p []byte) (int, error) { - n, err := fpInt32(o.lo.fp.CallFn("lowrite", []fpArg{fpIntArg(o.fd), p})) - return int(n), err + _, err := o.tx.Prepare(context.TODO(), "lowrite", "select lowrite($1, $2)") + if err != nil { + return 0, err + } + + var n int + err = o.tx.QueryRow(context.TODO(), "lowrite", o.fd, p).Scan(&n) + if err != nil { + return n, err + } + + if n < 0 { + return 0, errors.New("failed to write to large object") + } + + return n, nil } // Read reads up to len(p) bytes into p returning the number of bytes read. func (o *LargeObject) Read(p []byte) (int, error) { - res, err := o.lo.fp.CallFn("loread", []fpArg{fpIntArg(o.fd), fpIntArg(int32(len(p)))}) + _, err := o.tx.Prepare(context.TODO(), "loread", "select loread($1, $2)") + if err != nil { + return 0, err + } + + var res []byte + err = o.tx.QueryRow(context.TODO(), "loread", o.fd, len(p)).Scan(&res) + copy(p, res) + if err != nil { + return len(res), err + } + if len(res) < len(p) { err = io.EOF } - return copy(p, res), err + return len(res), err } // Seek moves the current location pointer to the new location specified by offset. func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) { - if o.lo.Has64 { - n, err = fpInt64(o.lo.fp.CallFn("lo_lseek64", []fpArg{fpIntArg(o.fd), fpInt64Arg(offset), fpIntArg(int32(whence))})) - } else { - var n32 int32 - n32, err = fpInt32(o.lo.fp.CallFn("lo_lseek", []fpArg{fpIntArg(o.fd), fpIntArg(int32(offset)), fpIntArg(int32(whence))})) - n = int64(n32) + _, err = o.tx.Prepare(context.TODO(), "lo_lseek64", "select lo_lseek64($1, $2, $3)") + if err != nil { + return 0, err } - return + + err = o.tx.QueryRow(context.TODO(), "lo_lseek64", o.fd, offset, whence).Scan(&n) + return n, err } // Tell returns the current read or write location of the large object // descriptor. func (o *LargeObject) Tell() (n int64, err error) { - if o.lo.Has64 { - n, err = fpInt64(o.lo.fp.CallFn("lo_tell64", []fpArg{fpIntArg(o.fd)})) - } else { - var n32 int32 - n32, err = fpInt32(o.lo.fp.CallFn("lo_tell", []fpArg{fpIntArg(o.fd)})) - n = int64(n32) + _, err = o.tx.Prepare(context.TODO(), "lo_tell64", "select lo_tell64($1)") + if err != nil { + return 0, err } - return + + err = o.tx.QueryRow(context.TODO(), "lo_tell64", o.fd).Scan(&n) + return n, err } // Trunctes the large object to size. func (o *LargeObject) Truncate(size int64) (err error) { - if o.lo.Has64 { - _, err = o.lo.fp.CallFn("lo_truncate64", []fpArg{fpIntArg(o.fd), fpInt64Arg(size)}) - } else { - _, err = o.lo.fp.CallFn("lo_truncate", []fpArg{fpIntArg(o.fd), fpIntArg(int32(size))}) + _, err = o.tx.Prepare(context.TODO(), "lo_truncate64", "select lo_truncate64($1, $2)") + if err != nil { + return err } - return + + _, err = o.tx.Exec(context.TODO(), "lo_truncate64", o.fd, size) + return err } // Close closees the large object descriptor. func (o *LargeObject) Close() error { - _, err := o.lo.fp.CallFn("lo_close", []fpArg{fpIntArg(o.fd)}) + _, err := o.tx.Prepare(context.TODO(), "lo_close", "select lo_close($1)") + if err != nil { + return err + } + + _, err = o.tx.Exec(context.TODO(), "lo_close", o.fd) return err } diff --git a/large_objects_test.go b/large_objects_test.go index 332699ec..e1065ba6 100644 --- a/large_objects_test.go +++ b/large_objects_test.go @@ -11,7 +11,6 @@ import ( ) func TestLargeObjects(t *testing.T) { - t.Skip("TODO: fix or (re)move") t.Parallel() conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) @@ -24,10 +23,7 @@ func TestLargeObjects(t *testing.T) { t.Fatal(err) } - lo, err := tx.LargeObjects() - if err != nil { - t.Fatal(err) - } + lo := tx.LargeObjects() id, err := lo.Create(0) if err != nil { @@ -125,7 +121,6 @@ func TestLargeObjects(t *testing.T) { } func TestLargeObjectsMultipleTransactions(t *testing.T) { - t.Skip("TODO: fix or (re)move") t.Parallel() conn, err := pgx.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) @@ -138,10 +133,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) { t.Fatal(err) } - lo, err := tx.LargeObjects() - if err != nil { - t.Fatal(err) - } + lo := tx.LargeObjects() id, err := lo.Create(0) if err != nil { @@ -181,10 +173,7 @@ func TestLargeObjectsMultipleTransactions(t *testing.T) { t.Fatal(err) } - lo2, err := tx2.LargeObjects() - if err != nil { - t.Fatal(err) - } + lo2 := tx2.LargeObjects() // Reopen the large object in the new transaction obj2, err := lo2.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)