mirror of https://github.com/jackc/pgx.git
Merge branch 'wingedpig-master'
* wingedpig-master: Reverted breaking API change to CreateReplicationSlot. Instead, the new version that returns the consistent_point and snapshot_name values is called CreateReplicationSlotEx(). Changed CreateReplicationSlot to return the consistent_point and snapshot_name.pull/333/head^2
commit
015b56a04f
|
@ -440,6 +440,18 @@ func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string)
|
|||
return
|
||||
}
|
||||
|
||||
// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values.
|
||||
func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) {
|
||||
var dummy string
|
||||
var rows *Rows
|
||||
rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin))
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
rows.Scan(&dummy, &consistentPoint, &snapshotName, &dummy)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Drop the replication slot for the given name
|
||||
func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) {
|
||||
_, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName))
|
||||
|
|
|
@ -56,10 +56,18 @@ func TestSimpleReplicationConnection(t *testing.T) {
|
|||
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
|
||||
defer closeReplicationConn(t, replicationConn)
|
||||
|
||||
err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
|
||||
var cp string
|
||||
var snapshot_name string
|
||||
cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding")
|
||||
if err != nil {
|
||||
t.Fatalf("replication slot create failed: %v", err)
|
||||
}
|
||||
if cp == "" {
|
||||
t.Logf("consistent_point is empty")
|
||||
}
|
||||
if snapshot_name == "" {
|
||||
t.Logf("snapshot_name is empty")
|
||||
}
|
||||
|
||||
// Do a simple change so we can get some wal data
|
||||
_, err = conn.Exec("create table if not exists replication_test (a integer)")
|
||||
|
@ -178,20 +186,35 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) {
|
|||
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
|
||||
defer closeReplicationConn(t, replicationConn)
|
||||
|
||||
err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
|
||||
var cp string
|
||||
var snapshot_name string
|
||||
cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding")
|
||||
if err != nil {
|
||||
t.Logf("replication slot create failed: %v", err)
|
||||
}
|
||||
if cp == "" {
|
||||
t.Logf("consistent_point is empty")
|
||||
}
|
||||
if snapshot_name == "" {
|
||||
t.Logf("snapshot_name is empty")
|
||||
}
|
||||
|
||||
err = replicationConn.DropReplicationSlot("pgx_slot_test")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to drop replication slot: %v", err)
|
||||
}
|
||||
|
||||
// We re-create to ensure the drop worked.
|
||||
err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding")
|
||||
cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding")
|
||||
if err != nil {
|
||||
t.Logf("replication slot create failed: %v", err)
|
||||
}
|
||||
if cp == "" {
|
||||
t.Logf("consistent_point is empty")
|
||||
}
|
||||
if snapshot_name == "" {
|
||||
t.Logf("snapshot_name is empty")
|
||||
}
|
||||
|
||||
// And finally we drop to ensure we don't leave dirty state
|
||||
err = replicationConn.DropReplicationSlot("pgx_slot_test")
|
||||
|
|
Loading…
Reference in New Issue