diff --git a/replication.go b/replication.go index 8923d533..7b28d6b6 100644 --- a/replication.go +++ b/replication.go @@ -190,20 +190,6 @@ func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { return } -// Send the message to formally stop the replication stream. This -// is done before calling Close() during a clean shutdown. -func (rc *ReplicationConn) StopReplication() (err error) { - writeBuf := newWriteBuf(rc.c, copyDone) - - writeBuf.closeMsg() - - _, err = rc.c.conn.Write(writeBuf.buf) - if err != nil { - rc.c.die(err) - } - return -} - func (rc *ReplicationConn) Close() error { return rc.c.Close() } @@ -320,6 +306,74 @@ func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r * return rc.readReplicationMessage() } +func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { + rc.c.lastActivityTime = time.Now() + + rows := rc.c.getRows(sql, nil) + + if err := rc.c.lock(); err != nil { + rows.abort(err) + return rows, err + } + rows.unlockConn = true + + err := rc.c.sendSimpleQuery(sql) + if err != nil { + rows.abort(err) + } + + var t byte + var r *msgReader + t, r, err = rc.c.rxMsg() + if err != nil { + return nil, err + } + + switch t { + case rowDescription: + rows.fields = rc.c.rxRowDescription(r) + // We don't have c.PgTypes here because we're a replication + // connection. This means the field descriptions will have + // only Oids. Not much we can do about this. + default: + if e := rc.c.processContextFreeMsg(t, r); e != nil { + rows.abort(e) + return rows, e + } + } + + return rows, rows.err +} + +// Execute the "IDENTIFY_SYSTEM" command as documented here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// This will return (if successful) a result set that has a single row +// that contains the systemid, current timeline, xlogpos and database +// name. +// +// NOTE: Because this is a replication mode connection, we don't have +// type names, so the field descriptions in the result will have only +// Oids and no DataTypeName values +func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { + return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") +} + +// Execute the "TIMELINE_HISTORY" command as documented here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// This will return (if successful) a result set that has a single row +// that contains the filename of the history file and the content +// of the history file. If called for timeline 1, typically this will +// generate an error that the timeline history file does not exist. +// +// NOTE: Because this is a replication mode connection, we don't have +// type names, so the field descriptions in the result will have only +// Oids and no DataTypeName values +func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) { + return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) +} + // Start a replication connection, sending WAL data to the given replication // receiver. This function wraps a START_REPLICATION command as documented // here: diff --git a/replication_test.go b/replication_test.go index ee187ec2..73874a1f 100644 --- a/replication_test.go +++ b/replication_test.go @@ -150,7 +150,6 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Failed to create standby status %v", err) } replicationConn.SendStandbyStatus(status) - replicationConn.StopReplication() if replicationConn.IsAlive() == false { t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) @@ -161,25 +160,8 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Fatalf("Replication connection close failed: %v", err) } - // Let's push the boundary conditions of the standby status and ensure it errors correctly - status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4) - if err == nil { - t.Errorf("Expected error from new standby status, got %v", status) - } - - // And if you provide 3 args, ensure the right fields are set - status, err = pgx.NewStandbyStatus(1, 2, 3) - if err != nil { - t.Errorf("Failed to create test status: %v", err) - } - if status.WalFlushPosition != 1 { - t.Errorf("Unexpected flush position %d", status.WalFlushPosition) - } - if status.WalApplyPosition != 2 { - t.Errorf("Unexpected apply position %d", status.WalApplyPosition) - } - if status.WalWritePosition != 3 { - t.Errorf("Unexpected write position %d", status.WalWritePosition) + if replicationConn.IsAlive() == true { + t.Errorf("Connection still alive: %v", replicationConn.CauseOfDeath()) } restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") @@ -200,5 +182,148 @@ func TestSimpleReplicationConnection(t *testing.T) { if droppedLsn != "" { t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn) } - } + +func TestReplicationConn_DropReplicationSlot(t *testing.T) { + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) + + err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + err = replicationConn.DropReplicationSlot("pgx_slot_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } + + // We re-create to ensure the drop worked. + err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + + // And finally we drop to ensure we don't leave dirty state + err = replicationConn.DropReplicationSlot("pgx_slot_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } +} + +func TestIdentifySystem(t *testing.T) { + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn2) + + r, err := replicationConn2.IdentifySystem() + if err != nil { + t.Error(err) + } + defer r.Close() + for _, fd := range r.FieldDescriptions() { + t.Logf("Field: %s of type %v", fd.Name, fd.DataType) + } + + var rowCount int + for r.Next() { + rowCount++ + values, err := r.Values() + if err != nil { + t.Error(err) + } + t.Logf("Row values: %v", values) + } + if r.Err() != nil { + t.Error(r.Err()) + } + + if rowCount == 0 { + t.Errorf("Failed to find any rows: %d", rowCount) + } +} + +func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { + r, err := rc.IdentifySystem() + if err != nil { + t.Error(err) + } + defer r.Close() + for r.Next() { + values, e := r.Values() + if e != nil { + t.Error(e) + } + timeline, e := strconv.Atoi(values[1].(string)) + if e != nil { + t.Error(e) + } + return timeline + } + t.Fatal("Failed to read timeline") + return -1 +} + + +func TestGetTimelineHistory(t *testing.T) { + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) + + timeline := getCurrentTimeline(t, replicationConn) + + r, err := replicationConn.TimelineHistory(timeline) + if err != nil { + t.Errorf("%#v", err) + } + defer r.Close() + + for _, fd := range r.FieldDescriptions() { + t.Logf("Field: %s of type %v", fd.Name, fd.DataType) + } + + var rowCount int + for r.Next() { + rowCount++ + values, err := r.Values() + if err != nil { + t.Error(err) + } + t.Logf("Row values: %v", values) + } + if r.Err() != nil { + if strings.Contains(r.Err().Error(), "No such file or directory") { + // This is normal, this means the timeline we're on has no + // history, which is the common case in a test db that + // has only one timeline + return + } + t.Error(r.Err()) + } + + // If we have a timeline history (see above) there should have been + // rows emitted + if rowCount == 0 { + t.Errorf("Failed to find any rows: %d", rowCount) + } +} + +func TestStandbyStatusParsing(t *testing.T) { + // Let's push the boundary conditions of the standby status and ensure it errors correctly + status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4) + if err == nil { + t.Errorf("Expected error from new standby status, got %v", status) + } + + // And if you provide 3 args, ensure the right fields are set + status, err = pgx.NewStandbyStatus(1, 2, 3) + if err != nil { + t.Errorf("Failed to create test status: %v", err) + } + if status.WalFlushPosition != 1 { + t.Errorf("Unexpected flush position %d", status.WalFlushPosition) + } + if status.WalApplyPosition != 2 { + t.Errorf("Unexpected apply position %d", status.WalApplyPosition) + } + if status.WalWritePosition != 3 { + t.Errorf("Unexpected write position %d", status.WalWritePosition) + } +} \ No newline at end of file