diff --git a/replication.go b/replication.go index bfa81e54..7dd5efe4 100644 --- a/replication.go +++ b/replication.go @@ -440,6 +440,18 @@ func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) return } +// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values. +func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { + var dummy string + var rows *Rows + rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + defer rows.Close() + for rows.Next() { + rows.Scan(&dummy, &consistentPoint, &snapshotName, &dummy) + } + return +} + // Drop the replication slot for the given name func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) { _, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) diff --git a/replication_test.go b/replication_test.go index d75233c1..d06d73cd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -56,10 +56,18 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") + var cp string + var snapshot_name string + cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding") if err != nil { t.Fatalf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } // Do a simple change so we can get some wal data _, err = conn.Exec("create table if not exists replication_test (a integer)") @@ -178,20 +186,35 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + var cp string + var snapshot_name string + cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } + err = replicationConn.DropReplicationSlot("pgx_slot_test") if err != nil { t.Fatalf("Failed to drop replication slot: %v", err) } // We re-create to ensure the drop worked. - err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } // And finally we drop to ensure we don't leave dirty state err = replicationConn.DropReplicationSlot("pgx_slot_test")