From 63d16ae3ac7a0131d84ff2daa3addcdac1c7d2b6 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Mon, 8 Apr 2013 21:23:25 -0500 Subject: [PATCH] Refactor --- conn.go | 95 ++++++++++++++++++++--------------------------------- messages.go | 33 +++++-------------- 2 files changed, 44 insertions(+), 84 deletions(-) diff --git a/conn.go b/conn.go index 4ade6b72..2a3e3fd1 100644 --- a/conn.go +++ b/conn.go @@ -10,11 +10,11 @@ import ( type conn struct { conn net.Conn // the underlying TCP or unix domain socket connection - rowDesc rowDescription // current query rowDescription buf []byte // work buffer to avoid constant alloc and dealloc pid int32 // backend pid secretKey int32 // key to use to send a cancel query message to the server runtimeParams map[string]string // parameters that have been reported by the server + txStatus byte } func Connect(options map[string]string) (c *conn, err error) { @@ -44,11 +44,11 @@ func Connect(options map[string]string) (c *conn, err error) { var r *messageReader if t, r, err = c.rxMsg(); err == nil { switch t { - case 'K': + case backendKeyData: c.rxBackendKeyData(r) - case 'R': + case authenticationX: c.rxAuthenticationX(r) - case 'Z': + case readyForQuery: return c, nil default: if err = c.processContextFreeMsg(t, r); err != nil { @@ -76,20 +76,31 @@ func (c *conn) Query(sql string) (rows []map[string]string, err error) { return } - var response interface{} + var fields []fieldDescription for { - response, err = c.processMsg() - if err != nil { - fmt.Println(err) + var t byte + var r *messageReader + if t, r, err = c.rxMsg(); err == nil { + switch t { + case readyForQuery: + return nil, nil + case rowDescription: + fields = c.rxRowDescription(r) + case dataRow: + c.rxDataRow(r, fields) + case commandComplete: + c.rxCommandComplete(r) + default: + if err = c.processContextFreeMsg(t, r); err != nil { + return nil, err + } + } + } else { return nil, err } - fmt.Println(response) - if _, ok := response.(*readyForQuery); ok { - break - } } - return nil, err + panic("Unreachable") } func (c *conn) sendSimpleQuery(sql string) (err error) { @@ -104,17 +115,6 @@ func (c *conn) sendSimpleQuery(sql string) (err error) { return err } -func (c *conn) processMsg() (msg interface{}, err error) { - var t byte - var r *messageReader - t, r, err = c.rxMsg() - if err != nil { - return - } - - return c.parseMsg(t, r) -} - // Processes messages that are not exclusive to one context such as // authentication or query response. The response to these messages // is the same regardless of when they occur. @@ -131,23 +131,6 @@ func (c *conn) processContextFreeMsg(t byte, r *messageReader) (err error) { } -func (c *conn) parseMsg(t byte, r *messageReader) (msg interface{}, err error) { - switch t { - case 'Z': - return c.rxReadyForQuery(r), nil - case 'T': - return c.rxRowDescription(r) - case 'D': - return c.rxDataRow(r) - case 'C': - return c.rxCommandComplete(r), nil - default: - return nil, c.processContextFreeMsg(t, r) - } - - panic("Unreachable") -} - func (c *conn) rxMsg() (t byte, r *messageReader, err error) { var bodySize int32 t, bodySize, err = c.rxMsgHeader() @@ -181,16 +164,15 @@ func (c *conn) rxMsgBody(bodySize int32) (buf []byte, err error) { return } -func (c *conn) rxAuthenticationX(r *messageReader) (msg interface{}, err error) { +func (c *conn) rxAuthenticationX(r *messageReader) (err error) { code := r.readInt32() switch code { - case 0: - return &authenticationOk{}, nil + case 0: // AuthenticationOk default: - return nil, errors.New("Received unknown authentication message") + err = errors.New("Received unknown authentication message") } - panic("Unreachable") + return } func (c *conn) rxParameterStatus(r *messageReader) { @@ -204,17 +186,15 @@ func (c *conn) rxBackendKeyData(r *messageReader) { c.secretKey = r.readInt32() } -func (c *conn) rxReadyForQuery(r *messageReader) (msg *readyForQuery) { - msg = new(readyForQuery) - msg.txStatus = r.readByte() - return +func (c *conn) rxReadyForQuery(r *messageReader) { + c.txStatus = r.readByte() } -func (c *conn) rxRowDescription(r *messageReader) (msg *rowDescription, err error) { +func (c *conn) rxRowDescription(r *messageReader) (fields []fieldDescription) { fieldCount := r.readInt16() - c.rowDesc.fields = make([]fieldDescription, fieldCount) + fields = make([]fieldDescription, fieldCount) for i := int16(0); i < fieldCount; i++ { - f := &c.rowDesc.fields[i] + f := &fields[i] f.name = r.readString() f.table = r.readOid() f.attributeNumber = r.readInt16() @@ -226,19 +206,14 @@ func (c *conn) rxRowDescription(r *messageReader) (msg *rowDescription, err erro return } -func (c *conn) rxDataRow(r *messageReader) (row map[string]string, err error) { +func (c *conn) rxDataRow(r *messageReader, fields []fieldDescription) (row map[string]string, err error) { fieldCount := r.readInt16() - if fieldCount != int16(len(c.rowDesc.fields)) { - return nil, fmt.Errorf("Received DataRow with %d fields, expected %d fields", fieldCount, c.rowDesc.fields) - } - row = make(map[string]string, fieldCount) for i := int16(0); i < fieldCount; i++ { // TODO - handle nulls size := r.readInt32() - fmt.Println(size) - row[c.rowDesc.fields[i].name] = r.readByteString(size) + row[fields[i].name] = r.readByteString(size) } return } diff --git a/messages.go b/messages.go index ea87dfef..bbc145fc 100644 --- a/messages.go +++ b/messages.go @@ -2,13 +2,21 @@ package pqx import ( "encoding/binary" - "fmt" ) const ( protocolVersionNumber = 196608 // 3.0 ) +const ( + backendKeyData = 'K' + authenticationX = 'R' + readyForQuery = 'Z' + rowDescription = 'T' + dataRow = 'D' + commandComplete = 'C' +) + type startupMessage struct { options map[string]string } @@ -31,21 +39,6 @@ func (self *startupMessage) Bytes() (buf []byte) { return buf } -type authenticationOk struct { -} - -func (self *authenticationOk) String() string { - return "AuthenticationOk" -} - -type readyForQuery struct { - txStatus byte -} - -func (self *readyForQuery) String() string { - return fmt.Sprintf("ReadyForQuery txStatus: %c", self.txStatus) -} - type oid int32 type fieldDescription struct { @@ -57,11 +50,3 @@ type fieldDescription struct { modifier int32 formatCode int16 } - -type rowDescription struct { - fields []fieldDescription -} - -func (self *rowDescription) String() string { - return fmt.Sprintf("RowDescription field count: %d", len(self.fields)) -}