From a1e4b1b9b5a25abf2f8d944a36f56989dda8f284 Mon Sep 17 00:00:00 2001 From: Mark Fletcher Date: Tue, 19 Sep 2017 21:12:15 -0700 Subject: [PATCH 1/4] Changed CreateReplicationSlot to return the consistent_point and snapshot_name. --- replication.go | 10 ++++++++-- replication_test.go | 6 +++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/replication.go b/replication.go index bfa81e54..b1329c6b 100644 --- a/replication.go +++ b/replication.go @@ -435,8 +435,14 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti } // Create the replication slot, using the given name and output plugin. -func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { - _, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) +func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { + var dummy string + var rows *Rows + rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + defer rows.Close() + for rows.Next() { + rows.Scan(&dummy, &consistentPoint, &snapshotName, &dummy) + } return } diff --git a/replication_test.go b/replication_test.go index d75233c1..9989c982 100644 --- a/replication_test.go +++ b/replication_test.go @@ -56,7 +56,7 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") + _, _, err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") if err != nil { t.Fatalf("replication slot create failed: %v", err) } @@ -178,7 +178,7 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + _, _, err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } @@ -188,7 +188,7 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { } // We re-create to ensure the drop worked. - err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + _, _, err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } From b4f9d149c1c2c8dcfb7ff4c702625f42f7514e40 Mon Sep 17 00:00:00 2001 From: Gaspard Douady Date: Thu, 21 Sep 2017 11:04:19 +0200 Subject: [PATCH 2/4] Fix queryRow leftover message on conn those leftover messages are tossed by ensureConnectionReadyForQuery but in the batch use case this is not called between each query. --- batch_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ query.go | 2 ++ 2 files changed, 51 insertions(+) diff --git a/batch_test.go b/batch_test.go index e12e4f32..998e8764 100644 --- a/batch_test.go +++ b/batch_test.go @@ -476,3 +476,52 @@ func TestConnBeginBatchQuerySyntaxError(t *testing.T) { t.Error("conn should be dead, but was alive") } } + +func TestConnBeginBatchSelectInsert(t *testing.T) { + t.Parallel() + + conn := mustConnect(t, *defaultConnConfig) + defer closeConn(t, conn) + + sql := `create temporary table ledger( + id serial primary key, + description varchar not null, + amount int not null +);` + mustExec(t, conn, sql) + + batch := conn.BeginBatch() + batch.Queue("select 1", + nil, + nil, + []int16{pgx.BinaryFormatCode}, + ) + batch.Queue("insert into ledger(description, amount) values($1, $2),($1, $2)", + []interface{}{"q1", 1}, + []pgtype.OID{pgtype.VarcharOID, pgtype.Int4OID}, + nil, + ) + + err := batch.Send(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + var value int + err = batch.QueryRowResults().Scan(&value) + if err != nil { + t.Error(err) + } + + ct, err := batch.ExecResults() + if err != nil { + t.Error(err) + } + if ct.RowsAffected() != 2 { + t.Errorf("ct.RowsAffected() => %v, want %v", ct.RowsAffected, 2) + } + + batch.Close() + + ensureConnValid(t, conn) +} diff --git a/query.go b/query.go index e37e6120..07b42e59 100644 --- a/query.go +++ b/query.go @@ -34,6 +34,8 @@ func (r *Row) Scan(dest ...interface{}) (err error) { } rows.Scan(dest...) + for rows.Next() { + } rows.Close() return rows.Err() } From 53e5d8e341f4e64da8814dd6b38593e797c12dae Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Thu, 21 Sep 2017 11:19:52 -0500 Subject: [PATCH 3/4] Fix incomplete selects during batch An incompletely read select followed by an insert would fail. This was caused by query methods in the non-batch path always calling ensureConnectionReadyForQuery. This ensures that connections interrupted by context cancellation are still usable. However, in the batch case query methods are not being called while reading the result. A incompletely read select followed by another select would not manifest this error due to it starting by reading until row description. But when an incomplete select (which even a successful QueryRow would be considered) is followed by an Exec, the CommandComplete message from the select would be considered as the response to the subsequent Exec. The fix is the batch tracking whether a CommandComplete is pending and reading it before advancing to the next result. This is similar in principle to ensureConnectionReadyForQuery, just specific to Batch. --- batch.go | 53 ++++++++++++++++++++++++++++++++++++++++++++------- batch_test.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++++- query.go | 5 +++-- 3 files changed, 99 insertions(+), 10 deletions(-) diff --git a/batch.go b/batch.go index fc6f0d03..36671cfd 100644 --- a/batch.go +++ b/batch.go @@ -17,13 +17,14 @@ type batchItem struct { // Batch queries are a way of bundling multiple queries together to avoid // unnecessary network round trips. type Batch struct { - conn *Conn - connPool *ConnPool - items []*batchItem - resultsRead int - sent bool - ctx context.Context - err error + conn *Conn + connPool *ConnPool + items []*batchItem + resultsRead int + sent bool + pendingCommandComplete bool + ctx context.Context + err error } // BeginBatch returns a *Batch query for c. @@ -145,8 +146,15 @@ func (b *Batch) ExecResults() (CommandTag, error) { default: } + if err := b.ensureCommandComplete(); err != nil { + b.die(err) + return "", err + } + b.resultsRead++ + b.pendingCommandComplete = true + for { msg, err := b.conn.rxMsg() if err != nil { @@ -155,6 +163,7 @@ func (b *Batch) ExecResults() (CommandTag, error) { switch msg := msg.(type) { case *pgproto3.CommandComplete: + b.pendingCommandComplete = false return CommandTag(msg.CommandTag), nil default: if err := b.conn.processContextFreeMsg(msg); err != nil { @@ -182,8 +191,16 @@ func (b *Batch) QueryResults() (*Rows, error) { default: } + if err := b.ensureCommandComplete(); err != nil { + b.die(err) + rows.fatal(err) + return rows, err + } + b.resultsRead++ + b.pendingCommandComplete = true + fieldDescriptions, err := b.conn.readUntilRowDescription() if err != nil { b.die(err) @@ -244,3 +261,25 @@ func (b *Batch) die(err error) { b.connPool.Release(b.conn) } } + +func (b *Batch) ensureCommandComplete() error { + for b.pendingCommandComplete { + msg, err := b.conn.rxMsg() + if err != nil { + return err + } + + switch msg := msg.(type) { + case *pgproto3.CommandComplete: + b.pendingCommandComplete = false + return nil + default: + err = b.conn.processContextFreeMsg(msg) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/batch_test.go b/batch_test.go index 998e8764..3112f183 100644 --- a/batch_test.go +++ b/batch_test.go @@ -477,7 +477,7 @@ func TestConnBeginBatchQuerySyntaxError(t *testing.T) { } } -func TestConnBeginBatchSelectInsert(t *testing.T) { +func TestConnBeginBatchQueryRowInsert(t *testing.T) { t.Parallel() conn := mustConnect(t, *defaultConnConfig) @@ -525,3 +525,52 @@ func TestConnBeginBatchSelectInsert(t *testing.T) { ensureConnValid(t, conn) } + +func TestConnBeginBatchQueryPartialReadInsert(t *testing.T) { + t.Parallel() + + conn := mustConnect(t, *defaultConnConfig) + defer closeConn(t, conn) + + sql := `create temporary table ledger( + id serial primary key, + description varchar not null, + amount int not null +);` + mustExec(t, conn, sql) + + batch := conn.BeginBatch() + batch.Queue("select 1 union all select 2 union all select 3", + nil, + nil, + []int16{pgx.BinaryFormatCode}, + ) + batch.Queue("insert into ledger(description, amount) values($1, $2),($1, $2)", + []interface{}{"q1", 1}, + []pgtype.OID{pgtype.VarcharOID, pgtype.Int4OID}, + nil, + ) + + err := batch.Send(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + rows, err := batch.QueryResults() + if err != nil { + t.Error(err) + } + rows.Close() + + ct, err := batch.ExecResults() + if err != nil { + t.Error(err) + } + if ct.RowsAffected() != 2 { + t.Errorf("ct.RowsAffected() => %v, want %v", ct.RowsAffected, 2) + } + + batch.Close() + + ensureConnValid(t, conn) +} diff --git a/query.go b/query.go index 07b42e59..407a792c 100644 --- a/query.go +++ b/query.go @@ -34,8 +34,6 @@ func (r *Row) Scan(dest ...interface{}) (err error) { } rows.Scan(dest...) - for rows.Next() { - } rows.Close() return rows.Err() } @@ -151,6 +149,9 @@ func (rows *Rows) Next() bool { rows.values = msg.Values return true case *pgproto3.CommandComplete: + if rows.batch != nil { + rows.batch.pendingCommandComplete = false + } rows.Close() return false From fd93b83433114f14aa048f6e3d24529a3c323602 Mon Sep 17 00:00:00 2001 From: Mark Fletcher Date: Thu, 21 Sep 2017 09:58:20 -0700 Subject: [PATCH 4/4] Reverted breaking API change to CreateReplicationSlot. Instead, the new version that returns the consistent_point and snapshot_name values is called CreateReplicationSlotEx(). --- replication.go | 8 +++++++- replication_test.go | 29 ++++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/replication.go b/replication.go index b1329c6b..7dd5efe4 100644 --- a/replication.go +++ b/replication.go @@ -435,7 +435,13 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti } // Create the replication slot, using the given name and output plugin. -func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { +func (rc *ReplicationConn) CreateReplicationSlot(slotName, outputPlugin string) (err error) { + _, err = rc.c.Exec(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) + return +} + +// Create the replication slot, using the given name and output plugin, and return the consistent_point and snapshot_name values. +func (rc *ReplicationConn) CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) { var dummy string var rows *Rows rows, err = rc.sendReplicationModeQuery(fmt.Sprintf("CREATE_REPLICATION_SLOT %s LOGICAL %s", slotName, outputPlugin)) diff --git a/replication_test.go b/replication_test.go index 9989c982..d06d73cd 100644 --- a/replication_test.go +++ b/replication_test.go @@ -56,10 +56,18 @@ func TestSimpleReplicationConnection(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - _, _, err = replicationConn.CreateReplicationSlot("pgx_test", "test_decoding") + var cp string + var snapshot_name string + cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_test", "test_decoding") if err != nil { t.Fatalf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } // Do a simple change so we can get some wal data _, err = conn.Exec("create table if not exists replication_test (a integer)") @@ -178,20 +186,35 @@ func TestReplicationConn_DropReplicationSlot(t *testing.T) { replicationConn := mustReplicationConnect(t, *replicationConnConfig) defer closeReplicationConn(t, replicationConn) - _, _, err := replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + var cp string + var snapshot_name string + cp, snapshot_name, err := replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } + err = replicationConn.DropReplicationSlot("pgx_slot_test") if err != nil { t.Fatalf("Failed to drop replication slot: %v", err) } // We re-create to ensure the drop worked. - _, _, err = replicationConn.CreateReplicationSlot("pgx_slot_test", "test_decoding") + cp, snapshot_name, err = replicationConn.CreateReplicationSlotEx("pgx_slot_test", "test_decoding") if err != nil { t.Logf("replication slot create failed: %v", err) } + if cp == "" { + t.Logf("consistent_point is empty") + } + if snapshot_name == "" { + t.Logf("snapshot_name is empty") + } // And finally we drop to ensure we don't leave dirty state err = replicationConn.DropReplicationSlot("pgx_slot_test")