mirror of https://github.com/jackc/pgx.git
Merge branch 'master' into v3-experimental
* master: Tweak replication test setup Properly make it a func init() The naming really matters Fix the syntax Properly make the replication tests skippable on 9.5 and below I forgot the tests are 9.6+ One more try for travis Valid YAML helps. Dont break old postgres Try to fix travis Add the ability to set all the fields in the constructor Start replication now wraps the sql and returns errors properly It should all be unsigned. Capitalization Add replication stop mechanism Add basic logical replication protocol supportv3-experimental-wait-ping-context
commit
f895e970b5
|
@ -19,10 +19,17 @@ before_install:
|
|||
- echo "host all pgx_md5 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf
|
||||
- echo "host all pgx_pw 127.0.0.1/32 password" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf
|
||||
- echo "hostssl all pgx_ssl 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf
|
||||
- echo "host replication pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf
|
||||
- echo "host pgx_test pgx_replication 127.0.0.1/32 md5" >> /etc/postgresql/$PGVERSION/main/pg_hba.conf
|
||||
- sudo chmod 777 /etc/postgresql/$PGVERSION/main/postgresql.conf
|
||||
- "[[ $PGVERSION < 9.6 ]] || echo \"wal_level='logical'\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf"
|
||||
- "[[ $PGVERSION < 9.6 ]] || echo \"max_wal_senders=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf"
|
||||
- "[[ $PGVERSION < 9.6 ]] || echo \"max_replication_slots=5\" >> /etc/postgresql/$PGVERSION/main/postgresql.conf"
|
||||
- sudo /etc/init.d/postgresql restart
|
||||
|
||||
env:
|
||||
matrix:
|
||||
- PGVERSION=9.6
|
||||
- PGVERSION=9.5
|
||||
- PGVERSION=9.4
|
||||
- PGVERSION=9.3
|
||||
|
@ -38,6 +45,7 @@ before_script:
|
|||
- psql -U postgres -c "create user pgx_ssl SUPERUSER PASSWORD 'secret'"
|
||||
- psql -U postgres -c "create user pgx_md5 SUPERUSER PASSWORD 'secret'"
|
||||
- psql -U postgres -c "create user pgx_pw SUPERUSER PASSWORD 'secret'"
|
||||
- psql -U postgres -c "create user pgx_replication with replication password 'secret'"
|
||||
- psql -U postgres -c "create user \" tricky, ' } \"\" \\ test user \" superuser password 'secret'"
|
||||
|
||||
install:
|
||||
|
|
18
README.md
18
README.md
|
@ -29,6 +29,7 @@ Pgx supports many additional features beyond what is available through database/
|
|||
* Large object support
|
||||
* Null mapping to Null* struct or pointer to pointer.
|
||||
* Supports database/sql.Scanner and database/sql/driver.Valuer interfaces for custom types
|
||||
* Logical replication connections, including receiving WAL and sending standby status updates
|
||||
|
||||
## Performance
|
||||
|
||||
|
@ -72,6 +73,7 @@ Then run the following SQL:
|
|||
create user pgx_md5 password 'secret';
|
||||
create user " tricky, ' } "" \ test user " password 'secret';
|
||||
create database pgx_test;
|
||||
create user pgx_replication with replication password 'secret';
|
||||
|
||||
Connect to database pgx_test and run:
|
||||
|
||||
|
@ -104,6 +106,22 @@ If you are developing on Windows with TCP connections:
|
|||
host pgx_test pgx_pw 127.0.0.1/32 password
|
||||
host pgx_test pgx_md5 127.0.0.1/32 md5
|
||||
|
||||
### Replication Test Environment
|
||||
|
||||
Add a replication user:
|
||||
|
||||
create user pgx_replication with replication password 'secret';
|
||||
|
||||
Add a replication line to your pg_hba.conf:
|
||||
|
||||
host replication pgx_replication 127.0.0.1/32 md5
|
||||
|
||||
Change the following settings in your postgresql.conf:
|
||||
|
||||
wal_level=logical
|
||||
max_wal_senders=5
|
||||
max_replication_slots=5
|
||||
|
||||
## Version Policy
|
||||
|
||||
pgx follows semantic versioning for the documented public API on stable releases. Branch ```v2``` is the latest stable release. ```master``` can contain new features or behavior that will change or be removed before being merged to the stable ```v2``` branch (in practice, this occurs very rarely).
|
||||
|
|
6
conn.go
6
conn.go
|
@ -305,6 +305,12 @@ func (c *Conn) connect(config ConnConfig, network, address string, tlsConfig *tl
|
|||
c.log(LogLevelInfo, "Connection established")
|
||||
}
|
||||
|
||||
// Replication connections can't execute the queries to
|
||||
// populate the c.PgTypes and c.pgsqlAfInet
|
||||
if _, ok := msg.options["replication"]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.PgTypes == nil {
|
||||
err = c.loadPgTypes()
|
||||
if err != nil {
|
||||
|
|
|
@ -14,6 +14,7 @@ var plainPasswordConnConfig *pgx.ConnConfig = nil
|
|||
var invalidUserConnConfig *pgx.ConnConfig = nil
|
||||
var tlsConnConfig *pgx.ConnConfig = nil
|
||||
var customDialerConnConfig *pgx.ConnConfig = nil
|
||||
var replicationConnConfig *pgx.ConnConfig = nil
|
||||
|
||||
// var tcpConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"}
|
||||
// var unixSocketConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "/private/tmp", User: "pgx_none", Database: "pgx_test"}
|
||||
|
|
|
@ -3,6 +3,8 @@ package pgx_test
|
|||
import (
|
||||
"crypto/tls"
|
||||
"github.com/jackc/pgx"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var defaultConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"}
|
||||
|
@ -13,3 +15,16 @@ var plainPasswordConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_pw",
|
|||
var invalidUserConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"}
|
||||
var tlsConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_ssl", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}}
|
||||
var customDialerConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"}
|
||||
var replicationConnConfig *pgx.ConnConfig = nil
|
||||
|
||||
func init() {
|
||||
version := os.Getenv("PGVERSION")
|
||||
|
||||
if len(version) > 0 {
|
||||
v, err := strconv.ParseFloat(version,64)
|
||||
if err == nil && v >= 9.6 {
|
||||
replicationConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,335 @@
|
|||
package pgx
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
copyBothResponse = 'W'
|
||||
walData = 'w'
|
||||
senderKeepalive = 'k'
|
||||
standbyStatusUpdate = 'r'
|
||||
initialReplicationResponseTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
var epochNano int64
|
||||
|
||||
func init() {
|
||||
epochNano = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano()
|
||||
}
|
||||
|
||||
// Format the given 64bit LSN value into the XXX/XXX format,
|
||||
// which is the format reported by postgres.
|
||||
func FormatLSN(lsn uint64) string {
|
||||
return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn))
|
||||
}
|
||||
|
||||
// Parse the given XXX/XXX format LSN as reported by postgres,
|
||||
// into a 64 bit integer as used internally by the wire procotols
|
||||
func ParseLSN(lsn string) (outputLsn uint64, err error) {
|
||||
var upperHalf uint64
|
||||
var lowerHalf uint64
|
||||
var nparsed int
|
||||
nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if nparsed != 2 {
|
||||
err = errors.New(fmt.Sprintf("Failed to parsed LSN: %s", lsn))
|
||||
return
|
||||
}
|
||||
|
||||
outputLsn = (upperHalf << 32) + lowerHalf
|
||||
return
|
||||
}
|
||||
|
||||
// The WAL message contains WAL payload entry data
|
||||
type WalMessage struct {
|
||||
// The WAL start position of this data. This
|
||||
// is the WAL position we need to track.
|
||||
WalStart uint64
|
||||
// The server wal end and server time are
|
||||
// documented to track the end position and current
|
||||
// time of the server, both of which appear to be
|
||||
// unimplemented in pg 9.5.
|
||||
ServerWalEnd uint64
|
||||
ServerTime uint64
|
||||
// The WAL data is the raw unparsed binary WAL entry.
|
||||
// The contents of this are determined by the output
|
||||
// logical encoding plugin.
|
||||
WalData []byte
|
||||
}
|
||||
|
||||
func (w *WalMessage) Time() time.Time {
|
||||
return time.Unix(0, (int64(w.ServerTime)*1000)+epochNano)
|
||||
}
|
||||
|
||||
func (w *WalMessage) ByteLag() uint64 {
|
||||
return (w.ServerWalEnd - w.WalStart)
|
||||
}
|
||||
|
||||
func (w *WalMessage) String() string {
|
||||
return fmt.Sprintf("Wal: %s Time: %s Lag: %d", FormatLSN(w.WalStart), w.Time(), w.ByteLag())
|
||||
}
|
||||
|
||||
// The server heartbeat is sent periodically from the server,
|
||||
// including server status, and a reply request field
|
||||
type ServerHeartbeat struct {
|
||||
// The current max wal position on the server,
|
||||
// used for lag tracking
|
||||
ServerWalEnd uint64
|
||||
// The server time, in microseconds since jan 1 2000
|
||||
ServerTime uint64
|
||||
// If 1, the server is requesting a standby status message
|
||||
// to be sent immediately.
|
||||
ReplyRequested byte
|
||||
}
|
||||
|
||||
func (s *ServerHeartbeat) Time() time.Time {
|
||||
return time.Unix(0, (int64(s.ServerTime)*1000)+epochNano)
|
||||
}
|
||||
|
||||
func (s *ServerHeartbeat) String() string {
|
||||
return fmt.Sprintf("WalEnd: %s ReplyRequested: %d T: %s", FormatLSN(s.ServerWalEnd), s.ReplyRequested, s.Time())
|
||||
}
|
||||
|
||||
// The replication message wraps all possible messages from the
|
||||
// server received during replication. At most one of the wal message
|
||||
// or server heartbeat will be non-nil
|
||||
type ReplicationMessage struct {
|
||||
WalMessage *WalMessage
|
||||
ServerHeartbeat *ServerHeartbeat
|
||||
}
|
||||
|
||||
// The standby status is the client side heartbeat sent to the postgresql
|
||||
// server to track the client wal positions. For practical purposes,
|
||||
// all wal positions are typically set to the same value.
|
||||
type StandbyStatus struct {
|
||||
// The WAL position that's been locally written
|
||||
WalWritePosition uint64
|
||||
// The WAL position that's been locally flushed
|
||||
WalFlushPosition uint64
|
||||
// The WAL position that's been locally applied
|
||||
WalApplyPosition uint64
|
||||
// The client time in microseconds since jan 1 2000
|
||||
ClientTime uint64
|
||||
// If 1, requests the server to immediately send a
|
||||
// server heartbeat
|
||||
ReplyRequested byte
|
||||
}
|
||||
|
||||
// Create a standby status struct, which sets all the WAL positions
|
||||
// to the given wal position, and the client time to the current time.
|
||||
// The wal positions are, in order:
|
||||
// WalFlushPosition
|
||||
// WalApplyPosition
|
||||
// WalWritePosition
|
||||
//
|
||||
// If only one position is provided, it will be used as the value for all 3
|
||||
// status fields. Note you must provide either 1 wal position, or all 3
|
||||
// in order to initialize the standby status.
|
||||
func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error) {
|
||||
if len(walPositions) == 1 {
|
||||
status = new(StandbyStatus)
|
||||
status.WalFlushPosition = walPositions[0]
|
||||
status.WalApplyPosition = walPositions[0]
|
||||
status.WalWritePosition = walPositions[0]
|
||||
} else if len(walPositions) == 3 {
|
||||
status = new(StandbyStatus)
|
||||
status.WalFlushPosition = walPositions[0]
|
||||
status.WalApplyPosition = walPositions[1]
|
||||
status.WalWritePosition = walPositions[2]
|
||||
} else {
|
||||
err = errors.New(fmt.Sprintf("Invalid number of wal positions provided, need 1 or 3, got %d", len(walPositions)))
|
||||
return
|
||||
}
|
||||
status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000)
|
||||
return
|
||||
}
|
||||
|
||||
// Send standby status to the server, which both acts as a keepalive
|
||||
// message to the server, as well as carries the WAL position of the
|
||||
// client, which then updates the server's replication slot position.
|
||||
func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
|
||||
writeBuf := newWriteBuf(c, copyData)
|
||||
writeBuf.WriteByte(standbyStatusUpdate)
|
||||
writeBuf.WriteInt64(int64(k.WalWritePosition))
|
||||
writeBuf.WriteInt64(int64(k.WalFlushPosition))
|
||||
writeBuf.WriteInt64(int64(k.WalApplyPosition))
|
||||
writeBuf.WriteInt64(int64(k.ClientTime))
|
||||
writeBuf.WriteByte(k.ReplyRequested)
|
||||
|
||||
writeBuf.closeMsg()
|
||||
|
||||
_, err = c.conn.Write(writeBuf.buf)
|
||||
if err != nil {
|
||||
c.die(err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Send the message to formally stop the replication stream. This
|
||||
// is done before calling Close() during a clean shutdown.
|
||||
func (c *Conn) StopReplication() (err error) {
|
||||
writeBuf := newWriteBuf(c, copyDone)
|
||||
|
||||
writeBuf.closeMsg()
|
||||
|
||||
_, err = c.conn.Write(writeBuf.buf)
|
||||
if err != nil {
|
||||
c.die(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
|
||||
var t byte
|
||||
var reader *msgReader
|
||||
t, reader, err = c.rxMsg()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch t {
|
||||
case noticeResponse:
|
||||
pgError := c.rxErrorResponse(reader)
|
||||
if c.shouldLog(LogLevelInfo) {
|
||||
c.log(LogLevelInfo, pgError.Error())
|
||||
}
|
||||
case errorResponse:
|
||||
err = c.rxErrorResponse(reader)
|
||||
if c.shouldLog(LogLevelError) {
|
||||
c.log(LogLevelError, err.Error())
|
||||
}
|
||||
return
|
||||
case copyBothResponse:
|
||||
// This is the tail end of the replication process start,
|
||||
// and can be safely ignored
|
||||
return
|
||||
case copyData:
|
||||
var msgType byte
|
||||
msgType = reader.readByte()
|
||||
switch msgType {
|
||||
case walData:
|
||||
walStart := reader.readInt64()
|
||||
serverWalEnd := reader.readInt64()
|
||||
serverTime := reader.readInt64()
|
||||
walData := reader.readBytes(reader.msgBytesRemaining)
|
||||
walMessage := WalMessage{WalStart: uint64(walStart),
|
||||
ServerWalEnd: uint64(serverWalEnd),
|
||||
ServerTime: uint64(serverTime),
|
||||
WalData: walData,
|
||||
}
|
||||
|
||||
return &ReplicationMessage{WalMessage: &walMessage}, nil
|
||||
case senderKeepalive:
|
||||
serverWalEnd := reader.readInt64()
|
||||
serverTime := reader.readInt64()
|
||||
replyNow := reader.readByte()
|
||||
h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
|
||||
return &ReplicationMessage{ServerHeartbeat: h}, nil
|
||||
default:
|
||||
if c.shouldLog(LogLevelError) {
|
||||
c.log(LogLevelError,"Unexpected data playload message type %v", t)
|
||||
}
|
||||
}
|
||||
default:
|
||||
if c.shouldLog(LogLevelError) {
|
||||
c.log(LogLevelError,"Unexpected replication message type %v", t)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for a single replication message up to timeout time.
|
||||
//
|
||||
// Properly using this requires some knowledge of the postgres replication mechanisms,
|
||||
// as the client can receive both WAL data (the ultimate payload) and server heartbeat
|
||||
// updates. The caller also must send standby status updates in order to keep the connection
|
||||
// alive and working.
|
||||
//
|
||||
// This returns pgx.ErrNotificationTimeout when there is no replication message by the specified
|
||||
// duration.
|
||||
func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) {
|
||||
var zeroTime time.Time
|
||||
|
||||
deadline := time.Now().Add(timeout)
|
||||
|
||||
// Use SetReadDeadline to implement the timeout. SetReadDeadline will
|
||||
// cause operations to fail with a *net.OpError that has a Timeout()
|
||||
// of true. Because the normal pgx rxMsg path considers any error to
|
||||
// have potentially corrupted the state of the connection, it dies
|
||||
// on any errors. So to avoid timeout errors in rxMsg we set the
|
||||
// deadline and peek into the reader. If a timeout error occurs there
|
||||
// we don't break the pgx connection. If the Peek returns that data
|
||||
// is available then we turn off the read deadline before the rxMsg.
|
||||
err = c.conn.SetReadDeadline(deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wait until there is a byte available before continuing onto the normal msg reading path
|
||||
_, err = c.reader.Peek(1)
|
||||
if err != nil {
|
||||
c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline
|
||||
if err, ok := err.(*net.OpError); ok && err.Timeout() {
|
||||
return nil, ErrNotificationTimeout
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = c.conn.SetReadDeadline(zeroTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.readReplicationMessage()
|
||||
}
|
||||
|
||||
// Start a replication connection, sending WAL data to the given replication
|
||||
// receiver. This function wraps a START_REPLICATION command as documented
|
||||
// here:
|
||||
// https://www.postgresql.org/docs/9.5/static/protocol-replication.html
|
||||
//
|
||||
// Once started, the client needs to invoke WaitForReplicationMessage() in order
|
||||
// to fetch the WAL and standby status. Also, it is the responsibility of the caller
|
||||
// to periodically send StandbyStatus messages to update the replication slot position.
|
||||
//
|
||||
// This function assumes that slotName has already been created. In order to omit the timeline argument
|
||||
// pass a -1 for the timeline to get the server default behavior.
|
||||
func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) {
|
||||
var queryString string
|
||||
if timeline >= 0 {
|
||||
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline)
|
||||
} else {
|
||||
queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s", slotName, FormatLSN(startLsn))
|
||||
}
|
||||
|
||||
for _, arg := range pluginArguments {
|
||||
queryString += fmt.Sprintf(" %s", arg)
|
||||
}
|
||||
|
||||
if err = c.sendQuery(queryString); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// The first replication message that comes back here will be (in a success case)
|
||||
// a empty CopyBoth that is (apparently) sent as the confirmation that the replication has
|
||||
// started. This call will either return nil, nil or if it returns an error
|
||||
// that indicates the start replication command failed
|
||||
var r *ReplicationMessage
|
||||
r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout)
|
||||
if err != nil && r != nil {
|
||||
if c.shouldLog(LogLevelError) {
|
||||
c.log(LogLevelError, "Unxpected replication message %v", r)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -0,0 +1,195 @@
|
|||
package pgx_test
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgx"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
"reflect"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// This function uses a postgresql 9.6 specific column
|
||||
func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string {
|
||||
// Fetch the restart LSN of the slot, to establish a starting point
|
||||
rows, err := conn.Query(fmt.Sprintf("select confirmed_flush_lsn from pg_replication_slots where slot_name='%s'", slot))
|
||||
if err != nil {
|
||||
t.Fatalf("conn.Query failed: %v", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var restartLsn string
|
||||
for rows.Next() {
|
||||
rows.Scan(&restartLsn)
|
||||
}
|
||||
return restartLsn
|
||||
}
|
||||
|
||||
// This battleship test (at least somewhat by necessity) does
|
||||
// several things all at once in a single run. It:
|
||||
// - Establishes a replication connection & slot
|
||||
// - Does a series of operations to create some known WAL entries
|
||||
// - Replicates the entries down, and checks that the rows it
|
||||
// created come down in order
|
||||
// - Sends a standby status message to update the server with the
|
||||
// wal position of the slot
|
||||
// - Checks the wal position of the slot on the server to make sure
|
||||
// the update succeeded
|
||||
func TestSimpleReplicationConnection(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var err error
|
||||
|
||||
if replicationConnConfig == nil {
|
||||
t.Skip("Skipping due to undefined replicationConnConfig")
|
||||
}
|
||||
|
||||
conn := mustConnect(t, *replicationConnConfig)
|
||||
defer closeConn(t, conn)
|
||||
|
||||
replicationConnConfig.RuntimeParams = make(map[string]string)
|
||||
replicationConnConfig.RuntimeParams["replication"] = "database"
|
||||
|
||||
replicationConn := mustConnect(t, *replicationConnConfig)
|
||||
defer closeConn(t, replicationConn)
|
||||
|
||||
_, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding")
|
||||
if err != nil {
|
||||
t.Logf("replication slot create failed: %v", err)
|
||||
}
|
||||
|
||||
// Do a simple change so we can get some wal data
|
||||
_, err = conn.Exec("create table if not exists replication_test (a integer)")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create table: %v", err)
|
||||
}
|
||||
|
||||
err = replicationConn.StartReplication("pgx_test", 0, -1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start replication: %v", err)
|
||||
}
|
||||
|
||||
var i int32
|
||||
var insertedTimes []int64
|
||||
for i < 5 {
|
||||
var ct pgx.CommandTag
|
||||
currentTime := time.Now().Unix()
|
||||
insertedTimes = append(insertedTimes, currentTime)
|
||||
ct, err = conn.Exec("insert into replication_test(a) values($1)", currentTime)
|
||||
if err != nil {
|
||||
t.Fatalf("Insert failed: %v", err)
|
||||
}
|
||||
t.Logf("Inserted %d rows", ct.RowsAffected())
|
||||
i++
|
||||
}
|
||||
|
||||
i = 0
|
||||
var foundTimes []int64
|
||||
var foundCount int
|
||||
var maxWal uint64
|
||||
for {
|
||||
var message *pgx.ReplicationMessage
|
||||
|
||||
message, err = replicationConn.WaitForReplicationMessage(time.Duration(1 * time.Second))
|
||||
if err != nil {
|
||||
if err != pgx.ErrNotificationTimeout {
|
||||
t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err))
|
||||
}
|
||||
}
|
||||
if message != nil {
|
||||
if message.WalMessage != nil {
|
||||
// The waldata payload with the test_decoding plugin looks like:
|
||||
// public.replication_test: INSERT: a[integer]:2
|
||||
// What we wanna do here is check that once we find one of our inserted times,
|
||||
// that they occur in the wal stream in the order we executed them.
|
||||
walString := string(message.WalMessage.WalData)
|
||||
if strings.Contains(walString, "public.replication_test: INSERT") {
|
||||
stringParts := strings.Split(walString, ":")
|
||||
offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse walString %s", walString)
|
||||
}
|
||||
if foundCount > 0 || offset == insertedTimes[0] {
|
||||
foundTimes = append(foundTimes, offset)
|
||||
foundCount++
|
||||
}
|
||||
}
|
||||
if message.WalMessage.WalStart > maxWal {
|
||||
maxWal = message.WalMessage.WalStart
|
||||
}
|
||||
|
||||
}
|
||||
if message.ServerHeartbeat != nil {
|
||||
t.Logf("Got heartbeat: %s", message.ServerHeartbeat)
|
||||
}
|
||||
} else {
|
||||
t.Log("Timed out waiting for wal message")
|
||||
i++
|
||||
}
|
||||
if i > 3 {
|
||||
t.Log("Actual timeout")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if foundCount != len(insertedTimes) {
|
||||
t.Fatalf("Failed to find all inserted time values in WAL stream (found %d expected %d)", foundCount, len(insertedTimes))
|
||||
}
|
||||
|
||||
for i := range insertedTimes {
|
||||
if foundTimes[i] != insertedTimes[i] {
|
||||
t.Fatalf("Found %d expected %d", foundTimes[i], insertedTimes[i])
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Found %d times, as expected", len(foundTimes))
|
||||
|
||||
// Before closing our connection, let's send a standby status to update our wal
|
||||
// position, which should then be reflected if we fetch out our current wal position
|
||||
// for the slot
|
||||
status, err := pgx.NewStandbyStatus(maxWal)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create standby status %v", err)
|
||||
}
|
||||
replicationConn.SendStandbyStatus(status)
|
||||
replicationConn.StopReplication()
|
||||
|
||||
// Let's push the boundary conditions of the standby status and ensure it errors correctly
|
||||
status, err = pgx.NewStandbyStatus(0,1,2,3,4)
|
||||
if err == nil {
|
||||
t.Errorf("Expected error from new standby status, got %v",status)
|
||||
}
|
||||
|
||||
// And if you provide 3 args, ensure the right fields are set
|
||||
status, err = pgx.NewStandbyStatus(1,2,3)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to create test status: %v", err)
|
||||
}
|
||||
if status.WalFlushPosition != 1 {
|
||||
t.Errorf("Unexpected flush position %d", status.WalFlushPosition)
|
||||
}
|
||||
if status.WalApplyPosition != 2 {
|
||||
t.Errorf("Unexpected apply position %d", status.WalApplyPosition)
|
||||
}
|
||||
if status.WalWritePosition != 3 {
|
||||
t.Errorf("Unexpected write position %d", status.WalWritePosition)
|
||||
}
|
||||
|
||||
err = replicationConn.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("Replication connection close failed: %v", err)
|
||||
}
|
||||
|
||||
restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test")
|
||||
integerRestartLsn, _ := pgx.ParseLSN(restartLsn)
|
||||
if integerRestartLsn != maxWal {
|
||||
t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn)
|
||||
}
|
||||
|
||||
_, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to drop replication slot: %v", err)
|
||||
}
|
||||
|
||||
}
|
|
@ -30,7 +30,7 @@ func ensureConnValid(t *testing.T, db *sql.DB) {
|
|||
|
||||
rows, err := db.Query("select generate_series(1,$1)", 10)
|
||||
if err != nil {
|
||||
t.Fatalf("db.Query failed: ", err)
|
||||
t.Fatalf("db.Query failed: %v", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
|
@ -42,7 +42,7 @@ func ensureConnValid(t *testing.T, db *sql.DB) {
|
|||
}
|
||||
|
||||
if rows.Err() != nil {
|
||||
t.Fatalf("db.Query failed: ", err)
|
||||
t.Fatalf("db.Query failed: %v", err)
|
||||
}
|
||||
|
||||
if rowCount != 10 {
|
||||
|
|
Loading…
Reference in New Issue