mirror of https://github.com/jackc/pgx.git
Extract replication to pgxrepl package
parent
dd8c63c839
commit
159d82e772
21
README.md
21
README.md
|
@ -97,7 +97,6 @@ Then run the following SQL:
|
||||||
create user pgx_md5 password 'secret';
|
create user pgx_md5 password 'secret';
|
||||||
create user " tricky, ' } "" \ test user " password 'secret';
|
create user " tricky, ' } "" \ test user " password 'secret';
|
||||||
create database pgx_test;
|
create database pgx_test;
|
||||||
create user pgx_replication with replication password 'secret';
|
|
||||||
|
|
||||||
Connect to database pgx_test and run:
|
Connect to database pgx_test and run:
|
||||||
|
|
||||||
|
@ -141,26 +140,6 @@ Each different test connection type uses a different connection string in an env
|
||||||
export PGX_TEST_MD5_PASSWORD_CONN_STRING="host=127.0.0.1 user=pgx_md5 password=secret database=pgx_test"
|
export PGX_TEST_MD5_PASSWORD_CONN_STRING="host=127.0.0.1 user=pgx_md5 password=secret database=pgx_test"
|
||||||
export PGX_TEST_PLAIN_PASSWORD_CONN_STRING="host=127.0.0.1 user=pgx_pw password=secret database=pgx_test"
|
export PGX_TEST_PLAIN_PASSWORD_CONN_STRING="host=127.0.0.1 user=pgx_pw password=secret database=pgx_test"
|
||||||
|
|
||||||
### Replication Test Environment
|
|
||||||
|
|
||||||
Add a replication user:
|
|
||||||
|
|
||||||
create user pgx_replication with replication password 'secret';
|
|
||||||
|
|
||||||
Add a replication line to your pg_hba.conf:
|
|
||||||
|
|
||||||
host replication pgx_replication 127.0.0.1/32 md5
|
|
||||||
|
|
||||||
Change the following settings in your postgresql.conf:
|
|
||||||
|
|
||||||
wal_level=logical
|
|
||||||
max_wal_senders=5
|
|
||||||
max_replication_slots=5
|
|
||||||
|
|
||||||
Set the replication environment variable.
|
|
||||||
|
|
||||||
export PGX_TEST_REPLICATION_CONN_STRING="host=127.0.0.1 user=pgx_replication password=secret database=pgx_test"
|
|
||||||
|
|
||||||
## Version Policy
|
## Version Policy
|
||||||
|
|
||||||
pgx follows semantic versioning for the documented public API on stable releases. Branch `v3` is the latest stable release. `master` can contain new features or behavior that will change or be removed before being merged to the stable `v3` branch (in practice, this occurs very rarely). `v2` is the previous stable release.
|
pgx follows semantic versioning for the documented public API on stable releases. Branch `v3` is the latest stable release. `master` can contain new features or behavior that will change or be removed before being merged to the stable `v3` branch (in practice, this occurs very rarely). `v2` is the previous stable release.
|
||||||
|
|
|
@ -31,14 +31,6 @@ func mustConnect(t testing.TB, config pgx.ConnConfig) *pgx.Conn {
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustReplicationConnect(t testing.TB, config pgx.ConnConfig) *pgx.ReplicationConn {
|
|
||||||
conn, err := pgx.ReplicationConnect(&config)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unable to establish connection: %v", err)
|
|
||||||
}
|
|
||||||
return conn
|
|
||||||
}
|
|
||||||
|
|
||||||
func closeConn(t testing.TB, conn *pgx.Conn) {
|
func closeConn(t testing.TB, conn *pgx.Conn) {
|
||||||
err := conn.Close(context.Background())
|
err := conn.Close(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -46,13 +38,6 @@ func closeConn(t testing.TB, conn *pgx.Conn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeReplicationConn(t testing.TB, conn *pgx.ReplicationConn) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("conn.Close unexpectedly failed: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag) {
|
func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag) {
|
||||||
var err error
|
var err error
|
||||||
if commandTag, err = conn.Exec(context.Background(), sql, arguments...); err != nil {
|
if commandTag, err = conn.Exec(context.Background(), sql, arguments...); err != nil {
|
||||||
|
|
445
replication.go
445
replication.go
|
@ -1,445 +0,0 @@
|
||||||
package pgx
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
errors "golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/jackc/pgconn"
|
|
||||||
"github.com/jackc/pgio"
|
|
||||||
"github.com/jackc/pgproto3/v2"
|
|
||||||
"github.com/jackc/pgtype"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
copyBothResponse = 'W'
|
|
||||||
walData = 'w'
|
|
||||||
senderKeepalive = 'k'
|
|
||||||
standbyStatusUpdate = 'r'
|
|
||||||
initialReplicationResponseTimeout = 5 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
var epochNano int64
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
epochNano = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Format the given 64bit LSN value into the XXX/XXX format,
|
|
||||||
// which is the format reported by postgres.
|
|
||||||
func FormatLSN(lsn uint64) string {
|
|
||||||
return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse the given XXX/XXX format LSN as reported by postgres,
|
|
||||||
// into a 64 bit integer as used internally by the wire procotols
|
|
||||||
func ParseLSN(lsn string) (outputLsn uint64, err error) {
|
|
||||||
var upperHalf uint64
|
|
||||||
var lowerHalf uint64
|
|
||||||
var nparsed int
|
|
||||||
nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if nparsed != 2 {
|
|
||||||
err = errors.New(fmt.Sprintf("Failed to parsed LSN: %s", lsn))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
outputLsn = (upperHalf << 32) + lowerHalf
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// The WAL message contains WAL payload entry data
|
|
||||||
type WalMessage struct {
|
|
||||||
// The WAL start position of this data. This
|
|
||||||
// is the WAL position we need to track.
|
|
||||||
WalStart uint64
|
|
||||||
// The server wal end and server time are
|
|
||||||
// documented to track the end position and current
|
|
||||||
// time of the server, both of which appear to be
|
|
||||||
// unimplemented in pg 9.5.
|
|
||||||
ServerWalEnd uint64
|
|
||||||
ServerTime uint64
|
|
||||||
// The WAL data is the raw unparsed binary WAL entry.
|
|
||||||
// The contents of this are determined by the output
|
|
||||||
// logical encoding plugin.
|
|
||||||
WalData []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *WalMessage) Time() time.Time {
|
|
||||||
return time.Unix(0, (int64(w.ServerTime)*1000)+epochNano)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *WalMessage) ByteLag() uint64 {
|
|
||||||
return (w.ServerWalEnd - w.WalStart)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *WalMessage) String() string {
|
|
||||||
return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLSN(w.WalStart), w.Time(), w.ByteLag())
|
|
||||||
}
|
|
||||||
|
|
||||||
// The server heartbeat is sent periodically from the server,
|
|
||||||
// including server status, and a reply request field
|
|
||||||
type ServerHeartbeat struct {
|
|
||||||
// The current max wal position on the server,
|
|
||||||
// used for lag tracking
|
|
||||||
ServerWalEnd uint64
|
|
||||||
// The server time, in microseconds since jan 1 2000
|
|
||||||
ServerTime uint64
|
|
||||||
// If 1, the server is requesting a standby status message
|
|
||||||
// to be sent immediately.
|
|
||||||
ReplyRequested byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ServerHeartbeat) Time() time.Time {
|
|
||||||
return time.Unix(0, (int64(s.ServerTime)*1000)+epochNano)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ServerHeartbeat) String() string {
|
|
||||||
return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLSN(s.ServerWalEnd), s.ReplyRequested, s.Time())
|
|
||||||
}
|
|
||||||
|
|
||||||
// The replication message wraps all possible messages from the
|
|
||||||
// server received during replication. At most one of the wal message
|
|
||||||
// or server heartbeat will be non-nil
|
|
||||||
type ReplicationMessage struct {
|
|
||||||
WalMessage *WalMessage
|
|
||||||
ServerHeartbeat *ServerHeartbeat
|
|
||||||
}
|
|
||||||
|
|
||||||
// The standby status is the client side heartbeat sent to the postgresql
|
|
||||||
// server to track the client wal positions. For practical purposes,
|
|
||||||
// all wal positions are typically set to the same value.
|
|
||||||
type StandbyStatus struct {
|
|
||||||
// The WAL position that's been locally written
|
|
||||||
WalWritePosition uint64
|
|
||||||
// The WAL position that's been locally flushed
|
|
||||||
WalFlushPosition uint64
|
|
||||||
// The WAL position that's been locally applied
|
|
||||||
WalApplyPosition uint64
|
|
||||||
// The client time in microseconds since jan 1 2000
|
|
||||||
ClientTime uint64
|
|
||||||
// If 1, requests the server to immediately send a
|
|
||||||
// server heartbeat
|
|
||||||
ReplyRequested byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a standby status struct, which sets all the WAL positions
|
|
||||||
// to the given wal position, and the client time to the current time.
|
|
||||||
// The wal positions are, in order:
|
|
||||||
// WalFlushPosition
|
|
||||||
// WalApplyPosition
|
|
||||||
// WalWritePosition
|
|
||||||
//
|
|
||||||
// If only one position is provided, it will be used as the value for all 3
|
|
||||||
// status fields. Note you must provide either 1 wal position, or all 3
|
|
||||||
// in order to initialize the standby status.
|
|
||||||
func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error) {
|
|
||||||
if len(walPositions) == 1 {
|
|
||||||
status = new(StandbyStatus)
|
|
||||||
status.WalFlushPosition = walPositions[0]
|
|
||||||
status.WalApplyPosition = walPositions[0]
|
|
||||||
status.WalWritePosition = walPositions[0]
|
|
||||||
} else if len(walPositions) == 3 {
|
|
||||||
status = new(StandbyStatus)
|
|
||||||
status.WalFlushPosition = walPositions[0]
|
|
||||||
status.WalApplyPosition = walPositions[1]
|
|
||||||
status.WalWritePosition = walPositions[2]
|
|
||||||
} else {
|
|
||||||
err = errors.New(fmt.Sprintf("Invalid number of wal positions provided, need 1 or 3, got %d", len(walPositions)))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func ReplicationConnect(config *ConnConfig) (r *ReplicationConn, err error) {
|
|
||||||
if config.Config.RuntimeParams == nil {
|
|
||||||
config.Config.RuntimeParams = make(map[string]string)
|
|
||||||
}
|
|
||||||
config.Config.RuntimeParams["replication"] = "database"
|
|
||||||
|
|
||||||
c, err := ConnectConfig(context.TODO(), config)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return &ReplicationConn{c: c}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type ReplicationConn struct {
|
|
||||||
c *Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send standby status to the server, which both acts as a keepalive
|
|
||||||
// message to the server, as well as carries the WAL position of the
|
|
||||||
// client, which then updates the server's replication slot position.
|
|
||||||
func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
|
||||||
buf := rc.c.wbuf
|
|
||||||
buf = append(buf, copyData)
|
|
||||||
sp := len(buf)
|
|
||||||
buf = pgio.AppendInt32(buf, -1)
|
|
||||||
|
|
||||||
buf = append(buf, standbyStatusUpdate)
|
|
||||||
buf = pgio.AppendInt64(buf, int64(k.WalWritePosition))
|
|
||||||
buf = pgio.AppendInt64(buf, int64(k.WalFlushPosition))
|
|
||||||
buf = pgio.AppendInt64(buf, int64(k.WalApplyPosition))
|
|
||||||
buf = pgio.AppendInt64(buf, int64(k.ClientTime))
|
|
||||||
buf = append(buf, k.ReplyRequested)
|
|
||||||
|
|
||||||
pgio.SetInt32(buf[sp:], int32(len(buf[sp:])))
|
|
||||||
|
|
||||||
_, err = rc.c.pgConn.Conn().Write(buf)
|
|
||||||
if err != nil {
|
|
||||||
rc.c.die(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc *ReplicationConn) Close() error {
|
|
||||||
return rc.c.Close(context.TODO())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc *ReplicationConn) IsAlive() bool {
|
|
||||||
return rc.c.IsAlive()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc *ReplicationConn) CauseOfDeath() error {
|
|
||||||
return rc.c.CauseOfDeath()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc *ReplicationConn) GetConnInfo() *pgtype.ConnInfo {
|
|
||||||
return rc.c.ConnInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
|
||||||
msg, err := rc.c.pgConn.ReceiveMessage()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
switch msg := msg.(type) {
|
|
||||||
case *pgproto3.NoticeResponse:
|
|
||||||
pgError := rc.c.rxErrorResponse((*pgproto3.ErrorResponse)(msg))
|
|
||||||
if rc.c.shouldLog(LogLevelInfo) {
|
|
||||||
rc.c.log(LogLevelInfo, pgError.Error(), nil)
|
|
||||||
}
|
|
||||||
case *pgproto3.ErrorResponse:
|
|
||||||
err = rc.c.rxErrorResponse(msg)
|
|
||||||
if rc.c.shouldLog(LogLevelError) {
|
|
||||||
rc.c.log(LogLevelError, err.Error(), nil)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case *pgproto3.CopyBothResponse:
|
|
||||||
// This is the tail end of the replication process start,
|
|
||||||
// and can be safely ignored
|
|
||||||
return
|
|
||||||
case *pgproto3.CopyData:
|
|
||||||
msgType := msg.Data[0]
|
|
||||||
rp := 1
|
|
||||||
|
|
||||||
switch msgType {
|
|
||||||
case walData:
|
|
||||||
walStart := binary.BigEndian.Uint64(msg.Data[rp:])
|
|
||||||
rp += 8
|
|
||||||
serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:])
|
|
||||||
rp += 8
|
|
||||||
serverTime := binary.BigEndian.Uint64(msg.Data[rp:])
|
|
||||||
rp += 8
|
|
||||||
walData := msg.Data[rp:]
|
|
||||||
walMessage := WalMessage{WalStart: walStart,
|
|
||||||
ServerWalEnd: serverWalEnd,
|
|
||||||
ServerTime: serverTime,
|
|
||||||
WalData: walData,
|
|
||||||
}
|
|
||||||
|
|
||||||
return &ReplicationMessage{WalMessage: &walMessage}, nil
|
|
||||||
case senderKeepalive:
|
|
||||||
serverWalEnd := binary.BigEndian.Uint64(msg.Data[rp:])
|
|
||||||
rp += 8
|
|
||||||
serverTime := binary.BigEndian.Uint64(msg.Data[rp:])
|
|
||||||
rp += 8
|
|
||||||
replyNow := msg.Data[rp]
|
|
||||||
rp += 1
|
|
||||||
h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow}
|
|
||||||
return &ReplicationMessage{ServerHeartbeat: h}, nil
|
|
||||||
default:
|
|
||||||
if rc.c.shouldLog(LogLevelError) {
|
|
||||||
rc.c.log(LogLevelError, "Unexpected data playload message type", map[string]interface{}{"type": msgType})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
if rc.c.shouldLog(LogLevelError) {
|
|
||||||
rc.c.log(LogLevelError, "Unexpected replication message type", map[string]interface{}{"type": msg})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for a single replication message.
|
|
||||||
//
|
|
||||||
// Properly using this requires some knowledge of the postgres replication mechanisms,
|
|
||||||
// as the client can receive both WAL data (the ultimate payload) and server heartbeat
|
|
||||||
// updates. The caller also must send standby status updates in order to keep the connection
|
|
||||||
// alive and working.
|
|
||||||
//
|
|
||||||
// This returns the context error when there is no replication message before
|
|
||||||
// the context is canceled.
|
|
||||||
func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*ReplicationMessage, error) {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
if err := rc.c.pgConn.Conn().SetDeadline(time.Now()); err != nil {
|
|
||||||
rc.Close() // Close connection if unable to set deadline
|
|
||||||
return
|
|
||||||
}
|
|
||||||
rc.c.closedChan <- ctx.Err()
|
|
||||||
case <-rc.c.doneChan:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
r, opErr := rc.readReplicationMessage()
|
|
||||||
|
|
||||||
var err error
|
|
||||||
select {
|
|
||||||
case err = <-rc.c.closedChan:
|
|
||||||
if err := rc.c.pgConn.Conn().SetDeadline(time.Time{}); err != nil {
|
|
||||||
rc.Close() // Close connection if unable to disable deadline
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if opErr == nil {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
case rc.c.doneChan <- struct{}{}:
|
|
||||||
err = opErr
|
|
||||||
}
|
|
||||||
|
|
||||||
return r, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute the "IDENTIFY_SYSTEM" command as documented here:
|
|
||||||
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
|
|
||||||
//
|
|
||||||
// This will return (if successful) a result set that has a single row
|
|
||||||
// that contains the systemid, current timeline, xlogpos and database
|
|
||||||
// name.
|
|
||||||
//
|
|
||||||
// NOTE: Because this is a replication mode connection, we don't have
|
|
||||||
// type names, so the field descriptions in the result will have only
|
|
||||||
// OIDs and no DataTypeName values
|
|
||||||
func (rc *ReplicationConn) IdentifySystem() (r Rows, err error) {
|
|
||||||
return nil, errors.New("TODO")
|
|
||||||
// return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute the "TIMELINE_HISTORY" command as documented here:
|
|
||||||
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
|
|
||||||
//
|
|
||||||
// This will return (if successful) a result set that has a single row
|
|
||||||
// that contains the filename of the history file and the content
|
|
||||||
// of the history file. If called for timeline 1, typically this will
|
|
||||||
// generate an error that the timeline history file does not exist.
|
|
||||||
//
|
|
||||||
// NOTE: Because this is a replication mode connection, we don't have
|
|
||||||
// type names, so the field descriptions in the result will have only
|
|
||||||
// OIDs and no DataTypeName values
|
|
||||||
func (rc *ReplicationConn) TimelineHistory(timeline int) (r Rows, err error) {
|
|
||||||
return nil, errors.New("TODO")
|
|
||||||
// return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start a replication connection, sending WAL data to the given replication
|
|
||||||
// receiver. This function wraps a START_REPLICATION command as documented
|
|
||||||
// here:
|
|
||||||
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
|
|
||||||
//
|
|
||||||
// Once started, the client needs to invoke WaitForReplicationMessage() in order
|
|
||||||
// to fetch the WAL and standby status. Also, it is the responsibility of the caller
|
|
||||||
// to periodically send StandbyStatus messages to update the replication slot position.
|
|
||||||
//
|
|
||||||
// This function assumes that slotName has already been created. In order to omit the timeline argument
|
|
||||||
// pass a -1 for the timeline to get the server default behavior.
|
|
||||||
func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
|
|
||||||
queryString := fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s", slotName, FormatLSN(startLsn))
|
|
||||||
if timeline >= 0 {
|
|
||||||
timelineOption := fmt.Sprintf("TIMELINE %d", timeline)
|
|
||||||
pluginArguments = append(pluginArguments, timelineOption)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(pluginArguments) > 0 {
|
|
||||||
queryString += fmt.Sprintf(" ( %s )", strings.Join(pluginArguments, ", "))
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := appendQuery(rc.c.wbuf, queryString)
|
|
||||||
|
|
||||||
_, err = rc.c.pgConn.Conn().Write(buf)
|
|
||||||
if err != nil {
|
|
||||||
rc.c.die(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancelFn := context.WithTimeout(context.Background(), initialReplicationResponseTimeout)
|
|
||||||
defer cancelFn()
|
|
||||||
|
|
||||||
// The first replication message that comes back here will be (in a success case)
|
|
||||||
// a empty CopyBoth that is (apparently) sent as the confirmation that the replication has
|
|
||||||
// started. This call will either return nil, nil or if it returns an error
|
|
||||||
// that indicates the start replication command failed
|
|
||||||
var r *ReplicationMessage
|
|
||||||
r, err = rc.WaitForReplicationMessage(ctx)
|
|
||||||
if err != nil && r != nil {
|
|
||||||
if rc.c.shouldLog(LogLevelError) {
|
|
||||||
rc.c.log(LogLevelError, "Unexpected replication message", map[string]interface{}{"msg": r, "err": err})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the replication slot, using the given name and output plugin.
|
|
||||||
func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) {
|
|
||||||
_, err = rc.c.Exec(context.TODO(), fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s NOEXPORT_SNAPSHOT", slotName, outputPlugin))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values.
|
|
||||||
func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) {
|
|
||||||
var results []*pgconn.Result
|
|
||||||
results, err = rc.c.pgConn.Exec(context.TODO(), fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)).ReadAll()
|
|
||||||
if err != nil {
|
|
||||||
return "", "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(results) != 1 {
|
|
||||||
return "", "", errors.Errorf("expected 1 result got: %d", len(results))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(results[0].Rows) != 1 {
|
|
||||||
return "", "", errors.Errorf("expected 1 row got: %d", len(results[0].Rows))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(results[0].Rows[0]) != 4 {
|
|
||||||
return "", "", errors.Errorf("expected 4 columns got: %d", len(results[0].Rows[0]))
|
|
||||||
}
|
|
||||||
|
|
||||||
return string(results[0].Rows[0][1]), string(results[0].Rows[0][2]), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Drop the replication slot for the given name
|
|
||||||
func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) {
|
|
||||||
_, err = rc.c.Exec(context.TODO(), fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName))
|
|
||||||
return
|
|
||||||
}
|
|
|
@ -1,360 +0,0 @@
|
||||||
package pgx_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/jackc/pgconn"
|
|
||||||
"github.com/jackc/pgx/v4"
|
|
||||||
)
|
|
||||||
|
|
||||||
// This function uses a postgresql 9.6 specific column
|
|
||||||
func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string {
|
|
||||||
// Fetch the restart LSN of the slot, to establish a starting point
|
|
||||||
rows, err := conn.Query(context.Background(), fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("conn.Query failed: %v", err)
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var restartLsn string
|
|
||||||
for rows.Next() {
|
|
||||||
rows.Scan(&restartLsn)
|
|
||||||
}
|
|
||||||
return restartLsn
|
|
||||||
}
|
|
||||||
|
|
||||||
// This battleship test (at least somewhat by necessity) does
|
|
||||||
// several things all at once in a single run. It:
|
|
||||||
// - Establishes a replication connection & slot
|
|
||||||
// - Does a series of operations to create some known WAL entries
|
|
||||||
// - Replicates the entries down, and checks that the rows it
|
|
||||||
// created come down in order
|
|
||||||
// - Sends a standby status message to update the server with the
|
|
||||||
// wal position of the slot
|
|
||||||
// - Checks the wal position of the slot on the server to make sure
|
|
||||||
// the update succeeded
|
|
||||||
func TestSimpleReplicationConnection(t *testing.T) {
|
|
||||||
t.Skipf("TODO - replication needs to be revisited when v4 churn settles down. For now just skip")
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
if connString == "" {
|
|
||||||
t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
}
|
|
||||||
|
|
||||||
conn := mustConnectString(t, connString)
|
|
||||||
defer func() {
|
|
||||||
// Ensure replication slot is destroyed, but don't check for errors as it
|
|
||||||
// should have already been destroyed.
|
|
||||||
conn.Exec(context.Background(), "select pg_drop_replication_slot('pgx_test')")
|
|
||||||
closeConn(t, conn)
|
|
||||||
}()
|
|
||||||
|
|
||||||
replicationConnConfig := mustParseConfig(t, connString)
|
|
||||||
replicationConn := mustReplicationConnect(t, replicationConnConfig)
|
|
||||||
defer closeReplicationConn(t, replicationConn)
|
|
||||||
|
|
||||||
var cp string
|
|
||||||
var snapshot_name string
|
|
||||||
cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("replication slot create failed: %v", err)
|
|
||||||
}
|
|
||||||
if cp == "" {
|
|
||||||
t.Logf("consistent_point is empty")
|
|
||||||
}
|
|
||||||
if snapshot_name == "" {
|
|
||||||
t.Logf("snapshot_name is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do a simple change so we can get some wal data
|
|
||||||
_, err = conn.Exec(context.Background(), "create table if not exists replication_test (a integer)")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create table: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = replicationConn.StartReplication("pgx_test", 0, -1)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to start replication: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var insertedTimes []int64
|
|
||||||
currentTime := time.Now().Unix()
|
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
var ct pgconn.CommandTag
|
|
||||||
insertedTimes = append(insertedTimes, currentTime)
|
|
||||||
ct, err = conn.Exec(context.Background(), "insert into replication_test(a) values($1)", currentTime)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Insert failed: %v", err)
|
|
||||||
}
|
|
||||||
t.Logf("Inserted %d rows", ct.RowsAffected())
|
|
||||||
currentTime++
|
|
||||||
}
|
|
||||||
|
|
||||||
var foundTimes []int64
|
|
||||||
var foundCount int
|
|
||||||
var maxWal uint64
|
|
||||||
|
|
||||||
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancelFn()
|
|
||||||
|
|
||||||
for {
|
|
||||||
var message *pgx.ReplicationMessage
|
|
||||||
|
|
||||||
message, err = replicationConn.WaitForReplicationMessage(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if message.WalMessage != nil {
|
|
||||||
// The waldata payload with the test_decoding plugin looks like:
|
|
||||||
// public.replication_test: INSERT: a[integer]:2
|
|
||||||
// What we wanna do here is check that once we find one of our inserted times,
|
|
||||||
// that they occur in the wal stream in the order we executed them.
|
|
||||||
walString := string(message.WalMessage.WalData)
|
|
||||||
if strings.Contains(walString, "public.replication_test: INSERT") {
|
|
||||||
stringParts := strings.Split(walString, ":")
|
|
||||||
offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to parse walString %s", walString)
|
|
||||||
}
|
|
||||||
if foundCount > 0 || offset == insertedTimes[0] {
|
|
||||||
foundTimes = append(foundTimes, offset)
|
|
||||||
foundCount++
|
|
||||||
}
|
|
||||||
if foundCount == len(insertedTimes) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if message.WalMessage.WalStart > maxWal {
|
|
||||||
maxWal = message.WalMessage.WalStart
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
if message.ServerHeartbeat != nil {
|
|
||||||
t.Logf("Got heartbeat: %s", message.ServerHeartbeat)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range insertedTimes {
|
|
||||||
if foundTimes[i] != insertedTimes[i] {
|
|
||||||
t.Fatalf("Found %d expected %d", foundTimes[i], insertedTimes[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("Found %d times, as expected", len(foundTimes))
|
|
||||||
|
|
||||||
// Before closing our connection, let's send a standby status to update our wal
|
|
||||||
// position, which should then be reflected if we fetch out our current wal position
|
|
||||||
// for the slot
|
|
||||||
status, err := pgx.NewStandbyStatus(maxWal)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to create standby status %v", err)
|
|
||||||
}
|
|
||||||
replicationConn.SendStandbyStatus(status)
|
|
||||||
|
|
||||||
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
|
|
||||||
integerRestartLsn, _ := pgx.ParseLSN(restartLsn)
|
|
||||||
if integerRestartLsn != maxWal {
|
|
||||||
t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn)
|
|
||||||
}
|
|
||||||
|
|
||||||
closeReplicationConn(t, replicationConn)
|
|
||||||
|
|
||||||
replicationConn2 := mustReplicationConnect(t, replicationConnConfig)
|
|
||||||
defer closeReplicationConn(t, replicationConn2)
|
|
||||||
|
|
||||||
err = replicationConn2.DropReplicationSlot("pgx_test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to drop replication slot: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
|
|
||||||
if droppedLsn != "" {
|
|
||||||
t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReplicationConn_DropReplicationSlot(t *testing.T) {
|
|
||||||
connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
if connString == "" {
|
|
||||||
t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
}
|
|
||||||
|
|
||||||
replicationConnConfig := mustParseConfig(t, connString)
|
|
||||||
replicationConn := mustReplicationConnect(t, replicationConnConfig)
|
|
||||||
defer closeReplicationConn(t, replicationConn)
|
|
||||||
|
|
||||||
var cp string
|
|
||||||
var snapshot_name string
|
|
||||||
cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding")
|
|
||||||
if err != nil {
|
|
||||||
t.Logf("replication slot create failed: %v", err)
|
|
||||||
}
|
|
||||||
if cp == "" {
|
|
||||||
t.Logf("consistent_point is empty")
|
|
||||||
}
|
|
||||||
if snapshot_name == "" {
|
|
||||||
t.Logf("snapshot_name is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = replicationConn.DropReplicationSlot("pgx_slot_test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to drop replication slot: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We re-create to ensure the drop worked.
|
|
||||||
cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding")
|
|
||||||
if err != nil {
|
|
||||||
t.Logf("replication slot create failed: %v", err)
|
|
||||||
}
|
|
||||||
if cp == "" {
|
|
||||||
t.Logf("consistent_point is empty")
|
|
||||||
}
|
|
||||||
if snapshot_name == "" {
|
|
||||||
t.Logf("snapshot_name is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
// And finally we drop to ensure we don't leave dirty state
|
|
||||||
err = replicationConn.DropReplicationSlot("pgx_slot_test")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to drop replication slot: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIdentifySystem(t *testing.T) {
|
|
||||||
t.Skipf("TODO")
|
|
||||||
connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
if connString == "" {
|
|
||||||
t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
}
|
|
||||||
|
|
||||||
replicationConnConfig := mustParseConfig(t, connString)
|
|
||||||
replicationConn2 := mustReplicationConnect(t, replicationConnConfig)
|
|
||||||
defer closeReplicationConn(t, replicationConn2)
|
|
||||||
|
|
||||||
r, err := replicationConn2.IdentifySystem()
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
defer r.Close()
|
|
||||||
for _, fd := range r.FieldDescriptions() {
|
|
||||||
t.Logf("Field: %s of type %v", fd.Name, fd.DataTypeOID)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rowCount int
|
|
||||||
for r.Next() {
|
|
||||||
rowCount++
|
|
||||||
values, err := r.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
t.Logf("Row values: %v", values)
|
|
||||||
}
|
|
||||||
if r.Err() != nil {
|
|
||||||
t.Error(r.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
if rowCount == 0 {
|
|
||||||
t.Errorf("Failed to find any rows: %d", rowCount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int {
|
|
||||||
r, err := rc.IdentifySystem()
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
defer r.Close()
|
|
||||||
for r.Next() {
|
|
||||||
values, e := r.Values()
|
|
||||||
if e != nil {
|
|
||||||
t.Error(e)
|
|
||||||
}
|
|
||||||
return int(values[1].(int32))
|
|
||||||
}
|
|
||||||
t.Fatal("Failed to read timeline")
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetTimelineHistory(t *testing.T) {
|
|
||||||
t.Skipf("TODO")
|
|
||||||
|
|
||||||
connString := os.Getenv("PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
if connString == "" {
|
|
||||||
t.Skipf("Skipping due to missing environment variable %v", "PGX_TEST_REPLICATION_CONN_STRING")
|
|
||||||
}
|
|
||||||
|
|
||||||
replicationConnConfig := mustParseConfig(t, connString)
|
|
||||||
replicationConn := mustReplicationConnect(t, replicationConnConfig)
|
|
||||||
defer closeReplicationConn(t, replicationConn)
|
|
||||||
|
|
||||||
timeline := getCurrentTimeline(t, replicationConn)
|
|
||||||
|
|
||||||
r, err := replicationConn.TimelineHistory(timeline)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("%#v", err)
|
|
||||||
}
|
|
||||||
defer r.Close()
|
|
||||||
|
|
||||||
for _, fd := range r.FieldDescriptions() {
|
|
||||||
t.Logf("Field: %s of type %v", fd.Name, fd.DataTypeOID)
|
|
||||||
}
|
|
||||||
|
|
||||||
var rowCount int
|
|
||||||
for r.Next() {
|
|
||||||
rowCount++
|
|
||||||
values, err := r.Values()
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
t.Logf("Row values: %v", values)
|
|
||||||
}
|
|
||||||
if r.Err() != nil {
|
|
||||||
if strings.Contains(r.Err().Error(), "No such file or directory") {
|
|
||||||
// This is normal, this means the timeline we're on has no
|
|
||||||
// history, which is the common case in a test db that
|
|
||||||
// has only one timeline
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.Error(r.Err())
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have a timeline history (see above) there should have been
|
|
||||||
// rows emitted
|
|
||||||
if rowCount == 0 {
|
|
||||||
t.Errorf("Failed to find any rows: %d", rowCount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStandbyStatusParsing(t *testing.T) {
|
|
||||||
// Let's push the boundary conditions of the standby status and ensure it errors correctly
|
|
||||||
status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Expected error from new standby status, got %v", status)
|
|
||||||
}
|
|
||||||
|
|
||||||
// And if you provide 3 args, ensure the right fields are set
|
|
||||||
status, err = pgx.NewStandbyStatus(1, 2, 3)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to create test status: %v", err)
|
|
||||||
}
|
|
||||||
if status.WalFlushPosition != 1 {
|
|
||||||
t.Errorf("Unexpected flush position %d", status.WalFlushPosition)
|
|
||||||
}
|
|
||||||
if status.WalApplyPosition != 2 {
|
|
||||||
t.Errorf("Unexpected apply position %d", status.WalApplyPosition)
|
|
||||||
}
|
|
||||||
if status.WalWritePosition != 3 {
|
|
||||||
t.Errorf("Unexpected write position %d", status.WalWritePosition)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue