diff --git a/conn.go b/conn.go index f256b714..97500229 100644 --- a/conn.go +++ b/conn.go @@ -50,6 +50,7 @@ type Conn struct { causeOfDeath error logger Logger mr msgReader + fp *fastpath } type PreparedStatement struct { diff --git a/fastpath.go b/fastpath.go new file mode 100644 index 00000000..5eee1ea1 --- /dev/null +++ b/fastpath.go @@ -0,0 +1,110 @@ +package pgx + +import ( + "encoding/binary" +) + +type fastpathArg []byte + +func newFastpath(cn *Conn) *fastpath { + return &fastpath{cn: cn, fns: make(map[string]Oid)} +} + +type fastpath struct { + cn *Conn + fns map[string]Oid +} + +func (f *fastpath) functionOID(name string) Oid { + return f.fns[name] +} + +func (f *fastpath) addFunction(name string, oid Oid) { + f.fns[name] = oid +} + +func (f *fastpath) addFunctions(rows *Rows) error { + for rows.Next() { + var name string + var oid Oid + if err := rows.Scan(&name, &oid); err != nil { + return err + } + f.addFunction(name, oid) + } + return rows.Err() +} + +type fpArg []byte + +func fpIntArg(n int32) fpArg { + res := make([]byte, 4) + binary.BigEndian.PutUint32(res, uint32(n)) + return res +} + +func fpInt64Arg(n int64) fpArg { + res := make([]byte, 8) + binary.BigEndian.PutUint64(res, uint64(n)) + return res +} + +func (f *fastpath) Call(oid Oid, args []fpArg) (res []byte, err error) { + wbuf := newWriteBuf(f.cn.wbuf[:0], 'F') // function call + wbuf.WriteInt32(int32(oid)) // function object id + wbuf.WriteInt16(1) // # of argument format codes + wbuf.WriteInt16(1) // format code: binary + wbuf.WriteInt16(int16(len(args))) // # of arguments + for _, arg := range args { + wbuf.WriteInt32(int32(len(arg))) // length of argument + wbuf.WriteBytes(arg) // argument value + } + wbuf.WriteInt16(1) // response format code (binary) + wbuf.closeMsg() + + if _, err := f.cn.conn.Write(wbuf.buf); err != nil { + return nil, err + } + + for { + var t byte + var r *msgReader + t, r, err = f.cn.rxMsg() + if err != nil { + return nil, err + } + switch t { + case 'V': // FunctionCallResponse + data := r.readBytes(r.readInt32()) + res = make([]byte, len(data)) + copy(res, data) + case 'Z': // Ready for query + f.cn.rxReadyForQuery(r) + // done + return + default: + if err := f.cn.processContextFreeMsg(t, r); err != nil { + return nil, err + } + } + } +} + +func (f *fastpath) CallFn(fn string, args []fpArg) ([]byte, error) { + return f.Call(f.functionOID(fn), args) +} + +func fpInt32(data []byte, err error) (int32, error) { + if err != nil { + return 0, err + } + n := int32(binary.BigEndian.Uint32(data)) + return n, nil +} + +func fpInt64(data []byte, err error) (int64, error) { + if err != nil { + return 0, err + } + return int64(binary.BigEndian.Uint64(data)), nil +} diff --git a/large_objects.go b/large_objects.go new file mode 100644 index 00000000..b208aede --- /dev/null +++ b/large_objects.go @@ -0,0 +1,146 @@ +package pgx + +import ( + "io" +) + +// LargeObjects is a structure used to access the large objects API. It is only +// valid within the transaction where it was created. +// +// For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html +type LargeObjects struct { + // Has64 is true if the server is capable of working with 64-bit numbers + Has64 bool + fp *fastpath +} + +const largeObjectFns = `select proname, oid from pg_catalog.pg_proc +where proname in ( +'lo_open', +'lo_close', +'lo_create', +'lo_unlink', +'lo_lseek', +'lo_lseek64', +'lo_tell', +'lo_tell64', +'lo_truncate', +'lo_truncate64', +'loread', +'lowrite') +and pronamespace = (select oid from pg_catalog.pg_namespace where nspname = 'pg_catalog')` + +// LargeObjects returns a LargeObjects instance for the transaction. +func (tx *Tx) LargeObjects() (*LargeObjects, error) { + if tx.conn.fp == nil { + tx.conn.fp = newFastpath(tx.conn) + } + if _, exists := tx.conn.fp.fns["lo_open"]; !exists { + res, err := tx.Query(largeObjectFns) + if err != nil { + return nil, err + } + if err := tx.conn.fp.addFunctions(res); err != nil { + return nil, err + } + } + + lo := &LargeObjects{fp: tx.conn.fp} + _, lo.Has64 = lo.fp.fns["lo_lseek64"] + + return lo, nil +} + +type LargeObjectMode int32 + +const ( + LargeObjectModeWrite LargeObjectMode = 0x20000 + LargeObjectModeRead LargeObjectMode = 0x40000 +) + +// Create creates a new large object. If id is zero, the server assigns an +// unused OID. +func (o *LargeObjects) Create(id Oid) (Oid, error) { + newOid, err := fpInt32(o.fp.CallFn("lo_create", []fpArg{fpIntArg(int32(id))})) + return Oid(newOid), err +} + +// Open opens an existing large object with the given mode. +func (o *LargeObjects) Open(oid Oid, mode LargeObjectMode) (*LargeObject, error) { + fd, err := fpInt32(o.fp.CallFn("lo_open", []fpArg{fpIntArg(int32(oid)), fpIntArg(int32(mode))})) + return &LargeObject{fd: fd, lo: o}, err +} + +// Unlink removes a large object from the database. +func (o *LargeObjects) Unlink(oid Oid) error { + _, err := o.fp.CallFn("lo_unlink", []fpArg{fpIntArg(int32(oid))}) + return err +} + +// A LargeObject is a large object stored on the server. It is only valid within +// the transaction that it was initialized in. It implements the these interfaces: +// +// - io.Writer +// - io.Reader +// - io.Seeker +// - io.Closer +type LargeObject struct { + fd int32 + lo *LargeObjects +} + +// Write writes +func (o *LargeObject) Write(p []byte) (int, error) { + n, err := fpInt32(o.lo.fp.CallFn("lowrite", []fpArg{fpIntArg(o.fd), p})) + return int(n), err +} + +// Read reads up to len(p) bytes into p returning the number of bytes read. +func (o *LargeObject) Read(p []byte) (int, error) { + res, err := o.lo.fp.CallFn("loread", []fpArg{fpIntArg(o.fd), fpIntArg(int32(len(p)))}) + if len(res) < len(p) { + err = io.EOF + } + return copy(p, res), err +} + +// Seek moves the current location pointer to the new location specified by offset. +func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) { + if o.lo.Has64 { + n, err = fpInt64(o.lo.fp.CallFn("lo_lseek64", []fpArg{fpIntArg(o.fd), fpInt64Arg(offset), fpIntArg(int32(whence))})) + } else { + var n32 int32 + n32, err = fpInt32(o.lo.fp.CallFn("lo_lseek", []fpArg{fpIntArg(o.fd), fpIntArg(int32(offset)), fpIntArg(int32(whence))})) + n = int64(n32) + } + return +} + +// Tell returns the current read or write location of the large object +// descriptor. +func (o *LargeObject) Tell() (n int64, err error) { + if o.lo.Has64 { + n, err = fpInt64(o.lo.fp.CallFn("lo_tell64", []fpArg{fpIntArg(o.fd)})) + } else { + var n32 int32 + n32, err = fpInt32(o.lo.fp.CallFn("lo_tell", []fpArg{fpIntArg(o.fd)})) + n = int64(n32) + } + return +} + +// Trunctes the large object to size. +func (o *LargeObject) Truncate(size int64) (err error) { + if o.lo.Has64 { + _, err = o.lo.fp.CallFn("lo_truncate64", []fpArg{fpIntArg(o.fd), fpInt64Arg(size)}) + } else { + _, err = o.lo.fp.CallFn("lo_truncate", []fpArg{fpIntArg(o.fd), fpIntArg(int32(size))}) + } + return +} + +// Close closees the large object descriptor. +func (o *LargeObject) Close() error { + _, err := o.lo.fp.CallFn("lo_close", []fpArg{fpIntArg(o.fd)}) + return err +} diff --git a/large_objects_test.go b/large_objects_test.go new file mode 100644 index 00000000..a19c851d --- /dev/null +++ b/large_objects_test.go @@ -0,0 +1,121 @@ +package pgx_test + +import ( + "io" + "testing" + + "github.com/jackc/pgx" +) + +func TestLargeObjects(t *testing.T) { + t.Parallel() + + conn, err := pgx.Connect(*defaultConnConfig) + if err != nil { + t.Fatal(err) + } + + tx, err := conn.Begin() + if err != nil { + t.Fatal(err) + } + + lo, err := tx.LargeObjects() + if err != nil { + t.Fatal(err) + } + + id, err := lo.Create(0) + if err != nil { + t.Fatal(err) + } + + obj, err := lo.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite) + if err != nil { + t.Fatal(err) + } + + n, err := obj.Write([]byte("testing")) + if err != nil { + t.Fatal(err) + } + if n != 7 { + t.Errorf("Expected n to be 7, got %d", n) + } + + pos, err := obj.Seek(1, 0) + if err != nil { + t.Fatal(err) + } + if pos != 1 { + t.Errorf("Expected pos to be 1, got %d", pos) + } + + res := make([]byte, 6) + n, err = obj.Read(res) + if err != nil { + t.Fatal(err) + } + if string(res) != "esting" { + t.Errorf(`Expected res to be "esting", got %q`, res) + } + if n != 6 { + t.Errorf("Expected n to be 6, got %d", n) + } + + n, err = obj.Read(res) + if err != io.EOF { + t.Error("Expected io.EOF, go nil") + } + if n != 0 { + t.Errorf("Expected n to be 0, got %d", n) + } + + pos, err = obj.Tell() + if err != nil { + t.Fatal(err) + } + if pos != 7 { + t.Errorf("Expected pos to be 7, got %d", pos) + } + + err = obj.Truncate(1) + if err != nil { + t.Fatal(err) + } + + pos, err = obj.Seek(-1, 2) + if err != nil { + t.Fatal(err) + } + if pos != 0 { + t.Errorf("Expected pos to be 0, got %d", pos) + } + + res = make([]byte, 2) + n, err = obj.Read(res) + if err != io.EOF { + t.Errorf("Expected err to be io.EOF, got %v", err) + } + if n != 1 { + t.Errorf("Expected n to be 1, got %d", n) + } + if res[0] != 't' { + t.Errorf("Expected res[0] to be 't', got %v", res[0]) + } + + err = obj.Close() + if err != nil { + t.Fatal(err) + } + + err = lo.Unlink(id) + if err != nil { + t.Fatal(err) + } + + _, err = lo.Open(id, pgx.LargeObjectModeRead) + if e, ok := err.(pgx.PgError); !ok || e.Code != "42704" { + t.Errorf("Expected undefined_object error (42704), got %#v", err) + } +}