diff --git a/README.md b/README.md index 18eed26e..8db2011f 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,6 @@ Then run the following SQL: create user pgx_md5 password 'secret'; create user " tricky, ' } "" \ test user " password 'secret'; create database pgx_test; - create user pgx_replication with replication password 'secret'; Connect to database pgx_test and run: @@ -141,26 +140,6 @@ Each different test connection type uses a different connection string in an env export PGX_TEST_MD5_PASSWORD_CONN_STRING="host=127.0.0.1 user=pgx_md5 password=secret database=pgx_test" export PGX_TEST_PLAIN_PASSWORD_CONN_STRING="host=127.0.0.1 user=pgx_pw password=secret database=pgx_test" -### Replication Test Environment - -Add a replication user: - - create user pgx_replication with replication password 'secret'; - -Add a replication line to your pg_hba.conf: - - host replication pgx_replication 127.0.0.1/32 md5 - -Change the following settings in your postgresql.conf: - - wal_level=logical - max_wal_senders=5 - max_replication_slots=5 - -Set the replication environment variable. - - export PGX_TEST_REPLICATION_CONN_STRING="host=127.0.0.1 user=pgx_replication password=secret database=pgx_test" - ## Version Policy pgx follows semantic versioning for the documented public API on stable releases. Branch `v3` is the latest stable release. `master` can contain new features or behavior that will change or be removed before being merged to the stable `v3` branch (in practice, this occurs very rarely). `v2` is the previous stable release. diff --git a/helper_test.go b/helper_test.go index 87e60792..c67cbfd4 100644 --- a/helper_test.go +++ b/helper_test.go @@ -31,14 +31,6 @@ func mustConnect(t testing.TB, config pgx.ConnConfig) *pgx.Conn { return conn } -func mustReplicationConnect(t testing.TB, config pgx.ConnConfig) *pgx.ReplicationConn { - conn, err := pgx.ReplicationConnect(&config) - if err != nil { - t.Fatalf("Unable to establish connection: %v", err) - } - return conn -} - func closeConn(t testing.TB, conn *pgx.Conn) { err := conn.Close(context.Background()) if err != nil { @@ -46,13 +38,6 @@ func closeConn(t testing.TB, conn *pgx.Conn) { } } -func closeReplicationConn(t testing.TB, conn *pgx.ReplicationConn) { - err := conn.Close() - if err != nil { - t.Fatalf("conn.Close unexpectedly failed: %v", err) - } -} - func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag) { var err error if commandTag, err = conn.Exec(context.Background(), sql, arguments...); err != nil { diff --git a/replication.go b/replication.go deleted file mode 100644 index cad9ebbb..00000000 --- a/replication.go +++ /dev/null @@ -1,445 +0,0 @@ -package pgx - -import ( - "context" - "encoding/binary" - "fmt" - "strings" - "time" - - errors "golang.org/x/xerrors" - - "github.com/jackc/pgconn" - "github.com/jackc/pgio" - "github.com/jackc/pgproto3/v2" - "github.com/jackc/pgtype" -) - -const ( - copyBothResponse = 'W' - walData = 'w' - senderKeepalive = 'k' - standbyStatusUpdate = 'r' - initialReplicationResponseTimeout = 5 * time.Second -) - -var epochNano int64 - -func init() { - epochNano = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano() -} - -// Format the given 64bit LSN value into the XXX/XXX format, -// which is the format reported by postgres. -func FormatLSN(lsn uint64) string { - return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn)) -} - -// Parse the given XXX/XXX format LSN as reported by postgres, -// into a 64 bit integer as used internally by the wire procotols -func ParseLSN(lsn string) (outputLsn uint64, err error) { - var upperHalf uint64 - var lowerHalf uint64 - var nparsed int - nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf) - if err != nil { - return - } - - if nparsed != 2 { - err = errors.New(fmt.Sprintf("Failed to parsed LSN: %s", lsn)) - return - } - - outputLsn = (upperHalf << 32) + lowerHalf - return -} - -// The WAL message contains WAL payload entry data -type WalMessage struct { - // The WAL start position of this data. This - // is the WAL position we need to track. - WalStart uint64 - // The server wal end and server time are - // documented to track the end position and current - // time of the server, both of which appear to be - // unimplemented in pg 9.5. - ServerWalEnd uint64 - ServerTime uint64 - // The WAL data is the raw unparsed binary WAL entry. - // The contents of this are determined by the output - // logical encoding plugin. - WalData []byte -} - -func (w *WalMessage) Time() time.Time { - return time.Unix(0, (int64(w.ServerTime)*1000)+epochNano) -} - -func (w *WalMessage) ByteLag() uint64 { - return (w.ServerWalEnd - w.WalStart) -} - -func (w *WalMessage) String() string { - return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLSN(w.WalStart), w.Time(), w.ByteLag()) -} - -// The server heartbeat is sent periodically from the server, -// including server status, and a reply request field -type ServerHeartbeat struct { - // The current max wal position on the server, - // used for lag tracking - ServerWalEnd uint64 - // The server time, in microseconds since jan 1 2000 - ServerTime uint64 - // If 1, the server is requesting a standby status message - // to be sent immediately. - ReplyRequested byte -} - -func (s *ServerHeartbeat) Time() time.Time { - return time.Unix(0, (int64(s.ServerTime)*1000)+epochNano) -} - -func (s *ServerHeartbeat) String() string { - return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLSN(s.ServerWalEnd), s.ReplyRequested, s.Time()) -} - -// The replication message wraps all possible messages from the -// server received during replication. At most one of the wal message -// or server heartbeat will be non-nil -type ReplicationMessage struct { - WalMessage *WalMessage - ServerHeartbeat *ServerHeartbeat -} - -// The standby status is the client side heartbeat sent to the postgresql -// server to track the client wal positions. For practical purposes, -// all wal positions are typically set to the same value. -type StandbyStatus struct { - // The WAL position that's been locally written - WalWritePosition uint64 - // The WAL position that's been locally flushed - WalFlushPosition uint64 - // The WAL position that's been locally applied - WalApplyPosition uint64 - // The client time in microseconds since jan 1 2000 - ClientTime uint64 - // If 1, requests the server to immediately send a - // server heartbeat - ReplyRequested byte -} - -// Create a standby status struct, which sets all the WAL positions -// to the given wal position, and the client time to the current time. -// The wal positions are, in order: -// WalFlushPosition -// WalApplyPosition -// WalWritePosition -// -// If only one position is provided, it will be used as the value for all 3 -// status fields. Note you must provide either 1 wal position, or all 3 -// in order to initialize the standby status. -func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error) { - if len(walPositions) == 1 { - status = new(StandbyStatus) - status.WalFlushPosition = walPositions[0] - status.WalApplyPosition = walPositions[0] - status.WalWritePosition = walPositions[0] - } else if len(walPositions) == 3 { - status = new(StandbyStatus) - status.WalFlushPosition = walPositions[0] - status.WalApplyPosition = walPositions[1] - status.WalWritePosition = walPositions[2] - } else { - err = errors.New(fmt.Sprintf("Invalid number of wal positions provided, need 1 or 3, got %d", len(walPositions))) - return - } - status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000) - return -} - -func ReplicationConnect(config *ConnConfig) (r *ReplicationConn, err error) { - if config.Config.RuntimeParams == nil { - config.Config.RuntimeParams = make(map[string]string) - } - config.Config.RuntimeParams["replication"] = "database" - - c, err := ConnectConfig(context.TODO(), config) - if err != nil { - return - } - return &ReplicationConn{c: c}, nil -} - -type ReplicationConn struct { - c *Conn -} - -// Send standby status to the server, which both acts as a keepalive -// message to the server, as well as carries the WAL position of the -// client, which then updates the server's replication slot position. -func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { - buf := rc.c.wbuf - buf = append(buf, copyData) - sp := len(buf) - buf = pgio.AppendInt32(buf, -1) - - buf = append(buf, standbyStatusUpdate) - buf = pgio.AppendInt64(buf, int64(k.WalWritePosition)) - buf = pgio.AppendInt64(buf, int64(k.WalFlushPosition)) - buf = pgio.AppendInt64(buf, int64(k.WalApplyPosition)) - buf = pgio.AppendInt64(buf, int64(k.ClientTime)) - buf = append(buf, k.ReplyRequested) - - pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - - _, err = rc.c.pgConn.Conn().Write(buf) - if err != nil { - rc.c.die(err) - } - - return -} - -func (rc *ReplicationConn) Close() error { - return rc.c.Close(context.TODO()) -} - -func (rc *ReplicationConn) IsAlive() bool { - return rc.c.IsAlive() -} - -func (rc *ReplicationConn) CauseOfDeath() error { - return rc.c.CauseOfDeath() -} - -func (rc *ReplicationConn) GetConnInfo() *pgtype.ConnInfo { - return rc.c.ConnInfo -} - -func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { - msg, err := rc.c.pgConn.ReceiveMessage() - if err != nil { - return - } - - switch msg := msg.(type) { - case *pgproto3.NoticeResponse: - pgError := rc.c.rxErrorResponse((*pgproto3.ErrorResponse)(msg)) - if rc.c.shouldLog(LogLevelInfo) { - rc.c.log(LogLevelInfo, pgError.Error(), nil) - } - case *pgproto3.ErrorResponse: - err = rc.c.rxErrorResponse(msg) - if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, err.Error(), nil) - } - return - case *pgproto3.CopyBothResponse: - // This is the tail end of the replication process start, - // and can be safely ignored - return - case *pgproto3.CopyData: - msgType := msg.Data[0] - rp := 1 - - switch msgType { - case walData: - walStart := binary.BigEndian.Uint64(msg.Data[rp:]) - rp += 8 - serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:]) - rp += 8 - serverTime := binary.BigEndian.Uint64(msg.Data[rp:]) - rp += 8 - walData := msg.Data[rp:] - walMessage := WalMessage{WalStart: walStart, - ServerWalEnd: serverWalEnd, - ServerTime: serverTime, - WalData: walData, - } - - return &ReplicationMessage{WalMessage: &walMessage}, nil - case senderKeepalive: - serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:]) - rp += 8 - serverTime := binary.BigEndian.Uint64(msg.Data[rp:]) - rp += 8 - replyNow := msg.Data[rp] - rp += 1 - h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow} - return &ReplicationMessage{ServerHeartbeat: h}, nil - default: - if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, "Unexpected data playload message type", map[string]interface{}{"type": msgType}) - } - } - default: - if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, "Unexpected replication message type", map[string]interface{}{"type": msg}) - } - } - return -} - -// Wait for a single replication message. -// -// Properly using this requires some knowledge of the postgres replication mechanisms, -// as the client can receive both WAL data (the ultimate payload) and server heartbeat -// updates. The caller also must send standby status updates in order to keep the connection -// alive and working. -// -// This returns the context error when there is no replication message before -// the context is canceled. -func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*ReplicationMessage, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - go func() { - select { - case <-ctx.Done(): - if err := rc.c.pgConn.Conn().SetDeadline(time.Now()); err != nil { - rc.Close() // Close connection if unable to set deadline - return - } - rc.c.closedChan <- ctx.Err() - case <-rc.c.doneChan: - } - }() - - r, opErr := rc.readReplicationMessage() - - var err error - select { - case err = <-rc.c.closedChan: - if err := rc.c.pgConn.Conn().SetDeadline(time.Time{}); err != nil { - rc.Close() // Close connection if unable to disable deadline - return nil, err - } - - if opErr == nil { - err = nil - } - case rc.c.doneChan <- struct{}{}: - err = opErr - } - - return r, err -} - -// Execute the "IDENTIFY_SYSTEM" command as documented here: -// https://www.postgresql.org/docs/9.5/static/protocol-replication.html -// -// This will return (if successful) a result set that has a single row -// that contains the systemid, current timeline, xlogpos and database -// name. -// -// NOTE: Because this is a replication mode connection, we don't have -// type names, so the field descriptions in the result will have only -// OIDs and no DataTypeName values -func (rc *ReplicationConn) IdentifySystem() (r Rows, err error) { - return nil, errors.New("TODO") - // return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") -} - -// Execute the "TIMELINE_HISTORY" command as documented here: -// https://www.postgresql.org/docs/9.5/static/protocol-replication.html -// -// This will return (if successful) a result set that has a single row -// that contains the filename of the history file and the content -// of the history file. If called for timeline 1, typically this will -// generate an error that the timeline history file does not exist. -// -// NOTE: Because this is a replication mode connection, we don't have -// type names, so the field descriptions in the result will have only -// OIDs and no DataTypeName values -func (rc *ReplicationConn) TimelineHistory(timeline int) (r Rows, err error) { - return nil, errors.New("TODO") - // return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) -} - -// Start a replication connection, sending WAL data to the given replication -// receiver. This function wraps a START_REPLICATION command as documented -// here: -// https://www.postgresql.org/docs/9.5/static/protocol-replication.html -// -// Once started, the client needs to invoke WaitForReplicationMessage() in order -// to fetch the WAL and standby status. Also, it is the responsibility of the caller -// to periodically send StandbyStatus messages to update the replication slot position. -// -// This function assumes that slotName has already been created. In order to omit the timeline argument -// pass a -1 for the timeline to get the server default behavior. -func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { - queryString := fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s", slotName, FormatLSN(startLsn)) - if timeline >= 0 { - timelineOption := fmt.Sprintf("TIMELINE %d", timeline) - pluginArguments = append(pluginArguments, timelineOption) - } - - if len(pluginArguments) > 0 { - queryString += fmt.Sprintf(" ( %s )", strings.Join(pluginArguments, ", ")) - } - - buf := appendQuery(rc.c.wbuf, queryString) - - _, err = rc.c.pgConn.Conn().Write(buf) - if err != nil { - rc.c.die(err) - return err - } - - ctx, cancelFn := context.WithTimeout(context.Background(), initialReplicationResponseTimeout) - defer cancelFn() - - // The first replication message that comes back here will be (in a success case) - // a empty CopyBoth that is (apparently) sent as the confirmation that the replication has - // started. This call will either return nil, nil or if it returns an error - // that indicates the start replication command failed - var r *ReplicationMessage - r, err = rc.WaitForReplicationMessage(ctx) - if err != nil && r != nil { - if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError, "Unexpected replication message", map[string]interface{}{"msg": r, "err": err}) - } - } - - return -} - -// Create the replication slot, using the given name and output plugin. -func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { - _, err = rc.c.Exec(context.TODO(), fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s NOEXPORT_SNAPSHOT", slotName, outputPlugin)) - return -} - -// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values. -func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { - var results []*pgconn.Result - results, err = rc.c.pgConn.Exec(context.TODO(), fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)).ReadAll() - if err != nil { - return "", "", err - } - - if len(results) != 1 { - return "", "", errors.Errorf("expected 1 result got: %d", len(results)) - } - - if len(results[0].Rows) != 1 { - return "", "", errors.Errorf("expected 1 row got: %d", len(results[0].Rows)) - } - - if len(results[0].Rows[0]) != 4 { - return "", "", errors.Errorf("expected 4 columns got: %d", len(results[0].Rows[0])) - } - - return string(results[0].Rows[0][1]), string(results[0].Rows[0][2]), nil -} - -// Drop the replication slot for the given name -func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) { - _, err = rc.c.Exec(context.TODO(), fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) - return -} diff --git a/replication_test.go b/replication_test.go deleted file mode 100644 index d84b390e..00000000 --- a/replication_test.go +++ /dev/null @@ -1,360 +0,0 @@ -package pgx_test - -import ( - "context" - "fmt" - "os" - "reflect" - "strconv" - "strings" - "testing" - "time" - - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" -) - -// This function uses a postgresql 9.6 specific column -func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string { - // Fetch the restart LSN of the slot, to establish a starting point - rows, err := conn.Query(context.Background(), fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot)) - if err != nil { - t.Fatalf("conn.Query failed: %v", err) - } - defer rows.Close() - - var restartLsn string - for rows.Next() { - rows.Scan(&restartLsn) - } - return restartLsn -} - -// This battleship test (at least somewhat by necessity) does -// several things all at once in a single run. It: -// - Establishes a replication connection & slot -// - Does a series of operations to create some known WAL entries -// - Replicates the entries down, and checks that the rows it -// created come down in order -// - Sends a standby status message to update the server with the -// wal position of the slot -// - Checks the wal position of the slot on the server to make sure -// the update succeeded -func TestSimpleReplicationConnection(t *testing.T) { - t.Skipf("TODO - replication needs to be revisited when v4 churn settles down. For now just skip") - - var err error - - connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING") - if connString == "" { - t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING") - } - - conn := mustConnectString(t, connString) - defer func() { - // Ensure replication slot is destroyed, but don't check for errors as it - // should have already been destroyed. - conn.Exec(context.Background(), "select pg_drop_replication_slot('pgx_test')") - closeConn(t, conn) - }() - - replicationConnConfig := mustParseConfig(t, connString) - replicationConn := mustReplicationConnect(t, replicationConnConfig) - defer closeReplicationConn(t, replicationConn) - - var cp string - var snapshot_name string - cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding") - if err != nil { - t.Fatalf("replication slot create failed: %v", err) - } - if cp == "" { - t.Logf("consistent_point is empty") - } - if snapshot_name == "" { - t.Logf("snapshot_name is empty") - } - - // Do a simple change so we can get some wal data - _, err = conn.Exec(context.Background(), "create table if not exists replication_test (a integer)") - if err != nil { - t.Fatalf("Failed to create table: %v", err) - } - - err = replicationConn.StartReplication("pgx_test", 0, -1) - if err != nil { - t.Fatalf("Failed to start replication: %v", err) - } - - var insertedTimes []int64 - currentTime := time.Now().Unix() - - for i := 0; i < 5; i++ { - var ct pgconn.CommandTag - insertedTimes = append(insertedTimes, currentTime) - ct, err = conn.Exec(context.Background(), "insert into replication_test(a) values($1)", currentTime) - if err != nil { - t.Fatalf("Insert failed: %v", err) - } - t.Logf("Inserted %d rows", ct.RowsAffected()) - currentTime++ - } - - var foundTimes []int64 - var foundCount int - var maxWal uint64 - - ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) - defer cancelFn() - - for { - var message *pgx.ReplicationMessage - - message, err = replicationConn.WaitForReplicationMessage(ctx) - if err != nil { - t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err)) - } - - if message.WalMessage != nil { - // The waldata payload with the test_decoding plugin looks like: - // public.replication_test: INSERT: a[integer]:2 - // What we wanna do here is check that once we find one of our inserted times, - // that they occur in the wal stream in the order we executed them. - walString := string(message.WalMessage.WalData) - if strings.Contains(walString, "public.replication_test: INSERT") { - stringParts := strings.Split(walString, ":") - offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64) - if err != nil { - t.Fatalf("Failed to parse walString %s", walString) - } - if foundCount > 0 || offset == insertedTimes[0] { - foundTimes = append(foundTimes, offset) - foundCount++ - } - if foundCount == len(insertedTimes) { - break - } - } - if message.WalMessage.WalStart > maxWal { - maxWal = message.WalMessage.WalStart - } - - } - if message.ServerHeartbeat != nil { - t.Logf("Got heartbeat: %s", message.ServerHeartbeat) - } - } - - for i := range insertedTimes { - if foundTimes[i] != insertedTimes[i] { - t.Fatalf("Found %d expected %d", foundTimes[i], insertedTimes[i]) - } - } - - t.Logf("Found %d times, as expected", len(foundTimes)) - - // Before closing our connection, let's send a standby status to update our wal - // position, which should then be reflected if we fetch out our current wal position - // for the slot - status, err := pgx.NewStandbyStatus(maxWal) - if err != nil { - t.Errorf("Failed to create standby status %v", err) - } - replicationConn.SendStandbyStatus(status) - - restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") - integerRestartLsn, _ := pgx.ParseLSN(restartLsn) - if integerRestartLsn != maxWal { - t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) - } - - closeReplicationConn(t, replicationConn) - - replicationConn2 := mustReplicationConnect(t, replicationConnConfig) - defer closeReplicationConn(t, replicationConn2) - - err = replicationConn2.DropReplicationSlot("pgx_test") - if err != nil { - t.Fatalf("Failed to drop replication slot: %v", err) - } - - droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") - if droppedLsn != "" { - t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn) - } -} - -func TestReplicationConn_DropReplicationSlot(t *testing.T) { - connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING") - if connString == "" { - t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING") - } - - replicationConnConfig := mustParseConfig(t, connString) - replicationConn := mustReplicationConnect(t, replicationConnConfig) - defer closeReplicationConn(t, replicationConn) - - var cp string - var snapshot_name string - cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") - if err != nil { - t.Logf("replication slot create failed: %v", err) - } - if cp == "" { - t.Logf("consistent_point is empty") - } - if snapshot_name == "" { - t.Logf("snapshot_name is empty") - } - - err = replicationConn.DropReplicationSlot("pgx_slot_test") - if err != nil { - t.Fatalf("Failed to drop replication slot: %v", err) - } - - // We re-create to ensure the drop worked. - cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") - if err != nil { - t.Logf("replication slot create failed: %v", err) - } - if cp == "" { - t.Logf("consistent_point is empty") - } - if snapshot_name == "" { - t.Logf("snapshot_name is empty") - } - - // And finally we drop to ensure we don't leave dirty state - err = replicationConn.DropReplicationSlot("pgx_slot_test") - if err != nil { - t.Fatalf("Failed to drop replication slot: %v", err) - } -} - -func TestIdentifySystem(t *testing.T) { - t.Skipf("TODO") - connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING") - if connString == "" { - t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING") - } - - replicationConnConfig := mustParseConfig(t, connString) - replicationConn2 := mustReplicationConnect(t, replicationConnConfig) - defer closeReplicationConn(t, replicationConn2) - - r, err := replicationConn2.IdentifySystem() - if err != nil { - t.Error(err) - } - defer r.Close() - for _, fd := range r.FieldDescriptions() { - t.Logf("Field: %s of type %v", fd.Name, fd.DataTypeOID) - } - - var rowCount int - for r.Next() { - rowCount++ - values, err := r.Values() - if err != nil { - t.Error(err) - } - t.Logf("Row values: %v", values) - } - if r.Err() != nil { - t.Error(r.Err()) - } - - if rowCount == 0 { - t.Errorf("Failed to find any rows: %d", rowCount) - } -} - -func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { - r, err := rc.IdentifySystem() - if err != nil { - t.Error(err) - } - defer r.Close() - for r.Next() { - values, e := r.Values() - if e != nil { - t.Error(e) - } - return int(values[1].(int32)) - } - t.Fatal("Failed to read timeline") - return -1 -} - -func TestGetTimelineHistory(t *testing.T) { - t.Skipf("TODO") - - connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING") - if connString == "" { - t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING") - } - - replicationConnConfig := mustParseConfig(t, connString) - replicationConn := mustReplicationConnect(t, replicationConnConfig) - defer closeReplicationConn(t, replicationConn) - - timeline := getCurrentTimeline(t, replicationConn) - - r, err := replicationConn.TimelineHistory(timeline) - if err != nil { - t.Errorf("%#v", err) - } - defer r.Close() - - for _, fd := range r.FieldDescriptions() { - t.Logf("Field: %s of type %v", fd.Name, fd.DataTypeOID) - } - - var rowCount int - for r.Next() { - rowCount++ - values, err := r.Values() - if err != nil { - t.Error(err) - } - t.Logf("Row values: %v", values) - } - if r.Err() != nil { - if strings.Contains(r.Err().Error(), "No such file or directory") { - // This is normal, this means the timeline we're on has no - // history, which is the common case in a test db that - // has only one timeline - return - } - t.Error(r.Err()) - } - - // If we have a timeline history (see above) there should have been - // rows emitted - if rowCount == 0 { - t.Errorf("Failed to find any rows: %d", rowCount) - } -} - -func TestStandbyStatusParsing(t *testing.T) { - // Let's push the boundary conditions of the standby status and ensure it errors correctly - status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4) - if err == nil { - t.Errorf("Expected error from new standby status, got %v", status) - } - - // And if you provide 3 args, ensure the right fields are set - status, err = pgx.NewStandbyStatus(1, 2, 3) - if err != nil { - t.Errorf("Failed to create test status: %v", err) - } - if status.WalFlushPosition != 1 { - t.Errorf("Unexpected flush position %d", status.WalFlushPosition) - } - if status.WalApplyPosition != 2 { - t.Errorf("Unexpected apply position %d", status.WalApplyPosition) - } - if status.WalWritePosition != 3 { - t.Errorf("Unexpected write position %d", status.WalWritePosition) - } -}