mirror of https://github.com/jackc/pgx.git
Improve replication test reliability
It was failing intermittently when run concurrently.v3-numeric-wip
parent
8162634259
commit
6c26c3a4a3
|
@ -122,6 +122,8 @@ Change the following settings in your postgresql.conf:
|
||||||
max_wal_senders=5
|
max_wal_senders=5
|
||||||
max_replication_slots=5
|
max_replication_slots=5
|
||||||
|
|
||||||
|
Set `replicationConnConfig` appropriately in `conn_config_test.go`.
|
||||||
|
|
||||||
## Version Policy
|
## Version Policy
|
||||||
|
|
||||||
pgx follows semantic versioning for the documented public API on stable releases. Branch `v2` is the latest stable release. `master` can contain new features or behavior that will change or be removed before being merged to the stable `v2` branch (in practice, this occurs very rarely).
|
pgx follows semantic versioning for the documented public API on stable releases. Branch `v2` is the latest stable release. `master` can contain new features or behavior that will change or be removed before being merged to the stable `v2` branch (in practice, this occurs very rarely).
|
||||||
|
|
|
@ -23,3 +23,5 @@ var replicationConnConfig *pgx.ConnConfig = nil
|
||||||
// var invalidUserConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"}
|
// var invalidUserConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"}
|
||||||
// var tlsConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}}
|
// var tlsConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}}
|
||||||
// var customDialerConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"}
|
// var customDialerConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"}
|
||||||
|
// var replicationConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"}
|
||||||
|
|
||||||
|
|
|
@ -39,8 +39,6 @@ func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string {
|
||||||
// - Checks the wal position of the slot on the server to make sure
|
// - Checks the wal position of the slot on the server to make sure
|
||||||
// the update succeeded
|
// the update succeeded
|
||||||
func TestSimpleReplicationConnection(t *testing.T) {
|
func TestSimpleReplicationConnection(t *testing.T) {
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if replicationConnConfig == nil {
|
if replicationConnConfig == nil {
|
||||||
|
@ -74,71 +72,63 @@ func TestSimpleReplicationConnection(t *testing.T) {
|
||||||
t.Fatalf("Failed to start replication: %v", err)
|
t.Fatalf("Failed to start replication: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var i int32
|
|
||||||
var insertedTimes []int64
|
var insertedTimes []int64
|
||||||
for i < 5 {
|
currentTime := time.Now().Unix()
|
||||||
|
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
var ct pgx.CommandTag
|
var ct pgx.CommandTag
|
||||||
currentTime := time.Now().Unix()
|
|
||||||
insertedTimes = append(insertedTimes, currentTime)
|
insertedTimes = append(insertedTimes, currentTime)
|
||||||
ct, err = conn.Exec("insert into replication_test(a) values($1)", currentTime)
|
ct, err = conn.Exec("insert into replication_test(a) values($1)", currentTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Insert failed: %v", err)
|
t.Fatalf("Insert failed: %v", err)
|
||||||
}
|
}
|
||||||
t.Logf("Inserted %d rows", ct.RowsAffected())
|
t.Logf("Inserted %d rows", ct.RowsAffected())
|
||||||
i++
|
currentTime++
|
||||||
}
|
}
|
||||||
|
|
||||||
i = 0
|
|
||||||
var foundTimes []int64
|
var foundTimes []int64
|
||||||
var foundCount int
|
var foundCount int
|
||||||
var maxWal uint64
|
var maxWal uint64
|
||||||
|
|
||||||
|
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancelFn()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var message *pgx.ReplicationMessage
|
var message *pgx.ReplicationMessage
|
||||||
|
|
||||||
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer cancelFn()
|
|
||||||
message, err = replicationConn.WaitForReplicationMessage(ctx)
|
message, err = replicationConn.WaitForReplicationMessage(ctx)
|
||||||
if err != nil && err != context.DeadlineExceeded {
|
if err != nil {
|
||||||
t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err))
|
t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err))
|
||||||
}
|
}
|
||||||
if message != nil {
|
|
||||||
if message.WalMessage != nil {
|
|
||||||
// The waldata payload with the test_decoding plugin looks like:
|
|
||||||
// public.replication_test: INSERT: a[integer]:2
|
|
||||||
// What we wanna do here is check that once we find one of our inserted times,
|
|
||||||
// that they occur in the wal stream in the order we executed them.
|
|
||||||
walString := string(message.WalMessage.WalData)
|
|
||||||
if strings.Contains(walString, "public.replication_test: INSERT") {
|
|
||||||
stringParts := strings.Split(walString, ":")
|
|
||||||
offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to parse walString %s", walString)
|
|
||||||
}
|
|
||||||
if foundCount > 0 || offset == insertedTimes[0] {
|
|
||||||
foundTimes = append(foundTimes, offset)
|
|
||||||
foundCount++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if message.WalMessage.WalStart > maxWal {
|
|
||||||
maxWal = message.WalMessage.WalStart
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if message.WalMessage != nil {
|
||||||
|
// The waldata payload with the test_decoding plugin looks like:
|
||||||
|
// public.replication_test: INSERT: a[integer]:2
|
||||||
|
// What we wanna do here is check that once we find one of our inserted times,
|
||||||
|
// that they occur in the wal stream in the order we executed them.
|
||||||
|
walString := string(message.WalMessage.WalData)
|
||||||
|
if strings.Contains(walString, "public.replication_test: INSERT") {
|
||||||
|
stringParts := strings.Split(walString, ":")
|
||||||
|
offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to parse walString %s", walString)
|
||||||
|
}
|
||||||
|
if foundCount > 0 || offset == insertedTimes[0] {
|
||||||
|
foundTimes = append(foundTimes, offset)
|
||||||
|
foundCount++
|
||||||
|
}
|
||||||
|
if foundCount == len(insertedTimes) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if message.ServerHeartbeat != nil {
|
if message.WalMessage.WalStart > maxWal {
|
||||||
t.Logf("Got heartbeat: %s", message.ServerHeartbeat)
|
maxWal = message.WalMessage.WalStart
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
t.Log("Timed out waiting for wal message")
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
if i > 3 {
|
|
||||||
t.Log("Actual timeout")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if foundCount != len(insertedTimes) {
|
}
|
||||||
t.Fatalf("Failed to find all inserted time values in WAL stream (found %d expected %d)", foundCount, len(insertedTimes))
|
if message.ServerHeartbeat != nil {
|
||||||
|
t.Logf("Got heartbeat: %s", message.ServerHeartbeat)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range insertedTimes {
|
for i := range insertedTimes {
|
||||||
|
|
Loading…
Reference in New Issue