Improve prepared statement performance

* Do not use bufio.Writer - use net.Conn directly
* Use byte slice instead of bytes.Buffer when building query message
* Use binary.BigEndian.* instead of binary.Write
This commit is contained in:
Jack Christensen 2014-06-23 18:26:15 -05:00
parent 3892d8bd70
commit aa6e9d0ddf
4 changed files with 138 additions and 160 deletions

90
conn.go
View File

@ -51,7 +51,7 @@ type ConnConfig struct {
type Conn struct {
conn net.Conn // the underlying TCP or unix domain socket connection
reader *bufio.Reader // buffered reader to improve read performance
writer *bufio.Writer // buffered writer to avoid sending tiny packets
wbuf [1024]byte
buf *bytes.Buffer // work buffer to avoid constant alloc and dealloc
bufSize int // desired size of buf
Pid int32 // backend pid
@ -196,7 +196,6 @@ func Connect(config ConnConfig) (c *Conn, err error) {
}
c.reader = bufio.NewReader(c.conn)
c.writer = bufio.NewWriter(c.conn)
msg := newStartupMessage()
msg.options["user"] = c.config.User
@ -242,7 +241,7 @@ func (c *Conn) Close() (err error) {
return nil
}
err = c.txMsg('X', c.getBuf(), true)
err = c.txMsg('X', c.getBuf())
c.die(errors.New("Closed"))
c.logger.Info("Closed connection")
return err
@ -598,7 +597,7 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) {
buf.WriteString(sql)
buf.WriteByte(0)
binary.Write(buf, binary.BigEndian, int16(0))
err = c.txMsg('P', buf, false)
err = c.txMsg('P', buf)
if err != nil {
return nil, err
}
@ -608,13 +607,13 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) {
buf.WriteByte('S')
buf.WriteString(name)
buf.WriteByte(0)
err = c.txMsg('D', buf, false)
err = c.txMsg('D', buf)
if err != nil {
return nil, err
}
// sync
err = c.txMsg('S', c.getBuf(), true)
err = c.txMsg('S', c.getBuf())
if err != nil {
return nil, err
}
@ -763,7 +762,7 @@ func (c *Conn) sendSimpleQuery(sql string, arguments ...interface{}) (err error)
return
}
return c.txMsg('Q', buf, true)
return c.txMsg('Q', buf)
}
func (c *Conn) sendPreparedQuery(ps *PreparedStatement, arguments ...interface{}) (err error) {
@ -772,71 +771,59 @@ func (c *Conn) sendPreparedQuery(ps *PreparedStatement, arguments ...interface{}
}
// bind
buf := c.getBuf()
buf.WriteString("")
buf.WriteByte(0)
buf.WriteString(ps.Name)
buf.WriteByte(0)
binary.Write(buf, binary.BigEndian, int16(len(ps.ParameterOids)))
wbuf := newWriteBuf(c.wbuf[0:0], 'B')
wbuf.WriteByte(0)
wbuf.WriteCString(ps.Name)
wbuf.WriteInt16(int16(len(ps.ParameterOids)))
for _, oid := range ps.ParameterOids {
transcoder := ValueTranscoders[oid]
if transcoder == nil {
transcoder = defaultTranscoder
}
binary.Write(buf, binary.BigEndian, transcoder.EncodeFormat)
wbuf.WriteInt16(transcoder.EncodeFormat)
}
binary.Write(buf, binary.BigEndian, int16(len(arguments)))
wbuf.WriteInt16(int16(len(arguments)))
for i, oid := range ps.ParameterOids {
if arguments[i] != nil {
transcoder := ValueTranscoders[oid]
if transcoder == nil {
transcoder = defaultTranscoder
}
err = transcoder.EncodeTo(buf, arguments[i])
err = transcoder.EncodeTo(wbuf, arguments[i])
if err != nil {
return err
}
} else {
binary.Write(buf, binary.BigEndian, int32(-1))
wbuf.WriteInt32(int32(-1))
}
}
binary.Write(buf, binary.BigEndian, int16(len(ps.FieldDescriptions)))
wbuf.WriteInt16(int16(len(ps.FieldDescriptions)))
for _, fd := range ps.FieldDescriptions {
transcoder := ValueTranscoders[fd.DataType]
if transcoder != nil && transcoder.DecodeBinary != nil {
binary.Write(buf, binary.BigEndian, int16(1))
wbuf.WriteInt16(1)
} else {
binary.Write(buf, binary.BigEndian, int16(0))
wbuf.WriteInt16(0)
}
}
err = c.txMsg('B', buf, false)
if err != nil {
return err
}
// execute
buf = c.getBuf()
buf.WriteString("")
buf.WriteByte(0)
binary.Write(buf, binary.BigEndian, int32(0))
err = c.txMsg('E', buf, false)
if err != nil {
return err
}
wbuf.startMsg('E')
wbuf.WriteByte(0)
wbuf.WriteInt32(0)
// sync
err = c.txMsg('S', c.getBuf(), true)
if err != nil {
wbuf.startMsg('S')
wbuf.closeMsg()
_, err = c.conn.Write(wbuf.buf)
return err
}
return
}
// Execute executes sql. sql can be either a prepared statement name or an SQL string.
// arguments will be sanitized before being interpolated into sql strings. arguments
// should be referenced positionally from the sql string as $1, $2, etc.
@ -1126,16 +1113,12 @@ func (c *Conn) startTLS() (err error) {
return nil
}
func (c *Conn) txStartupMessage(msg *startupMessage) (err error) {
_, err = c.writer.Write(msg.Bytes())
if err != nil {
return
}
err = c.writer.Flush()
return
func (c *Conn) txStartupMessage(msg *startupMessage) error {
_, err := c.conn.Write(msg.Bytes())
return err
}
func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err error) {
func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer) (err error) {
if !c.alive {
return DeadConnError
}
@ -1146,25 +1129,21 @@ func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err error)
}
}()
err = binary.Write(c.writer, binary.BigEndian, identifier)
err = binary.Write(c.conn, binary.BigEndian, identifier)
if err != nil {
return
}
err = binary.Write(c.writer, binary.BigEndian, int32(buf.Len()+4))
err = binary.Write(c.conn, binary.BigEndian, int32(buf.Len()+4))
if err != nil {
return
}
_, err = buf.WriteTo(c.writer)
_, err = buf.WriteTo(c.conn)
if err != nil {
return
}
if flush {
err = c.writer.Flush()
}
return
}
@ -1179,7 +1158,7 @@ func (c *Conn) txPasswordMessage(password string) (err error) {
if err != nil {
return
}
err = c.txMsg('p', buf, true)
err = c.txMsg('p', buf)
return
}
@ -1198,6 +1177,5 @@ func (c *Conn) getBuf() *bytes.Buffer {
func (c *Conn) die(err error) {
c.alive = false
c.causeOfDeath = err
c.writer.Flush()
c.conn.Close()
}

View File

@ -1,10 +1,8 @@
package pgx_test
import (
"encoding/binary"
"fmt"
"github.com/jackc/pgx"
"io"
"regexp"
"strconv"
)
@ -57,19 +55,15 @@ func decodePointFromText(mr *pgx.MessageReader, size int32) interface{} {
return p
}
func encodePoint(w io.Writer, value interface{}) error {
func encodePoint(w *pgx.WriteBuf, value interface{}) error {
p, ok := value.(Point)
if !ok {
return fmt.Errorf("Expected Point, received %T", value)
}
s := fmt.Sprintf("point(%v,%v)", p.x, p.y)
w.WriteInt32(int32(len(s)))
w.WriteBytes([]byte(s))
err := binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
_, err = io.WriteString(w, s)
return err
return nil
}

View File

@ -67,3 +67,54 @@ type PgError struct {
func (self PgError) Error() string {
return self.Severity + ": " + self.Message + " (SQLSTATE " + self.Code + ")"
}
func newWriteBuf(buf []byte, t byte) *WriteBuf {
buf = append(buf, t, 0, 0, 0, 0)
return &WriteBuf{buf: buf, sizeIdx: 1}
}
type WriteBuf struct {
buf []byte
sizeIdx int
}
func (wb *WriteBuf) startMsg(t byte) {
wb.closeMsg()
wb.buf = append(wb.buf, t, 0, 0, 0, 0)
wb.sizeIdx = len(wb.buf) - 4
}
func (wb *WriteBuf) closeMsg() {
binary.BigEndian.PutUint32(wb.buf[wb.sizeIdx:wb.sizeIdx+4], uint32(len(wb.buf)-wb.sizeIdx))
}
func (wb *WriteBuf) WriteByte(b byte) {
wb.buf = append(wb.buf, b)
}
func (wb *WriteBuf) WriteCString(s string) {
wb.buf = append(wb.buf, []byte(s)...)
wb.buf = append(wb.buf, 0)
}
func (wb *WriteBuf) WriteInt16(n int16) {
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, uint16(n))
wb.buf = append(wb.buf, b...)
}
func (wb *WriteBuf) WriteInt32(n int32) {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(n))
wb.buf = append(wb.buf, b...)
}
func (wb *WriteBuf) WriteInt64(n int64) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(n))
wb.buf = append(wb.buf, b...)
}
func (wb *WriteBuf) WriteBytes(b []byte) {
wb.buf = append(wb.buf, b...)
}

View File

@ -2,10 +2,8 @@ package pgx
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"math"
"regexp"
"strconv"
@ -21,7 +19,7 @@ type ValueTranscoder struct {
// DecodeBinary decodes values returned from the server in binary format
DecodeBinary func(*MessageReader, int32) interface{}
// EncodeTo encodes values to send to the server
EncodeTo func(io.Writer, interface{}) error
EncodeTo func(*WriteBuf, interface{}) error
// EncodeFormat is the format values are encoded for transmission.
// 0 = text
// 1 = binary
@ -162,23 +160,22 @@ func decodeBoolFromBinary(mr *MessageReader, size int32) interface{} {
return b != 0
}
func encodeBool(w io.Writer, value interface{}) error {
func encodeBool(w *WriteBuf, value interface{}) error {
v, ok := value.(bool)
if !ok {
return fmt.Errorf("Expected bool, received %T", value)
}
err := binary.Write(w, binary.BigEndian, int32(1))
if err != nil {
return err
}
w.WriteInt32(1)
var n byte
if v {
n = 1
}
return binary.Write(w, binary.BigEndian, n)
w.WriteByte(n)
return nil
}
func decodeInt8FromText(mr *MessageReader, size int32) interface{} {
@ -197,7 +194,7 @@ func decodeInt8FromBinary(mr *MessageReader, size int32) interface{} {
return mr.ReadInt64()
}
func encodeInt8(w io.Writer, value interface{}) error {
func encodeInt8(w *WriteBuf, value interface{}) error {
var v int64
switch value := value.(type) {
case int8:
@ -225,12 +222,10 @@ func encodeInt8(w io.Writer, value interface{}) error {
return fmt.Errorf("Expected integer representable in int64, received %T %v", value, value)
}
err := binary.Write(w, binary.BigEndian, int32(8))
if err != nil {
return err
}
w.WriteInt32(8)
w.WriteInt64(v)
return binary.Write(w, binary.BigEndian, v)
return nil
}
func decodeInt2FromText(mr *MessageReader, size int32) interface{} {
@ -249,7 +244,7 @@ func decodeInt2FromBinary(mr *MessageReader, size int32) interface{} {
return mr.ReadInt16()
}
func encodeInt2(w io.Writer, value interface{}) error {
func encodeInt2(w *WriteBuf, value interface{}) error {
var v int16
switch value := value.(type) {
case int8:
@ -292,12 +287,10 @@ func encodeInt2(w io.Writer, value interface{}) error {
return fmt.Errorf("Expected integer representable in int16, received %T %v", value, value)
}
err := binary.Write(w, binary.BigEndian, int32(2))
if err != nil {
return err
}
w.WriteInt32(2)
w.WriteInt16(v)
return binary.Write(w, binary.BigEndian, v)
return nil
}
func decodeInt4FromText(mr *MessageReader, size int32) interface{} {
@ -316,7 +309,7 @@ func decodeInt4FromBinary(mr *MessageReader, size int32) interface{} {
return mr.ReadInt32()
}
func encodeInt4(w io.Writer, value interface{}) error {
func encodeInt4(w *WriteBuf, value interface{}) error {
var v int32
switch value := value.(type) {
case int8:
@ -353,12 +346,10 @@ func encodeInt4(w io.Writer, value interface{}) error {
return fmt.Errorf("Expected integer representable in int32, received %T %v", value, value)
}
err := binary.Write(w, binary.BigEndian, int32(4))
if err != nil {
return err
}
w.WriteInt32(4)
w.WriteInt32(v)
return binary.Write(w, binary.BigEndian, v)
return nil
}
func decodeFloat4FromText(mr *MessageReader, size int32) interface{} {
@ -380,7 +371,7 @@ func decodeFloat4FromBinary(mr *MessageReader, size int32) interface{} {
return *(*float32)(p)
}
func encodeFloat4(w io.Writer, value interface{}) error {
func encodeFloat4(w *WriteBuf, value interface{}) error {
var v float32
switch value := value.(type) {
case float32:
@ -394,12 +385,12 @@ func encodeFloat4(w io.Writer, value interface{}) error {
return fmt.Errorf("Expected float representable in float32, received %T %v", value, value)
}
err := binary.Write(w, binary.BigEndian, int32(4))
if err != nil {
return err
}
w.WriteInt32(4)
return binary.Write(w, binary.BigEndian, v)
p := unsafe.Pointer(&v)
w.WriteInt32(*(*int32)(p))
return nil
}
func decodeFloat8FromText(mr *MessageReader, size int32) interface{} {
@ -421,7 +412,7 @@ func decodeFloat8FromBinary(mr *MessageReader, size int32) interface{} {
return *(*float64)(p)
}
func encodeFloat8(w io.Writer, value interface{}) error {
func encodeFloat8(w *WriteBuf, value interface{}) error {
var v float64
switch value := value.(type) {
case float32:
@ -432,31 +423,28 @@ func encodeFloat8(w io.Writer, value interface{}) error {
return fmt.Errorf("Expected float representable in float64, received %T %v", value, value)
}
err := binary.Write(w, binary.BigEndian, int32(8))
if err != nil {
return err
}
w.WriteInt32(8)
return binary.Write(w, binary.BigEndian, v)
p := unsafe.Pointer(&v)
w.WriteInt64(*(*int64)(p))
return nil
}
func decodeTextFromText(mr *MessageReader, size int32) interface{} {
return mr.ReadString(size)
}
func encodeText(w io.Writer, value interface{}) error {
func encodeText(w *WriteBuf, value interface{}) error {
s, ok := value.(string)
if !ok {
return fmt.Errorf("Expected string, received %T", value)
}
err := binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
w.WriteInt32(int32(len(s)))
w.WriteBytes([]byte(s))
_, err = io.WriteString(w, s)
return err
return nil
}
func decodeByteaFromText(mr *MessageReader, size int32) interface{} {
@ -468,19 +456,16 @@ func decodeByteaFromText(mr *MessageReader, size int32) interface{} {
return b
}
func encodeBytea(w io.Writer, value interface{}) error {
func encodeBytea(w *WriteBuf, value interface{}) error {
b, ok := value.([]byte)
if !ok {
return fmt.Errorf("Expected []byte, received %T", value)
}
err := binary.Write(w, binary.BigEndian, int32(len(b)))
if err != nil {
return err
}
w.WriteInt32(int32(len(b)))
w.WriteBytes(b)
_, err = w.Write(b)
return err
return nil
}
func decodeDateFromText(mr *MessageReader, size int32) interface{} {
@ -492,20 +477,14 @@ func decodeDateFromText(mr *MessageReader, size int32) interface{} {
return t
}
func encodeDate(w io.Writer, value interface{}) error {
func encodeDate(w *WriteBuf, value interface{}) error {
t, ok := value.(time.Time)
if !ok {
return fmt.Errorf("Expected time.Time, received %T", value)
}
s := t.Format("2006-01-02")
err := binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
_, err = io.WriteString(w, s)
return err
return encodeText(w, s)
}
func decodeTimestampTzFromText(mr *MessageReader, size int32) interface{} {
@ -531,20 +510,14 @@ func decodeTimestampTzFromBinary(mr *MessageReader, size int32) interface{} {
}
func encodeTimestampTz(w io.Writer, value interface{}) error {
func encodeTimestampTz(w *WriteBuf, value interface{}) error {
t, ok := value.(time.Time)
if !ok {
return fmt.Errorf("Expected time.Time, received %T", value)
}
s := t.Format("2006-01-02 15:04:05.999999 -0700")
err := binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
_, err = io.WriteString(w, s)
return err
return encodeText(w, s)
}
func decodeInt2ArrayFromText(mr *MessageReader, size int32) interface{} {
@ -594,7 +567,7 @@ func int16SliceToArrayString(nums []int16) (string, error) {
return w.String(), nil
}
func encodeInt2Array(w io.Writer, value interface{}) error {
func encodeInt2Array(w *WriteBuf, value interface{}) error {
v, ok := value.([]int16)
if !ok {
return fmt.Errorf("Expected []int16, received %T", value)
@ -605,13 +578,7 @@ func encodeInt2Array(w io.Writer, value interface{}) error {
return fmt.Errorf("Failed to encode []int16: %v", err)
}
err = binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
_, err = io.WriteString(w, s)
return err
return encodeText(w, s)
}
func decodeInt4ArrayFromText(mr *MessageReader, size int32) interface{} {
@ -662,7 +629,7 @@ func int32SliceToArrayString(nums []int32) (string, error) {
return w.String(), nil
}
func encodeInt4Array(w io.Writer, value interface{}) error {
func encodeInt4Array(w *WriteBuf, value interface{}) error {
v, ok := value.([]int32)
if !ok {
return fmt.Errorf("Expected []int32, received %T", value)
@ -673,13 +640,7 @@ func encodeInt4Array(w io.Writer, value interface{}) error {
return fmt.Errorf("Failed to encode []int32: %v", err)
}
err = binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
_, err = io.WriteString(w, s)
return err
return encodeText(w, s)
}
func decodeInt8ArrayFromText(mr *MessageReader, size int32) interface{} {
@ -730,7 +691,7 @@ func int64SliceToArrayString(nums []int64) (string, error) {
return w.String(), nil
}
func encodeInt8Array(w io.Writer, value interface{}) error {
func encodeInt8Array(w *WriteBuf, value interface{}) error {
v, ok := value.([]int64)
if !ok {
return fmt.Errorf("Expected []int64, received %T", value)
@ -741,11 +702,5 @@ func encodeInt8Array(w io.Writer, value interface{}) error {
return fmt.Errorf("Failed to encode []int64: %v", err)
}
err = binary.Write(w, binary.BigEndian, int32(len(s)))
if err != nil {
return err
}
_, err = io.WriteString(w, s)
return err
return encodeText(w, s)
}