From 8bc6aa6b4959d1fa9737e3e2867baf7b400ae6b2 Mon Sep 17 00:00:00 2001 From: Daniel Date: Fri, 29 Oct 2021 14:07:46 +0200 Subject: [PATCH] Fix goroutine leak and unclosed connections --- pgxpool/pool.go | 12 ++++--- pgxpool/pool_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/pgxpool/pool.go b/pgxpool/pool.go index 28b1e1a2..86ba40c9 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -226,6 +226,8 @@ func ConnectConfig(ctx context.Context, config *Config) (*Pool, error) { if !config.LazyConnect { if err := p.createIdleResources(ctx, int(p.minConns)); err != nil { + // Couldn't create resources for minpool size. Close unhealthy pool. + p.Close() return nil, err } @@ -385,7 +387,7 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in ctx, cancel := context.WithCancel(parentCtx) defer cancel() - errs := make(chan error) + errs := make(chan error, targetResources) for i := 0; i < targetResources; i++ { go func() { @@ -394,14 +396,16 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in }() } + var firstError error for i := 0; i < targetResources; i++ { - if err := <-errs; err != nil { + err := <-errs + if err != nil && firstError == nil { cancel() - return err + firstError = err } } - return nil + return firstError } // Acquire returns a connection (*Conn) from the Pool diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index cdf59c24..ac38c1a4 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -902,3 +902,78 @@ func TestConnectMinPoolZero(t *testing.T) { require.Equal(t, int64(0), acquireAttempts) require.Equal(t, int64(1), connectAttempts) } + +func TestCreateMinPoolClosesConnectionsOnError(t *testing.T) { + t.Parallel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.MinConns = int32(12) + config.MaxConns = int32(15) + config.LazyConnect = false + + acquireAttempts := int64(0) + madeConnections := int64(0) + conns := make(chan *pgx.Conn, 15) + + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + atomic.AddInt64(&acquireAttempts, 1) + return true + } + config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + conns <- conn + + atomic.AddInt64(&madeConnections, 1) + mc := atomic.LoadInt64(&madeConnections) + if mc == 10 { + return errors.New("mock error") + } + return nil + } + pool, err := pgxpool.ConnectConfig(context.Background(), config) + require.Error(t, err) + require.Nil(t, pool) + + close(conns) + for conn := range conns { + require.True(t, conn.IsClosed()) + } + + require.Equal(t, int64(0), acquireAttempts) + require.True(t, madeConnections >= 10, "Expected %d got %d", 10, madeConnections) +} + +func TestCreateMinPoolReturnsFirstError(t *testing.T) { + t.Parallel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.MinConns = int32(12) + config.MaxConns = int32(15) + config.LazyConnect = false + + acquireAttempts := int64(0) + connectAttempts := int64(0) + + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + atomic.AddInt64(&acquireAttempts, 1) + return true + } + config.BeforeConnect = func(ctx context.Context, cfg *pgx.ConnConfig) error { + atomic.AddInt64(&connectAttempts, 1) + ca := atomic.LoadInt64(&connectAttempts) + if ca >= 5 { + return fmt.Errorf("error %d", ca) + } + return nil + } + + pool, err := pgxpool.ConnectConfig(context.Background(), config) + require.Nil(t, pool) + require.Error(t, err) + + require.True(t, connectAttempts >= 5, "Expected %d got %d", 5, connectAttempts) + require.Equal(t, "error 5", err.Error()) +}