Add IdentifySystem and TimelineHistory functions, and tighten up the testing

pull/226/head
Kris Wehner 2017-01-30 11:35:48 -08:00
parent 41d9c0f338
commit 1424fb2b42
2 changed files with 214 additions and 35 deletions

View File

@ -190,20 +190,6 @@ func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) {
return
}
// Send the message to formally stop the replication stream. This
// is done before calling Close() during a clean shutdown.
func (rc *ReplicationConn) StopReplication() (err error) {
writeBuf := newWriteBuf(rc.c, copyDone)
writeBuf.closeMsg()
_, err = rc.c.conn.Write(writeBuf.buf)
if err != nil {
rc.c.die(err)
}
return
}
func (rc *ReplicationConn) Close() error {
return rc.c.Close()
}
@ -320,6 +306,74 @@ func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *
return rc.readReplicationMessage()
}
func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
rc.c.lastActivityTime = time.Now()
rows := rc.c.getRows(sql, nil)
if err := rc.c.lock(); err != nil {
rows.abort(err)
return rows, err
}
rows.unlockConn = true
err := rc.c.sendSimpleQuery(sql)
if err != nil {
rows.abort(err)
}
var t byte
var r *msgReader
t, r, err = rc.c.rxMsg()
if err != nil {
return nil, err
}
switch t {
case rowDescription:
rows.fields = rc.c.rxRowDescription(r)
// We don't have c.PgTypes here because we're a replication
// connection. This means the field descriptions will have
// only Oids. Not much we can do about this.
default:
if e := rc.c.processContextFreeMsg(t, r); e != nil {
rows.abort(e)
return rows, e
}
}
return rows, rows.err
}
// Execute the "IDENTIFY_SYSTEM" command as documented here:
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
//
// This will return (if successful) a result set that has a single row
// that contains the systemid, current timeline, xlogpos and database
// name.
//
// NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only
// Oids and no DataTypeName values
func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) {
return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM")
}
// Execute the "TIMELINE_HISTORY" command as documented here:
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
//
// This will return (if successful) a result set that has a single row
// that contains the filename of the history file and the content
// of the history file. If called for timeline 1, typically this will
// generate an error that the timeline history file does not exist.
//
// NOTE: Because this is a replication mode connection, we don't have
// type names, so the field descriptions in the result will have only
// Oids and no DataTypeName values
func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) {
return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline))
}
// Start a replication connection, sending WAL data to the given replication
// receiver. This function wraps a START_REPLICATION command as documented
// here:

View File

@ -150,7 +150,6 @@ func TestSimpleReplicationConnection(t *testing.T) {
t.Errorf("Failed to create standby status %v", err)
}
replicationConn.SendStandbyStatus(status)
replicationConn.StopReplication()
if replicationConn.IsAlive() == false {
t.Errorf("Connection died: %v", replicationConn.CauseOfDeath())
@ -161,25 +160,8 @@ func TestSimpleReplicationConnection(t *testing.T) {
t.Fatalf("Replication connection close failed: %v", err)
}
// Let's push the boundary conditions of the standby status and ensure it errors correctly
status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4)
if err == nil {
t.Errorf("Expected error from new standby status, got %v", status)
}
// And if you provide 3 args, ensure the right fields are set
status, err = pgx.NewStandbyStatus(1, 2, 3)
if err != nil {
t.Errorf("Failed to create test status: %v", err)
}
if status.WalFlushPosition != 1 {
t.Errorf("Unexpected flush position %d", status.WalFlushPosition)
}
if status.WalApplyPosition != 2 {
t.Errorf("Unexpected apply position %d", status.WalApplyPosition)
}
if status.WalWritePosition != 3 {
t.Errorf("Unexpected write position %d", status.WalWritePosition)
if replicationConn.IsAlive() == true {
t.Errorf("Connection still alive: %v", replicationConn.CauseOfDeath())
}
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
@ -200,5 +182,148 @@ func TestSimpleReplicationConnection(t *testing.T) {
if droppedLsn != "" {
t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn)
}
}
func TestReplicationConn_DropReplicationSlot(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
// We re-create to ensure the drop worked.
err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
// And finally we drop to ensure we don't leave dirty state
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
}
func TestIdentifySystem(t *testing.T) {
replicationConn2 := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn2)
r, err := replicationConn2.IdentifySystem()
if err != nil {
t.Error(err)
}
defer r.Close()
for _, fd := range r.FieldDescriptions() {
t.Logf("Field: %s of type %v", fd.Name, fd.DataType)
}
var rowCount int
for r.Next() {
rowCount++
values, err := r.Values()
if err != nil {
t.Error(err)
}
t.Logf("Row values: %v", values)
}
if r.Err() != nil {
t.Error(r.Err())
}
if rowCount == 0 {
t.Errorf("Failed to find any rows: %d", rowCount)
}
}
func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int {
r, err := rc.IdentifySystem()
if err != nil {
t.Error(err)
}
defer r.Close()
for r.Next() {
values, e := r.Values()
if e != nil {
t.Error(e)
}
timeline, e := strconv.Atoi(values[1].(string))
if e != nil {
t.Error(e)
}
return timeline
}
t.Fatal("Failed to read timeline")
return -1
}
func TestGetTimelineHistory(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
timeline := getCurrentTimeline(t, replicationConn)
r, err := replicationConn.TimelineHistory(timeline)
if err != nil {
t.Errorf("%#v", err)
}
defer r.Close()
for _, fd := range r.FieldDescriptions() {
t.Logf("Field: %s of type %v", fd.Name, fd.DataType)
}
var rowCount int
for r.Next() {
rowCount++
values, err := r.Values()
if err != nil {
t.Error(err)
}
t.Logf("Row values: %v", values)
}
if r.Err() != nil {
if strings.Contains(r.Err().Error(), "No such file or directory") {
// This is normal, this means the timeline we're on has no
// history, which is the common case in a test db that
// has only one timeline
return
}
t.Error(r.Err())
}
// If we have a timeline history (see above) there should have been
// rows emitted
if rowCount == 0 {
t.Errorf("Failed to find any rows: %d", rowCount)
}
}
func TestStandbyStatusParsing(t *testing.T) {
// Let's push the boundary conditions of the standby status and ensure it errors correctly
status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4)
if err == nil {
t.Errorf("Expected error from new standby status, got %v", status)
}
// And if you provide 3 args, ensure the right fields are set
status, err = pgx.NewStandbyStatus(1, 2, 3)
if err != nil {
t.Errorf("Failed to create test status: %v", err)
}
if status.WalFlushPosition != 1 {
t.Errorf("Unexpected flush position %d", status.WalFlushPosition)
}
if status.WalApplyPosition != 2 {
t.Errorf("Unexpected apply position %d", status.WalApplyPosition)
}
if status.WalWritePosition != 3 {
t.Errorf("Unexpected write position %d", status.WalWritePosition)
}
}