From edbd30ea6ad35cb4df53f845cd403d4fb706bc79 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Tue, 6 Dec 2016 15:44:37 -0800 Subject: [PATCH] Add replication stop mechanism --- replication.go | 20 ++++++++++++++++++-- replication_test.go | 7 +++++++ stdlib/sql_test.go | 4 ++-- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/replication.go b/replication.go index a107274b..66860787 100644 --- a/replication.go +++ b/replication.go @@ -148,14 +148,27 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { _, err = c.conn.Write(writeBuf.buf) if err != nil { - fmt.Printf("Error sending standby status %v\n", err) c.die(err) } - fmt.Printf("Write complete, wal position is %s\n", FormatLsn(k.WalApplyPosition)) return } +// Send the message to formally stop the replication stream. This +// is done before calling Close() during a clean shutdown. +func (c *Conn) StopReplication() (err error) { + writeBuf := newWriteBuf(c, copyDone) + + writeBuf.closeMsg() + + _, err = c.conn.Write(writeBuf.buf) + if err != nil { + c.die(err) + } + return +} + + func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { var t byte var reader *msgReader @@ -217,6 +230,9 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { // There is also a condition (during startup) which can cause both the replication message // to return as nil as well as the error, which is a normal part of the replication protocol // startup. It's important the client correctly handle (ignore) this scenario. +// +// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified +// duration. func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { var zeroTime time.Time diff --git a/replication_test.go b/replication_test.go index 4c76deaf..60119b14 100644 --- a/replication_test.go +++ b/replication_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" "time" + "reflect" ) // This function uses a postgresql 9.6 specific column @@ -92,6 +93,11 @@ func TestSimpleReplicationConnection(t *testing.T) { var message *pgx.ReplicationMessage message, err = replicationConn.WaitForReplicationMessage(time.Duration(1 * time.Second)) + if err != nil { + if err != pgx.ErrNotificationTimeout { + t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err)) + } + } if message != nil { if message.WalMessage != nil { // The waldata payload with the test_decoding plugin looks like: @@ -144,6 +150,7 @@ func TestSimpleReplicationConnection(t *testing.T) { // position, which should then be reflected if we fetch out our current wal position // for the slot replicationConn.SendStandbyStatus(pgx.NewStandbyStatus(maxWal)) + replicationConn.StopReplication() err = replicationConn.Close() if err != nil { diff --git a/stdlib/sql_test.go b/stdlib/sql_test.go index 602a9171..5a5f7049 100644 --- a/stdlib/sql_test.go +++ b/stdlib/sql_test.go @@ -30,7 +30,7 @@ func ensureConnValid(t *testing.T, db *sql.DB) { rows, err := db.Query("select generate_series(1,$1)", 10) if err != nil { - t.Fatalf("db.Query failed: ", err) + t.Fatalf("db.Query failed: %v", err) } defer rows.Close() @@ -42,7 +42,7 @@ func ensureConnValid(t *testing.T, db *sql.DB) { } if rows.Err() != nil { - t.Fatalf("db.Query failed: ", err) + t.Fatalf("db.Query failed: %v", err) } if rowCount != 10 {