mirror of https://github.com/jackc/pgx.git
Move notice handling to pgconn
parent
89c3d8af5d
commit
19a8df16b6
41
conn.go
41
conn.go
|
@ -40,19 +40,11 @@ func init() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// NoticeHandler is a function that can handle notices received from the
|
|
||||||
// PostgreSQL server. Notices can be received at any time, usually during
|
|
||||||
// handling of a query response. The *Conn is provided so the handler is aware
|
|
||||||
// of the origin of the notice, but it must not invoke any query method. Be
|
|
||||||
// aware that this is distinct from LISTEN/NOTIFY notification.
|
|
||||||
type NoticeHandler func(*Conn, *Notice)
|
|
||||||
|
|
||||||
// ConnConfig contains all the options used to establish a connection.
|
// ConnConfig contains all the options used to establish a connection.
|
||||||
type ConnConfig struct {
|
type ConnConfig struct {
|
||||||
pgconn.Config
|
pgconn.Config
|
||||||
Logger Logger
|
Logger Logger
|
||||||
LogLevel int
|
LogLevel int
|
||||||
OnNotice NoticeHandler // Callback function called when a notice response is received.
|
|
||||||
CustomConnInfo func(*Conn) (*pgtype.ConnInfo, error) // Callback function to implement connection strategies for different backends. crate, pgbouncer, pgpool, etc.
|
CustomConnInfo func(*Conn) (*pgtype.ConnInfo, error) // Callback function to implement connection strategies for different backends. crate, pgbouncer, pgpool, etc.
|
||||||
CustomCancel func(*Conn) error // Callback function used to override cancellation behavior
|
CustomCancel func(*Conn) error // Callback function used to override cancellation behavior
|
||||||
|
|
||||||
|
@ -83,7 +75,6 @@ type Conn struct {
|
||||||
fp *fastpath
|
fp *fastpath
|
||||||
poolResetCount int
|
poolResetCount int
|
||||||
preallocatedRows []Rows
|
preallocatedRows []Rows
|
||||||
onNotice NoticeHandler
|
|
||||||
|
|
||||||
mux sync.Mutex
|
mux sync.Mutex
|
||||||
status byte // One of connStatus* constants
|
status byte // One of connStatus* constants
|
||||||
|
@ -195,8 +186,6 @@ func connect(ctx context.Context, config *ConnConfig, connInfo *pgtype.ConnInfo)
|
||||||
}
|
}
|
||||||
c.logger = c.config.Logger
|
c.logger = c.config.Logger
|
||||||
|
|
||||||
c.onNotice = config.OnNotice
|
|
||||||
|
|
||||||
if c.shouldLog(LogLevelInfo) {
|
if c.shouldLog(LogLevelInfo) {
|
||||||
c.log(LogLevelInfo, "Dialing PostgreSQL server", map[string]interface{}{"host": config.Config.Host})
|
c.log(LogLevelInfo, "Dialing PostgreSQL server", map[string]interface{}{"host": config.Config.Host})
|
||||||
}
|
}
|
||||||
|
@ -849,8 +838,6 @@ func (c *Conn) processContextFreeMsg(msg pgproto3.BackendMessage) (err error) {
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case *pgproto3.ErrorResponse:
|
case *pgproto3.ErrorResponse:
|
||||||
return c.rxErrorResponse(msg)
|
return c.rxErrorResponse(msg)
|
||||||
case *pgproto3.NoticeResponse:
|
|
||||||
c.rxNoticeResponse(msg)
|
|
||||||
case *pgproto3.NotificationResponse:
|
case *pgproto3.NotificationResponse:
|
||||||
c.rxNotificationResponse(msg)
|
c.rxNotificationResponse(msg)
|
||||||
case *pgproto3.ReadyForQuery:
|
case *pgproto3.ReadyForQuery:
|
||||||
|
@ -904,34 +891,6 @@ func (c *Conn) rxErrorResponse(msg *pgproto3.ErrorResponse) *pgconn.PgError {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) rxNoticeResponse(msg *pgproto3.NoticeResponse) {
|
|
||||||
if c.onNotice == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
notice := &Notice{
|
|
||||||
Severity: msg.Severity,
|
|
||||||
Code: msg.Code,
|
|
||||||
Message: msg.Message,
|
|
||||||
Detail: msg.Detail,
|
|
||||||
Hint: msg.Hint,
|
|
||||||
Position: msg.Position,
|
|
||||||
InternalPosition: msg.InternalPosition,
|
|
||||||
InternalQuery: msg.InternalQuery,
|
|
||||||
Where: msg.Where,
|
|
||||||
SchemaName: msg.SchemaName,
|
|
||||||
TableName: msg.TableName,
|
|
||||||
ColumnName: msg.ColumnName,
|
|
||||||
DataTypeName: msg.DataTypeName,
|
|
||||||
ConstraintName: msg.ConstraintName,
|
|
||||||
File: msg.File,
|
|
||||||
Line: msg.Line,
|
|
||||||
Routine: msg.Routine,
|
|
||||||
}
|
|
||||||
|
|
||||||
c.onNotice(c, notice)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Conn) rxReadyForQuery(msg *pgproto3.ReadyForQuery) {
|
func (c *Conn) rxReadyForQuery(msg *pgproto3.ReadyForQuery) {
|
||||||
c.pendingReadyForQueryCount--
|
c.pendingReadyForQueryCount--
|
||||||
}
|
}
|
||||||
|
|
27
conn_test.go
27
conn_test.go
|
@ -1038,33 +1038,6 @@ func TestIdentifierSanitize(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConnOnNotice(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
var msg string
|
|
||||||
|
|
||||||
connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))
|
|
||||||
connConfig.OnNotice = func(c *pgx.Conn, notice *pgx.Notice) {
|
|
||||||
msg = notice.Message
|
|
||||||
}
|
|
||||||
conn := mustConnect(t, connConfig)
|
|
||||||
defer closeConn(t, conn)
|
|
||||||
|
|
||||||
_, err := conn.Exec(context.Background(), `do $$
|
|
||||||
begin
|
|
||||||
raise notice 'hello, world';
|
|
||||||
end$$;`)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if msg != "hello, world" {
|
|
||||||
t.Errorf("msg => %v, want %v", msg, "hello, world")
|
|
||||||
}
|
|
||||||
|
|
||||||
ensureConnValid(t, conn)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConnInitConnInfo(t *testing.T) {
|
func TestConnInitConnInfo(t *testing.T) {
|
||||||
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
|
conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE"))
|
||||||
defer closeConn(t, conn)
|
defer closeConn(t, conn)
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/pgconn"
|
|
||||||
"github.com/jackc/pgx/pgio"
|
"github.com/jackc/pgx/pgio"
|
||||||
"github.com/jackc/pgx/pgtype"
|
"github.com/jackc/pgx/pgtype"
|
||||||
)
|
)
|
||||||
|
@ -79,10 +78,6 @@ func (fd FieldDescription) Type() reflect.Type {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notice represents a notice response message reported by the PostgreSQL
|
|
||||||
// server. Be aware that this is distinct from LISTEN/NOTIFY notification.
|
|
||||||
type Notice pgconn.PgError
|
|
||||||
|
|
||||||
// appendParse appends a PostgreSQL wire protocol parse message to buf and returns it.
|
// appendParse appends a PostgreSQL wire protocol parse message to buf and returns it.
|
||||||
func appendParse(buf []byte, name string, query string, parameterOIDs []pgtype.OID) []byte {
|
func appendParse(buf []byte, name string, query string, parameterOIDs []pgtype.OID) []byte {
|
||||||
buf = append(buf, 'P')
|
buf = append(buf, 'P')
|
||||||
|
|
|
@ -40,6 +40,8 @@ type Config struct {
|
||||||
// server is acceptable. If this returns an error the connection is closed and the next fallback config is tried. This
|
// server is acceptable. If this returns an error the connection is closed and the next fallback config is tried. This
|
||||||
// allows implementing high availability behavior such as libpq does with target_session_attrs.
|
// allows implementing high availability behavior such as libpq does with target_session_attrs.
|
||||||
AfterConnectFunc AfterConnectFunc
|
AfterConnectFunc AfterConnectFunc
|
||||||
|
|
||||||
|
OnNotice NoticeHandler // Callback function called when a notice response is received.
|
||||||
}
|
}
|
||||||
|
|
||||||
// FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a
|
// FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a
|
||||||
|
|
|
@ -48,9 +48,19 @@ func (pe *PgError) Error() string {
|
||||||
return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")"
|
return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from
|
||||||
|
// LISTEN/NOTIFY notification.
|
||||||
|
type Notice PgError
|
||||||
|
|
||||||
// DialFunc is a function that can be used to connect to a PostgreSQL server
|
// DialFunc is a function that can be used to connect to a PostgreSQL server
|
||||||
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
|
type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)
|
||||||
|
|
||||||
|
// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at
|
||||||
|
// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin
|
||||||
|
// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY
|
||||||
|
// notification.
|
||||||
|
type NoticeHandler func(*PgConn, *Notice)
|
||||||
|
|
||||||
// ErrTLSRefused occurs when the connection attempt requires TLS and the
|
// ErrTLSRefused occurs when the connection attempt requires TLS and the
|
||||||
// PostgreSQL server refuses to use TLS
|
// PostgreSQL server refuses to use TLS
|
||||||
var ErrTLSRefused = errors.New("server refused TLS connection")
|
var ErrTLSRefused = errors.New("server refused TLS connection")
|
||||||
|
@ -277,6 +287,10 @@ func (pgConn *PgConn) ReceiveMessage() (pgproto3.BackendMessage, error) {
|
||||||
// TODO - close pgConn
|
// TODO - close pgConn
|
||||||
return nil, errorResponseToPgError(msg)
|
return nil, errorResponseToPgError(msg)
|
||||||
}
|
}
|
||||||
|
case *pgproto3.NoticeResponse:
|
||||||
|
if pgConn.Config.OnNotice != nil {
|
||||||
|
pgConn.Config.OnNotice(pgConn, noticeResponseToNotice(msg))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return msg, nil
|
return msg, nil
|
||||||
|
@ -858,6 +872,11 @@ func errorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice {
|
||||||
|
pgerr := errorResponseToPgError((*pgproto3.ErrorResponse)(msg))
|
||||||
|
return (*Notice)(pgerr)
|
||||||
|
}
|
||||||
|
|
||||||
// CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel
|
// CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel
|
||||||
// request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there
|
// request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there
|
||||||
// is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9
|
// is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9
|
||||||
|
|
|
@ -551,3 +551,26 @@ func TestCommandTag(t *testing.T) {
|
||||||
assert.Equalf(t, tt.rowsAffected, actual, "%d. %v", i, tt.commandTag)
|
assert.Equalf(t, tt.rowsAffected, actual, "%d. %v", i, tt.commandTag)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConnOnNotice(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
var msg string
|
||||||
|
config.OnNotice = func(c *pgconn.PgConn, notice *pgconn.Notice) {
|
||||||
|
msg = notice.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
pgConn, err := pgconn.ConnectConfig(context.Background(), config)
|
||||||
|
require.Nil(t, err)
|
||||||
|
defer closeConn(t, pgConn)
|
||||||
|
|
||||||
|
_, err = pgConn.Exec(context.Background(), `do $$
|
||||||
|
begin
|
||||||
|
raise notice 'hello, world';
|
||||||
|
end$$;`)
|
||||||
|
require.Nil(t, err)
|
||||||
|
assert.Equal(t, "hello, world", msg)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue