From c88c1101698756d3d42ecc05e2d2896ed9b61deb Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 9 Jan 2017 11:32:02 -0800 Subject: [PATCH 01/13] ReplicationConn refactor --- helper_test.go | 16 ++++++++ msg_reader.go | 2 +- replication.go | 95 ++++++++++++++++++++++++++++++--------------- replication_test.go | 9 ++--- value_reader.go | 2 +- values.go | 8 ++-- 6 files changed, 89 insertions(+), 43 deletions(-) diff --git a/helper_test.go b/helper_test.go index ed5a9644..eff731e8 100644 --- a/helper_test.go +++ b/helper_test.go @@ -13,6 +13,15 @@ func mustConnect(t testing.TB, config pgx.ConnConfig) *pgx.Conn { return conn } +func mustReplicationConnect(t testing.TB, config pgx.ConnConfig) *pgx.ReplicationConn { + conn, err := pgx.ReplicationConnect(config) + if err != nil { + t.Fatalf("Unable to establish connection: %v", err) + } + return conn +} + + func closeConn(t testing.TB, conn *pgx.Conn) { err := conn.Close() if err != nil { @@ -20,6 +29,13 @@ func closeConn(t testing.TB, conn *pgx.Conn) { } } +func closeReplicationConn(t testing.TB, conn *pgx.ReplicationConn) { + err := conn.Close() + if err != nil { + t.Fatalf("conn.Close unexpectedly failed: %v", err) + } +} + func mustExec(t testing.TB, conn *pgx.Conn, sql string, arguments ...interface{}) (commandTag pgx.CommandTag) { var err error if commandTag, err = conn.Exec(sql, arguments...); err != nil { diff --git a/msg_reader.go b/msg_reader.go index 59617b73..21db5d26 100644 --- a/msg_reader.go +++ b/msg_reader.go @@ -21,7 +21,7 @@ func (r *msgReader) Err() error { return r.err } -// fatal tells r that a Fatal error has occurred +// fatal tells rc that a Fatal error has occurred func (r *msgReader) fatal(err error) { if r.shouldLog(LogLevelTrace) { r.log(LogLevelTrace, "msgReader.fatal", "error", err, "msgBytesRemaining", r.msgBytesRemaining) diff --git a/replication.go b/replication.go index 7d4c56e2..f73b8528 100644 --- a/replication.go +++ b/replication.go @@ -151,11 +151,28 @@ func NewStandbyStatus(walPositions ...uint64) (status *StandbyStatus, err error) return } +func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) { + if config.RuntimeParams == nil { + config.RuntimeParams = make(map[string]string) + } + config.RuntimeParams["replication"] = "database" + + c,err := Connect(config) + if err != nil { + return + } + return &ReplicationConn{c: c}, nil +} + +type ReplicationConn struct { + c *Conn +} + // Send standby status to the server, which both acts as a keepalive // message to the server, as well as carries the WAL position of the // client, which then updates the server's replication slot position. -func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { - writeBuf := newWriteBuf(c, copyData) +func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { + writeBuf := newWriteBuf(rc.c, copyData) writeBuf.WriteByte(standbyStatusUpdate) writeBuf.WriteInt64(int64(k.WalWritePosition)) writeBuf.WriteInt64(int64(k.WalFlushPosition)) @@ -165,9 +182,9 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { writeBuf.closeMsg() - _, err = c.conn.Write(writeBuf.buf) + _, err = rc.c.conn.Write(writeBuf.buf) if err != nil { - c.die(err) + rc.c.die(err) } return @@ -175,37 +192,41 @@ func (c *Conn) SendStandbyStatus(k *StandbyStatus) (err error) { // Send the message to formally stop the replication stream. This // is done before calling Close() during a clean shutdown. -func (c *Conn) StopReplication() (err error) { - writeBuf := newWriteBuf(c, copyDone) +func (rc *ReplicationConn) StopReplication() (err error) { + writeBuf := newWriteBuf(rc.c, copyDone) writeBuf.closeMsg() - _, err = c.conn.Write(writeBuf.buf) + _, err = rc.c.conn.Write(writeBuf.buf) if err != nil { - c.die(err) + rc.c.die(err) } return } +func (rc *ReplicationConn) Close() error { + return rc.c.Close() +} -func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { + +func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { var t byte var reader *msgReader - t, reader, err = c.rxMsg() + t, reader, err = rc.c.rxMsg() if err != nil { return } switch t { case noticeResponse: - pgError := c.rxErrorResponse(reader) - if c.shouldLog(LogLevelInfo) { - c.log(LogLevelInfo, pgError.Error()) + pgError := rc.c.rxErrorResponse(reader) + if rc.c.shouldLog(LogLevelInfo) { + rc.c.log(LogLevelInfo, pgError.Error()) } case errorResponse: - err = c.rxErrorResponse(reader) - if c.shouldLog(LogLevelError) { - c.log(LogLevelError, err.Error()) + err = rc.c.rxErrorResponse(reader) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError, err.Error()) } return case copyBothResponse: @@ -235,13 +256,13 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { h := &ServerHeartbeat{ServerWalEnd: uint64(serverWalEnd), ServerTime: uint64(serverTime), ReplyRequested: replyNow} return &ReplicationMessage{ServerHeartbeat: h}, nil default: - if c.shouldLog(LogLevelError) { - c.log(LogLevelError,"Unexpected data playload message type %v", t) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError,"Unexpected data playload message type %v", t) } } default: - if c.shouldLog(LogLevelError) { - c.log(LogLevelError,"Unexpected replication message type %v", t) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError,"Unexpected replication message type %v", t) } } return @@ -256,7 +277,7 @@ func (c *Conn) readReplicationMessage() (r *ReplicationMessage, err error) { // // This returns pgx.ErrNotificationTimeout when there is no replication message by the specified // duration. -func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { +func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationMessage, err error) { var zeroTime time.Time deadline := time.Now().Add(timeout) @@ -269,27 +290,27 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM // deadline and peek into the reader. If a timeout error occurs there // we don't break the pgx connection. If the Peek returns that data // is available then we turn off the read deadline before the rxMsg. - err = c.conn.SetReadDeadline(deadline) + err = rc.c.conn.SetReadDeadline(deadline) if err != nil { return nil, err } // Wait until there is a byte available before continuing onto the normal msg reading path - _, err = c.reader.Peek(1) + _, err = rc.c.reader.Peek(1) if err != nil { - c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline + rc.c.conn.SetReadDeadline(zeroTime) // we can only return one error and we already have one -- so ignore possiple error from SetReadDeadline if err, ok := err.(*net.OpError); ok && err.Timeout() { return nil, ErrNotificationTimeout } return nil, err } - err = c.conn.SetReadDeadline(zeroTime) + err = rc.c.conn.SetReadDeadline(zeroTime) if err != nil { return nil, err } - return c.readReplicationMessage() + return rc.readReplicationMessage() } // Start a replication connection, sending WAL data to the given replication @@ -303,7 +324,7 @@ func (c *Conn) WaitForReplicationMessage(timeout time.Duration) (r *ReplicationM // // This function assumes that slotName has already been created. In order to omit the timeline argument // pass a -1 for the timeline to get the server default behavior. -func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { +func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, timeline int64, pluginArguments ...string) (err error) { var queryString string if timeline >= 0 { queryString = fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL %s TIMELINE %d", slotName, FormatLSN(startLsn), timeline) @@ -315,7 +336,7 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64 queryString += fmt.Sprintf(" %s", arg) } - if err = c.sendQuery(queryString); err != nil { + if err = rc.c.sendQuery(queryString); err != nil { return } @@ -324,12 +345,24 @@ func (c *Conn) StartReplication(slotName string, startLsn uint64, timeline int64 // started. This call will either return nil, nil or if it returns an error // that indicates the start replication command failed var r *ReplicationMessage - r, err = c.WaitForReplicationMessage(initialReplicationResponseTimeout) + r, err = rc.WaitForReplicationMessage(initialReplicationResponseTimeout) if err != nil && r != nil { - if c.shouldLog(LogLevelError) { - c.log(LogLevelError, "Unxpected replication message %v", r) + if rc.c.shouldLog(LogLevelError) { + rc.c.log(LogLevelError, "Unxpected replication message %v", r) } } return } + +// Create the replication slot, using the given name and output plugin. +func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { + _, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + return +} + +// Drop the replication slot for the given name +func (rc *ReplicationConn) DropReplicationSlot(slotName, outputPlugin string) (err error) { + _, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) + return +} diff --git a/replication_test.go b/replication_test.go index 866fe45e..20572edd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -48,13 +48,10 @@ func TestSimpleReplicationConnection(t *testing.T) { conn := mustConnect(t, *replicationConnConfig) defer closeConn(t, conn) - replicationConnConfig.RuntimeParams = make(map[string]string) - replicationConnConfig.RuntimeParams["replication"] = "database" + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) - replicationConn := mustConnect(t, *replicationConnConfig) - defer closeConn(t, replicationConn) - - _, err = replicationConn.Exec("CREATE_REPLICATION_SLOT pgx_test LOGICAL test_decoding") + err = replicationConn.CreateReplicationSlot("pgx_test","test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } diff --git a/value_reader.go b/value_reader.go index a4897543..fb036dd6 100644 --- a/value_reader.go +++ b/value_reader.go @@ -17,7 +17,7 @@ func (r *ValueReader) Err() error { return r.err } -// Fatal tells r that a Fatal error has occurred +// Fatal tells rc that a Fatal error has occurred func (r *ValueReader) Fatal(err error) { r.err = err } diff --git a/values.go b/values.go index b4466b82..bd912020 100644 --- a/values.go +++ b/values.go @@ -129,9 +129,9 @@ func (e SerializationError) Error() string { // Scanner is an interface used to decode values from the PostgreSQL server. type Scanner interface { - // Scan MUST check r.Type().DataType (to check by OID) or - // r.Type().DataTypeName (to check by name) to ensure that it is scanning an - // expected column type. It also MUST check r.Type().FormatCode before + // Scan MUST check rc.Type().DataType (to check by OID) or + // rc.Type().DataTypeName (to check by name) to ensure that it is scanning an + // expected column type. It also MUST check rc.Type().FormatCode before // decoding. It should not assume that it was called on a data type or format // that it understands. Scan(r *ValueReader) error @@ -3167,7 +3167,7 @@ func parseQuotedAclItem(reader *strings.Reader) (AclItem, error) { } } -// Returns the next rune from r, unless it is a backslash; +// Returns the next rune from rc, unless it is a backslash; // in that case, it returns the rune after the backslash. The second // return value tells us whether or not the rune was // preceeded by a backslash (escaped). From af01afca001022722c6239be8b076c202422627a Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 9 Jan 2017 14:19:08 -0800 Subject: [PATCH 02/13] Add the drop replication slot functionality --- replication.go | 2 +- replication_test.go | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/replication.go b/replication.go index f73b8528..6d78a7a1 100644 --- a/replication.go +++ b/replication.go @@ -362,7 +362,7 @@ func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) } // Drop the replication slot for the given name -func (rc *ReplicationConn) DropReplicationSlot(slotName, outputPlugin string) (err error) { +func (rc *ReplicationConn) DropReplicationSlot(slotName string) (err error) { _, err = rc.c.Exec(fmt.Sprintf("DROP_REPLICATION_SLOT %s", slotName)) return } diff --git a/replication_test.go b/replication_test.go index 20572edd..42d133c4 100644 --- a/replication_test.go +++ b/replication_test.go @@ -173,6 +173,10 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Unexpected write position %d", status.WalWritePosition) } + err = replicationConn.DropReplicationSlot("pgx_test") + if err != nil { + t.Fatalf("Failed to drop replication slot %v", err) + } err = replicationConn.Close() if err != nil { t.Fatalf("Replication connection close failed: %v", err) @@ -183,10 +187,4 @@ func TestSimpleReplicationConnection(t *testing.T) { if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } - - _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") - if err != nil { - t.Fatalf("Failed to drop replication slot: %v", err) - } - } From b2f416c07d8ed08f4dc21602a9c57c6eb602a03d Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 9 Jan 2017 14:27:34 -0800 Subject: [PATCH 03/13] Drop replication slot has to run on a live connection, so we'll use the function form for the test. --- replication_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/replication_test.go b/replication_test.go index 42d133c4..20572edd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -173,10 +173,6 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Unexpected write position %d", status.WalWritePosition) } - err = replicationConn.DropReplicationSlot("pgx_test") - if err != nil { - t.Fatalf("Failed to drop replication slot %v", err) - } err = replicationConn.Close() if err != nil { t.Fatalf("Replication connection close failed: %v", err) @@ -187,4 +183,10 @@ func TestSimpleReplicationConnection(t *testing.T) { if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } + + _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } + } From d398d957649de7b8958725bc73a42fac10e70e9b Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Mon, 23 Jan 2017 22:48:17 -0600 Subject: [PATCH 04/13] Explicitly close checked-in connections on ConnPool.Reset --- CHANGELOG.md | 1 + conn_pool.go | 9 +++-- conn_pool_test.go | 83 ++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bedf106b..baef2bdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Fixes * Oid underlying type changed to uint32, previously it was incorrectly int32 (Manni Wood) +* Explicitly close checked-in connections on ConnPool.Reset, previously they were closed by GC ## Features diff --git a/conn_pool.go b/conn_pool.go index eac731dc..6d04565d 100644 --- a/conn_pool.go +++ b/conn_pool.go @@ -255,8 +255,13 @@ func (p *ConnPool) Reset() { defer p.cond.L.Unlock() p.resetCount++ - p.allConnections = make([]*Conn, 0, p.maxConnections) - p.availableConnections = make([]*Conn, 0, p.maxConnections) + p.allConnections = p.allConnections[0:0] + + for _, conn := range p.availableConnections { + conn.Close() + } + + p.availableConnections = p.availableConnections[0:0] } // invalidateAcquired causes all acquired connections to be closed when released. diff --git a/conn_pool_test.go b/conn_pool_test.go index 71a361a6..ab76bfb7 100644 --- a/conn_pool_test.go +++ b/conn_pool_test.go @@ -465,32 +465,38 @@ func TestPoolReleaseDiscardsDeadConnections(t *testing.T) { } } -func TestConnPoolReset(t *testing.T) { +func TestConnPoolResetClosesCheckedOutConnectionsOnRelease(t *testing.T) { t.Parallel() pool := createConnPool(t, 5) defer pool.Close() inProgressRows := []*pgx.Rows{} + var inProgressPIDs []int32 // Start some queries and reset pool while they are in progress for i := 0; i < 10; i++ { - rows, err := pool.Query("select generate_series(1,5)::bigint") + rows, err := pool.Query("select pg_backend_pid() union all select 1 union all select 2") if err != nil { t.Fatal(err) } + rows.Next() + var pid int32 + rows.Scan(&pid) + inProgressPIDs = append(inProgressPIDs, pid) + inProgressRows = append(inProgressRows, rows) pool.Reset() } // Check that the queries are completed for _, rows := range inProgressRows { - var expectedN int64 + var expectedN int32 for rows.Next() { expectedN++ - var n int64 + var n int32 err := rows.Scan(&n) if err != nil { t.Fatal(err) @@ -510,6 +516,75 @@ func TestConnPoolReset(t *testing.T) { if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 { t.Fatalf("Unexpected connection pool stats: %v", stats) } + + var connCount int + err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount) + if err != nil { + t.Fatal(err) + } + if connCount != 0 { + t.Fatalf("%d connections not closed", connCount) + } +} + +func TestConnPoolResetClosesCheckedInConnections(t *testing.T) { + t.Parallel() + + pool := createConnPool(t, 5) + defer pool.Close() + + inProgressRows := []*pgx.Rows{} + var inProgressPIDs []int32 + + // Start some queries and reset pool while they are in progress + for i := 0; i < 5; i++ { + rows, err := pool.Query("select pg_backend_pid()") + if err != nil { + t.Fatal(err) + } + + inProgressRows = append(inProgressRows, rows) + } + + // Check that the queries are completed + for _, rows := range inProgressRows { + for rows.Next() { + var pid int32 + err := rows.Scan(&pid) + if err != nil { + t.Fatal(err) + } + inProgressPIDs = append(inProgressPIDs, pid) + + } + + if err := rows.Err(); err != nil { + t.Fatal(err) + } + } + + // Ensure pool is fully connected and available + stats := pool.Stat() + if stats.CurrentConnections != 5 || stats.AvailableConnections != 5 { + t.Fatalf("Unexpected connection pool stats: %v", stats) + } + + pool.Reset() + + // Pool should be empty after reset + stats = pool.Stat() + if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 { + t.Fatalf("Unexpected connection pool stats: %v", stats) + } + + var connCount int + err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount) + if err != nil { + t.Fatal(err) + } + if connCount != 0 { + t.Fatalf("%d connections not closed", connCount) + } } func TestConnPoolTransaction(t *testing.T) { From c8080fc4a1bfa44bf90383ad0fdce2f68b7d313c Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Tue, 24 Jan 2017 08:29:01 -0600 Subject: [PATCH 05/13] Update travis config --- .travis.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 32b35bbd..d9ea43b0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,8 @@ language: go go: - - 1.7.1 - - 1.6.3 + - 1.7.4 + - 1.6.4 - tip # Derived from https://github.com/lib/pq/blob/master/.travis.yml @@ -34,7 +34,6 @@ env: - PGVERSION=9.4 - PGVERSION=9.3 - PGVERSION=9.2 - - PGVERSION=9.1 # The tricky test user, below, has to actually exist so that it can be used in a test # of aclitem formatting. It turns out aclitems cannot contain non-existing users/roles. From 41d9c0f3381c66e534ff8c9ca16f5c637d93326d Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Thu, 26 Jan 2017 18:24:21 -0800 Subject: [PATCH 06/13] Add tests for replication slot drop, and go fmt --- replication.go | 21 ++++++++++++++------- replication_test.go | 36 ++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/replication.go b/replication.go index 6d78a7a1..8923d533 100644 --- a/replication.go +++ b/replication.go @@ -8,10 +8,10 @@ import ( ) const ( - copyBothResponse = 'W' - walData = 'w' - senderKeepalive = 'k' - standbyStatusUpdate = 'r' + copyBothResponse = 'W' + walData = 'w' + senderKeepalive = 'k' + standbyStatusUpdate = 'r' initialReplicationResponseTimeout = 5 * time.Second ) @@ -157,7 +157,7 @@ func ReplicationConnect(config ConnConfig) (r *ReplicationConn, err error) { } config.RuntimeParams["replication"] = "database" - c,err := Connect(config) + c, err := Connect(config) if err != nil { return } @@ -208,6 +208,13 @@ func (rc *ReplicationConn) Close() error { return rc.c.Close() } +func (rc *ReplicationConn) IsAlive() bool { + return rc.c.IsAlive() +} + +func (rc *ReplicationConn) CauseOfDeath() error { + return rc.c.CauseOfDeath() +} func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err error) { var t byte @@ -257,12 +264,12 @@ func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err return &ReplicationMessage{ServerHeartbeat: h}, nil default: if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError,"Unexpected data playload message type %v", t) + rc.c.log(LogLevelError, "Unexpected data playload message type %v", t) } } default: if rc.c.shouldLog(LogLevelError) { - rc.c.log(LogLevelError,"Unexpected replication message type %v", t) + rc.c.log(LogLevelError, "Unexpected replication message type %v", t) } } return diff --git a/replication_test.go b/replication_test.go index 20572edd..ee187ec2 100644 --- a/replication_test.go +++ b/replication_test.go @@ -1,13 +1,13 @@ package pgx_test import ( + "fmt" "github.com/jackc/pgx" + "reflect" "strconv" "strings" "testing" "time" - "reflect" - "fmt" ) // This function uses a postgresql 9.6 specific column @@ -51,7 +51,7 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - err = replicationConn.CreateReplicationSlot("pgx_test","test_decoding") + err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } @@ -152,14 +152,23 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn.SendStandbyStatus(status) replicationConn.StopReplication() + if replicationConn.IsAlive() == false { + t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) + } + + err = replicationConn.Close() + if err != nil { + t.Fatalf("Replication connection close failed: %v", err) + } + // Let's push the boundary conditions of the standby status and ensure it errors correctly - status, err = pgx.NewStandbyStatus(0,1,2,3,4) + status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4) if err == nil { - t.Errorf("Expected error from new standby status, got %v",status) + t.Errorf("Expected error from new standby status, got %v", status) } // And if you provide 3 args, ensure the right fields are set - status, err = pgx.NewStandbyStatus(1,2,3) + status, err = pgx.NewStandbyStatus(1, 2, 3) if err != nil { t.Errorf("Failed to create test status: %v", err) } @@ -173,20 +182,23 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Unexpected write position %d", status.WalWritePosition) } - err = replicationConn.Close() - if err != nil { - t.Fatalf("Replication connection close failed: %v", err) - } - restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") integerRestartLsn, _ := pgx.ParseLSN(restartLsn) if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } - _, err = conn.Exec("select pg_drop_replication_slot($1)", "pgx_test") + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn2) + + err = replicationConn2.DropReplicationSlot("pgx_test") if err != nil { t.Fatalf("Failed to drop replication slot: %v", err) } + droppedLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") + if droppedLsn != "" { + t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn) + } + } From 1424fb2b4284fdf5dc42b93036f4b51ed66b7083 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 11:35:48 -0800 Subject: [PATCH 07/13] Add IdentifySystem and TimelineHistory functions, and tighten up the testing --- replication.go | 82 ++++++++++++++++++---- replication_test.go | 167 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 214 insertions(+), 35 deletions(-) diff --git a/replication.go b/replication.go index 8923d533..7b28d6b6 100644 --- a/replication.go +++ b/replication.go @@ -190,20 +190,6 @@ func (rc *ReplicationConn) SendStandbyStatus(k *StandbyStatus) (err error) { return } -// Send the message to formally stop the replication stream. This -// is done before calling Close() during a clean shutdown. -func (rc *ReplicationConn) StopReplication() (err error) { - writeBuf := newWriteBuf(rc.c, copyDone) - - writeBuf.closeMsg() - - _, err = rc.c.conn.Write(writeBuf.buf) - if err != nil { - rc.c.die(err) - } - return -} - func (rc *ReplicationConn) Close() error { return rc.c.Close() } @@ -320,6 +306,74 @@ func (rc *ReplicationConn) WaitForReplicationMessage(timeout time.Duration) (r * return rc.readReplicationMessage() } +func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { + rc.c.lastActivityTime = time.Now() + + rows := rc.c.getRows(sql, nil) + + if err := rc.c.lock(); err != nil { + rows.abort(err) + return rows, err + } + rows.unlockConn = true + + err := rc.c.sendSimpleQuery(sql) + if err != nil { + rows.abort(err) + } + + var t byte + var r *msgReader + t, r, err = rc.c.rxMsg() + if err != nil { + return nil, err + } + + switch t { + case rowDescription: + rows.fields = rc.c.rxRowDescription(r) + // We don't have c.PgTypes here because we're a replication + // connection. This means the field descriptions will have + // only Oids. Not much we can do about this. + default: + if e := rc.c.processContextFreeMsg(t, r); e != nil { + rows.abort(e) + return rows, e + } + } + + return rows, rows.err +} + +// Execute the "IDENTIFY_SYSTEM" command as documented here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// This will return (if successful) a result set that has a single row +// that contains the systemid, current timeline, xlogpos and database +// name. +// +// NOTE: Because this is a replication mode connection, we don't have +// type names, so the field descriptions in the result will have only +// Oids and no DataTypeName values +func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { + return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") +} + +// Execute the "TIMELINE_HISTORY" command as documented here: +// https://www.postgresql.org/docs/9.5/static/protocol-replication.html +// +// This will return (if successful) a result set that has a single row +// that contains the filename of the history file and the content +// of the history file. If called for timeline 1, typically this will +// generate an error that the timeline history file does not exist. +// +// NOTE: Because this is a replication mode connection, we don't have +// type names, so the field descriptions in the result will have only +// Oids and no DataTypeName values +func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) { + return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) +} + // Start a replication connection, sending WAL data to the given replication // receiver. This function wraps a START_REPLICATION command as documented // here: diff --git a/replication_test.go b/replication_test.go index ee187ec2..73874a1f 100644 --- a/replication_test.go +++ b/replication_test.go @@ -150,7 +150,6 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Errorf("Failed to create standby status %v", err) } replicationConn.SendStandbyStatus(status) - replicationConn.StopReplication() if replicationConn.IsAlive() == false { t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) @@ -161,25 +160,8 @@ func TestSimpleReplicationConnection(t *testing.T) { t.Fatalf("Replication connection close failed: %v", err) } - // Let's push the boundary conditions of the standby status and ensure it errors correctly - status, err = pgx.NewStandbyStatus(0, 1, 2, 3, 4) - if err == nil { - t.Errorf("Expected error from new standby status, got %v", status) - } - - // And if you provide 3 args, ensure the right fields are set - status, err = pgx.NewStandbyStatus(1, 2, 3) - if err != nil { - t.Errorf("Failed to create test status: %v", err) - } - if status.WalFlushPosition != 1 { - t.Errorf("Unexpected flush position %d", status.WalFlushPosition) - } - if status.WalApplyPosition != 2 { - t.Errorf("Unexpected apply position %d", status.WalApplyPosition) - } - if status.WalWritePosition != 3 { - t.Errorf("Unexpected write position %d", status.WalWritePosition) + if replicationConn.IsAlive() == true { + t.Errorf("Connection still alive: %v", replicationConn.CauseOfDeath()) } restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") @@ -200,5 +182,148 @@ func TestSimpleReplicationConnection(t *testing.T) { if droppedLsn != "" { t.Errorf("Got odd flush lsn %s for supposedly dropped slot", droppedLsn) } - } + +func TestReplicationConn_DropReplicationSlot(t *testing.T) { + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) + + err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + err = replicationConn.DropReplicationSlot("pgx_slot_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } + + // We re-create to ensure the drop worked. + err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + if err != nil { + t.Logf("replication slot create failed: %v", err) + } + + // And finally we drop to ensure we don't leave dirty state + err = replicationConn.DropReplicationSlot("pgx_slot_test") + if err != nil { + t.Fatalf("Failed to drop replication slot: %v", err) + } +} + +func TestIdentifySystem(t *testing.T) { + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn2) + + r, err := replicationConn2.IdentifySystem() + if err != nil { + t.Error(err) + } + defer r.Close() + for _, fd := range r.FieldDescriptions() { + t.Logf("Field: %s of type %v", fd.Name, fd.DataType) + } + + var rowCount int + for r.Next() { + rowCount++ + values, err := r.Values() + if err != nil { + t.Error(err) + } + t.Logf("Row values: %v", values) + } + if r.Err() != nil { + t.Error(r.Err()) + } + + if rowCount == 0 { + t.Errorf("Failed to find any rows: %d", rowCount) + } +} + +func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { + r, err := rc.IdentifySystem() + if err != nil { + t.Error(err) + } + defer r.Close() + for r.Next() { + values, e := r.Values() + if e != nil { + t.Error(e) + } + timeline, e := strconv.Atoi(values[1].(string)) + if e != nil { + t.Error(e) + } + return timeline + } + t.Fatal("Failed to read timeline") + return -1 +} + + +func TestGetTimelineHistory(t *testing.T) { + replicationConn := mustReplicationConnect(t, *replicationConnConfig) + defer closeReplicationConn(t, replicationConn) + + timeline := getCurrentTimeline(t, replicationConn) + + r, err := replicationConn.TimelineHistory(timeline) + if err != nil { + t.Errorf("%#v", err) + } + defer r.Close() + + for _, fd := range r.FieldDescriptions() { + t.Logf("Field: %s of type %v", fd.Name, fd.DataType) + } + + var rowCount int + for r.Next() { + rowCount++ + values, err := r.Values() + if err != nil { + t.Error(err) + } + t.Logf("Row values: %v", values) + } + if r.Err() != nil { + if strings.Contains(r.Err().Error(), "No such file or directory") { + // This is normal, this means the timeline we're on has no + // history, which is the common case in a test db that + // has only one timeline + return + } + t.Error(r.Err()) + } + + // If we have a timeline history (see above) there should have been + // rows emitted + if rowCount == 0 { + t.Errorf("Failed to find any rows: %d", rowCount) + } +} + +func TestStandbyStatusParsing(t *testing.T) { + // Let's push the boundary conditions of the standby status and ensure it errors correctly + status, err := pgx.NewStandbyStatus(0, 1, 2, 3, 4) + if err == nil { + t.Errorf("Expected error from new standby status, got %v", status) + } + + // And if you provide 3 args, ensure the right fields are set + status, err = pgx.NewStandbyStatus(1, 2, 3) + if err != nil { + t.Errorf("Failed to create test status: %v", err) + } + if status.WalFlushPosition != 1 { + t.Errorf("Unexpected flush position %d", status.WalFlushPosition) + } + if status.WalApplyPosition != 2 { + t.Errorf("Unexpected apply position %d", status.WalApplyPosition) + } + if status.WalWritePosition != 3 { + t.Errorf("Unexpected write position %d", status.WalWritePosition) + } +} \ No newline at end of file From 86fef0e5d74d2bb3472d8a2443ea46fb9f25d519 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 11:37:07 -0800 Subject: [PATCH 08/13] go fmt --- replication_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/replication_test.go b/replication_test.go index 73874a1f..96e223df 100644 --- a/replication_test.go +++ b/replication_test.go @@ -262,7 +262,6 @@ func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { return -1 } - func TestGetTimelineHistory(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) @@ -326,4 +325,4 @@ func TestStandbyStatusParsing(t *testing.T) { if status.WalWritePosition != 3 { t.Errorf("Unexpected write position %d", status.WalWritePosition) } -} \ No newline at end of file +} From 76ac06083e7d32fab964ba782a54ce140cd241c6 Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 13:55:18 -0800 Subject: [PATCH 09/13] Dont test when you dont have a config --- replication_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/replication_test.go b/replication_test.go index 96e223df..26399d0c 100644 --- a/replication_test.go +++ b/replication_test.go @@ -185,6 +185,10 @@ func TestSimpleReplicationConnection(t *testing.T) { } func TestReplicationConn_DropReplicationSlot(t *testing.T) { + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) @@ -211,6 +215,10 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { } func TestIdentifySystem(t *testing.T) { + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn2) @@ -263,6 +271,10 @@ func getCurrentTimeline(t *testing.T, rc *pgx.ReplicationConn) int { } func TestGetTimelineHistory(t *testing.T) { + if replicationConnConfig == nil { + t.Skip("Skipping due to undefined replicationConnConfig") + } + replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) From be5a9a0aff30cb9fd8870d1292ec9d1cc9a5d21f Mon Sep 17 00:00:00 2001 From: Kris Wehner Date: Mon, 30 Jan 2017 14:12:12 -0800 Subject: [PATCH 10/13] Clean shutdown after the flush lsn check --- replication_test.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/replication_test.go b/replication_test.go index 26399d0c..4f810c78 100644 --- a/replication_test.go +++ b/replication_test.go @@ -151,25 +151,14 @@ func TestSimpleReplicationConnection(t *testing.T) { } replicationConn.SendStandbyStatus(status) - if replicationConn.IsAlive() == false { - t.Errorf("Connection died: %v", replicationConn.CauseOfDeath()) - } - - err = replicationConn.Close() - if err != nil { - t.Fatalf("Replication connection close failed: %v", err) - } - - if replicationConn.IsAlive() == true { - t.Errorf("Connection still alive: %v", replicationConn.CauseOfDeath()) - } - restartLsn := getConfirmedFlushLsnFor(t, conn, "pgx_test") integerRestartLsn, _ := pgx.ParseLSN(restartLsn) if integerRestartLsn != maxWal { t.Fatalf("Wal offset update failed, expected %s found %s", pgx.FormatLSN(maxWal), restartLsn) } + closeReplicationConn(t, replicationConn) + replicationConn2 := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn2) From 27b90681e860cb2cc7de8c777a51850f3587f32e Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 2 Feb 2017 19:30:26 -0600 Subject: [PATCH 11/13] Fix find-and-replace errors --- value_reader.go | 2 +- values.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/value_reader.go b/value_reader.go index fb036dd6..a4897543 100644 --- a/value_reader.go +++ b/value_reader.go @@ -17,7 +17,7 @@ func (r *ValueReader) Err() error { return r.err } -// Fatal tells rc that a Fatal error has occurred +// Fatal tells r that a Fatal error has occurred func (r *ValueReader) Fatal(err error) { r.err = err } diff --git a/values.go b/values.go index bd912020..b4466b82 100644 --- a/values.go +++ b/values.go @@ -129,9 +129,9 @@ func (e SerializationError) Error() string { // Scanner is an interface used to decode values from the PostgreSQL server. type Scanner interface { - // Scan MUST check rc.Type().DataType (to check by OID) or - // rc.Type().DataTypeName (to check by name) to ensure that it is scanning an - // expected column type. It also MUST check rc.Type().FormatCode before + // Scan MUST check r.Type().DataType (to check by OID) or + // r.Type().DataTypeName (to check by name) to ensure that it is scanning an + // expected column type. It also MUST check r.Type().FormatCode before // decoding. It should not assume that it was called on a data type or format // that it understands. Scan(r *ValueReader) error @@ -3167,7 +3167,7 @@ func parseQuotedAclItem(reader *strings.Reader) (AclItem, error) { } } -// Returns the next rune from rc, unless it is a backslash; +// Returns the next rune from r, unless it is a backslash; // in that case, it returns the rune after the backslash. The second // return value tells us whether or not the rune was // preceeded by a backslash (escaped). From 63e482f6bc338442a8b712a16a275105ab2ffc9f Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 2 Feb 2017 19:33:03 -0600 Subject: [PATCH 12/13] Update changelog for logical replication --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index baef2bdd..a02c8e11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * Add NullOid type (Manni Wood) * Add json/jsonb binary support to allow use with CopyTo * Add named error ErrAcquireTimeout (Alexander Staubo) +* Add logical replication decoding (Kris Wehner) ## Compatibility From a52a6bd5558b1450ee69178739caf0489c23c3e6 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 2 Feb 2017 20:20:31 -0600 Subject: [PATCH 13/13] Add PgxScanner interface Enables types to support database/sql at the same time as pgx. fixes #232 --- CHANGELOG.md | 1 + doc.go | 11 ++++--- example_custom_type_test.go | 2 +- query.go | 5 +++ query_test.go | 62 +++++++++++++++++++++++++++++++++++++ values.go | 16 +++++++++- 6 files changed, 90 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a02c8e11..126baef4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * Add json/jsonb binary support to allow use with CopyTo * Add named error ErrAcquireTimeout (Alexander Staubo) * Add logical replication decoding (Kris Wehner) +* Add PgxScanner interface to allow types to simultaneously support database/sql and pgx (Jack Christensen) ## Compatibility diff --git a/doc.go b/doc.go index 980c5a74..2b735bd5 100644 --- a/doc.go +++ b/doc.go @@ -157,14 +157,15 @@ Custom Type Support pgx includes support for the common data types like integers, floats, strings, dates, and times that have direct mappings between Go and SQL. Support can be added for additional types like point, hstore, numeric, etc. that do not have -direct mappings in Go by the types implementing Scanner and Encoder. +direct mappings in Go by the types implementing ScannerPgx and Encoder. Custom types can support text or binary formats. Binary format can provide a large performance increase. The natural place for deciding the format for a -value would be in Scanner as it is responsible for decoding the returned data. -However, that is impossible as the query has already been sent by the time the -Scanner is invoked. The solution to this is the global DefaultTypeFormats. If a -custom type prefers binary format it should register it there. +value would be in ScannerPgx as it is responsible for decoding the returned +data. However, that is impossible as the query has already been sent by the time +the ScannerPgx is invoked. The solution to this is the global +DefaultTypeFormats. If a custom type prefers binary format it should register it +there. pgx.DefaultTypeFormats["point"] = pgx.BinaryFormatCode diff --git a/example_custom_type_test.go b/example_custom_type_test.go index c8d8e220..34cc3165 100644 --- a/example_custom_type_test.go +++ b/example_custom_type_test.go @@ -18,7 +18,7 @@ type NullPoint struct { Valid bool // Valid is true if not NULL } -func (p *NullPoint) Scan(vr *pgx.ValueReader) error { +func (p *NullPoint) ScanPgx(vr *pgx.ValueReader) error { if vr.Type().DataTypeName != "point" { return pgx.SerializationError(fmt.Sprintf("NullPoint.Scan cannot decode %s (OID %d)", vr.Type().DataTypeName, vr.Type().DataType)) } diff --git a/query.go b/query.go index 4e4b8e53..19b867e2 100644 --- a/query.go +++ b/query.go @@ -264,6 +264,11 @@ func (rows *Rows) Scan(dest ...interface{}) (err error) { if err != nil { rows.Fatal(scanArgError{col: i, err: err}) } + } else if s, ok := d.(PgxScanner); ok { + err = s.ScanPgx(vr) + if err != nil { + rows.Fatal(scanArgError{col: i, err: err}) + } } else if s, ok := d.(sql.Scanner); ok { var val interface{} if 0 <= vr.Len() { diff --git a/query_test.go b/query_test.go index 457bc1fb..f08887b5 100644 --- a/query_test.go +++ b/query_test.go @@ -3,6 +3,7 @@ package pgx_test import ( "bytes" "database/sql" + "fmt" "strings" "testing" "time" @@ -291,6 +292,67 @@ func TestConnQueryScanner(t *testing.T) { ensureConnValid(t, conn) } +type pgxNullInt64 struct { + Int64 int64 + Valid bool // Valid is true if Int64 is not NULL +} + +func (n *pgxNullInt64) ScanPgx(vr *pgx.ValueReader) error { + if vr.Type().DataType != pgx.Int8Oid { + return pgx.SerializationError(fmt.Sprintf("pgxNullInt64.Scan cannot decode OID %d", vr.Type().DataType)) + } + + if vr.Len() == -1 { + n.Int64, n.Valid = 0, false + return nil + } + n.Valid = true + + err := pgx.Decode(vr, &n.Int64) + if err != nil { + return err + } + return vr.Err() +} + +func TestConnQueryPgxScanner(t *testing.T) { + t.Parallel() + + conn := mustConnect(t, *defaultConnConfig) + defer closeConn(t, conn) + + rows, err := conn.Query("select null::int8, 1::int8") + if err != nil { + t.Fatalf("conn.Query failed: %v", err) + } + + ok := rows.Next() + if !ok { + t.Fatal("rows.Next terminated early") + } + + var n, m pgxNullInt64 + err = rows.Scan(&n, &m) + if err != nil { + t.Fatalf("rows.Scan failed: %v", err) + } + rows.Close() + + if n.Valid { + t.Error("Null should not be valid, but it was") + } + + if !m.Valid { + t.Error("1 should be valid, but it wasn't") + } + + if m.Int64 != 1 { + t.Errorf("m.Int64 should have been 1, but it was %v", m.Int64) + } + + ensureConnValid(t, conn) +} + func TestConnQueryErrorWhileReturningRows(t *testing.T) { t.Parallel() diff --git a/values.go b/values.go index b4466b82..938462d9 100644 --- a/values.go +++ b/values.go @@ -127,7 +127,9 @@ func (e SerializationError) Error() string { return string(e) } -// Scanner is an interface used to decode values from the PostgreSQL server. +// Deprecated: Scanner is an interface used to decode values from the PostgreSQL +// server. To allow types to support pgx and database/sql.Scan this interface +// has been deprecated in favor of PgxScanner. type Scanner interface { // Scan MUST check r.Type().DataType (to check by OID) or // r.Type().DataTypeName (to check by name) to ensure that it is scanning an @@ -137,6 +139,18 @@ type Scanner interface { Scan(r *ValueReader) error } +// PgxScanner is an interface used to decode values from the PostgreSQL server. +// It is used exactly the same as the Scanner interface. It simply has renamed +// the method. +type PgxScanner interface { + // ScanPgx MUST check r.Type().DataType (to check by OID) or + // r.Type().DataTypeName (to check by name) to ensure that it is scanning an + // expected column type. It also MUST check r.Type().FormatCode before + // decoding. It should not assume that it was called on a data type or format + // that it understands. + ScanPgx(r *ValueReader) error +} + // Encoder is an interface used to encode values for transmission to the // PostgreSQL server. type Encoder interface {