diff --git a/README.md b/README.md index ea2038a8..b85f9c0f 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,8 @@ Change the following settings in your postgresql.conf: max_wal_senders=5 max_replication_slots=5 +Set `replicationConnConfig` appropriately in `conn_config_test.go`. + ## Version Policy pgx follows semantic versioning for the documented public API on stable releases. Branch `v2` is the latest stable release. `master` can contain new features or behavior that will change or be removed before being merged to the stable `v2` branch (in practice, this occurs very rarely). diff --git a/conn_config_test.go.example b/conn_config_test.go.example index cac798b7..4f6a5e5a 100644 --- a/conn_config_test.go.example +++ b/conn_config_test.go.example @@ -23,3 +23,5 @@ var replicationConnConfig *pgx.ConnConfig = nil // var invalidUserConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "invalid", Database: "pgx_test"} // var tlsConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test", TLSConfig: &tls.Config{InsecureSkipVerify: true}} // var customDialerConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_md5", Password: "secret", Database: "pgx_test"} +// var replicationConnConfig *pgx.ConnConfig = &pgx.ConnConfig{Host: "127.0.0.1", User: "pgx_replication", Password: "secret", Database: "pgx_test"} + diff --git a/replication_test.go b/replication_test.go index 1a8063e5..54ef4b66 100644 --- a/replication_test.go +++ b/replication_test.go @@ -39,8 +39,6 @@ func getConfirmedFlushLsnFor(t *testing.T, conn *pgx.Conn, slot string) string { // - Checks the wal position of the slot on the server to make sure // the update succeeded func TestSimpleReplicationConnection(t *testing.T) { - t.Parallel() - var err error if replicationConnConfig == nil { @@ -74,71 +72,63 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Fatalf("Failed to start replication: %v", err) } - var i int32 var insertedTimes []int64 - for i < 5 { + currentTime := time.Now().Unix() + + for i := 0; i < 5; i++ { var ct pgx.CommandTag - currentTime := time.Now().Unix() insertedTimes = append(insertedTimes, currentTime) ct, err = conn.Exec("insert into replication_test(a) values($1)", currentTime) if err != nil { t.Fatalf("Insert failed: %v", err) } t.Logf("Inserted %d rows", ct.RowsAffected()) - i++ + currentTime++ } - i = 0 var foundTimes []int64 var foundCount int var maxWal uint64 + + ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFn() + for { var message *pgx.ReplicationMessage - ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) - defer cancelFn() message, err = replicationConn.WaitForReplicationMessage(ctx) - if err != nil && err != context.DeadlineExceeded { + if err != nil { t.Fatalf("Replication failed: %v %s", err, reflect.TypeOf(err)) } - if message != nil { - if message.WalMessage != nil { - // The waldata payload with the test_decoding plugin looks like: - // public.replication_test: INSERT: a[integer]:2 - // What we wanna do here is check that once we find one of our inserted times, - // that they occur in the wal stream in the order we executed them. - walString := string(message.WalMessage.WalData) - if strings.Contains(walString, "public.replication_test: INSERT") { - stringParts := strings.Split(walString, ":") - offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64) - if err != nil { - t.Fatalf("Failed to parse walString %s", walString) - } - if foundCount > 0 || offset == insertedTimes[0] { - foundTimes = append(foundTimes, offset) - foundCount++ - } - } - if message.WalMessage.WalStart > maxWal { - maxWal = message.WalMessage.WalStart - } + if message.WalMessage != nil { + // The waldata payload with the test_decoding plugin looks like: + // public.replication_test: INSERT: a[integer]:2 + // What we wanna do here is check that once we find one of our inserted times, + // that they occur in the wal stream in the order we executed them. + walString := string(message.WalMessage.WalData) + if strings.Contains(walString, "public.replication_test: INSERT") { + stringParts := strings.Split(walString, ":") + offset, err := strconv.ParseInt(stringParts[len(stringParts)-1], 10, 64) + if err != nil { + t.Fatalf("Failed to parse walString %s", walString) + } + if foundCount > 0 || offset == insertedTimes[0] { + foundTimes = append(foundTimes, offset) + foundCount++ + } + if foundCount == len(insertedTimes) { + break + } } - if message.ServerHeartbeat != nil { - t.Logf("Got heartbeat: %s", message.ServerHeartbeat) + if message.WalMessage.WalStart > maxWal { + maxWal = message.WalMessage.WalStart } - } else { - t.Log("Timed out waiting for wal message") - i++ - } - if i > 3 { - t.Log("Actual timeout") - break - } - } - if foundCount != len(insertedTimes) { - t.Fatalf("Failed to find all inserted time values in WAL stream (found %d expected %d)", foundCount, len(insertedTimes)) + } + if message.ServerHeartbeat != nil { + t.Logf("Got heartbeat: %s", message.ServerHeartbeat) + } } for i := range insertedTimes {