mirror of https://github.com/jackc/pgx.git
Separate receiving and processing a message
parent
0cbf3b6f9e
commit
577f9707f7
46
conn.go
46
conn.go
|
@ -38,10 +38,10 @@ func Connect(options map[string]string) (c *conn, err error) {
|
|||
c.txStartupMessage(msg)
|
||||
|
||||
var response interface{}
|
||||
response, err = c.rxMsg()
|
||||
response, err = c.processMsg()
|
||||
|
||||
for {
|
||||
response, err = c.rxMsg()
|
||||
response, err = c.processMsg()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func (c *conn) Query(sql string) (rows []map[string]string, err error) {
|
|||
|
||||
var response interface{}
|
||||
for {
|
||||
response, err = c.rxMsg()
|
||||
response, err = c.processMsg()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return nil, err
|
||||
|
@ -95,34 +95,33 @@ func (c *conn) sendSimpleQuery(sql string) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
func (c *conn) rxMsg() (msg interface{}, err error) {
|
||||
func (c *conn) processMsg() (msg interface{}, err error) {
|
||||
var t byte
|
||||
var bodySize int32
|
||||
t, bodySize, err = c.rxMsgHeader()
|
||||
var body []byte
|
||||
t, body, err = c.rxMsg()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return
|
||||
}
|
||||
|
||||
var buf []byte
|
||||
if buf, err = c.rxMsgBody(bodySize); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.parseMsg(t, body)
|
||||
}
|
||||
|
||||
func (c *conn) parseMsg(t byte, body []byte) (msg interface{}, err error) {
|
||||
switch t {
|
||||
case 'K':
|
||||
return c.rxBackendKeyData(buf), nil
|
||||
return c.rxBackendKeyData(body), nil
|
||||
case 'R':
|
||||
return c.rxAuthenticationX(buf)
|
||||
return c.rxAuthenticationX(body)
|
||||
case 'S':
|
||||
return c.rxParameterStatus(buf)
|
||||
return c.rxParameterStatus(body)
|
||||
case 'Z':
|
||||
return c.rxReadyForQuery(buf), nil
|
||||
return c.rxReadyForQuery(body), nil
|
||||
case 'T':
|
||||
return c.rxRowDescription(buf)
|
||||
return c.rxRowDescription(body)
|
||||
case 'D':
|
||||
return c.rxDataRow(buf)
|
||||
return c.rxDataRow(body)
|
||||
case 'C':
|
||||
return c.rxCommandComplete(buf), nil
|
||||
return c.rxCommandComplete(body), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("Received unknown message type: %c", t)
|
||||
}
|
||||
|
@ -130,6 +129,17 @@ func (c *conn) rxMsg() (msg interface{}, err error) {
|
|||
panic("Unreachable")
|
||||
}
|
||||
|
||||
func (c *conn) rxMsg() (t byte, body []byte, err error) {
|
||||
var bodySize int32
|
||||
t, bodySize, err = c.rxMsgHeader()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
body, err = c.rxMsgBody(bodySize)
|
||||
return
|
||||
}
|
||||
|
||||
func (c *conn) rxMsgHeader() (t byte, bodySize int32, err error) {
|
||||
buf := c.buf[:5]
|
||||
if _, err = io.ReadFull(c.conn, buf); err != nil {
|
||||
|
|
Loading…
Reference in New Issue