From 77a2da2b4660734d3280fa7a49f50a59d4f884dd Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Wed, 10 Apr 2019 11:09:42 -0500 Subject: [PATCH] Replace connection pool --- batch.go | 5 - bench_test.go | 59 +-- conn_pool.go | 550 ---------------------- conn_pool_private_test.go | 44 -- conn_pool_test.go | 926 -------------------------------------- pool/pool_test.go | 23 + query.go | 5 - tx.go | 15 +- tx_test.go | 17 +- 9 files changed, 42 insertions(+), 1602 deletions(-) delete mode 100644 conn_pool.go delete mode 100644 conn_pool_private_test.go delete mode 100644 conn_pool_test.go diff --git a/batch.go b/batch.go index f26a398e..77a0a01c 100644 --- a/batch.go +++ b/batch.go @@ -19,7 +19,6 @@ type batchItem struct { // unnecessary network round trips. type Batch struct { conn *Conn - connPool *ConnPool items []*batchItem resultsRead int pendingCommandComplete bool @@ -188,8 +187,4 @@ func (b *Batch) die(err error) { b.err = err b.conn.die(err) - - if b.conn != nil && b.connPool != nil { - b.connPool.Release(b.conn) - } } diff --git a/bench_test.go b/bench_test.go index 1966fabb..2b39cf07 100644 --- a/bench_test.go +++ b/bench_test.go @@ -13,45 +13,6 @@ import ( "github.com/jackc/pgx/pgtype" ) -func BenchmarkConnPool(b *testing.B) { - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: 5} - pool, err := pgx.NewConnPool(config) - if err != nil { - b.Fatalf("Unable to create connection pool: %v", err) - } - defer pool.Close() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - var conn *pgx.Conn - if conn, err = pool.Acquire(); err != nil { - b.Fatalf("Unable to acquire connection: %v", err) - } - pool.Release(conn) - } -} - -func BenchmarkConnPoolQueryRow(b *testing.B) { - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: 5} - pool, err := pgx.NewConnPool(config) - if err != nil { - b.Fatalf("Unable to create connection pool: %v", err) - } - defer pool.Close() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - num := float64(-1) - if err := pool.QueryRow("select random()").Scan(&num); err != nil { - b.Fatal(err) - } - - if num < 0 { - b.Fatalf("expected `select random()` to return between 0 and 1 but it was: %v", num) - } - } -} - func BenchmarkPointerPointerWithNullValues(b *testing.B) { conn := mustConnect(b, mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE"))) defer closeConn(b, conn) @@ -613,19 +574,15 @@ func BenchmarkWrite10000RowsViaCopy(b *testing.B) { } func BenchmarkMultipleQueriesNonBatch(b *testing.B) { - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: 5} - pool, err := pgx.NewConnPool(config) - if err != nil { - b.Fatalf("Unable to create connection pool: %v", err) - } - defer pool.Close() + conn := mustConnect(b, mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE"))) + defer closeConn(b, conn) queryCount := 3 b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < queryCount; j++ { - rows, err := pool.Query("select n from generate_series(0, 5) n") + rows, err := conn.Query("select n from generate_series(0, 5) n") if err != nil { b.Fatal(err) } @@ -648,18 +605,14 @@ func BenchmarkMultipleQueriesNonBatch(b *testing.B) { } func BenchmarkMultipleQueriesBatch(b *testing.B) { - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: 5} - pool, err := pgx.NewConnPool(config) - if err != nil { - b.Fatalf("Unable to create connection pool: %v", err) - } - defer pool.Close() + conn := mustConnect(b, mustParseConfig(b, os.Getenv("PGX_TEST_DATABASE"))) + defer closeConn(b, conn) queryCount := 3 b.ResetTimer() for i := 0; i < b.N; i++ { - batch := pool.BeginBatch() + batch := conn.BeginBatch() for j := 0; j < queryCount; j++ { batch.Queue("select n from generate_series(0,5) n", nil, diff --git a/conn_pool.go b/conn_pool.go deleted file mode 100644 index 471a505c..00000000 --- a/conn_pool.go +++ /dev/null @@ -1,550 +0,0 @@ -package pgx - -import ( - "context" - "sync" - "time" - - "github.com/pkg/errors" - - "github.com/jackc/pgconn" - "github.com/jackc/pgx/pgtype" -) - -type ConnPoolConfig struct { - ConnConfig - MaxConnections int // max simultaneous connections to use, default 5, must be at least 2 - AfterConnect func(*Conn) error // function to call on every new connection - AcquireTimeout time.Duration // max wait time when all connections are busy (0 means no timeout) -} - -type ConnPool struct { - allConnections []*Conn - availableConnections []*Conn - cond *sync.Cond - config ConnConfig // config used when establishing connection - inProgressConnects int - maxConnections int - resetCount int - afterConnect func(*Conn) error - logger Logger - logLevel LogLevel - closed bool - preparedStatements map[string]*PreparedStatement - acquireTimeout time.Duration - connInfo *pgtype.ConnInfo -} - -type ConnPoolStat struct { - MaxConnections int // max simultaneous connections to use - CurrentConnections int // current live connections - AvailableConnections int // unused live connections -} - -// CheckedOutConnections returns the amount of connections that are currently -// checked out from the pool. -func (stat *ConnPoolStat) CheckedOutConnections() int { - return stat.CurrentConnections - stat.AvailableConnections -} - -// ErrAcquireTimeout occurs when an attempt to acquire a connection times out. -var ErrAcquireTimeout = errors.New("timeout acquiring connection from pool") - -// ErrClosedPool occurs on an attempt to acquire a connection from a closed pool. -var ErrClosedPool = errors.New("cannot acquire from closed pool") - -// NewConnPool creates a new ConnPool. config.ConnConfig is passed through to -// Connect directly. -func NewConnPool(config ConnPoolConfig) (p *ConnPool, err error) { - p = new(ConnPool) - p.config = config.ConnConfig - p.connInfo = minimalConnInfo - p.maxConnections = config.MaxConnections - if p.maxConnections == 0 { - p.maxConnections = 5 - } - if p.maxConnections < 1 { - return nil, errors.New("MaxConnections must be at least 1") - } - p.acquireTimeout = config.AcquireTimeout - if p.acquireTimeout < 0 { - return nil, errors.New("AcquireTimeout must be equal to or greater than 0") - } - - p.afterConnect = config.AfterConnect - - if config.LogLevel != 0 { - p.logLevel = config.LogLevel - } else { - // Preserve pre-LogLevel behavior by defaulting to LogLevelDebug - p.logLevel = LogLevelDebug - } - p.logger = config.Logger - if p.logger == nil { - p.logLevel = LogLevelNone - } - - p.allConnections = make([]*Conn, 0, p.maxConnections) - p.availableConnections = make([]*Conn, 0, p.maxConnections) - p.preparedStatements = make(map[string]*PreparedStatement) - p.cond = sync.NewCond(new(sync.Mutex)) - - // Initially establish one connection - var c *Conn - c, err = p.createConnection() - if err != nil { - return - } - p.allConnections = append(p.allConnections, c) - p.availableConnections = append(p.availableConnections, c) - p.connInfo = c.ConnInfo.DeepCopy() - - return -} - -// Acquire takes exclusive use of a connection until it is released. -func (p *ConnPool) Acquire() (*Conn, error) { - p.cond.L.Lock() - c, err := p.acquire(nil) - p.cond.L.Unlock() - return c, err -} - -// deadlinePassed returns true if the given deadline has passed. -func (p *ConnPool) deadlinePassed(deadline *time.Time) bool { - return deadline != nil && time.Now().After(*deadline) -} - -// acquire performs acquision assuming pool is already locked -func (p *ConnPool) acquire(deadline *time.Time) (*Conn, error) { - if p.closed { - return nil, ErrClosedPool - } - - // A connection is available - // The pool works like a queue. Available connection will be returned - // from the head. A new connection will be added to the tail. - numAvailable := len(p.availableConnections) - if numAvailable > 0 { - c := p.availableConnections[0] - c.poolResetCount = p.resetCount - copy(p.availableConnections, p.availableConnections[1:]) - p.availableConnections = p.availableConnections[:numAvailable-1] - return c, nil - } - - // Set initial timeout/deadline value. If the method (acquire) happens to - // recursively call itself the deadline should retain its value. - if deadline == nil && p.acquireTimeout > 0 { - tmp := time.Now().Add(p.acquireTimeout) - deadline = &tmp - } - - // Make sure the deadline (if it is) has not passed yet - if p.deadlinePassed(deadline) { - return nil, ErrAcquireTimeout - } - - // If there is a deadline then start a timeout timer - var timer *time.Timer - if deadline != nil { - timer = time.AfterFunc(deadline.Sub(time.Now()), func() { - p.cond.Broadcast() - }) - defer timer.Stop() - } - - // No connections are available, but we can create more - if len(p.allConnections)+p.inProgressConnects < p.maxConnections { - // Create a new connection. - // Careful here: createConnectionUnlocked() removes the current lock, - // creates a connection and then locks it back. - c, err := p.createConnectionUnlocked() - if err != nil { - return nil, err - } - c.poolResetCount = p.resetCount - p.allConnections = append(p.allConnections, c) - return c, nil - } - // All connections are in use and we cannot create more - if p.logLevel >= LogLevelWarn { - p.logger.Log(LogLevelWarn, "waiting for available connection", nil) - } - - // Wait until there is an available connection OR room to create a new connection - for len(p.availableConnections) == 0 && len(p.allConnections)+p.inProgressConnects == p.maxConnections { - if p.deadlinePassed(deadline) { - return nil, ErrAcquireTimeout - } - p.cond.Wait() - } - - // Stop the timer so that we do not spawn it on every acquire call. - if timer != nil { - timer.Stop() - } - return p.acquire(deadline) -} - -// Release gives up use of a connection. -func (p *ConnPool) Release(conn *Conn) { - if conn.ctxInProgress { - panic("should never release when context is in progress") - } - - if conn.pgConn.TxStatus != 'I' { - conn.Exec(context.TODO(), "rollback") - } - - if len(conn.channels) > 0 { - if err := conn.Unlisten("*"); err != nil { - conn.die(err) - } - conn.channels = make(map[string]struct{}) - } - - p.cond.L.Lock() - - if conn.poolResetCount != p.resetCount { - conn.Close() - p.cond.L.Unlock() - p.cond.Signal() - return - } - - if conn.IsAlive() { - p.availableConnections = append(p.availableConnections, conn) - } else { - p.removeFromAllConnections(conn) - } - p.cond.L.Unlock() - p.cond.Signal() -} - -// removeFromAllConnections Removes the given connection from the list. -// It returns true if the connection was found and removed or false otherwise. -func (p *ConnPool) removeFromAllConnections(conn *Conn) bool { - for i, c := range p.allConnections { - if conn == c { - p.allConnections = append(p.allConnections[:i], p.allConnections[i+1:]...) - return true - } - } - return false -} - -// Close ends the use of a connection pool. It prevents any new connections from -// being acquired and closes available underlying connections. Any acquired -// connections will be closed when they are released. -func (p *ConnPool) Close() { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - p.closed = true - - for _, c := range p.availableConnections { - _ = c.Close() - } - - // This will cause any checked out connections to be closed on release - p.resetCount++ -} - -// Reset closes all open connections, but leaves the pool open. It is intended -// for use when an error is detected that would disrupt all connections (such as -// a network interruption or a server state change). -// -// It is safe to reset a pool while connections are checked out. Those -// connections will be closed when they are returned to the pool. -func (p *ConnPool) Reset() { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - p.resetCount++ - p.allConnections = p.allConnections[0:0] - - for _, conn := range p.availableConnections { - conn.Close() - } - - p.availableConnections = p.availableConnections[0:0] -} - -// invalidateAcquired causes all acquired connections to be closed when released. -// The pool must already be locked. -func (p *ConnPool) invalidateAcquired() { - p.resetCount++ - - for _, c := range p.availableConnections { - c.poolResetCount = p.resetCount - } - - p.allConnections = p.allConnections[:len(p.availableConnections)] - copy(p.allConnections, p.availableConnections) -} - -// Stat returns connection pool statistics -func (p *ConnPool) Stat() (s ConnPoolStat) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - s.MaxConnections = p.maxConnections - s.CurrentConnections = len(p.allConnections) - s.AvailableConnections = len(p.availableConnections) - return -} - -func (p *ConnPool) createConnection() (*Conn, error) { - c, err := connect(context.TODO(), &p.config, p.connInfo) - if err != nil { - return nil, err - } - return p.afterConnectionCreated(c) -} - -// createConnectionUnlocked Removes the current lock, creates a new connection, and -// then locks it back. -// Here is the point: lets say our pool dialer's OpenTimeout is set to 3 seconds. -// And we have a pool with 20 connections in it, and we try to acquire them all at -// startup. -// If it happens that the remote server is not accessible, then the first connection -// in the pool blocks all the others for 3 secs, before it gets the timeout. Then -// connection #2 holds the lock and locks everything for the next 3 secs until it -// gets OpenTimeout err, etc. And the very last 20th connection will fail only after -// 3 * 20 = 60 secs. -// To avoid this we put Connect(p.config) outside of the lock (it is thread safe) -// what would allow us to make all the 20 connection in parallel (more or less). -func (p *ConnPool) createConnectionUnlocked() (*Conn, error) { - p.inProgressConnects++ - p.cond.L.Unlock() - // c, err := Connect(p.config) - c, err := Connect(context.TODO(), "TODO") - p.cond.L.Lock() - p.inProgressConnects-- - - if err != nil { - return nil, err - } - return p.afterConnectionCreated(c) -} - -// afterConnectionCreated executes (if it is) afterConnect() callback and prepares -// all the known statements for the new connection. -func (p *ConnPool) afterConnectionCreated(c *Conn) (*Conn, error) { - if p.afterConnect != nil { - err := p.afterConnect(c) - if err != nil { - c.die(err) - return nil, err - } - } - - for _, ps := range p.preparedStatements { - if _, err := c.Prepare(ps.Name, ps.SQL); err != nil { - c.die(err) - return nil, err - } - } - - return c, nil -} - -// Exec acquires a connection, delegates the call to that connection, and releases the connection -func (p *ConnPool) Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error) { - var c *Conn - if c, err = p.Acquire(); err != nil { - return - } - defer p.Release(c) - - return c.Exec(ctx, sql, arguments...) -} - -// Query acquires a connection and delegates the call to that connection. When -// *Rows are closed, the connection is released automatically. -func (p *ConnPool) Query(sql string, args ...interface{}) (*Rows, error) { - c, err := p.Acquire() - if err != nil { - // Because checking for errors can be deferred to the *Rows, build one with the error - return &Rows{closed: true, err: err}, err - } - - rows, err := c.Query(sql, args...) - if err != nil { - p.Release(c) - return rows, err - } - - rows.connPool = p - - return rows, nil -} - -func (p *ConnPool) QueryEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) (*Rows, error) { - c, err := p.Acquire() - if err != nil { - // Because checking for errors can be deferred to the *Rows, build one with the error - return &Rows{closed: true, err: err}, err - } - - rows, err := c.QueryEx(ctx, sql, options, args...) - if err != nil { - p.Release(c) - return rows, err - } - - rows.connPool = p - - return rows, nil -} - -// QueryRow acquires a connection and delegates the call to that connection. The -// connection is released automatically after Scan is called on the returned -// *Row. -func (p *ConnPool) QueryRow(sql string, args ...interface{}) *Row { - rows, _ := p.Query(sql, args...) - return (*Row)(rows) -} - -func (p *ConnPool) QueryRowEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) *Row { - rows, _ := p.QueryEx(ctx, sql, options, args...) - return (*Row)(rows) -} - -// Begin acquires a connection and begins a transaction on it. When the -// transaction is closed the connection will be automatically released. -func (p *ConnPool) Begin() (*Tx, error) { - return p.BeginEx(context.Background(), nil) -} - -// Prepare creates a prepared statement on a connection in the pool to test the -// statement is valid. If it succeeds all connections accessed through the pool -// will have the statement available. -// -// Prepare creates a prepared statement with name and sql. sql can contain -// placeholders for bound parameters. These placeholders are referenced -// positional as $1, $2, etc. -// -// Prepare is idempotent; i.e. it is safe to call Prepare multiple times with -// the same name and sql arguments. This allows a code path to Prepare and -// Query/Exec/PrepareEx without concern for if the statement has already been prepared. -func (p *ConnPool) Prepare(name, sql string) (*PreparedStatement, error) { - return p.PrepareEx(context.Background(), name, sql, nil) -} - -// PrepareEx creates a prepared statement on a connection in the pool to test the -// statement is valid. If it succeeds all connections accessed through the pool -// will have the statement available. -// -// PrepareEx creates a prepared statement with name and sql. sql can contain placeholders -// for bound parameters. These placeholders are referenced positional as $1, $2, etc. -// It differs from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct -// -// PrepareEx is idempotent; i.e. it is safe to call PrepareEx multiple times with the same -// name and sql arguments. This allows a code path to PrepareEx and Query/Exec/Prepare without -// concern for if the statement has already been prepared. -func (p *ConnPool) PrepareEx(ctx context.Context, name, sql string, opts *PrepareExOptions) (*PreparedStatement, error) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql { - return ps, nil - } - - c, err := p.acquire(nil) - if err != nil { - return nil, err - } - - p.availableConnections = append(p.availableConnections, c) - - // Double check that the statement was not prepared by someone else - // while we were acquiring the connection (since acquire is not fully - // blocking now, see createConnectionUnlocked()) - if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql { - return ps, nil - } - - ps, err := c.PrepareEx(ctx, name, sql, opts) - if err != nil { - return nil, err - } - - for _, c := range p.availableConnections { - _, err := c.PrepareEx(ctx, name, sql, opts) - if err != nil { - return nil, err - } - } - - p.invalidateAcquired() - p.preparedStatements[name] = ps - - return ps, err -} - -// Deallocate releases a prepared statement from all connections in the pool. -func (p *ConnPool) Deallocate(name string) (err error) { - p.cond.L.Lock() - defer p.cond.L.Unlock() - - for _, c := range p.availableConnections { - if err := c.Deallocate(name); err != nil { - return err - } - } - - p.invalidateAcquired() - delete(p.preparedStatements, name) - - return nil -} - -// BeginEx acquires a connection and starts a transaction with txOptions -// determining the transaction mode. When the transaction is closed the -// connection will be automatically released. -func (p *ConnPool) BeginEx(ctx context.Context, txOptions *TxOptions) (*Tx, error) { - for { - c, err := p.Acquire() - if err != nil { - return nil, err - } - - tx, err := c.BeginEx(ctx, txOptions) - if err != nil { - alive := c.IsAlive() - p.Release(c) - - // If connection is still alive then the error is not something trying - // again on a new connection would fix, so just return the error. But - // if the connection is dead try to acquire a new connection and try - // again. - if alive || ctx.Err() != nil { - return nil, err - } - continue - } - - tx.connPool = p - return tx, nil - } -} - -// CopyFrom acquires a connection, delegates the call to that connection, and releases the connection -func (p *ConnPool) CopyFrom(tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int, error) { - c, err := p.Acquire() - if err != nil { - return 0, err - } - defer p.Release(c) - - return c.CopyFrom(tableName, columnNames, rowSrc) -} - -// BeginBatch acquires a connection and begins a batch on that connection. When -// *Batch is finished, the connection is released automatically. -func (p *ConnPool) BeginBatch() *Batch { - c, err := p.Acquire() - return &Batch{conn: c, connPool: p, err: err} -} diff --git a/conn_pool_private_test.go b/conn_pool_private_test.go deleted file mode 100644 index ef0ec1de..00000000 --- a/conn_pool_private_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package pgx - -import ( - "testing" -) - -func compareConnSlices(slice1, slice2 []*Conn) bool { - if len(slice1) != len(slice2) { - return false - } - for i, c := range slice1 { - if c != slice2[i] { - return false - } - } - return true -} - -func TestConnPoolRemoveFromAllConnections(t *testing.T) { - t.Parallel() - pool := ConnPool{} - conn1 := &Conn{} - conn2 := &Conn{} - conn3 := &Conn{} - - // First element - pool.allConnections = []*Conn{conn1, conn2, conn3} - pool.removeFromAllConnections(conn1) - if !compareConnSlices(pool.allConnections, []*Conn{conn2, conn3}) { - t.Fatal("First element test failed") - } - // Element somewhere in the middle - pool.allConnections = []*Conn{conn1, conn2, conn3} - pool.removeFromAllConnections(conn2) - if !compareConnSlices(pool.allConnections, []*Conn{conn1, conn3}) { - t.Fatal("Middle element test failed") - } - // Last element - pool.allConnections = []*Conn{conn1, conn2, conn3} - pool.removeFromAllConnections(conn3) - if !compareConnSlices(pool.allConnections, []*Conn{conn1, conn2}) { - t.Fatal("Last element test failed") - } -} diff --git a/conn_pool_test.go b/conn_pool_test.go deleted file mode 100644 index 2cd43d3f..00000000 --- a/conn_pool_test.go +++ /dev/null @@ -1,926 +0,0 @@ -package pgx_test - -import ( - "context" - "fmt" - "net" - "os" - "sync" - "testing" - "time" - - "github.com/pkg/errors" - - "github.com/jackc/pgconn" - "github.com/jackc/pgx" -) - -func createConnPool(t *testing.T, maxConnections int) *pgx.ConnPool { - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: maxConnections} - pool, err := pgx.NewConnPool(config) - if err != nil { - t.Fatalf("Unable to create connection pool: %v", err) - } - return pool -} - -func acquireAllConnections(t *testing.T, pool *pgx.ConnPool, maxConnections int) []*pgx.Conn { - connections := make([]*pgx.Conn, maxConnections) - for i := 0; i < maxConnections; i++ { - var err error - if connections[i], err = pool.Acquire(); err != nil { - t.Fatalf("Unable to acquire connection: %v", err) - } - } - return connections -} - -func releaseAllConnections(pool *pgx.ConnPool, connections []*pgx.Conn) { - for _, c := range connections { - pool.Release(c) - } -} - -func acquireWithTimeTaken(pool *pgx.ConnPool) (*pgx.Conn, time.Duration, error) { - startTime := time.Now() - c, err := pool.Acquire() - return c, time.Since(startTime), err -} - -func TestNewConnPool(t *testing.T) { - t.Parallel() - - var numCallbacks int - afterConnect := func(c *pgx.Conn) error { - numCallbacks++ - return nil - } - - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: 2, AfterConnect: afterConnect} - pool, err := pgx.NewConnPool(config) - if err != nil { - t.Fatal("Unable to establish connection pool") - } - defer pool.Close() - - // It initially connects once - stat := pool.Stat() - if stat.CurrentConnections != 1 { - t.Errorf("Expected 1 connection to be established immediately, but %v were", numCallbacks) - } - - // Pool creation returns an error if any AfterConnect callback does - errAfterConnect := errors.New("Some error") - afterConnect = func(c *pgx.Conn) error { - return errAfterConnect - } - - config = pgx.ConnPoolConfig{ConnConfig: mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE")), MaxConnections: 2, AfterConnect: afterConnect} - pool, err = pgx.NewConnPool(config) - if err != errAfterConnect { - t.Errorf("Expected errAfterConnect but received unexpected: %v", err) - } -} - -func TestNewConnPoolDefaultsTo5MaxConnections(t *testing.T) { - t.Parallel() - - config := pgx.ConnPoolConfig{ConnConfig: mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE"))} - pool, err := pgx.NewConnPool(config) - if err != nil { - t.Fatal("Unable to establish connection pool") - } - defer pool.Close() - - if n := pool.Stat().MaxConnections; n != 5 { - t.Fatalf("Expected pool to default to 5 max connections, but it was %d", n) - } -} - -func TestPoolAcquireAndReleaseCycle(t *testing.T) { - t.Parallel() - - maxConnections := 2 - incrementCount := int32(100) - completeSync := make(chan int) - pool := createConnPool(t, maxConnections) - defer pool.Close() - - allConnections := acquireAllConnections(t, pool, maxConnections) - - for _, c := range allConnections { - mustExec(t, c, "create temporary table t(counter integer not null)") - mustExec(t, c, "insert into t(counter) values(0);") - } - - releaseAllConnections(pool, allConnections) - - f := func() { - conn, err := pool.Acquire() - if err != nil { - t.Fatal("Unable to acquire connection") - } - defer pool.Release(conn) - - // Increment counter... - mustExec(t, conn, "update t set counter = counter + 1") - completeSync <- 0 - } - - for i := int32(0); i < incrementCount; i++ { - go f() - } - - // Wait for all f() to complete - for i := int32(0); i < incrementCount; i++ { - <-completeSync - } - - // Check that temp table in each connection has been incremented some number of times - actualCount := int32(0) - allConnections = acquireAllConnections(t, pool, maxConnections) - - for _, c := range allConnections { - var n int32 - c.QueryRow("select counter from t").Scan(&n) - if n == 0 { - t.Error("A connection was never used") - } - - actualCount += n - } - - if actualCount != incrementCount { - fmt.Println(actualCount) - t.Error("Wrong number of increments") - } - - releaseAllConnections(pool, allConnections) -} - -func TestPoolNonBlockingConnections(t *testing.T) { - t.Parallel() - - var dialCountLock sync.Mutex - dialCount := 0 - openTimeout := 1 * time.Second - testDialer := func(ctx context.Context, network, address string) (net.Conn, error) { - var firstDial bool - dialCountLock.Lock() - dialCount++ - firstDial = dialCount == 1 - dialCountLock.Unlock() - - if firstDial { - return net.Dial(network, address) - } else { - time.Sleep(openTimeout) - return nil, &net.OpError{Op: "dial", Net: "tcp"} - } - } - - maxConnections := 3 - config := pgx.ConnPoolConfig{ - ConnConfig: mustParseConfig(t, os.Getenv("PGX_TEST_DATABASE")), - MaxConnections: maxConnections, - } - config.ConnConfig.Config.DialFunc = testDialer - - pool, err := pgx.NewConnPool(config) - if err != nil { - t.Fatalf("Expected NewConnPool not to fail, instead it failed with: %v", err) - } - defer pool.Close() - - // NewConnPool establishes an initial connection - // so we need to close that for the rest of the test - if conn, err := pool.Acquire(); err == nil { - conn.Close() - pool.Release(conn) - } else { - t.Fatalf("pool.Acquire unexpectedly failed: %v", err) - } - - var wg sync.WaitGroup - wg.Add(maxConnections) - - startedAt := time.Now() - for i := 0; i < maxConnections; i++ { - go func() { - _, err := pool.Acquire() - wg.Done() - if err == nil { - t.Fatal("Acquire() expected to fail but it did not") - } - }() - } - wg.Wait() - - // Prior to createConnectionUnlocked() use the test took - // maxConnections * openTimeout seconds to complete. - // With createConnectionUnlocked() it takes ~ 1 * openTimeout seconds. - timeTaken := time.Since(startedAt) - if timeTaken > openTimeout+1*time.Second { - t.Fatalf("Expected all Acquire() to run in parallel and take about %v, instead it took '%v'", openTimeout, timeTaken) - } - -} - -func TestPoolErrClosedPool(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 1) - // Intentionaly close the pool now so we can test ErrClosedPool - pool.Close() - - c, err := pool.Acquire() - if c != nil { - t.Fatalf("Expected acquired connection to be nil, instead it was '%v'", c) - } - - if err == nil || err != pgx.ErrClosedPool { - t.Fatalf("Expected error to be pgx.ErrClosedPool, instead it was '%v'", err) - } -} - -func TestPoolAcquireAndReleaseCycleAutoConnect(t *testing.T) { - t.Parallel() - - maxConnections := 3 - pool := createConnPool(t, maxConnections) - defer pool.Close() - - doSomething := func() { - c, err := pool.Acquire() - if err != nil { - t.Fatalf("Unable to Acquire: %v", err) - } - rows, _ := c.Query("select 1, pg_sleep(0.02)") - rows.Close() - pool.Release(c) - } - - for i := 0; i < 10; i++ { - doSomething() - } - - stat := pool.Stat() - if stat.CurrentConnections != 1 { - t.Fatalf("Pool shouldn't have established more connections when no contention: %v", stat.CurrentConnections) - } - - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - doSomething() - }() - } - wg.Wait() - - stat = pool.Stat() - if stat.CurrentConnections != stat.MaxConnections { - t.Fatalf("Pool should have used all possible connections: %v", stat.CurrentConnections) - } -} - -func TestPoolReleaseDiscardsDeadConnections(t *testing.T) { - t.Parallel() - - // Run timing sensitive test many times - for i := 0; i < 50; i++ { - func() { - maxConnections := 3 - pool := createConnPool(t, maxConnections) - defer pool.Close() - - var c1, c2 *pgx.Conn - var err error - var stat pgx.ConnPoolStat - - if c1, err = pool.Acquire(); err != nil { - t.Fatalf("Unexpected error acquiring connection: %v", err) - } - defer func() { - if c1 != nil { - pool.Release(c1) - } - }() - - if c2, err = pool.Acquire(); err != nil { - t.Fatalf("Unexpected error acquiring connection: %v", err) - } - defer func() { - if c2 != nil { - pool.Release(c2) - } - }() - - if _, err = c2.Exec(context.Background(), "select pg_terminate_backend($1)", c1.PID()); err != nil { - t.Fatalf("Unable to kill backend PostgreSQL process: %v", err) - } - - // do something with the connection so it knows it's dead - rows, _ := c1.Query("select 1") - rows.Close() - if rows.Err() == nil { - t.Fatal("Expected error but none occurred") - } - - if c1.IsAlive() { - t.Fatal("Expected connection to be dead but it wasn't") - } - - stat = pool.Stat() - if stat.CurrentConnections != 2 { - t.Fatalf("Unexpected CurrentConnections: %v", stat.CurrentConnections) - } - if stat.AvailableConnections != 0 { - t.Fatalf("Unexpected AvailableConnections: %v", stat.CurrentConnections) - } - - pool.Release(c1) - c1 = nil // so it doesn't get released again by the defer - - stat = pool.Stat() - if stat.CurrentConnections != 1 { - t.Fatalf("Unexpected CurrentConnections: %v", stat.CurrentConnections) - } - if stat.AvailableConnections != 0 { - t.Fatalf("Unexpected AvailableConnections: %v", stat.CurrentConnections) - } - }() - } -} - -func TestConnPoolResetClosesCheckedOutConnectionsOnRelease(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 5) - defer pool.Close() - - inProgressRows := []*pgx.Rows{} - var inProgressPIDs []int32 - - // Start some queries and reset pool while they are in progress - for i := 0; i < 10; i++ { - rows, err := pool.Query("select pg_backend_pid() union all select 1 union all select 2") - if err != nil { - t.Fatal(err) - } - - rows.Next() - var pid int32 - rows.Scan(&pid) - inProgressPIDs = append(inProgressPIDs, pid) - - inProgressRows = append(inProgressRows, rows) - pool.Reset() - } - - // Check that the queries are completed - for _, rows := range inProgressRows { - var expectedN int32 - - for rows.Next() { - expectedN++ - var n int32 - err := rows.Scan(&n) - if err != nil { - t.Fatal(err) - } - if expectedN != n { - t.Fatalf("Expected n to be %d, but it was %d", expectedN, n) - } - } - - if err := rows.Err(); err != nil { - t.Fatal(err) - } - } - - // pool should be in fresh state due to previous reset - stats := pool.Stat() - if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } - - var connCount int - err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount) - if err != nil { - t.Fatal(err) - } - if connCount != 0 { - t.Fatalf("%d connections not closed", connCount) - } -} - -func TestConnPoolResetClosesCheckedInConnections(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 5) - defer pool.Close() - - inProgressRows := []*pgx.Rows{} - var inProgressPIDs []int32 - - // Start some queries and reset pool while they are in progress - for i := 0; i < 5; i++ { - rows, err := pool.Query("select pg_backend_pid()") - if err != nil { - t.Fatal(err) - } - - inProgressRows = append(inProgressRows, rows) - } - - // Check that the queries are completed - for _, rows := range inProgressRows { - for rows.Next() { - var pid int32 - err := rows.Scan(&pid) - if err != nil { - t.Fatal(err) - } - inProgressPIDs = append(inProgressPIDs, pid) - - } - - if err := rows.Err(); err != nil { - t.Fatal(err) - } - } - - // Ensure pool is fully connected and available - stats := pool.Stat() - if stats.CurrentConnections != 5 || stats.AvailableConnections != 5 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } - - pool.Reset() - - // Pool should be empty after reset - stats = pool.Stat() - if stats.CurrentConnections != 0 || stats.AvailableConnections != 0 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } - - var connCount int - err := pool.QueryRow("select count(*) from pg_stat_activity where pid = any($1::int4[])", inProgressPIDs).Scan(&connCount) - if err != nil { - t.Fatal(err) - } - if connCount != 0 { - t.Fatalf("%d connections not closed", connCount) - } -} - -func TestConnPoolTransaction(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - stats := pool.Stat() - if stats.CurrentConnections != 1 || stats.AvailableConnections != 1 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } - - tx, err := pool.Begin() - if err != nil { - t.Fatalf("pool.Begin failed: %v", err) - } - defer tx.Rollback() - - var n int32 - err = tx.QueryRow("select 40+$1", 2).Scan(&n) - if err != nil { - t.Fatalf("tx.QueryRow Scan failed: %v", err) - } - if n != 42 { - t.Errorf("Expected 42, got %d", n) - } - - stats = pool.Stat() - if stats.CurrentConnections != 1 || stats.AvailableConnections != 0 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } - - err = tx.Rollback() - if err != nil { - t.Fatalf("tx.Rollback failed: %v", err) - } - - stats = pool.Stat() - if stats.CurrentConnections != 1 || stats.AvailableConnections != 1 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } -} - -func TestConnPoolTransactionIso(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - tx, err := pool.BeginEx(context.Background(), &pgx.TxOptions{IsoLevel: pgx.Serializable}) - if err != nil { - t.Fatalf("pool.BeginEx failed: %v", err) - } - defer tx.Rollback() - - var level string - err = tx.QueryRow("select current_setting('transaction_isolation')").Scan(&level) - if err != nil { - t.Fatalf("tx.QueryRow failed: %v", level) - } - - if level != "serializable" { - t.Errorf("Expected to be in isolation level %v but was %v", "serializable", level) - } -} - -func TestConnPoolBeginRetry(t *testing.T) { - t.Parallel() - - // Run timing sensitive test many times - for i := 0; i < 50; i++ { - func() { - pool := createConnPool(t, 2) - defer pool.Close() - - killerConn, err := pool.Acquire() - if err != nil { - t.Fatal(err) - } - defer pool.Release(killerConn) - - victimConn, err := pool.Acquire() - if err != nil { - t.Fatal(err) - } - pool.Release(victimConn) - - // Terminate connection that was released to pool - if _, err = killerConn.Exec(context.Background(), "select pg_terminate_backend($1)", victimConn.PID()); err != nil { - t.Fatalf("Unable to kill backend PostgreSQL process: %v", err) - } - - // Since victimConn is the only available connection in the pool, pool.Begin should - // try to use it, fail, and allocate another connection - tx, err := pool.Begin() - if err != nil { - t.Fatalf("pool.Begin failed: %v", err) - } - defer tx.Rollback() - - var txPID uint32 - err = tx.QueryRow("select pg_backend_pid()").Scan(&txPID) - if err != nil { - t.Fatalf("tx.QueryRow Scan failed: %v", err) - } - if txPID == victimConn.PID() { - t.Error("Expected txPID to defer from killed conn pid, but it didn't") - } - }() - } -} - -func TestConnPoolQuery(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - var sum, rowCount int32 - - rows, err := pool.Query("select generate_series(1,$1)", 10) - if err != nil { - t.Fatalf("pool.Query failed: %v", err) - } - - stats := pool.Stat() - if stats.CurrentConnections != 1 || stats.AvailableConnections != 0 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } - - for rows.Next() { - var n int32 - rows.Scan(&n) - sum += n - rowCount++ - } - - if rows.Err() != nil { - t.Fatalf("conn.Query failed: %v", err) - } - - if rowCount != 10 { - t.Error("Select called onDataRow wrong number of times") - } - if sum != 55 { - t.Error("Wrong values returned") - } - - stats = pool.Stat() - if stats.CurrentConnections != 1 || stats.AvailableConnections != 1 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } -} - -func TestConnPoolQueryConcurrentLoad(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 10) - defer pool.Close() - - n := 100 - done := make(chan bool) - - for i := 0; i < n; i++ { - go func() { - defer func() { done <- true }() - var rowCount int32 - - rows, err := pool.Query("select generate_series(1,$1)", 1000) - if err != nil { - t.Fatalf("pool.Query failed: %v", err) - } - defer rows.Close() - - for rows.Next() { - var n int32 - err = rows.Scan(&n) - if err != nil { - t.Fatalf("rows.Scan failed: %v", err) - } - if n != rowCount+1 { - t.Fatalf("Expected n to be %d, but it was %d", rowCount+1, n) - } - rowCount++ - } - - if rows.Err() != nil { - t.Fatalf("conn.Query failed: %v", rows.Err()) - } - - if rowCount != 1000 { - t.Error("Select called onDataRow wrong number of times") - } - - _, err = pool.Exec(context.Background(), "--;") - if err != nil { - t.Fatalf("pool.Exec failed: %v", err) - } - }() - } - - for i := 0; i < n; i++ { - <-done - } -} - -func TestConnPoolQueryRow(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - var n int32 - err := pool.QueryRow("select 40+$1", 2).Scan(&n) - if err != nil { - t.Fatalf("pool.QueryRow Scan failed: %v", err) - } - - if n != 42 { - t.Errorf("Expected 42, got %d", n) - } - - stats := pool.Stat() - if stats.CurrentConnections != 1 || stats.AvailableConnections != 1 { - t.Fatalf("Unexpected connection pool stats: %v", stats) - } -} - -func TestConnPoolPrepare(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - _, err := pool.Prepare("test", "select $1::varchar") - if err != nil { - t.Fatalf("Unable to prepare statement: %v", err) - } - - var s string - err = pool.QueryRow("test", "hello").Scan(&s) - if err != nil { - t.Errorf("Executing prepared statement failed: %v", err) - } - - if s != "hello" { - t.Errorf("Prepared statement did not return expected value: %v", s) - } - - err = pool.Deallocate("test") - if err != nil { - t.Errorf("Deallocate failed: %v", err) - } - - err = pool.QueryRow("test", "hello").Scan(&s) - if err, ok := err.(*pgconn.PgError); !(ok && err.Code == "42601") { - t.Errorf("Expected error calling deallocated prepared statement, but got: %v", err) - } -} - -func TestConnPoolPrepareDeallocatePrepare(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - _, err := pool.Prepare("test", "select $1::varchar") - if err != nil { - t.Fatalf("Unable to prepare statement: %v", err) - } - err = pool.Deallocate("test") - if err != nil { - t.Fatalf("Unable to deallocate statement: %v", err) - } - _, err = pool.Prepare("test", "select $1::varchar") - if err != nil { - t.Fatalf("Unable to prepare statement: %v", err) - } - - var s string - err = pool.QueryRow("test", "hello").Scan(&s) - if err != nil { - t.Fatalf("Executing prepared statement failed: %v", err) - } - - if s != "hello" { - t.Errorf("Prepared statement did not return expected value: %v", s) - } -} - -func TestConnPoolPrepareWhenConnIsAlreadyAcquired(t *testing.T) { - t.Parallel() - - t.Skip("TODO") - - // pool := createConnPool(t, 2) - // defer pool.Close() - - // testPreparedStatement := func(db queryRower, desc string) { - // var s string - // err := db.QueryRow("test", "hello").Scan(&s) - // if err != nil { - // t.Fatalf("%s. Executing prepared statement failed: %v", desc, err) - // } - - // if s != "hello" { - // t.Fatalf("%s. Prepared statement did not return expected value: %v", desc, s) - // } - // } - - // newReleaseOnce := func(c *pgx.Conn) func() { - // var once sync.Once - // return func() { - // once.Do(func() { pool.Release(c) }) - // } - // } - - // c1, err := pool.Acquire() - // if err != nil { - // t.Fatalf("Unable to acquire connection: %v", err) - // } - // c1Release := newReleaseOnce(c1) - // defer c1Release() - - // _, err = pool.Prepare("test", "select $1::varchar") - // if err != nil { - // t.Fatalf("Unable to prepare statement: %v", err) - // } - - // testPreparedStatement(pool, "pool") - - // c1Release() - - // c2, err := pool.Acquire() - // if err != nil { - // t.Fatalf("Unable to acquire connection: %v", err) - // } - // c2Release := newReleaseOnce(c2) - // defer c2Release() - - // // This conn will not be available and will be connection at this point - // c3, err := pool.Acquire() - // if err != nil { - // t.Fatalf("Unable to acquire connection: %v", err) - // } - // c3Release := newReleaseOnce(c3) - // defer c3Release() - - // testPreparedStatement(c2, "c2") - // testPreparedStatement(c3, "c3") - - // c2Release() - // c3Release() - - // err = pool.Deallocate("test") - // if err != nil { - // t.Errorf("Deallocate failed: %v", err) - // } - - // var s string - // err = pool.QueryRow("test", "hello").Scan(&s) - // if err, ok := err.(*pgconn.PgError); !(ok && err.Code == "42601") { - // t.Errorf("Expected error calling deallocated prepared statement, but got: %v", err) - // } -} - -func TestConnPoolBeginBatch(t *testing.T) { - t.Parallel() - - t.Skip("TODO") - - // pool := createConnPool(t, 2) - // defer pool.Close() - - // batch := pool.BeginBatch() - // batch.Queue("select n from generate_series(0,5) n", - // nil, - // nil, - // []int16{pgx.BinaryFormatCode}, - // ) - // batch.Queue("select n from generate_series(0,5) n", - // nil, - // nil, - // []int16{pgx.BinaryFormatCode}, - // ) - - // err := batch.Send(context.Background(), nil) - // if err != nil { - // t.Fatal(err) - // } - - // rows, err := batch.QueryResults() - // if err != nil { - // t.Error(err) - // } - - // for i := 0; rows.Next(); i++ { - // var n int - // if err := rows.Scan(&n); err != nil { - // t.Error(err) - // } - // if n != i { - // t.Errorf("n => %v, want %v", n, i) - // } - // } - - // if rows.Err() != nil { - // t.Error(rows.Err()) - // } - - // rows, err = batch.QueryResults() - // if err != nil { - // t.Error(err) - // } - - // for i := 0; rows.Next(); i++ { - // var n int - // if err := rows.Scan(&n); err != nil { - // t.Error(err) - // } - // if n != i { - // t.Errorf("n => %v, want %v", n, i) - // } - // } - - // if rows.Err() != nil { - // t.Error(rows.Err()) - // } - - // err = batch.Close() - // if err != nil { - // t.Fatal(err) - // } -} - -func TestConnPoolBeginEx(t *testing.T) { - t.Parallel() - - pool := createConnPool(t, 2) - defer pool.Close() - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - tx, err := pool.BeginEx(ctx, nil) - if err == nil || tx != nil { - t.Fatal("Should not be able to create a tx") - } -} diff --git a/pool/pool_test.go b/pool/pool_test.go index 074e717b..a9c36d68 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -209,3 +209,26 @@ func TestConnReleaseDestroysClosedConn(t *testing.T) { assert.Equal(t, 0, pool.Stat().TotalConns()) } + +func TestConnPoolQueryConcurrentLoad(t *testing.T) { + pool, err := pool.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + defer pool.Close() + + n := 100 + done := make(chan bool) + + for i := 0; i < n; i++ { + go func() { + defer func() { done <- true }() + testQuery(t, pool) + testQueryEx(t, pool) + testQueryRow(t, pool) + testQueryRowEx(t, pool) + }() + } + + for i := 0; i < n; i++ { + <-done + } +} diff --git a/query.go b/query.go index 5b301eaf..d2df5f77 100644 --- a/query.go +++ b/query.go @@ -44,7 +44,6 @@ func (r *Row) Scan(dest ...interface{}) (err error) { // calling Next() until it returns false, or when a fatal error occurs. type Rows struct { conn *Conn - connPool *ConnPool batch *Batch values [][]byte fields []FieldDescription @@ -105,10 +104,6 @@ func (rows *Rows) Close() { if rows.batch != nil && rows.err != nil { rows.batch.die(rows.err) } - - if rows.connPool != nil { - rows.connPool.Release(rows.conn) - } } func (rows *Rows) Err() error { diff --git a/tx.go b/tx.go index cc3b2fa7..a697e35d 100644 --- a/tx.go +++ b/tx.go @@ -105,10 +105,9 @@ func (c *Conn) BeginEx(ctx context.Context, txOptions *TxOptions) (*Tx, error) { // All Tx methods return ErrTxClosed if Commit or Rollback has already been // called on the Tx. type Tx struct { - conn *Conn - connPool *ConnPool - err error - status int8 + conn *Conn + err error + status int8 } // Commit commits the transaction @@ -135,10 +134,6 @@ func (tx *Tx) CommitEx(ctx context.Context) error { tx.conn.die(errors.New("commit failed")) } - if tx.connPool != nil { - tx.connPool.Release(tx.conn) - } - return tx.err } @@ -167,10 +162,6 @@ func (tx *Tx) RollbackEx(ctx context.Context) error { tx.conn.die(errors.New("rollback failed")) } - if tx.connPool != nil { - tx.connPool.Release(tx.conn) - } - return tx.err } diff --git a/tx_test.go b/tx_test.go index 4865a084..66016b51 100644 --- a/tx_test.go +++ b/tx_test.go @@ -100,23 +100,26 @@ func TestTxCommitWhenTxBroken(t *testing.T) { func TestTxCommitSerializationFailure(t *testing.T) { t.Parallel() - pool := createConnPool(t, 5) - defer pool.Close() + c1 := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE")) + defer closeConn(t, c1) - pool.Exec(context.Background(), `drop table if exists tx_serializable_sums`) - _, err := pool.Exec(context.Background(), `create table tx_serializable_sums(num integer);`) + c2 := mustConnectString(t, os.Getenv("PGX_TEST_DATABASE")) + defer closeConn(t, c2) + + c1.Exec(context.Background(), `drop table if exists tx_serializable_sums`) + _, err := c1.Exec(context.Background(), `create table tx_serializable_sums(num integer);`) if err != nil { t.Fatalf("Unable to create temporary table: %v", err) } - defer pool.Exec(context.Background(), `drop table tx_serializable_sums`) + defer c1.Exec(context.Background(), `drop table tx_serializable_sums`) - tx1, err := pool.BeginEx(context.Background(), &pgx.TxOptions{IsoLevel: pgx.Serializable}) + tx1, err := c1.BeginEx(context.Background(), &pgx.TxOptions{IsoLevel: pgx.Serializable}) if err != nil { t.Fatalf("BeginEx failed: %v", err) } defer tx1.Rollback() - tx2, err := pool.BeginEx(context.Background(), &pgx.TxOptions{IsoLevel: pgx.Serializable}) + tx2, err := c2.BeginEx(context.Background(), &pgx.TxOptions{IsoLevel: pgx.Serializable}) if err != nil { t.Fatalf("BeginEx failed: %v", err) }