From b72ebe050fdf729cd46ad86d1670f2124ee54919 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 31 Mar 2018 11:11:48 -0500 Subject: [PATCH] Fix fastpath / largeobjects query counting fastpath.Call was not incrementing pendingReadyForQueryCount when it sent a function call. But it was being decremented when the function call was finished and the server sent the ready for query message. This caused pendingReadyForQueryCount to go negative. This meant that any subsequent activity that depended on ensureConnectionReadyForQuery would not operate correctly because the connection would be considered ready before it had read all previous data off the wire. fixes #403 --- fastpath.go | 2 + large_objects_test.go | 144 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/fastpath.go b/fastpath.go index 06e1354a..f8af6190 100644 --- a/fastpath.go +++ b/fastpath.go @@ -76,6 +76,8 @@ func (f *fastpath) Call(oid pgtype.OID, args []fpArg) (res []byte, err error) { return nil, err } + f.cn.pendingReadyForQueryCount++ + for { msg, err := f.cn.rxMsg() if err != nil { diff --git a/large_objects_test.go b/large_objects_test.go index a19c851d..1d0a4f32 100644 --- a/large_objects_test.go +++ b/large_objects_test.go @@ -119,3 +119,147 @@ func TestLargeObjects(t *testing.T) { t.Errorf("Expected undefined_object error (42704), got %#v", err) } } + +func TestLargeObjectsMultipleTransactions(t *testing.T) { + t.Parallel() + + conn, err := pgx.Connect(*defaultConnConfig) + if err != nil { + t.Fatal(err) + } + + tx, err := conn.Begin() + if err != nil { + t.Fatal(err) + } + + lo, err := tx.LargeObjects() + if err != nil { + t.Fatal(err) + } + + id, err := lo.Create(0) + if err != nil { + t.Fatal(err) + } + + obj, err := lo.Open(id, pgx.LargeObjectModeWrite) + if err != nil { + t.Fatal(err) + } + + n, err := obj.Write([]byte("testing")) + if err != nil { + t.Fatal(err) + } + if n != 7 { + t.Errorf("Expected n to be 7, got %d", n) + } + + // Commit the first transaction + err = tx.Commit() + if err != nil { + t.Fatal(err) + } + + // IMPORTANT: Use the same connection for another query + query := `select n from generate_series(1,10) n` + rows, err := conn.Query(query) + if err != nil { + t.Fatal(err) + } + rows.Close() + + // Start a new transaction + tx2, err := conn.Begin() + if err != nil { + t.Fatal(err) + } + + lo2, err := tx2.LargeObjects() + if err != nil { + t.Fatal(err) + } + + // Reopen the large object in the new transaction + obj2, err := lo2.Open(id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite) + if err != nil { + t.Fatal(err) + } + + pos, err := obj2.Seek(1, 0) + if err != nil { + t.Fatal(err) + } + if pos != 1 { + t.Errorf("Expected pos to be 1, got %d", pos) + } + + res := make([]byte, 6) + n, err = obj2.Read(res) + if err != nil { + t.Fatal(err) + } + if string(res) != "esting" { + t.Errorf(`Expected res to be "esting", got %q`, res) + } + if n != 6 { + t.Errorf("Expected n to be 6, got %d", n) + } + + n, err = obj2.Read(res) + if err != io.EOF { + t.Error("Expected io.EOF, go nil") + } + if n != 0 { + t.Errorf("Expected n to be 0, got %d", n) + } + + pos, err = obj2.Tell() + if err != nil { + t.Fatal(err) + } + if pos != 7 { + t.Errorf("Expected pos to be 7, got %d", pos) + } + + err = obj2.Truncate(1) + if err != nil { + t.Fatal(err) + } + + pos, err = obj2.Seek(-1, 2) + if err != nil { + t.Fatal(err) + } + if pos != 0 { + t.Errorf("Expected pos to be 0, got %d", pos) + } + + res = make([]byte, 2) + n, err = obj2.Read(res) + if err != io.EOF { + t.Errorf("Expected err to be io.EOF, got %v", err) + } + if n != 1 { + t.Errorf("Expected n to be 1, got %d", n) + } + if res[0] != 't' { + t.Errorf("Expected res[0] to be 't', got %v", res[0]) + } + + err = obj2.Close() + if err != nil { + t.Fatal(err) + } + + err = lo2.Unlink(id) + if err != nil { + t.Fatal(err) + } + + _, err = lo2.Open(id, pgx.LargeObjectModeRead) + if e, ok := err.(pgx.PgError); !ok || e.Code != "42704" { + t.Errorf("Expected undefined_object error (42704), got %#v", err) + } +}