Merge pull request #57 from flynn/large-objects

Implement large object support
pull/59/head
Jack Christensen 2015-01-02 14:18:36 -06:00
commit b5eb23da7b
4 changed files with 378 additions and 0 deletions

View File

@ -50,6 +50,7 @@ type Conn struct {
causeOfDeath error
logger Logger
mr msgReader
fp *fastpath
}
type PreparedStatement struct {

110
fastpath.go Normal file
View File

@ -0,0 +1,110 @@
package pgx
import (
"encoding/binary"
)
type fastpathArg []byte
func newFastpath(cn *Conn) *fastpath {
return &fastpath{cn: cn, fns: make(map[string]Oid)}
}
type fastpath struct {
cn *Conn
fns map[string]Oid
}
func (f *fastpath) functionOID(name string) Oid {
return f.fns[name]
}
func (f *fastpath) addFunction(name string, oid Oid) {
f.fns[name] = oid
}
func (f *fastpath) addFunctions(rows *Rows) error {
for rows.Next() {
var name string
var oid Oid
if err := rows.Scan(&name, &oid); err != nil {
return err
}
f.addFunction(name, oid)
}
return rows.Err()
}
type fpArg []byte
func fpIntArg(n int32) fpArg {
res := make([]byte, 4)
binary.BigEndian.PutUint32(res, uint32(n))
return res
}
func fpInt64Arg(n int64) fpArg {
res := make([]byte, 8)
binary.BigEndian.PutUint64(res, uint64(n))
return res
}
func (f *fastpath) Call(oid Oid, args []fpArg) (res []byte, err error) {
wbuf := newWriteBuf(f.cn.wbuf[:0], 'F') // function call
wbuf.WriteInt32(int32(oid)) // function object id
wbuf.WriteInt16(1) // # of argument format codes
wbuf.WriteInt16(1) // format code: binary
wbuf.WriteInt16(int16(len(args))) // # of arguments
for _, arg := range args {
wbuf.WriteInt32(int32(len(arg))) // length of argument
wbuf.WriteBytes(arg) // argument value
}
wbuf.WriteInt16(1) // response format code (binary)
wbuf.closeMsg()
if _, err := f.cn.conn.Write(wbuf.buf); err != nil {
return nil, err
}
for {
var t byte
var r *msgReader
t, r, err = f.cn.rxMsg()
if err != nil {
return nil, err
}
switch t {
case 'V': // FunctionCallResponse
data := r.readBytes(r.readInt32())
res = make([]byte, len(data))
copy(res, data)
case 'Z': // Ready for query
f.cn.rxReadyForQuery(r)
// done
return
default:
if err := f.cn.processContextFreeMsg(t, r); err != nil {
return nil, err
}
}
}
}
func (f *fastpath) CallFn(fn string, args []fpArg) ([]byte, error) {
return f.Call(f.functionOID(fn), args)
}
func fpInt32(data []byte, err error) (int32, error) {
if err != nil {
return 0, err
}
n := int32(binary.BigEndian.Uint32(data))
return n, nil
}
func fpInt64(data []byte, err error) (int64, error) {
if err != nil {
return 0, err
}
return int64(binary.BigEndian.Uint64(data)), nil
}

146
large_objects.go Normal file
View File

@ -0,0 +1,146 @@
package pgx
import (
"io"
)
// LargeObjects is a structure used to access the large objects API. It is only
// valid within the transaction where it was created.
//
// For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html
type LargeObjects struct {
// Has64 is true if the server is capable of working with 64-bit numbers
Has64 bool
fp *fastpath
}
const largeObjectFns = `select proname, oid from pg_catalog.pg_proc
where proname in (
'lo_open',
'lo_close',
'lo_create',
'lo_unlink',
'lo_lseek',
'lo_lseek64',
'lo_tell',
'lo_tell64',
'lo_truncate',
'lo_truncate64',
'loread',
'lowrite')
and pronamespace = (select oid from pg_catalog.pg_namespace where nspname = 'pg_catalog')`
// LargeObjects returns a LargeObjects instance for the transaction.
func (tx *Tx) LargeObjects() (*LargeObjects, error) {
if tx.conn.fp == nil {
tx.conn.fp = newFastpath(tx.conn)
}
if _, exists := tx.conn.fp.fns["lo_open"]; !exists {
res, err := tx.Query(largeObjectFns)
if err != nil {
return nil, err
}
if err := tx.conn.fp.addFunctions(res); err != nil {
return nil, err
}
}
lo := &LargeObjects{fp: tx.conn.fp}
_, lo.Has64 = lo.fp.fns["lo_lseek64"]
return lo, nil
}
type LargeObjectMode int32
const (
LargeObjectModeWrite LargeObjectMode = 0x20000
LargeObjectModeRead LargeObjectMode = 0x40000
)
// Create creates a new large object. If id is zero, the server assigns an
// unused OID.
func (o *LargeObjects) Create(id Oid) (Oid, error) {
newOid, err := fpInt32(o.fp.CallFn("lo_create", []fpArg{fpIntArg(int32(id))}))
return Oid(newOid), err
}
// Open opens an existing large object with the given mode.
func (o *LargeObjects) Open(oid Oid, mode LargeObjectMode) (*LargeObject, error) {
fd, err := fpInt32(o.fp.CallFn("lo_open", []fpArg{fpIntArg(int32(oid)), fpIntArg(int32(mode))}))
return &LargeObject{fd: fd, lo: o}, err
}
// Unlink removes a large object from the database.
func (o *LargeObjects) Unlink(oid Oid) error {
_, err := o.fp.CallFn("lo_unlink", []fpArg{fpIntArg(int32(oid))})
return err
}
// A LargeObject is a large object stored on the server. It is only valid within
// the transaction that it was initialized in. It implements the these interfaces:
//
// - io.Writer
// - io.Reader
// - io.Seeker
// - io.Closer
type LargeObject struct {
fd int32
lo *LargeObjects
}
// Write writes
func (o *LargeObject) Write(p []byte) (int, error) {
n, err := fpInt32(o.lo.fp.CallFn("lowrite", []fpArg{fpIntArg(o.fd), p}))
return int(n), err
}
// Read reads up to len(p) bytes into p returning the number of bytes read.
func (o *LargeObject) Read(p []byte) (int, error) {
res, err := o.lo.fp.CallFn("loread", []fpArg{fpIntArg(o.fd), fpIntArg(int32(len(p)))})
if len(res) < len(p) {
err = io.EOF
}
return copy(p, res), err
}
// Seek moves the current location pointer to the new location specified by offset.
func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) {
if o.lo.Has64 {
n, err = fpInt64(o.lo.fp.CallFn("lo_lseek64", []fpArg{fpIntArg(o.fd), fpInt64Arg(offset), fpIntArg(int32(whence))}))
} else {
var n32 int32
n32, err = fpInt32(o.lo.fp.CallFn("lo_lseek", []fpArg{fpIntArg(o.fd), fpIntArg(int32(offset)), fpIntArg(int32(whence))}))
n = int64(n32)
}
return
}
// Tell returns the current read or write location of the large object
// descriptor.
func (o *LargeObject) Tell() (n int64, err error) {
if o.lo.Has64 {
n, err = fpInt64(o.lo.fp.CallFn("lo_tell64", []fpArg{fpIntArg(o.fd)}))
} else {
var n32 int32
n32, err = fpInt32(o.lo.fp.CallFn("lo_tell", []fpArg{fpIntArg(o.fd)}))
n = int64(n32)
}
return
}
// Trunctes the large object to size.
func (o *LargeObject) Truncate(size int64) (err error) {
if o.lo.Has64 {
_, err = o.lo.fp.CallFn("lo_truncate64", []fpArg{fpIntArg(o.fd), fpInt64Arg(size)})
} else {
_, err = o.lo.fp.CallFn("lo_truncate", []fpArg{fpIntArg(o.fd), fpIntArg(int32(size))})
}
return
}
// Close closees the large object descriptor.
func (o *LargeObject) Close() error {
_, err := o.lo.fp.CallFn("lo_close", []fpArg{fpIntArg(o.fd)})
return err
}

121
large_objects_test.go Normal file
View File

@ -0,0 +1,121 @@
package pgx_test
import (
"io"
"testing"
"github.com/jackc/pgx"
)
func TestLargeObjects(t *testing.T) {
t.Parallel()
conn, err := pgx.Connect(*defaultConnConfig)
if err != nil {
t.Fatal(err)
}
tx, err := conn.Begin()
if err != nil {
t.Fatal(err)
}
lo, err := tx.LargeObjects()
if err != nil {
t.Fatal(err)
}
id, err := lo.Create(0)
if err != nil {
t.Fatal(err)
}
obj, err := lo.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
if err != nil {
t.Fatal(err)
}
n, err := obj.Write([]byte("testing"))
if err != nil {
t.Fatal(err)
}
if n != 7 {
t.Errorf("Expected n to be 7, got %d", n)
}
pos, err := obj.Seek(1, 0)
if err != nil {
t.Fatal(err)
}
if pos != 1 {
t.Errorf("Expected pos to be 1, got %d", pos)
}
res := make([]byte, 6)
n, err = obj.Read(res)
if err != nil {
t.Fatal(err)
}
if string(res) != "esting" {
t.Errorf(`Expected res to be "esting", got %q`, res)
}
if n != 6 {
t.Errorf("Expected n to be 6, got %d", n)
}
n, err = obj.Read(res)
if err != io.EOF {
t.Error("Expected io.EOF, go nil")
}
if n != 0 {
t.Errorf("Expected n to be 0, got %d", n)
}
pos, err = obj.Tell()
if err != nil {
t.Fatal(err)
}
if pos != 7 {
t.Errorf("Expected pos to be 7, got %d", pos)
}
err = obj.Truncate(1)
if err != nil {
t.Fatal(err)
}
pos, err = obj.Seek(-1, 2)
if err != nil {
t.Fatal(err)
}
if pos != 0 {
t.Errorf("Expected pos to be 0, got %d", pos)
}
res = make([]byte, 2)
n, err = obj.Read(res)
if err != io.EOF {
t.Errorf("Expected err to be io.EOF, got %v", err)
}
if n != 1 {
t.Errorf("Expected n to be 1, got %d", n)
}
if res[0] != 't' {
t.Errorf("Expected res[0] to be 't', got %v", res[0])
}
err = obj.Close()
if err != nil {
t.Fatal(err)
}
err = lo.Unlink(id)
if err != nil {
t.Fatal(err)
}
_, err = lo.Open(id, pgx.LargeObjectModeRead)
if e, ok := err.(pgx.PgError); !ok || e.Code != "42704" {
t.Errorf("Expected undefined_object error (42704), got %#v", err)
}
}