Start replication now wraps the sql and returns errors properly

pull/224/head
Kris Wehner 2017-01-03 11:49:13 -08:00
parent 2b096a7d08
commit dad2c383af
2 changed files with 43 additions and 13 deletions

View File

@ -12,6 +12,7 @@ const (
walData = 'w'
senderKeepalive = 'k'
standbyStatusUpdate = 'r'
initialReplicationResponseTimeout = 5 * time.Second
)
var epochNano int64
@ -215,6 +216,14 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
replyNow := reader.readByte()
h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
return &ReplicationMessage{ServerHeartbeat: h}, nil
default:
if c.shouldLog(LogLevelError) {
c.log(LogLevelError,"Unexpected data playload message type %v", t)
}
}
default:
if c.shouldLog(LogLevelError) {
c.log(LogLevelError,"Unexpected replication message type %v", t)
}
}
return
@ -227,10 +236,6 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
// updates. The caller also must send standby status updates in order to keep the connection
// alive and working.
//
// There is also a condition (during startup) which can cause both the replication message
// to return as nil as well as the error, which is a normal part of the replication protocol
// startup. It's important the client correctly handle (ignore) this scenario.
//
// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified
// duration.
func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
@ -270,19 +275,43 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM
}
// Start a replication connection, sending WAL data to the given replication
// receiver. The sql string here should be a "START_REPLICATION" command, as
// per the postgresql docs here:
// receiver. This function wraps a START_REPLICATION command as documented
// here:
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
//
// A typical query would look like:
// START_REPLICATION SLOT t LOGICAL test_decoder 0/0
//
// Once started, the client needs to invoke WaitForReplicationMessage() in order
// to fetch the WAL and standby status. Also, it is the responsibility of the caller
// to periodically send StandbyStatus messages to update the replication slot position.
func (c *Conn) StartReplication(sql string, arguments ...interface{}) (err error) {
if err = c.sendQuery(sql, arguments...); err != nil {
//
// This function assumes that slotName has already been created. In order to omit the timeline argument
// pass a -1 for the timeline to get the server default behavior.
func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
var queryString string
if timeline >= 0 {
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline)
} else {
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s", slotName, FormatLSN(startLsn))
}
for _, arg := range pluginArguments {
queryString += fmt.Sprintf(" %s", arg)
}
if err = c.sendQuery(queryString); err != nil {
return
}
// The first replication message that comes back here will be (in a success case)
// a empty CopyBoth that is (apparently) sent as the confirmation that the replication has
// started. This call will either return nil, nil or if it returns an error
// that indicates the start replication command failed
var r *ReplicationMessage
r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout)
if err != nil && r != nil {
if c.shouldLog(LogLevelError) {
c.log(LogLevelError, "Unxpected replication message %v", r)
}
}
return
}

View File

@ -7,12 +7,13 @@ import (
"testing"
"time"
"reflect"
"fmt"
)
// This function uses a postgresql 9.6 specific column
func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string {
// Fetch the restart LSN of the slot, to establish a starting point
rows, err := conn.Query("select confirmed_flush_lsn from pg_replication_slots where slot_name='pgx_test'")
rows, err := conn.Query(fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot))
if err != nil {
t.Fatalf("conn.Query failed: %v", err)
}
@ -66,7 +67,7 @@ func TestSimpleReplicationConnection(t *testing.T) {
t.Fatalf("Failed to create table: %v", err)
}
err = replicationConn.StartReplication("START_REPLICATION SLOT pgx_test LOGICAL 0/0")
err = replicationConn.StartReplication("pgx_test", 0, -1)
if err != nil {
t.Fatalf("Failed to start replication: %v", err)
}