mirror of https://github.com/jackc/pgx.git
*Pipeline.getResults should close pipeline on error
Otherwise, it might be possible to panic when closing the pipeline if it tries to read a connection that should be closed but still has a fatal error on the wire. https://github.com/jackc/pgx/issues/1920pull/1925/head
parent
d149d3fe5c
commit
2e84dccaf5
|
@ -2094,6 +2094,8 @@ func (p *Pipeline) getResults() (results any, err error) {
|
|||
for {
|
||||
msg, err := p.conn.receiveMessage()
|
||||
if err != nil {
|
||||
p.closed = true
|
||||
p.err = err
|
||||
p.conn.asyncClose()
|
||||
return nil, normalizeTimeoutError(p.ctx, err)
|
||||
}
|
||||
|
|
|
@ -3389,3 +3389,86 @@ func TestSNISupport(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// https://github.com/jackc/pgx/issues/1920
|
||||
func TestFatalErrorReceivedInPipelineMode(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||
defer cancel()
|
||||
|
||||
steps := pgmock.AcceptUnauthenticatedConnRequestSteps()
|
||||
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
|
||||
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
|
||||
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
|
||||
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
|
||||
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Parse{}))
|
||||
steps = append(steps, pgmock.ExpectAnyMessage(&pgproto3.Describe{}))
|
||||
steps = append(steps, pgmock.SendMessage(&pgproto3.RowDescription{Fields: []pgproto3.FieldDescription{
|
||||
{Name: []byte("mock")},
|
||||
}}))
|
||||
steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))
|
||||
// We shouldn't get anything after the first fatal error. But the reported issue was with PgBouncer so maybe that
|
||||
// causes the issue. Anyway, a FATAL error after the connection had already been killed could cause a panic.
|
||||
steps = append(steps, pgmock.SendMessage(&pgproto3.ErrorResponse{Severity: "FATAL", Code: "57P01"}))
|
||||
|
||||
script := &pgmock.Script{Steps: steps}
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:")
|
||||
require.NoError(t, err)
|
||||
defer ln.Close()
|
||||
|
||||
serverKeepAlive := make(chan struct{})
|
||||
defer close(serverKeepAlive)
|
||||
|
||||
serverErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(serverErrChan)
|
||||
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
serverErrChan <- err
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
err = conn.SetDeadline(time.Now().Add(59 * time.Second))
|
||||
if err != nil {
|
||||
serverErrChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
err = script.Run(pgproto3.NewBackend(conn, conn))
|
||||
if err != nil {
|
||||
serverErrChan <- err
|
||||
return
|
||||
}
|
||||
|
||||
<-serverKeepAlive
|
||||
}()
|
||||
|
||||
parts := strings.Split(ln.Addr().String(), ":")
|
||||
host := parts[0]
|
||||
port := parts[1]
|
||||
connStr := fmt.Sprintf("sslmode=disable host=%s port=%s", host, port)
|
||||
|
||||
ctx, cancel = context.WithTimeout(ctx, 59*time.Second)
|
||||
defer cancel()
|
||||
conn, err := pgconn.Connect(ctx, connStr)
|
||||
require.NoError(t, err)
|
||||
|
||||
pipeline := conn.StartPipeline(ctx)
|
||||
pipeline.SendPrepare("s1", "select 1", nil)
|
||||
pipeline.SendPrepare("s2", "select 2", nil)
|
||||
pipeline.SendPrepare("s3", "select 3", nil)
|
||||
err = pipeline.Sync()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pipeline.GetResults()
|
||||
require.NoError(t, err)
|
||||
_, err = pipeline.GetResults()
|
||||
require.Error(t, err)
|
||||
|
||||
err = pipeline.Close()
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue