diff --git a/replication.go b/replication.go index 5586209c..16755f0e 100644 --- a/replication.go +++ b/replication.go @@ -22,15 +22,15 @@ func init() { // Format the given 64bit LSN value into the XXX/XXX format, // which is the format reported by postgres. -func FormatLSN(lsn int64) string { +func FormatLSN(lsn uint64) string { return fmt.Sprintf("%X/%X", uint32(lsn>>32), uint32(lsn)) } // Parse the given XXX/XXX format LSN as reported by postgres, // into a 64 bit integer as used internally by the wire procotols -func ParseLSN(lsn string) (outputLsn int64, err error) { - var upperHalf int64 - var lowerHalf int64 +func ParseLSN(lsn string) (outputLsn uint64, err error) { + var upperHalf uint64 + var lowerHalf uint64 var nparsed int nparsed, err = fmt.Sscanf(lsn, "%X/%X", &upperHalf, &lowerHalf) if err != nil { @@ -50,13 +50,13 @@ func ParseLSN(lsn string) (outputLsn int64, err error) { type WalMessage struct { // The WAL start position of this data. This // is the WAL position we need to track. - WalStart int64 + WalStart uint64 // The server wal end and server time are // documented to track the end position and current // time of the server, both of which appear to be // unimplemented in pg 9.5. - ServerWalEnd int64 - ServerTime int64 + ServerWalEnd uint64 + ServerTime uint64 // The WAL data is the raw unparsed binary WAL entry. // The contents of this are determined by the output // logical encoding plugin. @@ -64,10 +64,10 @@ type WalMessage struct { } func (w *WalMessage) Time() time.Time { - return time.Unix(0, (w.ServerTime*1000)+epochNano) + return time.Unix(0, (int64(w.ServerTime)*1000)+epochNano) } -func (w *WalMessage) ByteLag() int64 { +func (w *WalMessage) ByteLag() uint64 { return (w.ServerWalEnd - w.WalStart) } @@ -80,16 +80,16 @@ func (w *WalMessage) String() string { type ServerHeartbeat struct { // The current max wal position on the server, // used for lag tracking - ServerWalEnd int64 + ServerWalEnd uint64 // The server time, in microseconds since jan 1 2000 - ServerTime int64 + ServerTime uint64 // If 1, the server is requesting a standby status message // to be sent immediately. ReplyRequested byte } func (s *ServerHeartbeat) Time() time.Time { - return time.Unix(0, (s.ServerTime*1000)+epochNano) + return time.Unix(0, (int64(s.ServerTime)*1000)+epochNano) } func (s *ServerHeartbeat) String() string { @@ -109,13 +109,13 @@ type ReplicationMessage struct { // all wal positions are typically set to the same value. type StandbyStatus struct { // The WAL position that's been locally written - WalWritePosition int64 + WalWritePosition uint64 // The WAL position that's been locally flushed - WalFlushPosition int64 + WalFlushPosition uint64 // The WAL position that's been locally applied - WalApplyPosition int64 + WalApplyPosition uint64 // The client time in microseconds since jan 1 2000 - ClientTime int64 + ClientTime uint64 // If 1, requests the server to immediately send a // server heartbeat ReplyRequested byte @@ -123,12 +123,12 @@ type StandbyStatus struct { // Create a standby status struct, which sets all the WAL positions // to the given wal position, and the client time to the current time. -func NewStandbyStatus(walPosition int64) (status *StandbyStatus) { +func NewStandbyStatus(walPosition uint64) (status *StandbyStatus) { status = new(StandbyStatus) status.WalFlushPosition = walPosition status.WalApplyPosition = walPosition status.WalWritePosition = walPosition - status.ClientTime = (time.Now().UnixNano() - epochNano) / 1000 + status.ClientTime = uint64((time.Now().UnixNano() - epochNano) / 1000) return } @@ -138,10 +138,10 @@ func NewStandbyStatus(walPosition int64) (status *StandbyStatus) { func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { writeBuf := newWriteBuf(c, copyData) writeBuf.WriteByte(standbyStatusUpdate) - writeBuf.WriteInt64(k.WalWritePosition) - writeBuf.WriteInt64(k.WalFlushPosition) - writeBuf.WriteInt64(k.WalApplyPosition) - writeBuf.WriteInt64(k.ClientTime) + writeBuf.WriteInt64(int64(k.WalWritePosition)) + writeBuf.WriteInt64(int64(k.WalFlushPosition)) + writeBuf.WriteInt64(int64(k.WalApplyPosition)) + writeBuf.WriteInt64(int64(k.ClientTime)) writeBuf.WriteByte(k.ReplyRequested) writeBuf.closeMsg() @@ -202,9 +202,9 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { serverWalEnd := reader.readInt64() serverTime := reader.readInt64() walData := reader.readBytes(reader.msgBytesRemaining) - walMessage := WalMessage{WalStart: walStart, - ServerWalEnd: serverWalEnd, - ServerTime: serverTime, + walMessage := WalMessage{WalStart: uint64(walStart), + ServerWalEnd: uint64(serverWalEnd), + ServerTime: uint64(serverTime), WalData: walData, } @@ -213,7 +213,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { serverWalEnd := reader.readInt64() serverTime := reader.readInt64() replyNow := reader.readByte() - h := &ServerHeartbeat{ServerWalEnd: serverWalEnd, ServerTime: serverTime, ReplyRequested: replyNow} + h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil } } diff --git a/replication_test.go b/replication_test.go index 08affdf1..1d77982a 100644 --- a/replication_test.go +++ b/replication_test.go @@ -88,7 +88,7 @@ func TestSimpleReplicationConnection(t *testing.T) { i = 0 var foundTimes []int64 var foundCount int - var maxWal int64 + var maxWal uint64 for { var message *pgx.ReplicationMessage