Add notification response hook

refs #239
v3-ann
Jack Christensen 2017-06-04 21:18:26 -05:00
parent 6052227803
commit fb90fb2729
4 changed files with 74 additions and 0 deletions

41
conn.go
View File

@ -47,6 +47,13 @@ func init() {
}) })
} }
// NoticeHandler is a function that can handle notices received from the
// PostgreSQL server. Notices can be received at any time, usually during
// handling of a query response. The *Conn is provided so the handler is aware
// of the origin of the notice, but it must not invoke any query method. Be
// aware that this is distinct from LISTEN/NOTIFY notification.
type NoticeHandler func(*Conn, *Notice)
// DialFunc is a function that can be used to connect to a PostgreSQL server // DialFunc is a function that can be used to connect to a PostgreSQL server
type DialFunc func(network, addr string) (net.Conn, error) type DialFunc func(network, addr string) (net.Conn, error)
@ -64,6 +71,7 @@ type ConnConfig struct {
LogLevel int LogLevel int
Dial DialFunc Dial DialFunc
RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name) RuntimeParams map[string]string // Run-time parameters to set on connection as session default values (e.g. search_path or application_name)
OnNotice NoticeHandler // Callback function called when a notice response is received.
} }
func (cc *ConnConfig) networkAddress() (network, address string) { func (cc *ConnConfig) networkAddress() (network, address string) {
@ -102,6 +110,7 @@ type Conn struct {
fp *fastpath fp *fastpath
poolResetCount int poolResetCount int
preallocatedRows []Rows preallocatedRows []Rows
onNotice NoticeHandler
mux sync.Mutex mux sync.Mutex
status byte // One of connStatus* constants status byte // One of connStatus* constants
@ -235,6 +244,8 @@ func connect(config ConnConfig, connInfo *pgtype.ConnInfo) (c *Conn, err error)
} }
} }
c.onNotice = config.OnNotice
network, address := c.config.networkAddress() network, address := c.config.networkAddress()
if c.config.Dial == nil { if c.config.Dial == nil {
c.config.Dial = (&net.Dialer{KeepAlive: 5 * time.Minute}).Dial c.config.Dial = (&net.Dialer{KeepAlive: 5 * time.Minute}).Dial
@ -1079,6 +1090,8 @@ func (c *Conn) processContextFreeMsg(msg pgproto3.BackendMessage) (err error) {
switch msg := msg.(type) { switch msg := msg.(type) {
case *pgproto3.ErrorResponse: case *pgproto3.ErrorResponse:
return c.rxErrorResponse(msg) return c.rxErrorResponse(msg)
case *pgproto3.NoticeResponse:
c.rxNoticeResponse(msg)
case *pgproto3.NotificationResponse: case *pgproto3.NotificationResponse:
c.rxNotificationResponse(msg) c.rxNotificationResponse(msg)
case *pgproto3.ReadyForQuery: case *pgproto3.ReadyForQuery:
@ -1163,6 +1176,34 @@ func (c *Conn) rxErrorResponse(msg *pgproto3.ErrorResponse) PgError {
return err return err
} }
func (c *Conn) rxNoticeResponse(msg *pgproto3.NoticeResponse) {
if c.onNotice == nil {
return
}
notice := &Notice{
Severity: msg.Severity,
Code: msg.Code,
Message: msg.Message,
Detail: msg.Detail,
Hint: msg.Hint,
Position: msg.Position,
InternalPosition: msg.InternalPosition,
InternalQuery: msg.InternalQuery,
Where: msg.Where,
SchemaName: msg.SchemaName,
TableName: msg.TableName,
ColumnName: msg.ColumnName,
DataTypeName: msg.DataTypeName,
ConstraintName: msg.ConstraintName,
File: msg.File,
Line: msg.Line,
Routine: msg.Routine,
}
c.onNotice(c, notice)
}
func (c *Conn) rxBackendKeyData(msg *pgproto3.BackendKeyData) { func (c *Conn) rxBackendKeyData(msg *pgproto3.BackendKeyData) {
c.pid = msg.ProcessID c.pid = msg.ProcessID
c.secretKey = msg.SecretKey c.secretKey = msg.SecretKey

View File

@ -1898,3 +1898,30 @@ func TestIdentifierSanitize(t *testing.T) {
} }
} }
} }
func TestConnOnNotice(t *testing.T) {
t.Parallel()
var msg string
connConfig := *defaultConnConfig
connConfig.OnNotice = func(c *pgx.Conn, notice *pgx.Notice) {
msg = notice.Message
}
conn := mustConnect(t, connConfig)
defer closeConn(t, conn)
_, err := conn.Exec(`do $$
begin
raise notice 'hello, world';
end$$;`)
if err != nil {
t.Fatal(err)
}
if msg != "hello, world" {
t.Errorf("msg => %v, want %v", msg, "hello, world")
}
ensureConnValid(t, conn)
}

View File

@ -49,6 +49,10 @@ func (pe PgError) Error() string {
return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")" return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")"
} }
// Notice represents a notice response message reported by the PostgreSQL
// server. Be aware that this is distinct from LISTEN/NOTIFY notification.
type Notice PgError
// appendParse appends a PostgreSQL wire protocol parse message to buf and returns it. // appendParse appends a PostgreSQL wire protocol parse message to buf and returns it.
func appendParse(buf []byte, name string, query string, parameterOIDs []pgtype.OID) []byte { func appendParse(buf []byte, name string, query string, parameterOIDs []pgtype.OID) []byte {
buf = append(buf, 'P') buf = append(buf, 'P')

2
v3.md
View File

@ -54,6 +54,8 @@ Added batch operations
Use Go casing convention for OID, UUID, JSON(B), ACLItem, CID, TID, XID, and CIDR Use Go casing convention for OID, UUID, JSON(B), ACLItem, CID, TID, XID, and CIDR
Add OnNotice
## TODO / Possible / Investigate ## TODO / Possible / Investigate
Organize errors better Organize errors better