Rename pgx.Connection to pgx.Conn

scan-io
Jack Christensen 2014-05-17 13:03:51 -05:00
parent 4f67f16296
commit 4eb597d20b
8 changed files with 99 additions and 99 deletions

View File

@ -17,7 +17,7 @@ var float8TextVsBinaryTestDataLoaded bool
var boolTextVsBinaryTestDataLoaded bool var boolTextVsBinaryTestDataLoaded bool
var timestampTzTextVsBinaryTestDataLoaded bool var timestampTzTextVsBinaryTestDataLoaded bool
func createNarrowTestData(b *testing.B, conn *pgx.Connection) { func createNarrowTestData(b *testing.B, conn *pgx.Conn) {
if narrowTestDataLoaded { if narrowTestDataLoaded {
return return
} }
@ -158,7 +158,7 @@ func BenchmarkSelectValueToPreparedNarrow(b *testing.B) {
} }
} }
func createJoinsTestData(b *testing.B, conn *pgx.Connection) { func createJoinsTestData(b *testing.B, conn *pgx.Conn) {
if testJoinsDataLoaded { if testJoinsDataLoaded {
return return
} }
@ -259,7 +259,7 @@ func BenchmarkSelectRowsPreparedJoins(b *testing.B) {
} }
} }
func createInt2TextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createInt2TextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if int2TextVsBinaryTestDataLoaded { if int2TextVsBinaryTestDataLoaded {
return return
} }
@ -312,7 +312,7 @@ func BenchmarkInt2Binary(b *testing.B) {
} }
} }
func createInt4TextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createInt4TextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if int4TextVsBinaryTestDataLoaded { if int4TextVsBinaryTestDataLoaded {
return return
} }
@ -365,7 +365,7 @@ func BenchmarkInt4Binary(b *testing.B) {
} }
} }
func createInt8TextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createInt8TextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if int8TextVsBinaryTestDataLoaded { if int8TextVsBinaryTestDataLoaded {
return return
} }
@ -418,7 +418,7 @@ func BenchmarkInt8Binary(b *testing.B) {
} }
} }
func createFloat4TextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createFloat4TextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if float4TextVsBinaryTestDataLoaded { if float4TextVsBinaryTestDataLoaded {
return return
} }
@ -471,7 +471,7 @@ func BenchmarkFloat4Binary(b *testing.B) {
} }
} }
func createFloat8TextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createFloat8TextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if float8TextVsBinaryTestDataLoaded { if float8TextVsBinaryTestDataLoaded {
return return
} }
@ -524,7 +524,7 @@ func BenchmarkFloat8Binary(b *testing.B) {
} }
} }
func createBoolTextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createBoolTextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if boolTextVsBinaryTestDataLoaded { if boolTextVsBinaryTestDataLoaded {
return return
} }
@ -577,7 +577,7 @@ func BenchmarkBoolBinary(b *testing.B) {
} }
} }
func createTimestampTzTextVsBinaryTestData(b *testing.B, conn *pgx.Connection) { func createTimestampTzTextVsBinaryTestData(b *testing.B, conn *pgx.Conn) {
if timestampTzTextVsBinaryTestDataLoaded { if timestampTzTextVsBinaryTestDataLoaded {
return return
} }
@ -644,7 +644,7 @@ func BenchmarkConnectionPool(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
var conn *pgx.Connection var conn *pgx.Conn
if conn, err = pool.Acquire(); err != nil { if conn, err = pool.Acquire(); err != nil {
b.Fatalf("Unable to acquire connection: %v", err) b.Fatalf("Unable to acquire connection: %v", err)
} }

View File

@ -36,10 +36,10 @@ type ConnectionParameters struct {
Logger Logger Logger Logger
} }
// Connection is a PostgreSQL connection handle. It is not safe for concurrent usage. // Conn is a PostgreSQL connection handle. It is not safe for concurrent usage.
// Use ConnectionPool to manage access to multiple database connections from multiple // Use ConnectionPool to manage access to multiple database connections from multiple
// goroutines. // goroutines.
type Connection struct { type Conn struct {
conn net.Conn // the underlying TCP or unix domain socket connection conn net.Conn // the underlying TCP or unix domain socket connection
reader *bufio.Reader // buffered reader to improve read performance reader *bufio.Reader // buffered reader to improve read performance
writer *bufio.Writer // buffered writer to avoid sending tiny packets writer *bufio.Writer // buffered writer to avoid sending tiny packets
@ -101,8 +101,8 @@ var NotificationTimeoutError = errors.New("Notification Timeout")
// Connect establishes a connection with a PostgreSQL server using parameters. One // Connect establishes a connection with a PostgreSQL server using parameters. One
// of parameters.Socket or parameters.Host must be specified. parameters.User // of parameters.Socket or parameters.Host must be specified. parameters.User
// will default to the OS user name. Other parameters fields are optional. // will default to the OS user name. Other parameters fields are optional.
func Connect(parameters ConnectionParameters) (c *Connection, err error) { func Connect(parameters ConnectionParameters) (c *Conn, err error) {
c = new(Connection) c = new(Conn)
c.parameters = parameters c.parameters = parameters
if c.parameters.Logger != nil { if c.parameters.Logger != nil {
@ -211,7 +211,7 @@ func Connect(parameters ConnectionParameters) (c *Connection, err error) {
} }
} }
func (c *Connection) Close() (err error) { func (c *Conn) Close() (err error) {
err = c.txMsg('X', c.getBuf(), true) err = c.txMsg('X', c.getBuf(), true)
c.die(errors.New("Closed")) c.die(errors.New("Closed"))
c.logger.Info("Closed connection") c.logger.Info("Closed connection")
@ -255,7 +255,7 @@ func ParseURI(uri string) (ConnectionParameters, error) {
// need to simultaneously store the entire result set in memory. It also means that // need to simultaneously store the entire result set in memory. It also means that
// it is possible to process some rows and then for an error to occur. Callers // it is possible to process some rows and then for an error to occur. Callers
// should be aware of this possibility. // should be aware of this possibility.
func (c *Connection) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) { func (c *Conn) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
c.logger.Error(fmt.Sprintf("SelectFunc `%s` with %v failed: %v", sql, arguments, err)) c.logger.Error(fmt.Sprintf("SelectFunc `%s` with %v failed: %v", sql, arguments, err))
@ -307,7 +307,7 @@ func (c *Connection) SelectFunc(sql string, onDataRow func(*DataRowReader) error
// sql can be either a prepared statement name or an SQL string. arguments will be // sql can be either a prepared statement name or an SQL string. arguments will be
// sanitized before being interpolated into sql strings. arguments should be referenced // sanitized before being interpolated into sql strings. arguments should be referenced
// positionally from the sql string as $1, $2, etc. // positionally from the sql string as $1, $2, etc.
func (c *Connection) SelectRows(sql string, arguments ...interface{}) (rows []map[string]interface{}, err error) { func (c *Conn) SelectRows(sql string, arguments ...interface{}) (rows []map[string]interface{}, err error) {
rows = make([]map[string]interface{}, 0, 8) rows = make([]map[string]interface{}, 0, 8)
onDataRow := func(r *DataRowReader) error { onDataRow := func(r *DataRowReader) error {
rows = append(rows, c.rxDataRow(r)) rows = append(rows, c.rxDataRow(r))
@ -323,7 +323,7 @@ func (c *Connection) SelectRows(sql string, arguments ...interface{}) (rows []ma
// positionally from the sql string as $1, $2, etc. // positionally from the sql string as $1, $2, etc.
// //
// Returns a NotSingleRowError if exactly one row is not found // Returns a NotSingleRowError if exactly one row is not found
func (c *Connection) SelectRow(sql string, arguments ...interface{}) (row map[string]interface{}, err error) { func (c *Conn) SelectRow(sql string, arguments ...interface{}) (row map[string]interface{}, err error) {
var numRowsFound int64 var numRowsFound int64
onDataRow := func(r *DataRowReader) error { onDataRow := func(r *DataRowReader) error {
@ -345,7 +345,7 @@ func (c *Connection) SelectRow(sql string, arguments ...interface{}) (row map[st
// //
// Returns a UnexpectedColumnCountError if exactly one column is not found // Returns a UnexpectedColumnCountError if exactly one column is not found
// Returns a NotSingleRowError if exactly one row is not found // Returns a NotSingleRowError if exactly one row is not found
func (c *Connection) SelectValue(sql string, arguments ...interface{}) (v interface{}, err error) { func (c *Conn) SelectValue(sql string, arguments ...interface{}) (v interface{}, err error) {
var numRowsFound int64 var numRowsFound int64
onDataRow := func(r *DataRowReader) error { onDataRow := func(r *DataRowReader) error {
@ -376,7 +376,7 @@ func (c *Connection) SelectValue(sql string, arguments ...interface{}) (v interf
// //
// Returns a UnexpectedColumnCountError if exactly one column is not found // Returns a UnexpectedColumnCountError if exactly one column is not found
// Returns a NotSingleRowError if exactly one row is not found // Returns a NotSingleRowError if exactly one row is not found
func (c *Connection) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) { func (c *Conn) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
c.logger.Error(fmt.Sprintf("SelectValueTo `%s` with %v failed: %v", sql, arguments, err)) c.logger.Error(fmt.Sprintf("SelectValueTo `%s` with %v failed: %v", sql, arguments, err))
@ -431,7 +431,7 @@ func (c *Connection) SelectValueTo(w io.Writer, sql string, arguments ...interfa
return return
} }
func (c *Connection) rxDataRowValueTo(w io.Writer, bodySize int32) (err error) { func (c *Conn) rxDataRowValueTo(w io.Writer, bodySize int32) (err error) {
var columnCount int16 var columnCount int16
err = binary.Read(c.reader, binary.BigEndian, &columnCount) err = binary.Read(c.reader, binary.BigEndian, &columnCount)
if err != nil { if err != nil {
@ -475,7 +475,7 @@ func (c *Connection) rxDataRowValueTo(w io.Writer, bodySize int32) (err error) {
// the sql string as $1, $2, etc. // the sql string as $1, $2, etc.
// //
// Returns a UnexpectedColumnCountError if exactly one column is not found // Returns a UnexpectedColumnCountError if exactly one column is not found
func (c *Connection) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) { func (c *Conn) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) {
values = make([]interface{}, 0, 8) values = make([]interface{}, 0, 8)
onDataRow := func(r *DataRowReader) error { onDataRow := func(r *DataRowReader) error {
if len(r.fields) != 1 { if len(r.fields) != 1 {
@ -491,7 +491,7 @@ func (c *Connection) SelectValues(sql string, arguments ...interface{}) (values
// Prepare creates a prepared statement with name and sql. sql can contain placeholders // Prepare creates a prepared statement with name and sql. sql can contain placeholders
// for bound parameters. These placeholders are referenced positional as $1, $2, etc. // for bound parameters. These placeholders are referenced positional as $1, $2, etc.
func (c *Connection) Prepare(name, sql string) (err error) { func (c *Conn) Prepare(name, sql string) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
c.logger.Error(fmt.Sprintf("Prepare `%s` as `%s` failed: %v", name, sql, err)) c.logger.Error(fmt.Sprintf("Prepare `%s` as `%s` failed: %v", name, sql, err))
@ -565,21 +565,21 @@ func (c *Connection) Prepare(name, sql string) (err error) {
} }
// Deallocate released a prepared statement // Deallocate released a prepared statement
func (c *Connection) Deallocate(name string) (err error) { func (c *Conn) Deallocate(name string) (err error) {
delete(c.preparedStatements, name) delete(c.preparedStatements, name)
_, err = c.Execute("deallocate " + c.QuoteIdentifier(name)) _, err = c.Execute("deallocate " + c.QuoteIdentifier(name))
return return
} }
// Listen establishes a PostgreSQL listen/notify to channel // Listen establishes a PostgreSQL listen/notify to channel
func (c *Connection) Listen(channel string) (err error) { func (c *Conn) Listen(channel string) (err error) {
_, err = c.Execute("listen " + channel) _, err = c.Execute("listen " + channel)
return return
} }
// WaitForNotification waits for a PostgreSQL notification for up to timeout. // WaitForNotification waits for a PostgreSQL notification for up to timeout.
// If the timeout occurs it returns pgx.NotificationTimeoutError // If the timeout occurs it returns pgx.NotificationTimeoutError
func (c *Connection) WaitForNotification(timeout time.Duration) (*Notification, error) { func (c *Conn) WaitForNotification(timeout time.Duration) (*Notification, error) {
if len(c.notifications) > 0 { if len(c.notifications) > 0 {
notification := c.notifications[0] notification := c.notifications[0]
c.notifications = c.notifications[1:] c.notifications = c.notifications[1:]
@ -636,15 +636,15 @@ func (c *Connection) WaitForNotification(timeout time.Duration) (*Notification,
} }
} }
func (c *Connection) IsAlive() bool { func (c *Conn) IsAlive() bool {
return c.alive return c.alive
} }
func (c *Connection) CauseOfDeath() error { func (c *Conn) CauseOfDeath() error {
return c.causeOfDeath return c.causeOfDeath
} }
func (c *Connection) sendQuery(sql string, arguments ...interface{}) (err error) { func (c *Conn) sendQuery(sql string, arguments ...interface{}) (err error) {
if ps, present := c.preparedStatements[sql]; present { if ps, present := c.preparedStatements[sql]; present {
return c.sendPreparedQuery(ps, arguments...) return c.sendPreparedQuery(ps, arguments...)
} else { } else {
@ -652,7 +652,7 @@ func (c *Connection) sendQuery(sql string, arguments ...interface{}) (err error)
} }
} }
func (c *Connection) sendSimpleQuery(sql string, arguments ...interface{}) (err error) { func (c *Conn) sendSimpleQuery(sql string, arguments ...interface{}) (err error) {
if len(arguments) > 0 { if len(arguments) > 0 {
sql, err = c.SanitizeSql(sql, arguments...) sql, err = c.SanitizeSql(sql, arguments...)
if err != nil { if err != nil {
@ -674,7 +674,7 @@ func (c *Connection) sendSimpleQuery(sql string, arguments ...interface{}) (err
return c.txMsg('Q', buf, true) return c.txMsg('Q', buf, true)
} }
func (c *Connection) sendPreparedQuery(ps *preparedStatement, arguments ...interface{}) (err error) { func (c *Conn) sendPreparedQuery(ps *preparedStatement, arguments ...interface{}) (err error) {
if len(ps.ParameterOids) != len(arguments) { if len(ps.ParameterOids) != len(arguments) {
return fmt.Errorf("Prepared statement \"%v\" requires %d parameters, but %d were provided", ps.Name, len(ps.ParameterOids), len(arguments)) return fmt.Errorf("Prepared statement \"%v\" requires %d parameters, but %d were provided", ps.Name, len(ps.ParameterOids), len(arguments))
} }
@ -752,7 +752,7 @@ func (c *Connection) sendPreparedQuery(ps *preparedStatement, arguments ...inter
// Execute executes sql. sql can be either a prepared statement name or an SQL string. // Execute executes sql. sql can be either a prepared statement name or an SQL string.
// arguments will be sanitized before being interpolated into sql strings. arguments // arguments will be sanitized before being interpolated into sql strings. arguments
// should be referenced positionally from the sql string as $1, $2, etc. // should be referenced positionally from the sql string as $1, $2, etc.
func (c *Connection) Execute(sql string, arguments ...interface{}) (commandTag string, err error) { func (c *Conn) Execute(sql string, arguments ...interface{}) (commandTag string, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
c.logger.Error(fmt.Sprintf("Execute `%s` with %v failed: %v", sql, arguments, err)) c.logger.Error(fmt.Sprintf("Execute `%s` with %v failed: %v", sql, arguments, err))
@ -791,7 +791,7 @@ func (c *Connection) Execute(sql string, arguments ...interface{}) (commandTag s
// from err as an explicit rollback is not an error. Transaction will use the default // from err as an explicit rollback is not an error. Transaction will use the default
// isolation level for the current connection. To use a specific isolation level see // isolation level for the current connection. To use a specific isolation level see
// TransactionIso // TransactionIso
func (c *Connection) Transaction(f func() bool) (committed bool, err error) { func (c *Conn) Transaction(f func() bool) (committed bool, err error) {
return c.transaction("", f) return c.transaction("", f)
} }
@ -803,11 +803,11 @@ func (c *Connection) Transaction(f func() bool) (committed bool, err error) {
// repeatable read // repeatable read
// read committed // read committed
// read uncommitted // read uncommitted
func (c *Connection) TransactionIso(isoLevel string, f func() bool) (committed bool, err error) { func (c *Conn) TransactionIso(isoLevel string, f func() bool) (committed bool, err error) {
return c.transaction(isoLevel, f) return c.transaction(isoLevel, f)
} }
func (c *Connection) transaction(isoLevel string, f func() bool) (committed bool, err error) { func (c *Conn) transaction(isoLevel string, f func() bool) (committed bool, err error) {
var beginSql string var beginSql string
if isoLevel == "" { if isoLevel == "" {
beginSql = "begin" beginSql = "begin"
@ -837,7 +837,7 @@ func (c *Connection) transaction(isoLevel string, f func() bool) (committed bool
// Processes messages that are not exclusive to one context such as // Processes messages that are not exclusive to one context such as
// authentication or query response. The response to these messages // authentication or query response. The response to these messages
// is the same regardless of when they occur. // is the same regardless of when they occur.
func (c *Connection) processContextFreeMsg(t byte, r *MessageReader) (err error) { func (c *Conn) processContextFreeMsg(t byte, r *MessageReader) (err error) {
switch t { switch t {
case 'S': case 'S':
c.rxParameterStatus(r) c.rxParameterStatus(r)
@ -853,7 +853,7 @@ func (c *Connection) processContextFreeMsg(t byte, r *MessageReader) (err error)
} }
} }
func (c *Connection) rxMsg() (t byte, r *MessageReader, err error) { func (c *Conn) rxMsg() (t byte, r *MessageReader, err error) {
var bodySize int32 var bodySize int32
t, bodySize, err = c.rxMsgHeader() t, bodySize, err = c.rxMsgHeader()
if err != nil { if err != nil {
@ -869,7 +869,7 @@ func (c *Connection) rxMsg() (t byte, r *MessageReader, err error) {
return return
} }
func (c *Connection) rxMsgHeader() (t byte, bodySize int32, err error) { func (c *Conn) rxMsgHeader() (t byte, bodySize int32, err error) {
if !c.alive { if !c.alive {
err = errors.New("Connection is dead") err = errors.New("Connection is dead")
return return
@ -890,7 +890,7 @@ func (c *Connection) rxMsgHeader() (t byte, bodySize int32, err error) {
return return
} }
func (c *Connection) rxMsgBody(bodySize int32) (*bytes.Buffer, error) { func (c *Conn) rxMsgBody(bodySize int32) (*bytes.Buffer, error) {
if !c.alive { if !c.alive {
return nil, errors.New("Connection is dead") return nil, errors.New("Connection is dead")
} }
@ -905,7 +905,7 @@ func (c *Connection) rxMsgBody(bodySize int32) (*bytes.Buffer, error) {
return buf, nil return buf, nil
} }
func (c *Connection) rxAuthenticationX(r *MessageReader) (err error) { func (c *Conn) rxAuthenticationX(r *MessageReader) (err error) {
code := r.ReadInt32() code := r.ReadInt32()
switch code { switch code {
case 0: // AuthenticationOk case 0: // AuthenticationOk
@ -928,13 +928,13 @@ func hexMD5(s string) string {
return hex.EncodeToString(hash.Sum(nil)) return hex.EncodeToString(hash.Sum(nil))
} }
func (c *Connection) rxParameterStatus(r *MessageReader) { func (c *Conn) rxParameterStatus(r *MessageReader) {
key := r.ReadCString() key := r.ReadCString()
value := r.ReadCString() value := r.ReadCString()
c.RuntimeParams[key] = value c.RuntimeParams[key] = value
} }
func (c *Connection) rxErrorResponse(r *MessageReader) (err PgError) { func (c *Conn) rxErrorResponse(r *MessageReader) (err PgError) {
for { for {
switch r.ReadByte() { switch r.ReadByte() {
case 'S': case 'S':
@ -951,16 +951,16 @@ func (c *Connection) rxErrorResponse(r *MessageReader) (err PgError) {
} }
} }
func (c *Connection) rxBackendKeyData(r *MessageReader) { func (c *Conn) rxBackendKeyData(r *MessageReader) {
c.Pid = r.ReadInt32() c.Pid = r.ReadInt32()
c.SecretKey = r.ReadInt32() c.SecretKey = r.ReadInt32()
} }
func (c *Connection) rxReadyForQuery(r *MessageReader) { func (c *Conn) rxReadyForQuery(r *MessageReader) {
c.TxStatus = r.ReadByte() c.TxStatus = r.ReadByte()
} }
func (c *Connection) rxRowDescription(r *MessageReader) (fields []FieldDescription) { func (c *Conn) rxRowDescription(r *MessageReader) (fields []FieldDescription) {
fieldCount := r.ReadInt16() fieldCount := r.ReadInt16()
fields = make([]FieldDescription, fieldCount) fields = make([]FieldDescription, fieldCount)
for i := int16(0); i < fieldCount; i++ { for i := int16(0); i < fieldCount; i++ {
@ -976,7 +976,7 @@ func (c *Connection) rxRowDescription(r *MessageReader) (fields []FieldDescripti
return return
} }
func (c *Connection) rxParameterDescription(r *MessageReader) (parameters []Oid) { func (c *Conn) rxParameterDescription(r *MessageReader) (parameters []Oid) {
parameterCount := r.ReadInt16() parameterCount := r.ReadInt16()
parameters = make([]Oid, 0, parameterCount) parameters = make([]Oid, 0, parameterCount)
for i := int16(0); i < parameterCount; i++ { for i := int16(0); i < parameterCount; i++ {
@ -985,7 +985,7 @@ func (c *Connection) rxParameterDescription(r *MessageReader) (parameters []Oid)
return return
} }
func (c *Connection) rxDataRow(r *DataRowReader) (row map[string]interface{}) { func (c *Conn) rxDataRow(r *DataRowReader) (row map[string]interface{}) {
fieldCount := len(r.fields) fieldCount := len(r.fields)
row = make(map[string]interface{}, fieldCount) row = make(map[string]interface{}, fieldCount)
@ -995,11 +995,11 @@ func (c *Connection) rxDataRow(r *DataRowReader) (row map[string]interface{}) {
return return
} }
func (c *Connection) rxCommandComplete(r *MessageReader) string { func (c *Conn) rxCommandComplete(r *MessageReader) string {
return r.ReadCString() return r.ReadCString()
} }
func (c *Connection) rxNotificationResponse(r *MessageReader) (err error) { func (c *Conn) rxNotificationResponse(r *MessageReader) (err error) {
n := new(Notification) n := new(Notification)
n.Pid = r.ReadInt32() n.Pid = r.ReadInt32()
n.Channel = r.ReadCString() n.Channel = r.ReadCString()
@ -1008,7 +1008,7 @@ func (c *Connection) rxNotificationResponse(r *MessageReader) (err error) {
return return
} }
func (c *Connection) startTLS() (err error) { func (c *Conn) startTLS() (err error) {
err = binary.Write(c.conn, binary.BigEndian, []int32{8, 80877103}) err = binary.Write(c.conn, binary.BigEndian, []int32{8, 80877103})
if err != nil { if err != nil {
return return
@ -1029,7 +1029,7 @@ func (c *Connection) startTLS() (err error) {
return nil return nil
} }
func (c *Connection) txStartupMessage(msg *startupMessage) (err error) { func (c *Conn) txStartupMessage(msg *startupMessage) (err error) {
_, err = c.writer.Write(msg.Bytes()) _, err = c.writer.Write(msg.Bytes())
if err != nil { if err != nil {
return return
@ -1038,7 +1038,7 @@ func (c *Connection) txStartupMessage(msg *startupMessage) (err error) {
return return
} }
func (c *Connection) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err error) { func (c *Conn) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err error) {
if !c.alive { if !c.alive {
return errors.New("Connection is dead") return errors.New("Connection is dead")
} }
@ -1071,7 +1071,7 @@ func (c *Connection) txMsg(identifier byte, buf *bytes.Buffer, flush bool) (err
return return
} }
func (c *Connection) txPasswordMessage(password string) (err error) { func (c *Conn) txPasswordMessage(password string) (err error) {
buf := c.getBuf() buf := c.getBuf()
_, err = buf.WriteString(password) _, err = buf.WriteString(password)
@ -1089,7 +1089,7 @@ func (c *Connection) txPasswordMessage(password string) (err error) {
// Gets the shared connection buffer. Since bytes.Buffer never releases memory from // Gets the shared connection buffer. Since bytes.Buffer never releases memory from
// its internal byte array, check on the size and create a new bytes.Buffer so the // its internal byte array, check on the size and create a new bytes.Buffer so the
// old one can get GC'ed // old one can get GC'ed
func (c *Connection) getBuf() *bytes.Buffer { func (c *Conn) getBuf() *bytes.Buffer {
c.buf.Reset() c.buf.Reset()
if cap(c.buf.Bytes()) > c.bufSize { if cap(c.buf.Bytes()) > c.bufSize {
c.logger.Debug(fmt.Sprintf("c.buf (%d) is larger than c.bufSize (%d) -- resetting", cap(c.buf.Bytes()), c.bufSize)) c.logger.Debug(fmt.Sprintf("c.buf (%d) is larger than c.bufSize (%d) -- resetting", cap(c.buf.Bytes()), c.bufSize))
@ -1098,7 +1098,7 @@ func (c *Connection) getBuf() *bytes.Buffer {
return c.buf return c.buf
} }
func (c *Connection) die(err error) { func (c *Conn) die(err error) {
c.alive = false c.alive = false
c.causeOfDeath = err c.causeOfDeath = err
} }

View File

@ -7,17 +7,17 @@ import (
type ConnectionPoolOptions struct { type ConnectionPoolOptions struct {
MaxConnections int // max simultaneous connections to use MaxConnections int // max simultaneous connections to use
AfterConnect func(*Connection) error AfterConnect func(*Conn) error
Logger Logger Logger Logger
} }
type ConnectionPool struct { type ConnectionPool struct {
allConnections []*Connection allConnections []*Conn
availableConnections []*Connection availableConnections []*Conn
cond *sync.Cond cond *sync.Cond
parameters ConnectionParameters // parameters used when establishing connection parameters ConnectionParameters // parameters used when establishing connection
maxConnections int maxConnections int
afterConnect func(*Connection) error afterConnect func(*Conn) error
logger Logger logger Logger
} }
@ -40,12 +40,12 @@ func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOp
p.logger = nullLogger("null") p.logger = nullLogger("null")
} }
p.allConnections = make([]*Connection, 0, p.maxConnections) p.allConnections = make([]*Conn, 0, p.maxConnections)
p.availableConnections = make([]*Connection, 0, p.maxConnections) p.availableConnections = make([]*Conn, 0, p.maxConnections)
p.cond = sync.NewCond(new(sync.Mutex)) p.cond = sync.NewCond(new(sync.Mutex))
// Initially establish one connection // Initially establish one connection
var c *Connection var c *Conn
c, err = p.createConnection() c, err = p.createConnection()
if err != nil { if err != nil {
return return
@ -57,7 +57,7 @@ func NewConnectionPool(parameters ConnectionParameters, options ConnectionPoolOp
} }
// Acquire takes exclusive use of a connection until it is released. // Acquire takes exclusive use of a connection until it is released.
func (p *ConnectionPool) Acquire() (c *Connection, err error) { func (p *ConnectionPool) Acquire() (c *Conn, err error) {
p.cond.L.Lock() p.cond.L.Lock()
defer p.cond.L.Unlock() defer p.cond.L.Unlock()
@ -93,7 +93,7 @@ func (p *ConnectionPool) Acquire() (c *Connection, err error) {
} }
// Release gives up use of a connection. // Release gives up use of a connection.
func (p *ConnectionPool) Release(conn *Connection) { func (p *ConnectionPool) Release(conn *Conn) {
if conn.TxStatus != 'I' { if conn.TxStatus != 'I' {
conn.Execute("rollback") conn.Execute("rollback")
} }
@ -142,7 +142,7 @@ func (p *ConnectionPool) CurrentConnectionCount() int {
return p.maxConnections return p.maxConnections
} }
func (p *ConnectionPool) createConnection() (c *Connection, err error) { func (p *ConnectionPool) createConnection() (c *Conn, err error) {
c, err = Connect(p.parameters) c, err = Connect(p.parameters)
if err != nil { if err != nil {
return return
@ -158,7 +158,7 @@ func (p *ConnectionPool) createConnection() (c *Connection, err error) {
// SelectFunc acquires a connection, delegates the call to that connection, and releases the connection // SelectFunc acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) { func (p *ConnectionPool) SelectFunc(sql string, onDataRow func(*DataRowReader) error, arguments ...interface{}) (err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -169,7 +169,7 @@ func (p *ConnectionPool) SelectFunc(sql string, onDataRow func(*DataRowReader) e
// SelectRows acquires a connection, delegates the call to that connection, and releases the connection // SelectRows acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) SelectRows(sql string, arguments ...interface{}) (rows []map[string]interface{}, err error) { func (p *ConnectionPool) SelectRows(sql string, arguments ...interface{}) (rows []map[string]interface{}, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -180,7 +180,7 @@ func (p *ConnectionPool) SelectRows(sql string, arguments ...interface{}) (rows
// SelectRow acquires a connection, delegates the call to that connection, and releases the connection // SelectRow acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) SelectRow(sql string, arguments ...interface{}) (row map[string]interface{}, err error) { func (p *ConnectionPool) SelectRow(sql string, arguments ...interface{}) (row map[string]interface{}, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -191,7 +191,7 @@ func (p *ConnectionPool) SelectRow(sql string, arguments ...interface{}) (row ma
// SelectValue acquires a connection, delegates the call to that connection, and releases the connection // SelectValue acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v interface{}, err error) { func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v interface{}, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -202,7 +202,7 @@ func (p *ConnectionPool) SelectValue(sql string, arguments ...interface{}) (v in
// SelectValueTo acquires a connection, delegates the call to that connection, and releases the connection // SelectValueTo acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) { func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...interface{}) (err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -213,7 +213,7 @@ func (p *ConnectionPool) SelectValueTo(w io.Writer, sql string, arguments ...int
// SelectValues acquires a connection, delegates the call to that connection, and releases the connection // SelectValues acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) { func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (values []interface{}, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -224,7 +224,7 @@ func (p *ConnectionPool) SelectValues(sql string, arguments ...interface{}) (val
// Execute acquires a connection, delegates the call to that connection, and releases the connection // Execute acquires a connection, delegates the call to that connection, and releases the connection
func (p *ConnectionPool) Execute(sql string, arguments ...interface{}) (commandTag string, err error) { func (p *ConnectionPool) Execute(sql string, arguments ...interface{}) (commandTag string, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -235,9 +235,9 @@ func (p *ConnectionPool) Execute(sql string, arguments ...interface{}) (commandT
// Transaction acquires a connection, delegates the call to that connection, // Transaction acquires a connection, delegates the call to that connection,
// and releases the connection. The call signature differs slightly from the // and releases the connection. The call signature differs slightly from the
// underlying Transaction in that the callback function accepts a *Connection // underlying Transaction in that the callback function accepts a *Conn
func (p *ConnectionPool) Transaction(f func(conn *Connection) bool) (committed bool, err error) { func (p *ConnectionPool) Transaction(f func(conn *Conn) bool) (committed bool, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }
@ -250,9 +250,9 @@ func (p *ConnectionPool) Transaction(f func(conn *Connection) bool) (committed b
// TransactionIso acquires a connection, delegates the call to that connection, // TransactionIso acquires a connection, delegates the call to that connection,
// and releases the connection. The call signature differs slightly from the // and releases the connection. The call signature differs slightly from the
// underlying TransactionIso in that the callback function accepts a *Connection // underlying TransactionIso in that the callback function accepts a *Conn
func (p *ConnectionPool) TransactionIso(isoLevel string, f func(conn *Connection) bool) (committed bool, err error) { func (p *ConnectionPool) TransactionIso(isoLevel string, f func(conn *Conn) bool) (committed bool, err error) {
var c *Connection var c *Conn
if c, err = p.Acquire(); err != nil { if c, err = p.Acquire(); err != nil {
return return
} }

View File

@ -19,7 +19,7 @@ func createConnectionPool(t *testing.T, maxConnections int) *pgx.ConnectionPool
func TestNewConnectionPool(t *testing.T) { func TestNewConnectionPool(t *testing.T) {
var numCallbacks int var numCallbacks int
afterConnect := func(c *pgx.Connection) error { afterConnect := func(c *pgx.Conn) error {
numCallbacks++ numCallbacks++
return nil return nil
} }
@ -39,7 +39,7 @@ func TestNewConnectionPool(t *testing.T) {
// Pool creation returns an error if any AfterConnect callback does // Pool creation returns an error if any AfterConnect callback does
errAfterConnect := errors.New("Some error") errAfterConnect := errors.New("Some error")
afterConnect = func(c *pgx.Connection) error { afterConnect = func(c *pgx.Conn) error {
return errAfterConnect return errAfterConnect
} }
@ -57,8 +57,8 @@ func TestPoolAcquireAndReleaseCycle(t *testing.T) {
pool := createConnectionPool(t, maxConnections) pool := createConnectionPool(t, maxConnections)
defer pool.Close() defer pool.Close()
acquireAll := func() (connections []*pgx.Connection) { acquireAll := func() (connections []*pgx.Conn) {
connections = make([]*pgx.Connection, maxConnections) connections = make([]*pgx.Conn, maxConnections)
for i := 0; i < maxConnections; i++ { for i := 0; i < maxConnections; i++ {
var err error var err error
if connections[i], err = pool.Acquire(); err != nil { if connections[i], err = pool.Acquire(); err != nil {
@ -206,7 +206,7 @@ func TestPoolReleaseDiscardsDeadConnections(t *testing.T) {
pool := createConnectionPool(t, maxConnections) pool := createConnectionPool(t, maxConnections)
defer pool.Close() defer pool.Close()
var c1, c2 *pgx.Connection var c1, c2 *pgx.Conn
var err error var err error
var stat pgx.ConnectionPoolStat var stat pgx.ConnectionPoolStat
@ -265,7 +265,7 @@ func TestPoolTransaction(t *testing.T) {
pool := createConnectionPool(t, 1) pool := createConnectionPool(t, 1)
defer pool.Close() defer pool.Close()
committed, err := pool.Transaction(func(conn *pgx.Connection) bool { committed, err := pool.Transaction(func(conn *pgx.Conn) bool {
mustExecute(t, conn, "create temporary table foo(id serial primary key)") mustExecute(t, conn, "create temporary table foo(id serial primary key)")
return true return true
}) })
@ -276,7 +276,7 @@ func TestPoolTransaction(t *testing.T) {
t.Fatal("Transaction was not committed when it should have been") t.Fatal("Transaction was not committed when it should have been")
} }
committed, err = pool.Transaction(func(conn *pgx.Connection) bool { committed, err = pool.Transaction(func(conn *pgx.Conn) bool {
n := mustSelectValue(t, conn, "select count(*) from foo") n := mustSelectValue(t, conn, "select count(*) from foo")
if n.(int64) != 0 { if n.(int64) != 0 {
t.Fatalf("Did not receive expected value: %v", n) t.Fatalf("Did not receive expected value: %v", n)
@ -298,7 +298,7 @@ func TestPoolTransaction(t *testing.T) {
t.Fatal("Transaction was committed when it shouldn't have been") t.Fatal("Transaction was committed when it shouldn't have been")
} }
committed, err = pool.Transaction(func(conn *pgx.Connection) bool { committed, err = pool.Transaction(func(conn *pgx.Conn) bool {
n := mustSelectValue(t, conn, "select count(*) from foo") n := mustSelectValue(t, conn, "select count(*) from foo")
if n.(int64) != 0 { if n.(int64) != 0 {
t.Fatalf("Did not receive expected value: %v", n) t.Fatalf("Did not receive expected value: %v", n)
@ -318,7 +318,7 @@ func TestPoolTransactionIso(t *testing.T) {
pool := createConnectionPool(t, 1) pool := createConnectionPool(t, 1)
defer pool.Close() defer pool.Close()
committed, err := pool.TransactionIso("serializable", func(conn *pgx.Connection) bool { committed, err := pool.TransactionIso("serializable", func(conn *pgx.Conn) bool {
if level := mustSelectValue(t, conn, "select current_setting('transaction_isolation')"); level != "serializable" { if level := mustSelectValue(t, conn, "select current_setting('transaction_isolation')"); level != "serializable" {
t.Errorf("Expected to be in isolation level %v but was %v", "serializable", level) t.Errorf("Expected to be in isolation level %v but was %v", "serializable", level)
} }

View File

@ -11,7 +11,7 @@ import (
var pool *pgx.ConnectionPool var pool *pgx.ConnectionPool
// afterConnect creates the prepared statements that this application uses // afterConnect creates the prepared statements that this application uses
func afterConnect(conn *pgx.Connection) (err error) { func afterConnect(conn *pgx.Conn) (err error) {
err = conn.Prepare("getUrl", ` err = conn.Prepare("getUrl", `
select url from shortened_urls where id=$1 select url from shortened_urls where id=$1
`) `)

View File

@ -6,9 +6,9 @@ import (
"testing" "testing"
) )
var sharedConnection *pgx.Connection var sharedConnection *pgx.Conn
func getSharedConnection(t testing.TB) (c *pgx.Connection) { func getSharedConnection(t testing.TB) (c *pgx.Conn) {
if sharedConnection == nil || !sharedConnection.IsAlive() { if sharedConnection == nil || !sharedConnection.IsAlive() {
var err error var err error
sharedConnection, err = pgx.Connect(*defaultConnectionParameters) sharedConnection, err = pgx.Connect(*defaultConnectionParameters)
@ -20,13 +20,13 @@ func getSharedConnection(t testing.TB) (c *pgx.Connection) {
return sharedConnection return sharedConnection
} }
func mustPrepare(t testing.TB, conn *pgx.Connection, name, sql string) { func mustPrepare(t testing.TB, conn *pgx.Conn, name, sql string) {
if err := conn.Prepare(name, sql); err != nil { if err := conn.Prepare(name, sql); err != nil {
t.Fatalf("Could not prepare %v: %v", name, err) t.Fatalf("Could not prepare %v: %v", name, err)
} }
} }
func mustExecute(t testing.TB, conn *pgx.Connection, sql string, arguments ...interface{}) (commandTag string) { func mustExecute(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag string) {
var err error var err error
if commandTag, err = conn.Execute(sql, arguments...); err != nil { if commandTag, err = conn.Execute(sql, arguments...); err != nil {
t.Fatalf("Execute unexpectedly failed with %v: %v", sql, err) t.Fatalf("Execute unexpectedly failed with %v: %v", sql, err)
@ -34,7 +34,7 @@ func mustExecute(t testing.TB, conn *pgx.Connection, sql string, arguments ...in
return return
} }
func mustSelectRow(t testing.TB, conn *pgx.Connection, sql string, arguments ...interface{}) (row map[string]interface{}) { func mustSelectRow(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (row map[string]interface{}) {
var err error var err error
if row, err = conn.SelectRow(sql, arguments...); err != nil { if row, err = conn.SelectRow(sql, arguments...); err != nil {
t.Fatalf("SelectRow unexpectedly failed with %v: %v", sql, err) t.Fatalf("SelectRow unexpectedly failed with %v: %v", sql, err)
@ -42,7 +42,7 @@ func mustSelectRow(t testing.TB, conn *pgx.Connection, sql string, arguments ...
return return
} }
func mustSelectRows(t testing.TB, conn *pgx.Connection, sql string, arguments ...interface{}) (rows []map[string]interface{}) { func mustSelectRows(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (rows []map[string]interface{}) {
var err error var err error
if rows, err = conn.SelectRows(sql, arguments...); err != nil { if rows, err = conn.SelectRows(sql, arguments...); err != nil {
t.Fatalf("SelectRows unexpected failed with %v: %v", sql, err) t.Fatalf("SelectRows unexpected failed with %v: %v", sql, err)
@ -50,7 +50,7 @@ func mustSelectRows(t testing.TB, conn *pgx.Connection, sql string, arguments ..
return return
} }
func mustSelectValue(t testing.TB, conn *pgx.Connection, sql string, arguments ...interface{}) (value interface{}) { func mustSelectValue(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (value interface{}) {
var err error var err error
if value, err = conn.SelectValue(sql, arguments...); err != nil { if value, err = conn.SelectValue(sql, arguments...); err != nil {
t.Fatalf("SelectValue unexpectedly failed with %v: %v", sql, err) t.Fatalf("SelectValue unexpectedly failed with %v: %v", sql, err)
@ -58,7 +58,7 @@ func mustSelectValue(t testing.TB, conn *pgx.Connection, sql string, arguments .
return return
} }
func mustSelectValueTo(t testing.TB, conn *pgx.Connection, w io.Writer, sql string, arguments ...interface{}) { func mustSelectValueTo(t testing.TB, conn *pgx.Conn, w io.Writer, sql string, arguments ...interface{}) {
if err := conn.SelectValueTo(w, sql, arguments...); err != nil { if err := conn.SelectValueTo(w, sql, arguments...); err != nil {
t.Fatalf("SelectValueTo unexpectedly failed with %v: %v", sql, err) t.Fatalf("SelectValueTo unexpectedly failed with %v: %v", sql, err)
} }

View File

@ -13,14 +13,14 @@ var literalPattern *regexp.Regexp = regexp.MustCompile(`\$\d+`)
// QuoteString escapes and quotes a string making it safe for interpolation // QuoteString escapes and quotes a string making it safe for interpolation
// into an SQL string. // into an SQL string.
func (c *Connection) QuoteString(input string) (output string) { func (c *Conn) QuoteString(input string) (output string) {
output = "'" + strings.Replace(input, "'", "''", -1) + "'" output = "'" + strings.Replace(input, "'", "''", -1) + "'"
return return
} }
// QuoteIdentifier escapes and quotes an identifier making it safe for // QuoteIdentifier escapes and quotes an identifier making it safe for
// interpolation into an SQL string // interpolation into an SQL string
func (c *Connection) QuoteIdentifier(input string) (output string) { func (c *Conn) QuoteIdentifier(input string) (output string) {
output = `"` + strings.Replace(input, `"`, `""`, -1) + `"` output = `"` + strings.Replace(input, `"`, `""`, -1) + `"`
return return
} }
@ -28,7 +28,7 @@ func (c *Connection) QuoteIdentifier(input string) (output string) {
// SanitizeSql substitutely args positionaly into sql. Placeholder values are // SanitizeSql substitutely args positionaly into sql. Placeholder values are
// $ prefixed integers like $1, $2, $3, etc. args are sanitized and quoted as // $ prefixed integers like $1, $2, $3, etc. args are sanitized and quoted as
// appropriate. // appropriate.
func (c *Connection) SanitizeSql(sql string, args ...interface{}) (output string, err error) { func (c *Conn) SanitizeSql(sql string, args ...interface{}) (output string, err error) {
replacer := func(match string) (replacement string) { replacer := func(match string) (replacement string) {
n, _ := strconv.ParseInt(match[1:], 10, 0) n, _ := strconv.ParseInt(match[1:], 10, 0)
switch arg := args[n-1].(type) { switch arg := args[n-1].(type) {