pgx-vs-pq
Jack Christensen 2013-04-08 21:23:25 -05:00
parent f56abaf154
commit 63d16ae3ac
2 changed files with 44 additions and 84 deletions

95
conn.go
View File

@ -10,11 +10,11 @@ import (
type conn struct { type conn struct {
conn net.Conn // the underlying TCP or unix domain socket connection conn net.Conn // the underlying TCP or unix domain socket connection
rowDesc rowDescription // current query rowDescription
buf []byte // work buffer to avoid constant alloc and dealloc buf []byte // work buffer to avoid constant alloc and dealloc
pid int32 // backend pid pid int32 // backend pid
secretKey int32 // key to use to send a cancel query message to the server secretKey int32 // key to use to send a cancel query message to the server
runtimeParams map[string]string // parameters that have been reported by the server runtimeParams map[string]string // parameters that have been reported by the server
txStatus byte
} }
func Connect(options map[string]string) (c *conn, err error) { func Connect(options map[string]string) (c *conn, err error) {
@ -44,11 +44,11 @@ func Connect(options map[string]string) (c *conn, err error) {
var r *messageReader var r *messageReader
if t, r, err = c.rxMsg(); err == nil { if t, r, err = c.rxMsg(); err == nil {
switch t { switch t {
case 'K': case backendKeyData:
c.rxBackendKeyData(r) c.rxBackendKeyData(r)
case 'R': case authenticationX:
c.rxAuthenticationX(r) c.rxAuthenticationX(r)
case 'Z': case readyForQuery:
return c, nil return c, nil
default: default:
if err = c.processContextFreeMsg(t, r); err != nil { if err = c.processContextFreeMsg(t, r); err != nil {
@ -76,20 +76,31 @@ func (c *conn) Query(sql string) (rows []map[string]string, err error) {
return return
} }
var response interface{} var fields []fieldDescription
for { for {
response, err = c.processMsg() var t byte
if err != nil { var r *messageReader
fmt.Println(err) if t, r, err = c.rxMsg(); err == nil {
switch t {
case readyForQuery:
return nil, nil
case rowDescription:
fields = c.rxRowDescription(r)
case dataRow:
c.rxDataRow(r, fields)
case commandComplete:
c.rxCommandComplete(r)
default:
if err = c.processContextFreeMsg(t, r); err != nil {
return nil, err
}
}
} else {
return nil, err return nil, err
} }
fmt.Println(response)
if _, ok := response.(*readyForQuery); ok {
break
}
} }
return nil, err panic("Unreachable")
} }
func (c *conn) sendSimpleQuery(sql string) (err error) { func (c *conn) sendSimpleQuery(sql string) (err error) {
@ -104,17 +115,6 @@ func (c *conn) sendSimpleQuery(sql string) (err error) {
return err return err
} }
func (c *conn) processMsg() (msg interface{}, err error) {
var t byte
var r *messageReader
t, r, err = c.rxMsg()
if err != nil {
return
}
return c.parseMsg(t, r)
}
// Processes messages that are not exclusive to one context such as // Processes messages that are not exclusive to one context such as
// authentication or query response. The response to these messages // authentication or query response. The response to these messages
// is the same regardless of when they occur. // is the same regardless of when they occur.
@ -131,23 +131,6 @@ func (c *conn) processContextFreeMsg(t byte, r *messageReader) (err error) {
} }
func (c *conn) parseMsg(t byte, r *messageReader) (msg interface{}, err error) {
switch t {
case 'Z':
return c.rxReadyForQuery(r), nil
case 'T':
return c.rxRowDescription(r)
case 'D':
return c.rxDataRow(r)
case 'C':
return c.rxCommandComplete(r), nil
default:
return nil, c.processContextFreeMsg(t, r)
}
panic("Unreachable")
}
func (c *conn) rxMsg() (t byte, r *messageReader, err error) { func (c *conn) rxMsg() (t byte, r *messageReader, err error) {
var bodySize int32 var bodySize int32
t, bodySize, err = c.rxMsgHeader() t, bodySize, err = c.rxMsgHeader()
@ -181,16 +164,15 @@ func (c *conn) rxMsgBody(bodySize int32) (buf []byte, err error) {
return return
} }
func (c *conn) rxAuthenticationX(r *messageReader) (msg interface{}, err error) { func (c *conn) rxAuthenticationX(r *messageReader) (err error) {
code := r.readInt32() code := r.readInt32()
switch code { switch code {
case 0: case 0: // AuthenticationOk
return &authenticationOk{}, nil
default: default:
return nil, errors.New("Received unknown authentication message") err = errors.New("Received unknown authentication message")
} }
panic("Unreachable") return
} }
func (c *conn) rxParameterStatus(r *messageReader) { func (c *conn) rxParameterStatus(r *messageReader) {
@ -204,17 +186,15 @@ func (c *conn) rxBackendKeyData(r *messageReader) {
c.secretKey = r.readInt32() c.secretKey = r.readInt32()
} }
func (c *conn) rxReadyForQuery(r *messageReader) (msg *readyForQuery) { func (c *conn) rxReadyForQuery(r *messageReader) {
msg = new(readyForQuery) c.txStatus = r.readByte()
msg.txStatus = r.readByte()
return
} }
func (c *conn) rxRowDescription(r *messageReader) (msg *rowDescription, err error) { func (c *conn) rxRowDescription(r *messageReader) (fields []fieldDescription) {
fieldCount := r.readInt16() fieldCount := r.readInt16()
c.rowDesc.fields = make([]fieldDescription, fieldCount) fields = make([]fieldDescription, fieldCount)
for i := int16(0); i < fieldCount; i++ { for i := int16(0); i < fieldCount; i++ {
f := &c.rowDesc.fields[i] f := &fields[i]
f.name = r.readString() f.name = r.readString()
f.table = r.readOid() f.table = r.readOid()
f.attributeNumber = r.readInt16() f.attributeNumber = r.readInt16()
@ -226,19 +206,14 @@ func (c *conn) rxRowDescription(r *messageReader) (msg *rowDescription, err erro
return return
} }
func (c *conn) rxDataRow(r *messageReader) (row map[string]string, err error) { func (c *conn) rxDataRow(r *messageReader, fields []fieldDescription) (row map[string]string, err error) {
fieldCount := r.readInt16() fieldCount := r.readInt16()
if fieldCount != int16(len(c.rowDesc.fields)) {
return nil, fmt.Errorf("Received DataRow with %d fields, expected %d fields", fieldCount, c.rowDesc.fields)
}
row = make(map[string]string, fieldCount) row = make(map[string]string, fieldCount)
for i := int16(0); i < fieldCount; i++ { for i := int16(0); i < fieldCount; i++ {
// TODO - handle nulls // TODO - handle nulls
size := r.readInt32() size := r.readInt32()
fmt.Println(size) row[fields[i].name] = r.readByteString(size)
row[c.rowDesc.fields[i].name] = r.readByteString(size)
} }
return return
} }

View File

@ -2,13 +2,21 @@ package pqx
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
) )
const ( const (
protocolVersionNumber = 196608 // 3.0 protocolVersionNumber = 196608 // 3.0
) )
const (
backendKeyData = 'K'
authenticationX = 'R'
readyForQuery = 'Z'
rowDescription = 'T'
dataRow = 'D'
commandComplete = 'C'
)
type startupMessage struct { type startupMessage struct {
options map[string]string options map[string]string
} }
@ -31,21 +39,6 @@ func (self *startupMessage) Bytes() (buf []byte) {
return buf return buf
} }
type authenticationOk struct {
}
func (self *authenticationOk) String() string {
return "AuthenticationOk"
}
type readyForQuery struct {
txStatus byte
}
func (self *readyForQuery) String() string {
return fmt.Sprintf("ReadyForQuery txStatus: %c", self.txStatus)
}
type oid int32 type oid int32
type fieldDescription struct { type fieldDescription struct {
@ -57,11 +50,3 @@ type fieldDescription struct {
modifier int32 modifier int32
formatCode int16 formatCode int16
} }
type rowDescription struct {
fields []fieldDescription
}
func (self *rowDescription) String() string {
return fmt.Sprintf("RowDescription field count: %d", len(self.fields))
}