Add tests for replication slot drop, and go fmt

pull/226/head
Kris Wehner 2017-01-26 18:24:21 -08:00
parent b2f416c07d
commit 41d9c0f338
2 changed files with 38 additions and 19 deletions

View File

@ -8,10 +8,10 @@ import (
) )
const ( const (
copyBothResponse = 'W' copyBothResponse = 'W'
walData = 'w' walData = 'w'
senderKeepalive = 'k' senderKeepalive = 'k'
standbyStatusUpdate = 'r' standbyStatusUpdate = 'r'
initialReplicationResponseTimeout = 5 * time.Second initialReplicationResponseTimeout = 5 * time.Second
) )
@ -157,7 +157,7 @@ func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) {
} }
config.RuntimeParams["replication"] = "database" config.RuntimeParams["replication"] = "database"
c,err := Connect(config) c, err := Connect(config)
if err != nil { if err != nil {
return return
} }
@ -208,6 +208,13 @@ func (rc *ReplicationConn) Close() error {
return rc.c.Close() return rc.c.Close()
} }
func (rc *ReplicationConn) IsAlive() bool {
return rc.c.IsAlive()
}
func (rc *ReplicationConn) CauseOfDeath() error {
return rc.c.CauseOfDeath()
}
func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) {
var t byte var t byte
@ -257,12 +264,12 @@ func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err
return &ReplicationMessage{ServerHeartbeat: h}, nil return &ReplicationMessage{ServerHeartbeat: h}, nil
default: default:
if rc.c.shouldLog(LogLevelError) { if rc.c.shouldLog(LogLevelError) {
rc.c.log(LogLevelError,"Unexpected data playload message type %v", t) rc.c.log(LogLevelError, "Unexpected data playload message type %v", t)
} }
} }
default: default:
if rc.c.shouldLog(LogLevelError) { if rc.c.shouldLog(LogLevelError) {
rc.c.log(LogLevelError,"Unexpected replication message type %v", t) rc.c.log(LogLevelError, "Unexpected replication message type %v", t)
} }
} }
return return

View File

@ -1,13 +1,13 @@
package pgx_test package pgx_test
import ( import (
"fmt"
"github.com/jackc/pgx" "github.com/jackc/pgx"
"reflect"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
"time" "time"
"reflect"
"fmt"
) )
// This function uses a postgresql 9.6 specific column // This function uses a postgresql 9.6 specific column
@ -51,7 +51,7 @@ func TestSimpleReplicationConnection(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig) replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn) defer closeReplicationConn(t, replicationConn)
err = replicationConn.CreateReplicationSlot("pgx_test","test_decoding") err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
if err != nil { if err != nil {
t.Logf("replication slot create failed: %v", err) t.Logf("replication slot create failed: %v", err)
} }
@ -152,14 +152,23 @@ func TestSimpleReplicationConnection(t *testing.T) {
replicationConn.SendStandbyStatus(status) replicationConn.SendStandbyStatus(status)
replicationConn.StopReplication() replicationConn.StopReplication()
if replicationConn.IsAlive() == false {
t.Errorf("Connection died: %v", replicationConn.CauseOfDeath())
}
err = replicationConn.Close()
if err != nil {
t.Fatalf("Replication connection close failed: %v", err)
}
// Let's push the boundary conditions of the standby status and ensure it errors correctly // Let's push the boundary conditions of the standby status and ensure it errors correctly
status, err = pgx.NewStandbyStatus(0,1,2,3,4) status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4)
if err == nil { if err == nil {
t.Errorf("Expected error from new standby status, got %v",status) t.Errorf("Expected error from new standby status, got %v", status)
} }
// And if you provide 3 args, ensure the right fields are set // And if you provide 3 args, ensure the right fields are set
status, err = pgx.NewStandbyStatus(1,2,3) status, err = pgx.NewStandbyStatus(1, 2, 3)
if err != nil { if err != nil {
t.Errorf("Failed to create test status: %v", err) t.Errorf("Failed to create test status: %v", err)
} }
@ -173,20 +182,23 @@ func TestSimpleReplicationConnection(t *testing.T) {
t.Errorf("Unexpected write position %d", status.WalWritePosition) t.Errorf("Unexpected write position %d", status.WalWritePosition)
} }
err = replicationConn.Close()
if err != nil {
t.Fatalf("Replication connection close failed: %v", err)
}
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
integerRestartLsn, _ := pgx.ParseLSN(restartLsn) integerRestartLsn, _ := pgx.ParseLSN(restartLsn)
if integerRestartLsn != maxWal { if integerRestartLsn != maxWal {
t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn)
} }
_, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") replicationConn2 := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn2)
err = replicationConn2.DropReplicationSlot("pgx_test")
if err != nil { if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err) t.Fatalf("Failed to drop replication slot: %v", err)
} }
droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
if droppedLsn != "" {
t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn)
}
} }