From 7dd26a34a182d4aacaed3bf8c09f9cc48a7b6156 Mon Sep 17 00:00:00 2001 From: Jack Christensen Date: Sat, 4 Jun 2022 20:28:23 -0500 Subject: [PATCH] Fix block when reading more than buffered --- internal/nbconn/nbconn.go | 4 ++- internal/nbconn/nbconn_test.go | 46 ++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/internal/nbconn/nbconn.go b/internal/nbconn/nbconn.go index 0ceb3c79..a4fead7e 100644 --- a/internal/nbconn/nbconn.go +++ b/internal/nbconn/nbconn.go @@ -107,7 +107,9 @@ func (c *NetConn) Read(b []byte) (n int, err error) { n += copiedN } - if n == len(b) { + // If any bytes were already buffered return them without trying to do a Read. Otherwise, when the caller is trying to + // Read up to len(b) bytes but all available bytes have already been buffered the underlying Read would block. + if n > 0 { return n, nil } diff --git a/internal/nbconn/nbconn_test.go b/internal/nbconn/nbconn_test.go index 8622fcff..f26ccea3 100644 --- a/internal/nbconn/nbconn_test.go +++ b/internal/nbconn/nbconn_test.go @@ -123,7 +123,7 @@ func testVariants(t *testing.T, f func(t *testing.T, local nbconn.Conn, remote n } // makePipeConns returns a connected pair of net.Conns created with net.Pipe(). It is entirely synchronous so it is -// useful for testing an exact sequence of reads and writes. +// useful for testing an exact sequence of reads and writes with the underlying connection blocking. func makePipeConns(t *testing.T) (local, remote net.Conn) { local, remote = net.Pipe() t.Cleanup(func() { @@ -294,7 +294,7 @@ func TestReadPreviouslyBuffered(t *testing.T) { _, err := conn.Write([]byte("test")) require.NoError(t, err) - // Because net.Pipe() is synchronous conn.Flust must buffer a read. + // Because net.Pipe() is synchronous conn.Flush must buffer a read. err = conn.Flush() require.NoError(t, err) @@ -306,6 +306,42 @@ func TestReadPreviouslyBuffered(t *testing.T) { }) } +func TestReadMoreThanPreviouslyBufferedDoesNotBlock(t *testing.T) { + testVariants(t, func(t *testing.T, conn nbconn.Conn, remote net.Conn) { + errChan := make(chan error, 1) + go func() { + err := func() error { + _, err := remote.Write([]byte("alpha")) + if err != nil { + return err + } + + readBuf := make([]byte, 4) + _, err = remote.Read(readBuf) + if err != nil { + return err + } + + return nil + }() + errChan <- err + }() + + _, err := conn.Write([]byte("test")) + require.NoError(t, err) + + // Because net.Pipe() is synchronous conn.Flush must buffer a read. + err = conn.Flush() + require.NoError(t, err) + + readBuf := make([]byte, 10) + n, err := conn.Read(readBuf) + require.NoError(t, err) + require.EqualValues(t, 5, n) + require.Equal(t, []byte("alpha"), readBuf[:n]) + }) +} + func TestReadPreviouslyBufferedPartialRead(t *testing.T) { testVariants(t, func(t *testing.T, conn nbconn.Conn, remote net.Conn) { @@ -331,7 +367,7 @@ func TestReadPreviouslyBufferedPartialRead(t *testing.T) { _, err := conn.Write([]byte("test")) require.NoError(t, err) - // Because net.Pipe() is synchronous conn.Flust must buffer a read. + // Because net.Pipe() is synchronous conn.Flush must buffer a read. err = conn.Flush() require.NoError(t, err) @@ -378,7 +414,7 @@ func TestReadMultiplePreviouslyBuffered(t *testing.T) { _, err := conn.Write([]byte("test")) require.NoError(t, err) - // Because net.Pipe() is synchronous conn.Flust must buffer a read. + // Because net.Pipe() is synchronous conn.Flush must buffer a read. err = conn.Flush() require.NoError(t, err) @@ -423,7 +459,7 @@ func TestReadPreviouslyBufferedAndReadMore(t *testing.T) { _, err := conn.Write([]byte("test")) require.NoError(t, err) - // Because net.Pipe() is synchronous conn.Flust must buffer a read. + // Because net.Pipe() is synchronous conn.Flush must buffer a read. err = conn.Flush() require.NoError(t, err)