mirror of https://github.com/jackc/pgx.git
ReplicationConn refactor
parent
5584040249
commit
c88c110169
|
@ -13,6 +13,15 @@ func mustConnect(t testing.TB, config pgx.ConnConfig) *pgx.Conn {
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustReplicationConnect(t testing.TB, config pgx.ConnConfig) *pgx.ReplicationConn {
|
||||||
|
conn, err := pgx.ReplicationConnect(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unable to establish connection: %v", err)
|
||||||
|
}
|
||||||
|
return conn
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
func closeConn(t testing.TB, conn *pgx.Conn) {
|
func closeConn(t testing.TB, conn *pgx.Conn) {
|
||||||
err := conn.Close()
|
err := conn.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -20,6 +29,13 @@ func closeConn(t testing.TB, conn *pgx.Conn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func closeReplicationConn(t testing.TB, conn *pgx.ReplicationConn) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("conn.Close unexpectedly failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgx.CommandTag) {
|
func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgx.CommandTag) {
|
||||||
var err error
|
var err error
|
||||||
if commandTag, err = conn.Exec(sql, arguments...); err != nil {
|
if commandTag, err = conn.Exec(sql, arguments...); err != nil {
|
||||||
|
|
|
@ -21,7 +21,7 @@ func (r *msgReader) Err() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// fatal tells r that a Fatal error has occurred
|
// fatal tells rc that a Fatal error has occurred
|
||||||
func (r *msgReader) fatal(err error) {
|
func (r *msgReader) fatal(err error) {
|
||||||
if r.shouldLog(LogLevelTrace) {
|
if r.shouldLog(LogLevelTrace) {
|
||||||
r.log(LogLevelTrace, "msgReader.fatal", "error", err, "msgBytesRemaining", r.msgBytesRemaining)
|
r.log(LogLevelTrace, "msgReader.fatal", "error", err, "msgBytesRemaining", r.msgBytesRemaining)
|
||||||
|
|
|
@ -151,11 +151,28 @@ func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) {
|
||||||
|
if config.RuntimeParams == nil {
|
||||||
|
config.RuntimeParams = make(map[string]string)
|
||||||
|
}
|
||||||
|
config.RuntimeParams["replication"] = "database"
|
||||||
|
|
||||||
|
c,err := Connect(config)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return &ReplicationConn{c: c}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReplicationConn struct {
|
||||||
|
c *Conn
|
||||||
|
}
|
||||||
|
|
||||||
// Send standby status to the server, which both acts as a keepalive
|
// Send standby status to the server, which both acts as a keepalive
|
||||||
// message to the server, as well as carries the WAL position of the
|
// message to the server, as well as carries the WAL position of the
|
||||||
// client, which then updates the server's replication slot position.
|
// client, which then updates the server's replication slot position.
|
||||||
func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
||||||
writeBuf := newWriteBuf(c, copyData)
|
writeBuf := newWriteBuf(rc.c, copyData)
|
||||||
writeBuf.WriteByte(standbyStatusUpdate)
|
writeBuf.WriteByte(standbyStatusUpdate)
|
||||||
writeBuf.WriteInt64(int64(k.WalWritePosition))
|
writeBuf.WriteInt64(int64(k.WalWritePosition))
|
||||||
writeBuf.WriteInt64(int64(k.WalFlushPosition))
|
writeBuf.WriteInt64(int64(k.WalFlushPosition))
|
||||||
|
@ -165,9 +182,9 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
||||||
|
|
||||||
writeBuf.closeMsg()
|
writeBuf.closeMsg()
|
||||||
|
|
||||||
_, err = c.conn.Write(writeBuf.buf)
|
_, err = rc.c.conn.Write(writeBuf.buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.die(err)
|
rc.c.die(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
|
@ -175,37 +192,41 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
||||||
|
|
||||||
// Send the message to formally stop the replication stream. This
|
// Send the message to formally stop the replication stream. This
|
||||||
// is done before calling Close() during a clean shutdown.
|
// is done before calling Close() during a clean shutdown.
|
||||||
func (c *Conn) StopReplication() (err error) {
|
func (rc *ReplicationConn) StopReplication() (err error) {
|
||||||
writeBuf := newWriteBuf(c, copyDone)
|
writeBuf := newWriteBuf(rc.c, copyDone)
|
||||||
|
|
||||||
writeBuf.closeMsg()
|
writeBuf.closeMsg()
|
||||||
|
|
||||||
_, err = c.conn.Write(writeBuf.buf)
|
_, err = rc.c.conn.Write(writeBuf.buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.die(err)
|
rc.c.die(err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rc *ReplicationConn) Close() error {
|
||||||
|
return rc.c.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
|
||||||
|
func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
||||||
var t byte
|
var t byte
|
||||||
var reader *msgReader
|
var reader *msgReader
|
||||||
t, reader, err = c.rxMsg()
|
t, reader, err = rc.c.rxMsg()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
switch t {
|
switch t {
|
||||||
case noticeResponse:
|
case noticeResponse:
|
||||||
pgError := c.rxErrorResponse(reader)
|
pgError := rc.c.rxErrorResponse(reader)
|
||||||
if c.shouldLog(LogLevelInfo) {
|
if rc.c.shouldLog(LogLevelInfo) {
|
||||||
c.log(LogLevelInfo, pgError.Error())
|
rc.c.log(LogLevelInfo, pgError.Error())
|
||||||
}
|
}
|
||||||
case errorResponse:
|
case errorResponse:
|
||||||
err = c.rxErrorResponse(reader)
|
err = rc.c.rxErrorResponse(reader)
|
||||||
if c.shouldLog(LogLevelError) {
|
if rc.c.shouldLog(LogLevelError) {
|
||||||
c.log(LogLevelError, err.Error())
|
rc.c.log(LogLevelError, err.Error())
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
case copyBothResponse:
|
case copyBothResponse:
|
||||||
|
@ -235,13 +256,13 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
||||||
h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
|
h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
|
||||||
return &ReplicationMessage{ServerHeartbeat: h}, nil
|
return &ReplicationMessage{ServerHeartbeat: h}, nil
|
||||||
default:
|
default:
|
||||||
if c.shouldLog(LogLevelError) {
|
if rc.c.shouldLog(LogLevelError) {
|
||||||
c.log(LogLevelError,"Unexpected data playload message type %v", t)
|
rc.c.log(LogLevelError,"Unexpected data playload message type %v", t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if c.shouldLog(LogLevelError) {
|
if rc.c.shouldLog(LogLevelError) {
|
||||||
c.log(LogLevelError,"Unexpected replication message type %v", t)
|
rc.c.log(LogLevelError,"Unexpected replication message type %v", t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -256,7 +277,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
||||||
//
|
//
|
||||||
// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified
|
// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified
|
||||||
// duration.
|
// duration.
|
||||||
func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
|
func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
|
||||||
var zeroTime time.Time
|
var zeroTime time.Time
|
||||||
|
|
||||||
deadline := time.Now().Add(timeout)
|
deadline := time.Now().Add(timeout)
|
||||||
|
@ -269,27 +290,27 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM
|
||||||
// deadline and peek into the reader. If a timeout error occurs there
|
// deadline and peek into the reader. If a timeout error occurs there
|
||||||
// we don't break the pgx connection. If the Peek returns that data
|
// we don't break the pgx connection. If the Peek returns that data
|
||||||
// is available then we turn off the read deadline before the rxMsg.
|
// is available then we turn off the read deadline before the rxMsg.
|
||||||
err = c.conn.SetReadDeadline(deadline)
|
err = rc.c.conn.SetReadDeadline(deadline)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait until there is a byte available before continuing onto the normal msg reading path
|
// Wait until there is a byte available before continuing onto the normal msg reading path
|
||||||
_, err = c.reader.Peek(1)
|
_, err = rc.c.reader.Peek(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
|
rc.c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
|
||||||
if err, ok := err.(*net.OpError); ok && err.Timeout() {
|
if err, ok := err.(*net.OpError); ok && err.Timeout() {
|
||||||
return nil, ErrNotificationTimeout
|
return nil, ErrNotificationTimeout
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.conn.SetReadDeadline(zeroTime)
|
err = rc.c.conn.SetReadDeadline(zeroTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.readReplicationMessage()
|
return rc.readReplicationMessage()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a replication connection, sending WAL data to the given replication
|
// Start a replication connection, sending WAL data to the given replication
|
||||||
|
@ -303,7 +324,7 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM
|
||||||
//
|
//
|
||||||
// This function assumes that slotName has already been created. In order to omit the timeline argument
|
// This function assumes that slotName has already been created. In order to omit the timeline argument
|
||||||
// pass a -1 for the timeline to get the server default behavior.
|
// pass a -1 for the timeline to get the server default behavior.
|
||||||
func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
|
func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
|
||||||
var queryString string
|
var queryString string
|
||||||
if timeline >= 0 {
|
if timeline >= 0 {
|
||||||
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline)
|
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline)
|
||||||
|
@ -315,7 +336,7 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64
|
||||||
queryString += fmt.Sprintf(" %s", arg)
|
queryString += fmt.Sprintf(" %s", arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = c.sendQuery(queryString); err != nil {
|
if err = rc.c.sendQuery(queryString); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,12 +345,24 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64
|
||||||
// started. This call will either return nil, nil or if it returns an error
|
// started. This call will either return nil, nil or if it returns an error
|
||||||
// that indicates the start replication command failed
|
// that indicates the start replication command failed
|
||||||
var r *ReplicationMessage
|
var r *ReplicationMessage
|
||||||
r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout)
|
r, err = rc.WaitForReplicationMessage(initialReplicationResponseTimeout)
|
||||||
if err != nil && r != nil {
|
if err != nil && r != nil {
|
||||||
if c.shouldLog(LogLevelError) {
|
if rc.c.shouldLog(LogLevelError) {
|
||||||
c.log(LogLevelError, "Unxpected replication message %v", r)
|
rc.c.log(LogLevelError, "Unxpected replication message %v", r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create the replication slot, using the given name and output plugin.
|
||||||
|
func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) {
|
||||||
|
_, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop the replication slot for the given name
|
||||||
|
func (rc *ReplicationConn) DropReplicationSlot(slotName, outputPlugin string) (err error) {
|
||||||
|
_, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -48,13 +48,10 @@ func TestSimpleReplicationConnection(t *testing.T) {
|
||||||
conn := mustConnect(t, *replicationConnConfig)
|
conn := mustConnect(t, *replicationConnConfig)
|
||||||
defer closeConn(t, conn)
|
defer closeConn(t, conn)
|
||||||
|
|
||||||
replicationConnConfig.RuntimeParams = make(map[string]string)
|
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
|
||||||
replicationConnConfig.RuntimeParams["replication"] = "database"
|
defer closeReplicationConn(t, replicationConn)
|
||||||
|
|
||||||
replicationConn := mustConnect(t, *replicationConnConfig)
|
err = replicationConn.CreateReplicationSlot("pgx_test","test_decoding")
|
||||||
defer closeConn(t, replicationConn)
|
|
||||||
|
|
||||||
_, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("replication slot create failed: %v", err)
|
t.Logf("replication slot create failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ func (r *ValueReader) Err() error {
|
||||||
return r.err
|
return r.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fatal tells r that a Fatal error has occurred
|
// Fatal tells rc that a Fatal error has occurred
|
||||||
func (r *ValueReader) Fatal(err error) {
|
func (r *ValueReader) Fatal(err error) {
|
||||||
r.err = err
|
r.err = err
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,9 +129,9 @@ func (e SerializationError) Error() string {
|
||||||
|
|
||||||
// Scanner is an interface used to decode values from the PostgreSQL server.
|
// Scanner is an interface used to decode values from the PostgreSQL server.
|
||||||
type Scanner interface {
|
type Scanner interface {
|
||||||
// Scan MUST check r.Type().DataType (to check by OID) or
|
// Scan MUST check rc.Type().DataType (to check by OID) or
|
||||||
// r.Type().DataTypeName (to check by name) to ensure that it is scanning an
|
// rc.Type().DataTypeName (to check by name) to ensure that it is scanning an
|
||||||
// expected column type. It also MUST check r.Type().FormatCode before
|
// expected column type. It also MUST check rc.Type().FormatCode before
|
||||||
// decoding. It should not assume that it was called on a data type or format
|
// decoding. It should not assume that it was called on a data type or format
|
||||||
// that it understands.
|
// that it understands.
|
||||||
Scan(r *ValueReader) error
|
Scan(r *ValueReader) error
|
||||||
|
@ -3167,7 +3167,7 @@ func parseQuotedAclItem(reader *strings.Reader) (AclItem, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the next rune from r, unless it is a backslash;
|
// Returns the next rune from rc, unless it is a backslash;
|
||||||
// in that case, it returns the rune after the backslash. The second
|
// in that case, it returns the rune after the backslash. The second
|
||||||
// return value tells us whether or not the rune was
|
// return value tells us whether or not the rune was
|
||||||
// preceeded by a backslash (escaped).
|
// preceeded by a backslash (escaped).
|
||||||
|
|
Loading…
Reference in New Issue