diff --git a/pgconn/config.go b/pgconn/config.go index 172e7478..0eab23af 100644 --- a/pgconn/config.go +++ b/pgconn/config.go @@ -248,21 +248,21 @@ func ParseConfig(connString string) (*Config, error) { config.LookupFunc = makeDefaultResolver().LookupHost notRuntimeParams := map[string]struct{}{ - "host": struct{}{}, - "port": struct{}{}, - "database": struct{}{}, - "user": struct{}{}, - "password": struct{}{}, - "passfile": struct{}{}, - "connect_timeout": struct{}{}, - "sslmode": struct{}{}, - "sslkey": struct{}{}, - "sslcert": struct{}{}, - "sslrootcert": struct{}{}, - "target_session_attrs": struct{}{}, - "min_read_buffer_size": struct{}{}, - "service": struct{}{}, - "servicefile": struct{}{}, + "host": {}, + "port": {}, + "database": {}, + "user": {}, + "password": {}, + "passfile": {}, + "connect_timeout": {}, + "sslmode": {}, + "sslkey": {}, + "sslcert": {}, + "sslrootcert": {}, + "target_session_attrs": {}, + "min_read_buffer_size": {}, + "service": {}, + "servicefile": {}, } for k, v := range settings { @@ -329,10 +329,19 @@ func ParseConfig(connString string) (*Config, error) { } } - if settings["target_session_attrs"] == "read-write" { + switch tsa := settings["target_session_attrs"]; tsa { + case "read-write": config.ValidateConnect = ValidateConnectTargetSessionAttrsReadWrite - } else if settings["target_session_attrs"] != "any" { - return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", settings["target_session_attrs"])} + case "read-only": + config.ValidateConnect = ValidateConnectTargetSessionAttrsReadOnly + case "primary": + config.ValidateConnect = ValidateConnectTargetSessionAttrsPrimary + case "standby": + config.ValidateConnect = ValidateConnectTargetSessionAttrsStandby + case "any", "prefer-standby": + // do nothing + default: + return nil, &parseConfigError{connString: connString, msg: fmt.Sprintf("unknown target_session_attrs value: %v", tsa)} } return config, nil @@ -727,3 +736,48 @@ func ValidateConnectTargetSessionAttrsReadWrite(ctx context.Context, pgConn *PgC return nil } + +// ValidateConnectTargetSessionAttrsReadOnly is an ValidateConnectFunc that implements libpq compatible +// target_session_attrs=read-only. +func ValidateConnectTargetSessionAttrsReadOnly(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "show transaction_read_only", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) != "on" { + return errors.New("connection is not read only") + } + + return nil +} + +// ValidateConnectTargetSessionAttrsStandby is an ValidateConnectFunc that implements libpq compatible +// target_session_attrs=standby. +func ValidateConnectTargetSessionAttrsStandby(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) != "t" { + return errors.New("server is not in hot standby mode") + } + + return nil +} + +// ValidateConnectTargetSessionAttrsPrimary is an ValidateConnectFunc that implements libpq compatible +// target_session_attrs=primary. +func ValidateConnectTargetSessionAttrsPrimary(ctx context.Context, pgConn *PgConn) error { + result := pgConn.ExecParams(ctx, "select pg_is_in_recovery()", nil, nil, nil, nil).Read() + if result.Err != nil { + return result.Err + } + + if string(result.Rows[0][0]) == "t" { + return errors.New("server is in standby mode") + } + + return nil +} diff --git a/pgconn/config_test.go b/pgconn/config_test.go index 3ffa384f..335a25ca 100644 --- a/pgconn/config_test.go +++ b/pgconn/config_test.go @@ -541,7 +541,7 @@ func TestParseConfig(t *testing.T) { }, }, { - name: "target_session_attrs", + name: "target_session_attrs read-write", connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=read-write", config: &pgconn.Config{ User: "jack", @@ -554,6 +554,87 @@ func TestParseConfig(t *testing.T) { ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsReadWrite, }, }, + { + name: "target_session_attrs read-only", + connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=read-only", + config: &pgconn.Config{ + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, + ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsReadOnly, + }, + }, + { + name: "target_session_attrs primary", + connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=primary", + config: &pgconn.Config{ + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, + ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsPrimary, + }, + }, + { + name: "target_session_attrs standby", + connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=standby", + config: &pgconn.Config{ + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, + ValidateConnect: pgconn.ValidateConnectTargetSessionAttrsStandby, + }, + }, + { + name: "target_session_attrs prefer-standby", + connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=prefer-standby", + config: &pgconn.Config{ + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, + }, + }, + { + name: "target_session_attrs any", + connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable&target_session_attrs=any", + config: &pgconn.Config{ + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, + }, + }, + { + name: "target_session_attrs not set (any)", + connString: "postgres://jack:secret@localhost:5432/mydb?sslmode=disable", + config: &pgconn.Config{ + User: "jack", + Password: "secret", + Host: "localhost", + Port: 5432, + Database: "mydb", + TLSConfig: nil, + RuntimeParams: map[string]string{}, + }, + }, } for i, tt := range tests { diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 3d583dca..843bbef4 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -230,7 +230,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig var err error network, address := NetworkAddress(fallbackConfig.Host, fallbackConfig.Port) - pgConn.conn, err = config.DialFunc(ctx, network, address) + netConn, err := config.DialFunc(ctx, network, address) if err != nil { var netErr net.Error if errors.As(err, &netErr) && netErr.Timeout() { @@ -239,24 +239,27 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig return nil, &connectError{config: config, msg: "dial error", err: err} } - pgConn.parameterStatuses = make(map[string]string) + pgConn.conn = netConn + pgConn.contextWatcher = newContextWatcher(netConn) + pgConn.contextWatcher.Watch(ctx) if fallbackConfig.TLSConfig != nil { - if err := pgConn.startTLS(fallbackConfig.TLSConfig); err != nil { - pgConn.conn.Close() + tlsConn, err := startTLS(netConn, fallbackConfig.TLSConfig) + pgConn.contextWatcher.Unwatch() // Always unwatch `netConn` after TLS. + if err != nil { + netConn.Close() return nil, &connectError{config: config, msg: "tls error", err: err} } + + pgConn.conn = tlsConn + pgConn.contextWatcher = newContextWatcher(tlsConn) + pgConn.contextWatcher.Watch(ctx) } - pgConn.status = connStatusConnecting - pgConn.contextWatcher = ctxwatch.NewContextWatcher( - func() { pgConn.conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, - func() { pgConn.conn.SetDeadline(time.Time{}) }, - ) - - pgConn.contextWatcher.Watch(ctx) defer pgConn.contextWatcher.Unwatch() + pgConn.parameterStatuses = make(map[string]string) + pgConn.status = connStatusConnecting pgConn.frontend = config.BuildFrontend(pgConn.conn, pgConn.conn) startupMsg := pgproto3.StartupMessage{ @@ -332,7 +335,7 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig } } return pgConn, nil - case *pgproto3.ParameterStatus: + case *pgproto3.ParameterStatus, *pgproto3.NoticeResponse: // handled by ReceiveMessage case *pgproto3.ErrorResponse: pgConn.conn.Close() @@ -344,24 +347,29 @@ func connect(ctx context.Context, config *Config, fallbackConfig *FallbackConfig } } -func (pgConn *PgConn) startTLS(tlsConfig *tls.Config) (err error) { - err = binary.Write(pgConn.conn, binary.BigEndian, []int32{8, 80877103}) +func newContextWatcher(conn net.Conn) *ctxwatch.ContextWatcher { + return ctxwatch.NewContextWatcher( + func() { conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, + func() { conn.SetDeadline(time.Time{}) }, + ) +} + +func startTLS(conn net.Conn, tlsConfig *tls.Config) (net.Conn, error) { + err := binary.Write(conn, binary.BigEndian, []int32{8, 80877103}) if err != nil { - return + return nil, err } response := make([]byte, 1) - if _, err = io.ReadFull(pgConn.conn, response); err != nil { - return + if _, err = io.ReadFull(conn, response); err != nil { + return nil, err } if response[0] != 'S' { - return errors.New("server refused TLS connection") + return nil, errors.New("server refused TLS connection") } - pgConn.conn = tls.Client(pgConn.conn, tlsConfig) - - return nil + return tls.Client(conn, tlsConfig), nil } func (pgConn *PgConn) txPasswordMessage(password string) (err error) { @@ -1709,10 +1717,7 @@ func Construct(hc *HijackedConn) (*PgConn, error) { cleanupDone: make(chan struct{}), } - pgConn.contextWatcher = ctxwatch.NewContextWatcher( - func() { pgConn.conn.SetDeadline(time.Date(1, 1, 1, 1, 1, 1, 1, time.UTC)) }, - func() { pgConn.conn.SetDeadline(time.Time{}) }, - ) + pgConn.contextWatcher = newContextWatcher(pgConn.conn) return pgConn, nil } diff --git a/pgconn/pgconn_test.go b/pgconn/pgconn_test.go index 4350452c..d1ba29d2 100644 --- a/pgconn/pgconn_test.go +++ b/pgconn/pgconn_test.go @@ -161,6 +161,84 @@ func TestConnectTimeout(t *testing.T) { } } +func TestConnectTimeoutStuckOnTLSHandshake(t *testing.T) { + t.Parallel() + tests := []struct { + name string + connect func(connStr string) error + }{ + { + name: "via context that times out", + connect: func(connStr string) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + _, err := pgconn.Connect(ctx, connStr) + return err + }, + }, + { + name: "via config ConnectTimeout", + connect: func(connStr string) error { + conf, err := pgconn.ParseConfig(connStr) + require.NoError(t, err) + conf.ConnectTimeout = time.Millisecond * 10 + _, err = pgconn.ConnectConfig(context.Background(), conf) + return err + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ln, err := net.Listen("tcp", "127.0.0.1:") + require.NoError(t, err) + defer ln.Close() + + serverErrChan := make(chan error) + defer close(serverErrChan) + go func() { + conn, err := ln.Accept() + if err != nil { + serverErrChan <- err + return + } + defer conn.Close() + + var buf []byte + _, err = conn.Read(buf) + if err != nil { + serverErrChan <- err + return + } + + // Sleeping to hang the TLS handshake. + time.Sleep(time.Minute) + }() + + parts := strings.Split(ln.Addr().String(), ":") + host := parts[0] + port := parts[1] + connStr := fmt.Sprintf("host=%s port=%s", host, port) + + errChan := make(chan error) + go func() { + err := tt.connect(connStr) + errChan <- err + }() + + select { + case err = <-errChan: + require.True(t, pgconn.Timeout(err), err) + case err = <-serverErrChan: + t.Fatalf("server failed with error: %s", err) + case <-time.After(time.Millisecond * 100): + t.Fatal("exceeded connection timeout without erroring out") + } + }) + } +} + func TestConnectInvalidUser(t *testing.T) { t.Parallel() @@ -1220,6 +1298,7 @@ func TestConnOnNotice(t *testing.T) { config.OnNotice = func(c *pgconn.PgConn, notice *pgconn.Notice) { msg = notice.Message } + config.RuntimeParams["client_min_messages"] = "notice" // Ensure we only get the message we expect. pgConn, err := pgconn.ConnectConfig(context.Background(), config) require.NoError(t, err) @@ -1876,7 +1955,11 @@ func TestConnSendBytesAndReceiveMessage(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_CONN_STRING")) + config, err := pgconn.ParseConfig(os.Getenv("PGX_TEST_CONN_STRING")) + require.NoError(t, err) + config.RuntimeParams["client_min_messages"] = "notice" // Ensure we only get the messages we expect. + + pgConn, err := pgconn.ConnectConfig(context.Background(), config) require.NoError(t, err) defer closeConn(t, pgConn)