Reduce nesting in message loops

scan-io
Jack Christensen 2014-06-17 16:39:44 -05:00
parent 928612917d
commit 5fe3dd7ecf
1 changed files with 133 additions and 111 deletions

244
conn.go
View File

@ -211,27 +211,28 @@ func Connect(config ConnConfig) (c *Conn, err error) {
for { for {
var t byte var t byte
var r *MessageReader var r *MessageReader
if t, r, err = c.rxMsg(); err == nil { t, r, err = c.rxMsg()
switch t { if err != nil {
case backendKeyData:
c.rxBackendKeyData(r)
case authenticationX:
if err = c.rxAuthenticationX(r); err != nil {
return nil, err
}
case readyForQuery:
c.rxReadyForQuery(r)
c.logger = c.logger.New("pid", c.Pid)
c.logger.Info("Connection established")
return c, nil
default:
if err = c.processContextFreeMsg(t, r); err != nil {
return nil, err
}
}
} else {
return nil, err return nil, err
} }
switch t {
case backendKeyData:
c.rxBackendKeyData(r)
case authenticationX:
if err = c.rxAuthenticationX(r); err != nil {
return nil, err
}
case readyForQuery:
c.rxReadyForQuery(r)
c.logger = c.logger.New("pid", c.Pid)
c.logger.Info("Connection established")
return c, nil
default:
if err = c.processContextFreeMsg(t, r); err != nil {
return nil, err
}
}
} }
} }
@ -305,31 +306,36 @@ func (c *Conn) selectFunc(sql string, onDataRow func(*DataRowReader) error, argu
return return
} }
var softErr error
for { for {
if t, r, rxErr := c.rxMsg(); rxErr == nil { var t byte
switch t { var r *MessageReader
case readyForQuery: t, r, err = c.rxMsg()
c.rxReadyForQuery(r) if err != nil {
return return err
case rowDescription: }
fields = c.rxRowDescription(r)
case dataRow: switch t {
if err == nil { case readyForQuery:
var drr *DataRowReader c.rxReadyForQuery(r)
drr, err = newDataRowReader(r, fields) return softErr
if err == nil { case rowDescription:
err = onDataRow(drr) fields = c.rxRowDescription(r)
} case dataRow:
} if softErr == nil {
case commandComplete: var drr *DataRowReader
case bindComplete: drr, softErr = newDataRowReader(r, fields)
default: if softErr == nil {
if e := c.processContextFreeMsg(t, r); e != nil && err == nil { softErr = onDataRow(drr)
err = e
} }
} }
} else { case commandComplete:
return rxErr case bindComplete:
default:
if e := c.processContextFreeMsg(t, r); e != nil && softErr == nil {
softErr = e
}
} }
} }
} }
@ -456,44 +462,50 @@ func (c *Conn) SelectValueTo(w io.Writer, sql string, arguments ...interface{})
} }
var numRowsFound int64 var numRowsFound int64
var softErr error
for { for {
if t, bodySize, rxErr := c.rxMsgHeader(); rxErr == nil { var t byte
if t == dataRow { var bodySize int32
numRowsFound++
if numRowsFound > 1 { t, bodySize, err = c.rxMsgHeader()
err = NotSingleRowError{RowCount: numRowsFound} if err != nil {
} return err
}
if err != nil { if t == dataRow {
c.rxMsgBody(bodySize) // Read and discard rest of message numRowsFound++
continue
}
err = c.rxDataRowValueTo(w, bodySize) if numRowsFound > 1 {
} else { softErr = NotSingleRowError{RowCount: numRowsFound}
var body *bytes.Buffer }
if body, rxErr = c.rxMsgBody(bodySize); rxErr == nil {
r := newMessageReader(body) if softErr != nil {
switch t { c.rxMsgBody(bodySize) // Read and discard rest of message
case readyForQuery: continue
c.rxReadyForQuery(r) }
return
case rowDescription: softErr = c.rxDataRowValueTo(w, bodySize)
case commandComplete: } else {
case bindComplete: var body *bytes.Buffer
default: body, err = c.rxMsgBody(bodySize)
if e := c.processContextFreeMsg(t, r); e != nil && err == nil { if err != nil {
err = e return err
} }
}
} else { r := newMessageReader(body)
return rxErr switch t {
case readyForQuery:
c.rxReadyForQuery(r)
return softErr
case rowDescription:
case commandComplete:
case bindComplete:
default:
if e := c.processContextFreeMsg(t, r); e != nil && softErr == nil {
softErr = e
} }
} }
} else {
return rxErr
} }
} }
} }
@ -610,32 +622,37 @@ func (c *Conn) Prepare(name, sql string) (err error) {
ps := preparedStatement{Name: name} ps := preparedStatement{Name: name}
var softErr error
for { for {
if t, r, rxErr := c.rxMsg(); rxErr == nil { var t byte
switch t { var r *MessageReader
case parseComplete: t, r, err := c.rxMsg()
case parameterDescription: if err != nil {
ps.ParameterOids = c.rxParameterDescription(r) return err
case rowDescription: }
ps.FieldDescriptions = c.rxRowDescription(r)
for i := range ps.FieldDescriptions { switch t {
oid := ps.FieldDescriptions[i].DataType case parseComplete:
if ValueTranscoders[oid] != nil && ValueTranscoders[oid].DecodeBinary != nil { case parameterDescription:
ps.FieldDescriptions[i].FormatCode = 1 ps.ParameterOids = c.rxParameterDescription(r)
} case rowDescription:
} ps.FieldDescriptions = c.rxRowDescription(r)
case noData: for i := range ps.FieldDescriptions {
case readyForQuery: oid := ps.FieldDescriptions[i].DataType
c.rxReadyForQuery(r) if ValueTranscoders[oid] != nil && ValueTranscoders[oid].DecodeBinary != nil {
c.preparedStatements[name] = &ps ps.FieldDescriptions[i].FormatCode = 1
return
default:
if e := c.processContextFreeMsg(t, r); e != nil && err == nil {
err = e
} }
} }
} else { case noData:
return rxErr case readyForQuery:
c.rxReadyForQuery(r)
c.preparedStatements[name] = &ps
return softErr
default:
if e := c.processContextFreeMsg(t, r); e != nil && softErr == nil {
softErr = e
}
} }
} }
} }
@ -844,24 +861,29 @@ func (c *Conn) Execute(sql string, arguments ...interface{}) (commandTag Command
return return
} }
var softErr error
for { for {
if t, r, rxErr := c.rxMsg(); rxErr == nil { var t byte
switch t { var r *MessageReader
case readyForQuery: t, r, err = c.rxMsg()
c.rxReadyForQuery(r) if err != nil {
return return commandTag, err
case rowDescription: }
case dataRow:
case bindComplete: switch t {
case commandComplete: case readyForQuery:
commandTag = CommandTag(r.ReadCString()) c.rxReadyForQuery(r)
default: return commandTag, softErr
if e := c.processContextFreeMsg(t, r); e != nil && err == nil { case rowDescription:
err = e case dataRow:
} case bindComplete:
case commandComplete:
commandTag = CommandTag(r.ReadCString())
default:
if e := c.processContextFreeMsg(t, r); e != nil && softErr == nil {
softErr = e
} }
} else {
return "", rxErr
} }
} }
} }