mirror of https://github.com/jackc/pgx.git
Fix replication with context
The normal connection context timeout cancels the current query. That isn't appropriate for a replication connection.v3-numeric-wip
parent
071f4cc2ad
commit
5702f34407
|
@ -270,16 +270,43 @@ func (rc *ReplicationConn) readReplicationMessage() (r *ReplicationMessage, err
|
||||||
//
|
//
|
||||||
// This returns the context error when there is no replication message before
|
// This returns the context error when there is no replication message before
|
||||||
// the context is canceled.
|
// the context is canceled.
|
||||||
func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (r *ReplicationMessage, err error) {
|
func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*ReplicationMessage, error) {
|
||||||
err = rc.c.initContext(ctx)
|
select {
|
||||||
if err != nil {
|
case <-ctx.Done():
|
||||||
return nil, err
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
if err := rc.c.conn.SetDeadline(time.Now()); err != nil {
|
||||||
|
rc.Close() // Close connection if unable to set deadline
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rc.c.closedChan <- ctx.Err()
|
||||||
|
case <-rc.c.doneChan:
|
||||||
}
|
}
|
||||||
defer func() {
|
|
||||||
err = rc.c.termContext(err)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return rc.readReplicationMessage()
|
r, opErr := rc.readReplicationMessage()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
select {
|
||||||
|
case err = <-rc.c.closedChan:
|
||||||
|
if err := rc.c.conn.SetDeadline(time.Time{}); err != nil {
|
||||||
|
rc.Close() // Close connection if unable to disable deadline
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if opErr == nil {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
case rc.c.doneChan <- struct{}{}:
|
||||||
|
err = opErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return r, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
|
func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) {
|
||||||
|
|
|
@ -3,12 +3,13 @@ package pgx_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/jackc/pgx"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// This function uses a postgresql 9.6 specific column
|
// This function uses a postgresql 9.6 specific column
|
||||||
|
@ -47,14 +48,19 @@ func TestSimpleReplicationConnection(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
conn := mustConnect(t, *replicationConnConfig)
|
conn := mustConnect(t, *replicationConnConfig)
|
||||||
defer closeConn(t, conn)
|
defer func() {
|
||||||
|
// Ensure replication slot is destroyed, but don't check for errors as it
|
||||||
|
// should have already been destroyed.
|
||||||
|
conn.Exec("select pg_drop_replication_slot('pgx_test')")
|
||||||
|
closeConn(t, conn)
|
||||||
|
}()
|
||||||
|
|
||||||
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
|
replicationConn := mustReplicationConnect(t, *replicationConnConfig)
|
||||||
defer closeReplicationConn(t, replicationConn)
|
defer closeReplicationConn(t, replicationConn)
|
||||||
|
|
||||||
err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
|
err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("replication slot create failed: %v", err)
|
t.Fatalf("replication slot create failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do a simple change so we can get some wal data
|
// Do a simple change so we can get some wal data
|
||||||
|
|
Loading…
Reference in New Issue