diff --git a/replication.go b/replication.go index b1329c6b..7dd5efe4 100644 --- a/replication.go +++ b/replication.go @@ -435,7 +435,13 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti } // Create the replication slot, using the given name and output plugin. -func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { +func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { + _, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + return +} + +// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values. +func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { var dummy string var rows *Rows rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) diff --git a/replication_test.go b/replication_test.go index 9989c982..d06d73cd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -56,10 +56,18 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - _, _, err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") + var cp string + var snapshot_name string + cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding") if err != nil { t.Fatalf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } // Do a simple change so we can get some wal data _, err = conn.Exec("create table if not exists replication_test (a integer)") @@ -178,20 +186,35 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - _, _, err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + var cp string + var snapshot_name string + cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } + err = replicationConn.DropReplicationSlot("pgx_slot_test") if err != nil { t.Fatalf("Failed to drop replication slot: %v", err) } // We re-create to ensure the drop worked. - _, _, err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } // And finally we drop to ensure we don't leave dirty state err = replicationConn.DropReplicationSlot("pgx_slot_test")