diff --git a/conn.go b/conn.go index 95144f21..e02f905f 100644 --- a/conn.go +++ b/conn.go @@ -40,19 +40,11 @@ func init() { }) } -// NoticeHandler is a function that can handle notices received from the -// PostgreSQL server. Notices can be received at any time, usually during -// handling of a query response. The *Conn is provided so the handler is aware -// of the origin of the notice, but it must not invoke any query method. Be -// aware that this is distinct from LISTEN/NOTIFY notification. -type NoticeHandler func(*Conn, *Notice) - // ConnConfig contains all the options used to establish a connection. type ConnConfig struct { pgconn.Config Logger Logger LogLevel int - OnNotice NoticeHandler // Callback function called when a notice response is received. CustomConnInfo func(*Conn) (*pgtype.ConnInfo, error) // Callback function to implement connection strategies for different backends. crate, pgbouncer, pgpool, etc. CustomCancel func(*Conn) error // Callback function used to override cancellation behavior @@ -83,7 +75,6 @@ type Conn struct { fp *fastpath poolResetCount int preallocatedRows []Rows - onNotice NoticeHandler mux sync.Mutex status byte // One of connStatus* constants @@ -195,8 +186,6 @@ func connect(ctx context.Context, config *ConnConfig, connInfo *pgtype.ConnInfo) } c.logger = c.config.Logger - c.onNotice = config.OnNotice - if c.shouldLog(LogLevelInfo) { c.log(LogLevelInfo, "Dialing PostgreSQL server", map[string]interface{}{"host": config.Config.Host}) } @@ -849,8 +838,6 @@ func (c *Conn) processContextFreeMsg(msg pgproto3.BackendMessage) (err error) { switch msg := msg.(type) { case *pgproto3.ErrorResponse: return c.rxErrorResponse(msg) - case *pgproto3.NoticeResponse: - c.rxNoticeResponse(msg) case *pgproto3.NotificationResponse: c.rxNotificationResponse(msg) case *pgproto3.ReadyForQuery: @@ -904,34 +891,6 @@ func (c *Conn) rxErrorResponse(msg *pgproto3.ErrorResponse) *pgconn.PgError { return err } -func (c *Conn) rxNoticeResponse(msg *pgproto3.NoticeResponse) { - if c.onNotice == nil { - return - } - - notice := &Notice{ - Severity: msg.Severity, - Code: msg.Code, - Message: msg.Message, - Detail: msg.Detail, - Hint: msg.Hint, - Position: msg.Position, - InternalPosition: msg.InternalPosition, - InternalQuery: msg.InternalQuery, - Where: msg.Where, - SchemaName: msg.SchemaName, - TableName: msg.TableName, - ColumnName: msg.ColumnName, - DataTypeName: msg.DataTypeName, - ConstraintName: msg.ConstraintName, - File: msg.File, - Line: msg.Line, - Routine: msg.Routine, - } - - c.onNotice(c, notice) -} - func (c *Conn) rxReadyForQuery(msg *pgproto3.ReadyForQuery) { c.pendingReadyForQueryCount-- } diff --git a/conn_test.go b/conn_test.go index d77e8847..c9816b58 100644 --- a/conn_test.go +++ b/conn_test.go @@ -1038,33 +1038,6 @@ func TestIdentifierSanitize(t *testing.T) { } } -func TestConnOnNotice(t *testing.T) { - t.Parallel() - - var msg string - - connConfig := mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE")) - connConfig.OnNotice = func(c *pgx.Conn, notice *pgx.Notice) { - msg = notice.Message - } - conn := mustConnect(t, connConfig) - defer closeConn(t, conn) - - _, err := conn.Exec(context.Background(), `do $$ -begin - raise notice 'hello, world'; -end$$;`) - if err != nil { - t.Fatal(err) - } - - if msg != "hello, world" { - t.Errorf("msg => %v, want %v", msg, "hello, world") - } - - ensureConnValid(t, conn) -} - func TestConnInitConnInfo(t *testing.T) { conn := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE")) defer closeConn(t, conn) diff --git a/messages.go b/messages.go index 633ddad1..cd504bfc 100644 --- a/messages.go +++ b/messages.go @@ -6,7 +6,6 @@ import ( "reflect" "time" - "github.com/jackc/pgx/pgconn" "github.com/jackc/pgx/pgio" "github.com/jackc/pgx/pgtype" ) @@ -79,10 +78,6 @@ func (fd FieldDescription) Type() reflect.Type { } } -// Notice represents a notice response message reported by the PostgreSQL -// server. Be aware that this is distinct from LISTEN/NOTIFY notification. -type Notice pgconn.PgError - // appendParse appends a PostgreSQL wire protocol parse message to buf and returns it. func appendParse(buf []byte, name string, query string, parameterOIDs []pgtype.OID) []byte { buf = append(buf, 'P') diff --git a/pgconn/config.go b/pgconn/config.go index d8872f66..bd1fec9b 100644 --- a/pgconn/config.go +++ b/pgconn/config.go @@ -40,6 +40,8 @@ type Config struct { // server is acceptable. If this returns an error the connection is closed and the next fallback config is tried. This // allows implementing high availability behavior such as libpq does with target_session_attrs. AfterConnectFunc AfterConnectFunc + + OnNotice NoticeHandler // Callback function called when a notice response is received. } // FallbackConfig is additional settings to attempt a connection with when the primary Config fails to establish a diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index b3abe8e0..6b6330dc 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -48,9 +48,19 @@ func (pe *PgError) Error() string { return pe.Severity + ": " + pe.Message + " (SQLSTATE " + pe.Code + ")" } +// Notice represents a notice response message reported by the PostgreSQL server. Be aware that this is distinct from +// LISTEN/NOTIFY notification. +type Notice PgError + // DialFunc is a function that can be used to connect to a PostgreSQL server type DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) +// NoticeHandler is a function that can handle notices received from the PostgreSQL server. Notices can be received at +// any time, usually during handling of a query response. The *PgConn is provided so the handler is aware of the origin +// of the notice, but it must not invoke any query method. Be aware that this is distinct from LISTEN/NOTIFY +// notification. +type NoticeHandler func(*PgConn, *Notice) + // ErrTLSRefused occurs when the connection attempt requires TLS and the // PostgreSQL server refuses to use TLS var ErrTLSRefused = errors.New("server refused TLS connection") @@ -277,6 +287,10 @@ func (pgConn *PgConn) ReceiveMessage() (pgproto3.BackendMessage, error) { // TODO - close pgConn return nil, errorResponseToPgError(msg) } + case *pgproto3.NoticeResponse: + if pgConn.Config.OnNotice != nil { + pgConn.Config.OnNotice(pgConn, noticeResponseToNotice(msg)) + } } return msg, nil @@ -858,6 +872,11 @@ func errorResponseToPgError(msg *pgproto3.ErrorResponse) *PgError { } } +func noticeResponseToNotice(msg *pgproto3.NoticeResponse) *Notice { + pgerr := errorResponseToPgError((*pgproto3.ErrorResponse)(msg)) + return (*Notice)(pgerr) +} + // CancelRequest sends a cancel request to the PostgreSQL server. It returns an error if unable to deliver the cancel // request, but lack of an error does not ensure that the query was canceled. As specified in the documentation, there // is no way to be sure a query was canceled. See https://www.postgresql.org/docs/11/protocol-flow.html#id-1.10.5.7.9 diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 8d6b606a..98ec9664 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -551,3 +551,26 @@ func TestCommandTag(t *testing.T) { assert.Equalf(t, tt.rowsAffected, actual, "%d. %v", i, tt.commandTag) } } + +func TestConnOnNotice(t *testing.T) { + t.Parallel() + + config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.Nil(t, err) + + var msg string + config.OnNotice = func(c *pgconn.PgConn, notice *pgconn.Notice) { + msg = notice.Message + } + + pgConn, err := pgconn.ConnectConfig(context.Background(), config) + require.Nil(t, err) + defer closeConn(t, pgConn) + + _, err = pgConn.Exec(context.Background(), `do $$ +begin + raise notice 'hello, world'; +end$$;`) + require.Nil(t, err) + assert.Equal(t, "hello, world", msg) +}