Reverted breaking API change to CreateReplicationSlot. Instead, the new version that returns the consistent_point and snapshot_name values is

called CreateReplicationSlotEx().
pull/328/head
Mark Fletcher 2017-09-21 09:58:20 -07:00
parent a1e4b1b9b5
commit fd93b83433
2 changed files with 33 additions and 4 deletions

View File

@ -435,7 +435,13 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti
}
// Create the replication slot, using the given name and output plugin.
func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) {
func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) {
_, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))
return
}
// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values.
func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) {
var dummy string
var rows *Rows
rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))

View File

@ -56,10 +56,18 @@ func TestSimpleReplicationConnection(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
_, _, err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
var cp string
var snapshot_name string
cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding")
if err != nil {
t.Fatalf("replication slot create failed: %v", err)
}
if cp == "" {
t.Logf("consistent_point is empty")
}
if snapshot_name == "" {
t.Logf("snapshot_name is empty")
}
// Do a simple change so we can get some wal data
_, err = conn.Exec("create table if not exists replication_test (a integer)")
@ -178,20 +186,35 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) {
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
defer closeReplicationConn(t, replicationConn)
_, _, err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
var cp string
var snapshot_name string
cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
if cp == "" {
t.Logf("consistent_point is empty")
}
if snapshot_name == "" {
t.Logf("snapshot_name is empty")
}
err = replicationConn.DropReplicationSlot("pgx_slot_test")
if err != nil {
t.Fatalf("Failed to drop replication slot: %v", err)
}
// We re-create to ensure the drop worked.
_, _, err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding")
if err != nil {
t.Logf("replication slot create failed: %v", err)
}
if cp == "" {
t.Logf("consistent_point is empty")
}
if snapshot_name == "" {
t.Logf("snapshot_name is empty")
}
// And finally we drop to ensure we don't leave dirty state
err = replicationConn.DropReplicationSlot("pgx_slot_test")