From aa6e9d0ddff91f7403b02865db4881e6370ad2ed Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Mon, 23 Jun 2014 18:26:15 -0500 Subject: [PATCH] Improve prepared statement performance * Do not use bufio.Writer - use net.Conn directly * Use byte slice instead of bytes.Buffer when building query message * Use binary.BigEndian.* instead of binary.Write --- conn.go | 92 ++++++++------------ example_value_transcoder_test.go | 14 +-- messages.go | 51 +++++++++++ value_transcoder.go | 141 +++++++++++-------------------- 4 files changed, 138 insertions(+), 160 deletions(-) diff --git a/conn.go b/conn.go index 0330ddcc..ea979770 100644 --- a/conn.go +++ b/conn.go @@ -49,9 +49,9 @@ type ConnConfig struct { // Use ConnPool to manage access to multiple database connections from multiple // goroutines. type Conn struct { - conn net.Conn // the underlying TCP or unix domain socket connection - reader *bufio.Reader // buffered reader to improve read performance - writer *bufio.Writer // buffered writer to avoid sending tiny packets + conn net.Conn // the underlying TCP or unix domain socket connection + reader *bufio.Reader // buffered reader to improve read performance + wbuf [1024]byte buf *bytes.Buffer // work buffer to avoid constant alloc and dealloc bufSize int // desired size of buf Pid int32 // backend pid @@ -196,7 +196,6 @@ func Connect(config ConnConfig) (c *Conn, err error) { } c.reader = bufio.NewReader(c.conn) - c.writer = bufio.NewWriter(c.conn) msg := newStartupMessage() msg.options["user"] = c.config.User @@ -242,7 +241,7 @@ func (c *Conn) Close() (err error) { return nil } - err = c.txMsg('X', c.getBuf(), true) + err = c.txMsg('X', c.getBuf()) c.die(errors.New("Closed")) c.logger.Info("Closed connection") return err @@ -598,7 +597,7 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) { buf.WriteString(sql) buf.WriteByte(0) binary.Write(buf, binary.BigEndian, int16(0)) - err = c.txMsg('P', buf, false) + err = c.txMsg('P', buf) if err != nil { return nil, err } @@ -608,13 +607,13 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) { buf.WriteByte('S') buf.WriteString(name) buf.WriteByte(0) - err = c.txMsg('D', buf, false) + err = c.txMsg('D', buf) if err != nil { return nil, err } // sync - err = c.txMsg('S', c.getBuf(), true) + err = c.txMsg('S', c.getBuf()) if err != nil { return nil, err } @@ -763,7 +762,7 @@ func (c *Conn) sendSimpleQuery(sql string, arguments ...interface{}) (err error) return } - return c.txMsg('Q', buf, true) + return c.txMsg('Q', buf) } func (c *Conn) sendPreparedQuery(ps *PreparedStatement, arguments ...interface{}) (err error) { @@ -772,69 +771,57 @@ func (c *Conn) sendPreparedQuery(ps *PreparedStatement, arguments ...interface{} } // bind - buf := c.getBuf() - buf.WriteString("") - buf.WriteByte(0) - buf.WriteString(ps.Name) - buf.WriteByte(0) - binary.Write(buf, binary.BigEndian, int16(len(ps.ParameterOids))) + wbuf := newWriteBuf(c.wbuf[0:0], 'B') + wbuf.WriteByte(0) + wbuf.WriteCString(ps.Name) + + wbuf.WriteInt16(int16(len(ps.ParameterOids))) for _, oid := range ps.ParameterOids { transcoder := ValueTranscoders[oid] if transcoder == nil { transcoder = defaultTranscoder } - binary.Write(buf, binary.BigEndian, transcoder.EncodeFormat) + wbuf.WriteInt16(transcoder.EncodeFormat) } - binary.Write(buf, binary.BigEndian, int16(len(arguments))) + wbuf.WriteInt16(int16(len(arguments))) for i, oid := range ps.ParameterOids { if arguments[i] != nil { transcoder := ValueTranscoders[oid] if transcoder == nil { transcoder = defaultTranscoder } - err = transcoder.EncodeTo(buf, arguments[i]) + err = transcoder.EncodeTo(wbuf, arguments[i]) if err != nil { return err } } else { - binary.Write(buf, binary.BigEndian, int32(-1)) + wbuf.WriteInt32(int32(-1)) } } - binary.Write(buf, binary.BigEndian, int16(len(ps.FieldDescriptions))) + wbuf.WriteInt16(int16(len(ps.FieldDescriptions))) for _, fd := range ps.FieldDescriptions { transcoder := ValueTranscoders[fd.DataType] if transcoder != nil && transcoder.DecodeBinary != nil { - binary.Write(buf, binary.BigEndian, int16(1)) + wbuf.WriteInt16(1) } else { - binary.Write(buf, binary.BigEndian, int16(0)) + wbuf.WriteInt16(0) } } - err = c.txMsg('B', buf, false) - if err != nil { - return err - } - // execute - buf = c.getBuf() - buf.WriteString("") - buf.WriteByte(0) - binary.Write(buf, binary.BigEndian, int32(0)) - err = c.txMsg('E', buf, false) - if err != nil { - return err - } + wbuf.startMsg('E') + wbuf.WriteByte(0) + wbuf.WriteInt32(0) // sync - err = c.txMsg('S', c.getBuf(), true) - if err != nil { - return err - } + wbuf.startMsg('S') + wbuf.closeMsg() - return + _, err = c.conn.Write(wbuf.buf) + return err } // Execute executes sql. sql can be either a prepared statement name or an SQL string. @@ -1126,16 +1113,12 @@ func (c *Conn) startTLS() (err error) { return nil } -func (c *Conn) txStartupMessage(msg *startupMessage) (err error) { - _, err = c.writer.Write(msg.Bytes()) - if err != nil { - return - } - err = c.writer.Flush() - return +func (c *Conn) txStartupMessage(msg *startupMessage) error { + _, err := c.conn.Write(msg.Bytes()) + return err } -func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err error) { +func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer) (err error) { if !c.alive { return DeadConnError } @@ -1146,25 +1129,21 @@ func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err error) } }() - err = binary.Write(c.writer, binary.BigEndian, identifier) + err = binary.Write(c.conn, binary.BigEndian, identifier) if err != nil { return } - err = binary.Write(c.writer, binary.BigEndian, int32(buf.Len()+4)) + err = binary.Write(c.conn, binary.BigEndian, int32(buf.Len()+4)) if err != nil { return } - _, err = buf.WriteTo(c.writer) + _, err = buf.WriteTo(c.conn) if err != nil { return } - if flush { - err = c.writer.Flush() - } - return } @@ -1179,7 +1158,7 @@ func (c *Conn) txPasswordMessage(password string) (err error) { if err != nil { return } - err = c.txMsg('p', buf, true) + err = c.txMsg('p', buf) return } @@ -1198,6 +1177,5 @@ func (c *Conn) getBuf() *bytes.Buffer { func (c *Conn) die(err error) { c.alive = false c.causeOfDeath = err - c.writer.Flush() c.conn.Close() } diff --git a/example_value_transcoder_test.go b/example_value_transcoder_test.go index a8ff81cc..2d7b25a5 100644 --- a/example_value_transcoder_test.go +++ b/example_value_transcoder_test.go @@ -1,10 +1,8 @@ package pgx_test import ( - "encoding/binary" "fmt" "github.com/jackc/pgx" - "io" "regexp" "strconv" ) @@ -57,19 +55,15 @@ func decodePointFromText(mr *pgx.MessageReader, size int32) interface{} { return p } -func encodePoint(w io.Writer, value interface{}) error { +func encodePoint(w *pgx.WriteBuf, value interface{}) error { p, ok := value.(Point) if !ok { return fmt.Errorf("Expected Point, received %T", value) } s := fmt.Sprintf("point(%v,%v)", p.x, p.y) + w.WriteInt32(int32(len(s))) + w.WriteBytes([]byte(s)) - err := binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } - - _, err = io.WriteString(w, s) - return err + return nil } diff --git a/messages.go b/messages.go index 27d7444d..892745ba 100644 --- a/messages.go +++ b/messages.go @@ -67,3 +67,54 @@ type PgError struct { func (self PgError) Error() string { return self.Severity + ": " + self.Message + " (SQLSTATE " + self.Code + ")" } + +func newWriteBuf(buf []byte, t byte) *WriteBuf { + buf = append(buf, t, 0, 0, 0, 0) + return &WriteBuf{buf: buf, sizeIdx: 1} +} + +type WriteBuf struct { + buf []byte + sizeIdx int +} + +func (wb *WriteBuf) startMsg(t byte) { + wb.closeMsg() + wb.buf = append(wb.buf, t, 0, 0, 0, 0) + wb.sizeIdx = len(wb.buf) - 4 +} + +func (wb *WriteBuf) closeMsg() { + binary.BigEndian.PutUint32(wb.buf[wb.sizeIdx:wb.sizeIdx+4], uint32(len(wb.buf)-wb.sizeIdx)) +} + +func (wb *WriteBuf) WriteByte(b byte) { + wb.buf = append(wb.buf, b) +} + +func (wb *WriteBuf) WriteCString(s string) { + wb.buf = append(wb.buf, []byte(s)...) + wb.buf = append(wb.buf, 0) +} + +func (wb *WriteBuf) WriteInt16(n int16) { + b := make([]byte, 2) + binary.BigEndian.PutUint16(b, uint16(n)) + wb.buf = append(wb.buf, b...) +} + +func (wb *WriteBuf) WriteInt32(n int32) { + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, uint32(n)) + wb.buf = append(wb.buf, b...) +} + +func (wb *WriteBuf) WriteInt64(n int64) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(n)) + wb.buf = append(wb.buf, b...) +} + +func (wb *WriteBuf) WriteBytes(b []byte) { + wb.buf = append(wb.buf, b...) +} diff --git a/value_transcoder.go b/value_transcoder.go index 698a67f1..2085346d 100644 --- a/value_transcoder.go +++ b/value_transcoder.go @@ -2,10 +2,8 @@ package pgx import ( "bytes" - "encoding/binary" "encoding/hex" "fmt" - "io" "math" "regexp" "strconv" @@ -21,7 +19,7 @@ type ValueTranscoder struct { // DecodeBinary decodes values returned from the server in binary format DecodeBinary func(*MessageReader, int32) interface{} // EncodeTo encodes values to send to the server - EncodeTo func(io.Writer, interface{}) error + EncodeTo func(*WriteBuf, interface{}) error // EncodeFormat is the format values are encoded for transmission. // 0 = text // 1 = binary @@ -162,23 +160,22 @@ func decodeBoolFromBinary(mr *MessageReader, size int32) interface{} { return b != 0 } -func encodeBool(w io.Writer, value interface{}) error { +func encodeBool(w *WriteBuf, value interface{}) error { v, ok := value.(bool) if !ok { return fmt.Errorf("Expected bool, received %T", value) } - err := binary.Write(w, binary.BigEndian, int32(1)) - if err != nil { - return err - } + w.WriteInt32(1) var n byte if v { n = 1 } - return binary.Write(w, binary.BigEndian, n) + w.WriteByte(n) + + return nil } func decodeInt8FromText(mr *MessageReader, size int32) interface{} { @@ -197,7 +194,7 @@ func decodeInt8FromBinary(mr *MessageReader, size int32) interface{} { return mr.ReadInt64() } -func encodeInt8(w io.Writer, value interface{}) error { +func encodeInt8(w *WriteBuf, value interface{}) error { var v int64 switch value := value.(type) { case int8: @@ -225,12 +222,10 @@ func encodeInt8(w io.Writer, value interface{}) error { return fmt.Errorf("Expected integer representable in int64, received %T %v", value, value) } - err := binary.Write(w, binary.BigEndian, int32(8)) - if err != nil { - return err - } + w.WriteInt32(8) + w.WriteInt64(v) - return binary.Write(w, binary.BigEndian, v) + return nil } func decodeInt2FromText(mr *MessageReader, size int32) interface{} { @@ -249,7 +244,7 @@ func decodeInt2FromBinary(mr *MessageReader, size int32) interface{} { return mr.ReadInt16() } -func encodeInt2(w io.Writer, value interface{}) error { +func encodeInt2(w *WriteBuf, value interface{}) error { var v int16 switch value := value.(type) { case int8: @@ -292,12 +287,10 @@ func encodeInt2(w io.Writer, value interface{}) error { return fmt.Errorf("Expected integer representable in int16, received %T %v", value, value) } - err := binary.Write(w, binary.BigEndian, int32(2)) - if err != nil { - return err - } + w.WriteInt32(2) + w.WriteInt16(v) - return binary.Write(w, binary.BigEndian, v) + return nil } func decodeInt4FromText(mr *MessageReader, size int32) interface{} { @@ -316,7 +309,7 @@ func decodeInt4FromBinary(mr *MessageReader, size int32) interface{} { return mr.ReadInt32() } -func encodeInt4(w io.Writer, value interface{}) error { +func encodeInt4(w *WriteBuf, value interface{}) error { var v int32 switch value := value.(type) { case int8: @@ -353,12 +346,10 @@ func encodeInt4(w io.Writer, value interface{}) error { return fmt.Errorf("Expected integer representable in int32, received %T %v", value, value) } - err := binary.Write(w, binary.BigEndian, int32(4)) - if err != nil { - return err - } + w.WriteInt32(4) + w.WriteInt32(v) - return binary.Write(w, binary.BigEndian, v) + return nil } func decodeFloat4FromText(mr *MessageReader, size int32) interface{} { @@ -380,7 +371,7 @@ func decodeFloat4FromBinary(mr *MessageReader, size int32) interface{} { return *(*float32)(p) } -func encodeFloat4(w io.Writer, value interface{}) error { +func encodeFloat4(w *WriteBuf, value interface{}) error { var v float32 switch value := value.(type) { case float32: @@ -394,12 +385,12 @@ func encodeFloat4(w io.Writer, value interface{}) error { return fmt.Errorf("Expected float representable in float32, received %T %v", value, value) } - err := binary.Write(w, binary.BigEndian, int32(4)) - if err != nil { - return err - } + w.WriteInt32(4) - return binary.Write(w, binary.BigEndian, v) + p := unsafe.Pointer(&v) + w.WriteInt32(*(*int32)(p)) + + return nil } func decodeFloat8FromText(mr *MessageReader, size int32) interface{} { @@ -421,7 +412,7 @@ func decodeFloat8FromBinary(mr *MessageReader, size int32) interface{} { return *(*float64)(p) } -func encodeFloat8(w io.Writer, value interface{}) error { +func encodeFloat8(w *WriteBuf, value interface{}) error { var v float64 switch value := value.(type) { case float32: @@ -432,31 +423,28 @@ func encodeFloat8(w io.Writer, value interface{}) error { return fmt.Errorf("Expected float representable in float64, received %T %v", value, value) } - err := binary.Write(w, binary.BigEndian, int32(8)) - if err != nil { - return err - } + w.WriteInt32(8) - return binary.Write(w, binary.BigEndian, v) + p := unsafe.Pointer(&v) + w.WriteInt64(*(*int64)(p)) + + return nil } func decodeTextFromText(mr *MessageReader, size int32) interface{} { return mr.ReadString(size) } -func encodeText(w io.Writer, value interface{}) error { +func encodeText(w *WriteBuf, value interface{}) error { s, ok := value.(string) if !ok { return fmt.Errorf("Expected string, received %T", value) } - err := binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } + w.WriteInt32(int32(len(s))) + w.WriteBytes([]byte(s)) - _, err = io.WriteString(w, s) - return err + return nil } func decodeByteaFromText(mr *MessageReader, size int32) interface{} { @@ -468,19 +456,16 @@ func decodeByteaFromText(mr *MessageReader, size int32) interface{} { return b } -func encodeBytea(w io.Writer, value interface{}) error { +func encodeBytea(w *WriteBuf, value interface{}) error { b, ok := value.([]byte) if !ok { return fmt.Errorf("Expected []byte, received %T", value) } - err := binary.Write(w, binary.BigEndian, int32(len(b))) - if err != nil { - return err - } + w.WriteInt32(int32(len(b))) + w.WriteBytes(b) - _, err = w.Write(b) - return err + return nil } func decodeDateFromText(mr *MessageReader, size int32) interface{} { @@ -492,20 +477,14 @@ func decodeDateFromText(mr *MessageReader, size int32) interface{} { return t } -func encodeDate(w io.Writer, value interface{}) error { +func encodeDate(w *WriteBuf, value interface{}) error { t, ok := value.(time.Time) if !ok { return fmt.Errorf("Expected time.Time, received %T", value) } s := t.Format("2006-01-02") - err := binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } - - _, err = io.WriteString(w, s) - return err + return encodeText(w, s) } func decodeTimestampTzFromText(mr *MessageReader, size int32) interface{} { @@ -531,20 +510,14 @@ func decodeTimestampTzFromBinary(mr *MessageReader, size int32) interface{} { } -func encodeTimestampTz(w io.Writer, value interface{}) error { +func encodeTimestampTz(w *WriteBuf, value interface{}) error { t, ok := value.(time.Time) if !ok { return fmt.Errorf("Expected time.Time, received %T", value) } s := t.Format("2006-01-02 15:04:05.999999 -0700") - err := binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } - - _, err = io.WriteString(w, s) - return err + return encodeText(w, s) } func decodeInt2ArrayFromText(mr *MessageReader, size int32) interface{} { @@ -594,7 +567,7 @@ func int16SliceToArrayString(nums []int16) (string, error) { return w.String(), nil } -func encodeInt2Array(w io.Writer, value interface{}) error { +func encodeInt2Array(w *WriteBuf, value interface{}) error { v, ok := value.([]int16) if !ok { return fmt.Errorf("Expected []int16, received %T", value) @@ -605,13 +578,7 @@ func encodeInt2Array(w io.Writer, value interface{}) error { return fmt.Errorf("Failed to encode []int16: %v", err) } - err = binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } - - _, err = io.WriteString(w, s) - return err + return encodeText(w, s) } func decodeInt4ArrayFromText(mr *MessageReader, size int32) interface{} { @@ -662,7 +629,7 @@ func int32SliceToArrayString(nums []int32) (string, error) { return w.String(), nil } -func encodeInt4Array(w io.Writer, value interface{}) error { +func encodeInt4Array(w *WriteBuf, value interface{}) error { v, ok := value.([]int32) if !ok { return fmt.Errorf("Expected []int32, received %T", value) @@ -673,13 +640,7 @@ func encodeInt4Array(w io.Writer, value interface{}) error { return fmt.Errorf("Failed to encode []int32: %v", err) } - err = binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } - - _, err = io.WriteString(w, s) - return err + return encodeText(w, s) } func decodeInt8ArrayFromText(mr *MessageReader, size int32) interface{} { @@ -730,7 +691,7 @@ func int64SliceToArrayString(nums []int64) (string, error) { return w.String(), nil } -func encodeInt8Array(w io.Writer, value interface{}) error { +func encodeInt8Array(w *WriteBuf, value interface{}) error { v, ok := value.([]int64) if !ok { return fmt.Errorf("Expected []int64, received %T", value) @@ -741,11 +702,5 @@ func encodeInt8Array(w io.Writer, value interface{}) error { return fmt.Errorf("Failed to encode []int64: %v", err) } - err = binary.Write(w, binary.BigEndian, int32(len(s))) - if err != nil { - return err - } - - _, err = io.WriteString(w, s) - return err + return encodeText(w, s) }