It should all be unsigned.

pull/224/head
Kris Wehner 2016-12-08 15:26:44 -08:00
parent 14497e4c65
commit 2b096a7d08
2 changed files with 27 additions and 27 deletions

View File

@ -22,15 +22,15 @@ func init() {
// Format the given 64bit LSN value into the XXX/XXX format, // Format the given 64bit LSN value into the XXX/XXX format,
// which is the format reported by postgres. // which is the format reported by postgres.
func FormatLSN(lsn int64) string { func FormatLSN(lsn uint64) string {
return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn)) return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn))
} }
// Parse the given XXX/XXX format LSN as reported by postgres, // Parse the given XXX/XXX format LSN as reported by postgres,
// into a 64 bit integer as used internally by the wire procotols // into a 64 bit integer as used internally by the wire procotols
func ParseLSN(lsn string) (outputLsn int64, err error) { func ParseLSN(lsn string) (outputLsn uint64, err error) {
var upperHalf int64 var upperHalf uint64
var lowerHalf int64 var lowerHalf uint64
var nparsed int var nparsed int
nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf) nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf)
if err != nil { if err != nil {
@ -50,13 +50,13 @@ func ParseLSN(lsn string) (outputLsn int64, err error) {
type WalMessage struct { type WalMessage struct {
// The WAL start position of this data. This // The WAL start position of this data. This
// is the WAL position we need to track. // is the WAL position we need to track.
WalStart int64 WalStart uint64
// The server wal end and server time are // The server wal end and server time are
// documented to track the end position and current // documented to track the end position and current
// time of the server, both of which appear to be // time of the server, both of which appear to be
// unimplemented in pg 9.5. // unimplemented in pg 9.5.
ServerWalEnd int64 ServerWalEnd uint64
ServerTime int64 ServerTime uint64
// The WAL data is the raw unparsed binary WAL entry. // The WAL data is the raw unparsed binary WAL entry.
// The contents of this are determined by the output // The contents of this are determined by the output
// logical encoding plugin. // logical encoding plugin.
@ -64,10 +64,10 @@ type WalMessage struct {
} }
func (w *WalMessage) Time() time.Time { func (w *WalMessage) Time() time.Time {
return time.Unix(0, (w.ServerTime*1000)+epochNano) return time.Unix(0, (int64(w.ServerTime)*1000)+epochNano)
} }
func (w *WalMessage) ByteLag() int64 { func (w *WalMessage) ByteLag() uint64 {
return (w.ServerWalEnd - w.WalStart) return (w.ServerWalEnd - w.WalStart)
} }
@ -80,16 +80,16 @@ func (w *WalMessage) String() string {
type ServerHeartbeat struct { type ServerHeartbeat struct {
// The current max wal position on the server, // The current max wal position on the server,
// used for lag tracking // used for lag tracking
ServerWalEnd int64 ServerWalEnd uint64
// The server time, in microseconds since jan 1 2000 // The server time, in microseconds since jan 1 2000
ServerTime int64 ServerTime uint64
// If 1, the server is requesting a standby status message // If 1, the server is requesting a standby status message
// to be sent immediately. // to be sent immediately.
ReplyRequested byte ReplyRequested byte
} }
func (s *ServerHeartbeat) Time() time.Time { func (s *ServerHeartbeat) Time() time.Time {
return time.Unix(0, (s.ServerTime*1000)+epochNano) return time.Unix(0, (int64(s.ServerTime)*1000)+epochNano)
} }
func (s *ServerHeartbeat) String() string { func (s *ServerHeartbeat) String() string {
@ -109,13 +109,13 @@ type ReplicationMessage struct {
// all wal positions are typically set to the same value. // all wal positions are typically set to the same value.
type StandbyStatus struct { type StandbyStatus struct {
// The WAL position that's been locally written // The WAL position that's been locally written
WalWritePosition int64 WalWritePosition uint64
// The WAL position that's been locally flushed // The WAL position that's been locally flushed
WalFlushPosition int64 WalFlushPosition uint64
// The WAL position that's been locally applied // The WAL position that's been locally applied
WalApplyPosition int64 WalApplyPosition uint64
// The client time in microseconds since jan 1 2000 // The client time in microseconds since jan 1 2000
ClientTime int64 ClientTime uint64
// If 1, requests the server to immediately send a // If 1, requests the server to immediately send a
// server heartbeat // server heartbeat
ReplyRequested byte ReplyRequested byte
@ -123,12 +123,12 @@ type StandbyStatus struct {
// Create a standby status struct, which sets all the WAL positions // Create a standby status struct, which sets all the WAL positions
// to the given wal position, and the client time to the current time. // to the given wal position, and the client time to the current time.
func NewStandbyStatus(walPosition int64) (status *StandbyStatus) { func NewStandbyStatus(walPosition uint64) (status *StandbyStatus) {
status = new(StandbyStatus) status = new(StandbyStatus)
status.WalFlushPosition = walPosition status.WalFlushPosition = walPosition
status.WalApplyPosition = walPosition status.WalApplyPosition = walPosition
status.WalWritePosition = walPosition status.WalWritePosition = walPosition
status.ClientTime = (time.Now().UnixNano() - epochNano) / 1000 status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000)
return return
} }
@ -138,10 +138,10 @@ func NewStandbyStatus(walPosition int64) (status *StandbyStatus) {
func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) {
writeBuf := newWriteBuf(c, copyData) writeBuf := newWriteBuf(c, copyData)
writeBuf.WriteByte(standbyStatusUpdate) writeBuf.WriteByte(standbyStatusUpdate)
writeBuf.WriteInt64(k.WalWritePosition) writeBuf.WriteInt64(int64(k.WalWritePosition))
writeBuf.WriteInt64(k.WalFlushPosition) writeBuf.WriteInt64(int64(k.WalFlushPosition))
writeBuf.WriteInt64(k.WalApplyPosition) writeBuf.WriteInt64(int64(k.WalApplyPosition))
writeBuf.WriteInt64(k.ClientTime) writeBuf.WriteInt64(int64(k.ClientTime))
writeBuf.WriteByte(k.ReplyRequested) writeBuf.WriteByte(k.ReplyRequested)
writeBuf.closeMsg() writeBuf.closeMsg()
@ -202,9 +202,9 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
serverWalEnd := reader.readInt64() serverWalEnd := reader.readInt64()
serverTime := reader.readInt64() serverTime := reader.readInt64()
walData := reader.readBytes(reader.msgBytesRemaining) walData := reader.readBytes(reader.msgBytesRemaining)
walMessage := WalMessage{WalStart: walStart, walMessage := WalMessage{WalStart: uint64(walStart),
ServerWalEnd: serverWalEnd, ServerWalEnd: uint64(serverWalEnd),
ServerTime: serverTime, ServerTime: uint64(serverTime),
WalData: walData, WalData: walData,
} }
@ -213,7 +213,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) {
serverWalEnd := reader.readInt64() serverWalEnd := reader.readInt64()
serverTime := reader.readInt64() serverTime := reader.readInt64()
replyNow := reader.readByte() replyNow := reader.readByte()
h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow} h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow}
return &ReplicationMessage{ServerHeartbeat: h}, nil return &ReplicationMessage{ServerHeartbeat: h}, nil
} }
} }

View File

@ -88,7 +88,7 @@ func TestSimpleReplicationConnection(t *testing.T) {
i = 0 i = 0
var foundTimes []int64 var foundTimes []int64
var foundCount int var foundCount int
var maxWal int64 var maxWal uint64
for { for {
var message *pgx.ReplicationMessage var message *pgx.ReplicationMessage