diff --git a/replication.go b/replication.go index 16755f0e..fdbdce92 100644 --- a/replication.go +++ b/replication.go @@ -12,6 +12,7 @@ const ( walData = 'w' senderKeepalive = 'k' standbyStatusUpdate = 'r' + initialReplicationResponseTimeout = 5 * time.Second ) var epochNano int64 @@ -215,6 +216,14 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { replyNow := reader.readByte() h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil + default: + if c.shouldLog(LogLevelError) { + c.log(LogLevelError,"Unexpected data playload message type %v", t) + } + } + default: + if c.shouldLog(LogLevelError) { + c.log(LogLevelError,"Unexpected replication message type %v", t) } } return @@ -227,10 +236,6 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { // updates. The caller also must send standby status updates in order to keep the connection // alive and working. // -// There is also a condition (during startup) which can cause both the replication message -// to return as nil as well as the error, which is a normal part of the replication protocol -// startup. It's important the client correctly handle (ignore) this scenario. -// // This returns pgx.ErrNotificationTimeout when there is no replication message by the specified // duration. func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { @@ -270,19 +275,43 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM } // Start a replication connection, sending WAL data to the given replication -// receiver. The sql string here should be a "START_REPLICATION" command, as -// per the postgresql docs here: +// receiver. This function wraps a START_REPLICATION command as documented +// here: // https://www.postgresql.org/docs/9.5/static/protocol-replication.html // -// A typical query would look like: -// START_REPLICATION SLOT t LOGICAL test_decoder 0/0 -// // Once started, the client needs to invoke WaitForReplicationMessage() in order // to fetch the WAL and standby status. Also, it is the responsibility of the caller // to periodically send StandbyStatus messages to update the replication slot position. -func (c *Conn) StartReplication(sql string, arguments ...interface{}) (err error) { - if err = c.sendQuery(sql, arguments...); err != nil { +// +// This function assumes that slotName has already been created. In order to omit the timeline argument +// pass a -1 for the timeline to get the server default behavior. +func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { + var queryString string + if timeline >= 0 { + queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline) + } else { + queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s", slotName, FormatLSN(startLsn)) + } + + for _, arg := range pluginArguments { + queryString += fmt.Sprintf(" %s", arg) + } + + if err = c.sendQuery(queryString); err != nil { return } + + // The first replication message that comes back here will be (in a success case) + // a empty CopyBoth that is (apparently) sent as the confirmation that the replication has + // started. This call will either return nil, nil or if it returns an error + // that indicates the start replication command failed + var r *ReplicationMessage + r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout) + if err != nil && r != nil { + if c.shouldLog(LogLevelError) { + c.log(LogLevelError, "Unxpected replication message %v", r) + } + } + return } diff --git a/replication_test.go b/replication_test.go index 1d77982a..411b449e 100644 --- a/replication_test.go +++ b/replication_test.go @@ -7,12 +7,13 @@ import ( "testing" "time" "reflect" + "fmt" ) // This function uses a postgresql 9.6 specific column func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string { // Fetch the restart LSN of the slot, to establish a starting point - rows, err := conn.Query("select confirmed_flush_lsn from pg_replication_slots where slot_name='pgx_test'") + rows, err := conn.Query(fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot)) if err != nil { t.Fatalf("conn.Query failed: %v", err) } @@ -66,7 +67,7 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Fatalf("Failed to create table: %v", err) } - err = replicationConn.StartReplication("START_REPLICATION SLOT pgx_test LOGICAL 0/0") + err = replicationConn.StartReplication("pgx_test", 0, -1) if err != nil { t.Fatalf("Failed to start replication: %v", err) }